Authentication System - Testing Guide
Testing patterns and strategies for the authentication system.
Testing Philosophy
Coverage Requirements
| Component |
Minimum Coverage |
| Overall Backend |
85% |
Core (app/core/) |
95% |
| Auth Module |
90% |
| Security Files |
95% (jwt.py, security.py) |
Test Types
| Type |
Purpose |
Location |
| Unit |
Test functions in isolation |
tests/unit/ |
| Integration |
Test component interactions |
tests/integration/ |
| E2E |
Test complete flows |
tests/e2e/ |
| Mutation |
Verify test quality |
mutmut run |
Unit Testing
Password Hashing
# tests/unit/core/test_security.py
import pytest
from app.core.security import get_password_hash, verify_password
def test_password_hash_roundtrip():
"""Test that password can be hashed and verified."""
password = "SecurePassword123!" # pragma: allowlist secret
hashed = get_password_hash(password)
assert hashed != password # Not plaintext
assert verify_password(password, hashed) # Correct password works
assert not verify_password("wrong", hashed) # Wrong password fails
def test_password_hash_salt():
"""Test that each hash is unique (salt)."""
password = "SamePassword" # pragma: allowlist secret
hash1 = get_password_hash(password)
hash2 = get_password_hash(password)
assert hash1 != hash2 # Unique salts
def test_password_hash_constant_time():
"""Test that verification is constant-time."""
password = "Password123!" # pragma: allowlist secret
hashed = get_password_hash(password)
# Timing attacks shouldn't reveal password length
import time
start = time.time()
verify_password(password, hashed)
duration = time.time() - start
assert duration > 0 # Should take some time
JWT Token Creation/Validation
# tests/unit/core/test_jwt.py
import pytest
from app.core.jwt import create_access_token, decode_token, TokenValidationError
def test_access_token_creation():
"""Test access token structure."""
token = create_access_token(
subject="123",
user_name="Test User",
team_id=1,
team_name="Test Team",
role_id=1,
role_name="Owner",
permissions=["*"]
)
assert token.token_type == "access"
assert token.expires_at is not None
assert token.jti is not None
def test_token_decode_valid():
"""Test decoding a valid token."""
token = create_access_token(
subject="123",
user_name="Test User",
team_id=1,
team_name="Test Team",
role_id=1,
role_name="Owner",
permissions=["*"]
)
payload = decode_token(token.token, expected_type="access")
assert payload.subject == "123"
assert payload.team_id == 1
assert payload.permissions == ("*",)
def test_token_decode_invalid_signature():
"""Test that invalid signature raises error."""
fake_token = "eyJhbGciOiJIUzI1NiIsInR5cCI6IkpXVCJ9.fake.signature"
with pytest.raises(TokenValidationError):
decode_token(fake_token)
def test_token_decode_expired():
"""Test that expired token raises error."""
# Create expired token (manually or mock time)
token = create_access_token(
subject="123",
user_name="Test",
team_id=1,
team_name="Test",
role_id=1,
role_name="Owner",
permissions=["*"]
)
# Mock time to be past expiration...
with pytest.raises(TokenValidationError):
decode_token(token.token)
Permission Checking
# tests/unit/core/test_permissions.py
from app.core.permissions import has_permission, WILDCARD_PERMISSION
from app.core.identity.context import AuthenticatedUser
def test_wildcard_permission():
"""Test that wildcard grants all permissions."""
user = AuthenticatedUser(
user_id=1,
team_id=1,
role_id=1,
role_name="Owner",
permissions=frozenset([WILDCARD_PERMISSION]),
token_jti="abc"
)
assert has_permission(user, "any.permission")
assert has_permission(user, "site.create")
assert has_permission(user, "nonexistent.permission")
def test_exact_match_permission():
"""Test exact permission matching."""
user = AuthenticatedUser(
user_id=1,
team_id=1,
role_id=2,
role_name="Developer",
permissions=frozenset(["site.create", "site.view"]),
token_jti="abc"
)
assert has_permission(user, "site.create")
assert has_permission(user, "site.view")
assert not has_permission(user, "site.delete")
assert not has_permission(user, "team.manage")
def test_permission_case_sensitive():
"""Test that permissions are case-sensitive."""
user = AuthenticatedUser(
user_id=1,
team_id=1,
role_id=2,
role_name="Developer",
permissions=frozenset(["site.create"]),
token_jti="abc"
)
assert has_permission(user, "site.create")
assert not has_permission(user, "Site.Create")
assert not has_permission(user, "SITE.CREATE")
Integration Testing
Login Flow
# tests/integration/api/test_login_flow.py
import pytest
from httpx import AsyncClient
@pytest.mark.asyncio
async def test_login_success_flow(async_client: AsyncClient, db_session):
"""Test successful login through session exchange."""
# Step 1: Login with credentials
response = await async_client.post("/api/v1/auth/login", json={
"email": "test@example.com",
"password": "testpass123", # pragma: allowlist secret
"remember_me": False
})
assert response.status_code == 200
data = response.json()
assert "pre_auth_token" in data
assert "teams" in data
pre_auth_token = data["pre_auth_token"]
team_id = data["teams"][0]["team_id"]
# Step 2: Exchange session
response = await async_client.post("/api/v1/auth/session-exchange", json={
"pre_auth_token": pre_auth_token,
"team_id": team_id,
"session_info": None
})
assert response.status_code == 200
data = response.json()
assert "user_id" in data
# Verify cookies are set
cookies = response.cookies
assert "mbpanel_access" in cookies
assert "mbpanel_refresh" in cookies
assert "mbpanel_csrf" in cookies
@pytest.mark.asyncio
async def test_login_invalid_credentials(async_client: AsyncClient):
"""Test login with invalid credentials."""
response = await async_client.post("/api/v1/auth/login", json={
"email": "test@example.com",
"password": "wrongpassword" # pragma: allowlist secret
})
assert response.status_code == 401
@pytest.mark.asyncio
async def test_login_unverified_email(async_client: AsyncClient):
"""Test login with unverified email."""
# Create unverified user...
response = await async_client.post("/api/v1/auth/login", json={
"email": "unverified@example.com",
"password": "testpass123" # pragma: allowlist secret
})
assert response.status_code == 422
assert "Email verification required" in response.json()["detail"]
Token Refresh Flow
@pytest.mark.asyncio
async def test_token_refresh_with_csrf(async_client: AsyncClient):
"""Test token refresh with CSRF protection."""
# First, login to get cookies
# ... (login steps from above)
cookies = response.cookies
csrf_token = cookies.get("mbpanel_csrf")
# Refresh with CSRF token
response = await async_client.post(
"/api/v1/auth/refresh",
cookies=cookies,
headers={"X-CSRF-Token": csrf_token}
)
assert response.status_code == 200
data = response.json()
assert "access_token" in data
assert "refresh_token" in data
# Verify new cookies are set
new_cookies = response.cookies
assert "mbpanel_access" in new_cookies
assert "mbpanel_refresh" in new_cookies
@pytest.mark.asyncio
async def test_token_refresh_without_csrf_fails(async_client: AsyncClient):
"""Test that refresh without CSRF fails."""
cookies = {"mbpanel_refresh": "valid-token"}
response = await async_client.post(
"/api/v1/auth/refresh",
cookies=cookies
# No CSRF header
)
assert response.status_code == 403
assert "CSRF" in response.json()["detail"]
Permission Enforcement
@pytest.mark.asyncio
async def test_permission_enforced_endpoint(
async_client: AsyncClient,
authenticated_user_cookies # Fixture for auth cookies
):
"""Test that permission is enforced on endpoint."""
# User has "site.view" but not "site.delete"
response = await async_client.delete(
"/api/v1/sites/1",
cookies=authenticated_user_cookies
)
assert response.status_code == 403
assert "Missing permission" in response.json()["detail"]
@pytest.mark.asyncio
async def test_wildcard_permission_grants_access(
async_client: AsyncClient,
owner_cookies # Owner has wildcard
):
"""Test that wildcard permission grants access."""
response = await async_client.delete(
"/api/v1/sites/1",
cookies=owner_cookies
)
# Should not get 403 (may get other status)
assert response.status_code != 403
E2E Testing
Complete Registration Flow
# tests/e2e/test_registration_e2e.py
@pytest.mark.e2e
async def test_door_a_registration_flow(async_client: AsyncClient):
"""Test complete Door A registration flow."""
email = f"test-{uuid4()}@example.com"
# Step 1: Register
response = await async_client.post("/api/v1/auth/register-owner", json={
"email": email,
"password": "SecurePass123!", # pragma: allowlist secret
"name": "Test User",
"team_name": "Test Team",
"team_slug": "test-team"
})
assert response.status_code == 201
data = response.json()
user_id = data["user_id"]
team_id = data["team_id"]
# Step 2: Get verification token from DB/email
# (In test, extract from DB)
verification_token = await get_email_verification_token(email)
# Step 3: Verify email
response = await async_client.post("/api/v1/auth/email/verification/confirm", json={
"token": verification_token
})
assert response.status_code == 200
# Step 4: Login
response = await async_client.post("/api/v1/auth/login", json={
"email": email,
"password": "SecurePass123!" # pragma: allowlist secret
})
assert response.status_code == 200
pre_auth_token = response.json()["pre_auth_token"]
# Step 5: Exchange session
response = await async_client.post("/api/v1/auth/session-exchange", json={
"pre_auth_token": pre_auth_token,
"team_id": team_id
})
assert response.status_code == 200
assert response.json()["role_name"] == "Owner"
Suspicious Login Flow
@pytest.mark.e2e
async def test_suspicious_login_approval_flow(async_client: AsyncClient):
"""Test suspicious login detection and approval."""
# Create user with existing login from known location
# Login from NEW location (different IP/geo)
response = await async_client.post("/api/v1/auth/login", json={
"email": "user@example.com",
"password": "password123" # pragma: allowlist secret
})
pre_auth_token = response.json()["pre_auth_token"]
response = await async_client.post("/api/v1/auth/session-exchange", json={
"pre_auth_token": pre_auth_token,
"team_id": 1,
"session_info": {
"device_fingerprint": "new-device",
"ip": "1.2.3.4",
"user_agent": "Mozilla/5.0...",
"geo": {"city": "New York", "country": "US"}
}
})
# Should return pending state
assert response.status_code == 423
assert response.json()["detail"]["status"] == "pending"
alert_token = response.json()["detail"]["token"]
# Get alert token from Redis
# Approve the login
response = await async_client.post("/api/v1/auth/login/approve", json={
"token": alert_token
})
assert response.status_code == 200
# Now session-exchange should work
response = await async_client.post("/api/v1/auth/session-exchange", json={
"pre_auth_token": pre_auth_token,
"team_id": 1
})
assert response.status_code == 200
Mutation Testing
Running Mutation Tests
# Run mutation tests on security-critical files
mutmut run --paths-to-mutate app/core/jwt.py,app/core/security.py
# Expected: All mutations should be killed (tests fail)
# If tests pass with mutation, the test needs improvement
Example Mutation Test
# Tests should catch mutations like:
def test_password_hash_mutation():
"""Mutation: Changing != to == in verify_password."""
# Original: return bcrypt.checkpw(plain, hashed)
# Mutated: return not bcrypt.checkpw(plain, hashed)
# Test should FAIL if mutation survives
assert verify_password("pass", get_password_hash("pass"))
# If mutation changes logic, test fails
Testing with Fixtures
Auth Fixtures
# tests/conftest.py
import pytest
from httpx import AsyncClient
@pytest.fixture
async def authenticated_user(async_client: AsyncClient):
"""Create and login a test user."""
# Create user in DB
user = await create_test_user()
# Login
response = await async_client.post("/api/v1/auth/login", json={
"email": user.email,
"password": "testpass123" # pragma: allowlist secret
})
pre_auth_token = response.json()["pre_auth_token"]
# Exchange session
response = await async_client.post("/api/v1/auth/session-exchange", json={
"pre_auth_token": pre_auth_token,
"team_id": user.team_id
})
# Return cookies for authenticated requests
return response.cookies
@pytest.fixture
async def owner_user(async_client: AsyncClient):
"""Create and login an Owner user."""
return await create_user_with_role("Owner")
@pytest.fixture
async def developer_user(async_client: AsyncClient):
"""Create and login a Developer user."""
return await create_user_with_role("Developer")
Mock Fixtures
@pytest.fixture
def mock_geo_ip_lookup(monkeypatch):
"""Mock Geo-IP lookup to avoid external API calls."""
async def mock_lookup(ip: str):
return {"city": "San Francisco", "country": "US"}
monkeypatch.setattr(
"app.infrastructure.external.geo.ip_api.lookup_geo",
mock_lookup
)
@pytest.fixture
def mock_emailer(monkeypatch):
"""Mock emailer to avoid sending real emails."""
class MockEmailer:
def __init__(self):
self.sent_emails = []
async def send(self, to: str, subject: str, body: str):
self.sent_emails.append({"to": to, "subject": subject})
return MockEmailer()
Running Tests
Unit Tests Only (No Infrastructure)
# Fast - no Docker required
pytest tests/unit/ -v
pytest tests/unit/modules/auth/ -v
pytest tests/unit/core/ -v
Integration Tests (Requires Infrastructure)
# Start services first
make up
# Run integration tests
pytest tests/integration/ -v
pytest tests/integration/api/ -v
With Coverage
# Generate coverage report
pytest tests/unit/modules/auth/ --cov=app/modules/auth --cov-report=html
# View report
open htmlcov/index.html
Coverage Thresholds
# Enforce minimum coverage
pytest tests/unit/ --cov=app/core --cov-fail-under=95
pytest tests/unit/modules/auth/ --cov=app/modules/auth --cov-fail-under=90
Sad Path Testing
Test Failure Scenarios
| Scenario |
Test |
| Invalid credentials |
401 response |
| Expired token |
TokenValidationError |
| Wrong CSRF token |
403 response |
| Rate limit exceeded |
429 response |
| Suspicious login |
Pending state |
| Concurrent login |
423 Locked |
| Missing permission |
403 Forbidden |
Example Sad Path Test
@pytest.mark.asyncio
async def test_login_with_expired_preauth_token(async_client: AsyncClient):
"""Test that expired pre-auth token is rejected."""
# Create token that's already expired in Redis
expired_token = "expired-token-123"
response = await async_client.post("/api/v1/auth/session-exchange", json={
"pre_auth_token": expired_token,
"team_id": 1
})
assert response.status_code == 401
assert "expired" in response.json()["detail"].lower()