Skip to content

Authentication System - Testing Guide

Testing patterns and strategies for the authentication system.


Testing Philosophy

Coverage Requirements

Component Minimum Coverage
Overall Backend 85%
Core (app/core/) 95%
Auth Module 90%
Security Files 95% (jwt.py, security.py)

Test Types

Type Purpose Location
Unit Test functions in isolation tests/unit/
Integration Test component interactions tests/integration/
E2E Test complete flows tests/e2e/
Mutation Verify test quality mutmut run

Unit Testing

Password Hashing

# tests/unit/core/test_security.py
import pytest
from app.core.security import get_password_hash, verify_password

def test_password_hash_roundtrip():
    """Test that password can be hashed and verified."""
    password = "SecurePassword123!"  # pragma: allowlist secret
    hashed = get_password_hash(password)

    assert hashed != password  # Not plaintext
    assert verify_password(password, hashed)  # Correct password works
    assert not verify_password("wrong", hashed)  # Wrong password fails

def test_password_hash_salt():
    """Test that each hash is unique (salt)."""
    password = "SamePassword"  # pragma: allowlist secret
    hash1 = get_password_hash(password)
    hash2 = get_password_hash(password)

    assert hash1 != hash2  # Unique salts

def test_password_hash_constant_time():
    """Test that verification is constant-time."""
    password = "Password123!"  # pragma: allowlist secret
    hashed = get_password_hash(password)

    # Timing attacks shouldn't reveal password length
    import time
    start = time.time()
    verify_password(password, hashed)
    duration = time.time() - start
    assert duration > 0  # Should take some time

JWT Token Creation/Validation

# tests/unit/core/test_jwt.py
import pytest
from app.core.jwt import create_access_token, decode_token, TokenValidationError

def test_access_token_creation():
    """Test access token structure."""
    token = create_access_token(
        subject="123",
        user_name="Test User",
        team_id=1,
        team_name="Test Team",
        role_id=1,
        role_name="Owner",
        permissions=["*"]
    )

    assert token.token_type == "access"
    assert token.expires_at is not None
    assert token.jti is not None

def test_token_decode_valid():
    """Test decoding a valid token."""
    token = create_access_token(
        subject="123",
        user_name="Test User",
        team_id=1,
        team_name="Test Team",
        role_id=1,
        role_name="Owner",
        permissions=["*"]
    )

    payload = decode_token(token.token, expected_type="access")

    assert payload.subject == "123"
    assert payload.team_id == 1
    assert payload.permissions == ("*",)

def test_token_decode_invalid_signature():
    """Test that invalid signature raises error."""
    fake_token = "eyJhbGciOiJIUzI1NiIsInR5cCI6IkpXVCJ9.fake.signature"

    with pytest.raises(TokenValidationError):
        decode_token(fake_token)

def test_token_decode_expired():
    """Test that expired token raises error."""
    # Create expired token (manually or mock time)
    token = create_access_token(
        subject="123",
        user_name="Test",
        team_id=1,
        team_name="Test",
        role_id=1,
        role_name="Owner",
        permissions=["*"]
    )
    # Mock time to be past expiration...

    with pytest.raises(TokenValidationError):
        decode_token(token.token)

Permission Checking

# tests/unit/core/test_permissions.py
from app.core.permissions import has_permission, WILDCARD_PERMISSION
from app.core.identity.context import AuthenticatedUser

def test_wildcard_permission():
    """Test that wildcard grants all permissions."""
    user = AuthenticatedUser(
        user_id=1,
        team_id=1,
        role_id=1,
        role_name="Owner",
        permissions=frozenset([WILDCARD_PERMISSION]),
        token_jti="abc"
    )

    assert has_permission(user, "any.permission")
    assert has_permission(user, "site.create")
    assert has_permission(user, "nonexistent.permission")

def test_exact_match_permission():
    """Test exact permission matching."""
    user = AuthenticatedUser(
        user_id=1,
        team_id=1,
        role_id=2,
        role_name="Developer",
        permissions=frozenset(["site.create", "site.view"]),
        token_jti="abc"
    )

    assert has_permission(user, "site.create")
    assert has_permission(user, "site.view")
    assert not has_permission(user, "site.delete")
    assert not has_permission(user, "team.manage")

def test_permission_case_sensitive():
    """Test that permissions are case-sensitive."""
    user = AuthenticatedUser(
        user_id=1,
        team_id=1,
        role_id=2,
        role_name="Developer",
        permissions=frozenset(["site.create"]),
        token_jti="abc"
    )

    assert has_permission(user, "site.create")
    assert not has_permission(user, "Site.Create")
    assert not has_permission(user, "SITE.CREATE")

Integration Testing

Login Flow

# tests/integration/api/test_login_flow.py
import pytest
from httpx import AsyncClient

@pytest.mark.asyncio
async def test_login_success_flow(async_client: AsyncClient, db_session):
    """Test successful login through session exchange."""
    # Step 1: Login with credentials
    response = await async_client.post("/api/v1/auth/login", json={
        "email": "test@example.com",
        "password": "testpass123",  # pragma: allowlist secret
        "remember_me": False
    })

    assert response.status_code == 200
    data = response.json()
    assert "pre_auth_token" in data
    assert "teams" in data

    pre_auth_token = data["pre_auth_token"]
    team_id = data["teams"][0]["team_id"]

    # Step 2: Exchange session
    response = await async_client.post("/api/v1/auth/session-exchange", json={
        "pre_auth_token": pre_auth_token,
        "team_id": team_id,
        "session_info": None
    })

    assert response.status_code == 200
    data = response.json()
    assert "user_id" in data

    # Verify cookies are set
    cookies = response.cookies
    assert "mbpanel_access" in cookies
    assert "mbpanel_refresh" in cookies
    assert "mbpanel_csrf" in cookies

@pytest.mark.asyncio
async def test_login_invalid_credentials(async_client: AsyncClient):
    """Test login with invalid credentials."""
    response = await async_client.post("/api/v1/auth/login", json={
        "email": "test@example.com",
        "password": "wrongpassword"  # pragma: allowlist secret
    })

    assert response.status_code == 401

@pytest.mark.asyncio
async def test_login_unverified_email(async_client: AsyncClient):
    """Test login with unverified email."""
    # Create unverified user...

    response = await async_client.post("/api/v1/auth/login", json={
        "email": "unverified@example.com",
        "password": "testpass123"  # pragma: allowlist secret
    })

    assert response.status_code == 422
    assert "Email verification required" in response.json()["detail"]

Token Refresh Flow

@pytest.mark.asyncio
async def test_token_refresh_with_csrf(async_client: AsyncClient):
    """Test token refresh with CSRF protection."""
    # First, login to get cookies
    # ... (login steps from above)

    cookies = response.cookies
    csrf_token = cookies.get("mbpanel_csrf")

    # Refresh with CSRF token
    response = await async_client.post(
        "/api/v1/auth/refresh",
        cookies=cookies,
        headers={"X-CSRF-Token": csrf_token}
    )

    assert response.status_code == 200
    data = response.json()
    assert "access_token" in data
    assert "refresh_token" in data

    # Verify new cookies are set
    new_cookies = response.cookies
    assert "mbpanel_access" in new_cookies
    assert "mbpanel_refresh" in new_cookies

@pytest.mark.asyncio
async def test_token_refresh_without_csrf_fails(async_client: AsyncClient):
    """Test that refresh without CSRF fails."""
    cookies = {"mbpanel_refresh": "valid-token"}

    response = await async_client.post(
        "/api/v1/auth/refresh",
        cookies=cookies
        # No CSRF header
    )

    assert response.status_code == 403
    assert "CSRF" in response.json()["detail"]

Permission Enforcement

@pytest.mark.asyncio
async def test_permission_enforced_endpoint(
    async_client: AsyncClient,
    authenticated_user_cookies  # Fixture for auth cookies
):
    """Test that permission is enforced on endpoint."""
    # User has "site.view" but not "site.delete"

    response = await async_client.delete(
        "/api/v1/sites/1",
        cookies=authenticated_user_cookies
    )

    assert response.status_code == 403
    assert "Missing permission" in response.json()["detail"]

@pytest.mark.asyncio
async def test_wildcard_permission_grants_access(
    async_client: AsyncClient,
    owner_cookies  # Owner has wildcard
):
    """Test that wildcard permission grants access."""
    response = await async_client.delete(
        "/api/v1/sites/1",
        cookies=owner_cookies
    )

    # Should not get 403 (may get other status)
    assert response.status_code != 403

E2E Testing

Complete Registration Flow

# tests/e2e/test_registration_e2e.py
@pytest.mark.e2e
async def test_door_a_registration_flow(async_client: AsyncClient):
    """Test complete Door A registration flow."""
    email = f"test-{uuid4()}@example.com"

    # Step 1: Register
    response = await async_client.post("/api/v1/auth/register-owner", json={
        "email": email,
        "password": "SecurePass123!",  # pragma: allowlist secret
        "name": "Test User",
        "team_name": "Test Team",
        "team_slug": "test-team"
    })

    assert response.status_code == 201
    data = response.json()
    user_id = data["user_id"]
    team_id = data["team_id"]

    # Step 2: Get verification token from DB/email
    # (In test, extract from DB)
    verification_token = await get_email_verification_token(email)

    # Step 3: Verify email
    response = await async_client.post("/api/v1/auth/email/verification/confirm", json={
        "token": verification_token
    })

    assert response.status_code == 200

    # Step 4: Login
    response = await async_client.post("/api/v1/auth/login", json={
        "email": email,
        "password": "SecurePass123!"  # pragma: allowlist secret
    })

    assert response.status_code == 200
    pre_auth_token = response.json()["pre_auth_token"]

    # Step 5: Exchange session
    response = await async_client.post("/api/v1/auth/session-exchange", json={
        "pre_auth_token": pre_auth_token,
        "team_id": team_id
    })

    assert response.status_code == 200
    assert response.json()["role_name"] == "Owner"

Suspicious Login Flow

@pytest.mark.e2e
async def test_suspicious_login_approval_flow(async_client: AsyncClient):
    """Test suspicious login detection and approval."""
    # Create user with existing login from known location

    # Login from NEW location (different IP/geo)
    response = await async_client.post("/api/v1/auth/login", json={
        "email": "user@example.com",
        "password": "password123"  # pragma: allowlist secret
    })

    pre_auth_token = response.json()["pre_auth_token"]

    response = await async_client.post("/api/v1/auth/session-exchange", json={
        "pre_auth_token": pre_auth_token,
        "team_id": 1,
        "session_info": {
            "device_fingerprint": "new-device",
            "ip": "1.2.3.4",
            "user_agent": "Mozilla/5.0...",
            "geo": {"city": "New York", "country": "US"}
        }
    })

    # Should return pending state
    assert response.status_code == 423
    assert response.json()["detail"]["status"] == "pending"

    alert_token = response.json()["detail"]["token"]

    # Get alert token from Redis
    # Approve the login
    response = await async_client.post("/api/v1/auth/login/approve", json={
        "token": alert_token
    })

    assert response.status_code == 200

    # Now session-exchange should work
    response = await async_client.post("/api/v1/auth/session-exchange", json={
        "pre_auth_token": pre_auth_token,
        "team_id": 1
    })

    assert response.status_code == 200

Mutation Testing

Running Mutation Tests

# Run mutation tests on security-critical files
mutmut run --paths-to-mutate app/core/jwt.py,app/core/security.py

# Expected: All mutations should be killed (tests fail)
# If tests pass with mutation, the test needs improvement

Example Mutation Test

# Tests should catch mutations like:
def test_password_hash_mutation():
    """Mutation: Changing != to == in verify_password."""
    # Original: return bcrypt.checkpw(plain, hashed)
    # Mutated: return not bcrypt.checkpw(plain, hashed)

    # Test should FAIL if mutation survives
    assert verify_password("pass", get_password_hash("pass"))
    # If mutation changes logic, test fails

Testing with Fixtures

Auth Fixtures

# tests/conftest.py
import pytest
from httpx import AsyncClient

@pytest.fixture
async def authenticated_user(async_client: AsyncClient):
    """Create and login a test user."""
    # Create user in DB
    user = await create_test_user()

    # Login
    response = await async_client.post("/api/v1/auth/login", json={
        "email": user.email,
        "password": "testpass123"  # pragma: allowlist secret
    })
    pre_auth_token = response.json()["pre_auth_token"]

    # Exchange session
    response = await async_client.post("/api/v1/auth/session-exchange", json={
        "pre_auth_token": pre_auth_token,
        "team_id": user.team_id
    })

    # Return cookies for authenticated requests
    return response.cookies

@pytest.fixture
async def owner_user(async_client: AsyncClient):
    """Create and login an Owner user."""
    return await create_user_with_role("Owner")

@pytest.fixture
async def developer_user(async_client: AsyncClient):
    """Create and login a Developer user."""
    return await create_user_with_role("Developer")

Mock Fixtures

@pytest.fixture
def mock_geo_ip_lookup(monkeypatch):
    """Mock Geo-IP lookup to avoid external API calls."""
    async def mock_lookup(ip: str):
        return {"city": "San Francisco", "country": "US"}

    monkeypatch.setattr(
        "app.infrastructure.external.geo.ip_api.lookup_geo",
        mock_lookup
    )

@pytest.fixture
def mock_emailer(monkeypatch):
    """Mock emailer to avoid sending real emails."""
    class MockEmailer:
        def __init__(self):
            self.sent_emails = []

        async def send(self, to: str, subject: str, body: str):
            self.sent_emails.append({"to": to, "subject": subject})

    return MockEmailer()

Running Tests

Unit Tests Only (No Infrastructure)

# Fast - no Docker required
pytest tests/unit/ -v
pytest tests/unit/modules/auth/ -v
pytest tests/unit/core/ -v

Integration Tests (Requires Infrastructure)

# Start services first
make up

# Run integration tests
pytest tests/integration/ -v
pytest tests/integration/api/ -v

With Coverage

# Generate coverage report
pytest tests/unit/modules/auth/ --cov=app/modules/auth --cov-report=html

# View report
open htmlcov/index.html

Coverage Thresholds

# Enforce minimum coverage
pytest tests/unit/ --cov=app/core --cov-fail-under=95
pytest tests/unit/modules/auth/ --cov=app/modules/auth --cov-fail-under=90

Sad Path Testing

Test Failure Scenarios

Scenario Test
Invalid credentials 401 response
Expired token TokenValidationError
Wrong CSRF token 403 response
Rate limit exceeded 429 response
Suspicious login Pending state
Concurrent login 423 Locked
Missing permission 403 Forbidden

Example Sad Path Test

@pytest.mark.asyncio
async def test_login_with_expired_preauth_token(async_client: AsyncClient):
    """Test that expired pre-auth token is rejected."""
    # Create token that's already expired in Redis
    expired_token = "expired-token-123"

    response = await async_client.post("/api/v1/auth/session-exchange", json={
        "pre_auth_token": expired_token,
        "team_id": 1
    })

    assert response.status_code == 401
    assert "expired" in response.json()["detail"].lower()