Skip to content

Security Architecture

Deep dive into the security implementation of the authentication system.


Cryptography

Password Hashing

Algorithm: bcrypt with 12 rounds

# backend/app/core/security.py
import bcrypt

def get_password_hash(password: str) -> str:
    salt = bcrypt.gensalt(rounds=settings.bcrypt_rounds)  # 12
    return bcrypt.hashpw(password.encode(), salt).decode()

def verify_password(plain: str, hashed: str) -> bool:
    return bcrypt.checkpw(plain.encode(), hashed.encode())

Properties: - Salt: Built-in to bcrypt, unique per password - Cost Factor: 12 rounds (configurable via BCRYPT_ROUNDS) - Timing: Constant-time comparison prevents timing attacks - Memory: Hardened against GPU/ASIC attacks

JWT Tokens

Algorithm: HS256 (HMAC-SHA256)

# Token payload
{
  "sub": "123",              # user_id
  "type": "access" | "refresh",
  "user_name": "John Doe",
  "team_id": 456,
  "team_name": "Acme Corp",
  "role_id": 1,
  "role_name": "Owner",
  "permissions": ["*"],
  "jti": "uuid4-hex",       # unique token ID
  "iat": 1234567890,         # issued at
  "exp": 1234585890          # expiration
}

Security Properties: - Secret: 32+ character secret key - TTL: Access (6h), Refresh (7d/1d) - Revocation: Blacklist via Redis - Server-side Validation: JTI checked against blacklist

Encryption at Rest

Algorithm: Fernet (symmetric encryption)

Use Cases: - Virtuozzo passwords - Virtuozzo session keys - TOTP secrets (future) - Backup codes (future)

# backend/app/core/crypto.py
from cryptography.fernet import Fernet

def encrypt_str(plaintext: str) -> str:
    f = Fernet(settings.encryption_key.encode())
    return f.encrypt(plaintext.encode()).decode()

def decrypt_str(ciphertext: str) -> str:
    f = Fernet(settings.encryption_key.encode())
    return f.decrypt(ciphertext.encode()).decode()

Key Requirements: - 32-byte base64 urlsafe key - Rotate quarterly (recommended) - Never log plaintext keys


Token Security

Access Token

Property Value
Format JWT (HS256)
TTL 360 minutes (6 hours)
Storage HttpOnly cookie
Scope Single team
Revocation Redis blacklist

Cookie Attributes:

Set-Cookie: mbpanel_access=<token>;
  HttpOnly;
  Secure;
  SameSite=Lax;
  Path=/;
  Max-Age=21600

Refresh Token

Property Value
Format JWT (HS256) + Redis session
TTL (Remember) 10080 minutes (7 days)
TTL (Normal) 1440 minutes (1 day)
Storage HttpOnly cookie + Redis
Revocation Blacklist + session deletion

Redis Session State:

{
  "user_id": 123,
  "team_id": 456,
  "role_name": "Owner",
  "permissions": ["*"],
  "session_info": {
    "device_fingerprint": "sha256-hash",
    "ip": "1.2.3.4",
    "user_agent": "Mozilla/5.0...",
    "geo": {"city": "SF", "country": "US"}
  },
  "remember_me": true,
  "last_seen_at": "2025-01-22T10:30:00Z"
}

Token Lifecycle

┌─────────────┐
│ 1. Issue    │ create_access_token(), create_refresh_token()
└──────┬──────┘
┌─────────────┐
│ 2. Validate │ decode_token(), check signature/expiry
└──────┬──────┘
┌─────────────┐
│ 3. Use      │ Attach to request as AuthenticatedUser
└──────┬──────┘
┌─────────────┐
│ 4. Rotate    │ refresh_tokens() - blacklist old, issue new
└──────┬──────┘
┌─────────────┐
│ 5. Revoke   │ logout(), blacklist JTI, delete session
└─────────────┘

CSRF Protection

┌──────────────┐                  ┌──────────────┐
│   CLIENT     │                  │   BACKEND    │
└──────┬───────┘                  └──────┬───────┘
       │                                 │
       │  1. GET /auth/session-exchange   │
       ├─────────────────────────────────>│
       │                                 │
       │  2. Set-Cookie: mbpanel_csrf    │
       │     = random-token               │
       │<─────────────────────────────────┤
       │                                 │
       │  3. POST /auth/refresh          │
       │     Cookie: mbpanel_csrf=token   │
       │     Header: X-CSRF-Token=token   │
       ├─────────────────────────────────>│
       │                                 │
       │  4. Verify cookie == header     │
       │  5. Set-Cookie: new CSRF token  │
       │<─────────────────────────────────┤
       │                                 │

Implementation:

# backend/app/core/csrf.py
def verify_csrf(request: Request) -> None:
    cookie_token = request.cookies.get(settings.csrf_cookie_name)
    header_token = request.headers.get(settings.csrf_header_name)
    if cookie_token != header_token:
        raise HTTPException(status_code=403, detail="CSRF token mismatch")

Protected Endpoints: - POST /auth/refresh - POST /auth/logout - POST /auth/totp/* - POST /auth/device/approve - POST /auth/trusted-devices/revoke


Rate Limiting

Login Rate Limiting

Implementation: Redis INCR counter

# backend/app/core/rate_limit.py
async def check(key: str, limit: int, window_seconds: int) -> None:
    count = await redis.incr(key)
    if count == 1:
        await redis.expire(key, window_seconds)
    if count > limit:
        raise HTTPException(status_code=429, detail="Too many attempts")

Configuration: | Setting | Default | |---------|---------| | Max Attempts | 5 per window | | Window | 300 seconds (5 minutes) | | Key Format | rl:login:{client_host} |

Behavior: - Track attempts per IP address - Return 429 when limit exceeded - Log violation for abuse detection - Reset after window expires


Session Security

Idle Timeout

Purpose: Auto-logout inactive users

Implementation:

# On every refresh
if now - session.last_seen_at > settings.refresh_idle_timeout_minutes:
    raise SessionInactive()
session.last_seen_at = now

Configuration: - Default: 60 minutes - Per-user: Can be configured via team settings

Absolute Timeout

Purpose: Force periodic re-authentication

Implementation: - Built into JWT TTL (7 days / 1 day) - User must re-login after expiration

Single-Session Enforcement

Purpose: Prevent multiple concurrent sessions

Implementation:

# On session-exchange
existing = await get_active_session(user_id)
if existing:
    # Require approval from existing session
    raise ConcurrentLoginPending()
# Replace with new session
await replace_active_session(user_id, new_session)


Device Security

Device Fingerprinting

Components: - SHA-256 hash of user agent - IP address - Optional: Device-specific identifiers

Usage:

fingerprint = hashlib.sha256(
    f"{user_agent}:{ip_address}".encode()
).hexdigest()

Device Approval Flow

OTP Generation:

import pyotp
otp = pyotp.random_base36(length=6)  # 6-digit code

Storage:

# Redis: hashed OTP
key = f"device-otp:{user_id}:{fingerprint}"
await redis.setex(
    key,
    settings.device_approval_otp_ttl_seconds,  # 300
    hmac_sha256_hash(otp, settings.encryption_key)
)

Verification:

# Hash provided OTP and compare
stored_hash = await redis.get(key)
provided_hash = hmac_sha256_hash(provided_otp, key)
if not compare_digest(stored_hash, provided_hash):
    raise OTPInvalid()


Suspicious Login Detection

Geo-IP Lookup

Provider: ip-api.com (free tier)

Implementation:

# backend/app/infrastructure/external/geo/ip_api.py
async def lookup_geo(ip: str) -> dict:
    response = await httpx.get(f"http://ip-api.com/json/{ip}")
    return {
        "city": response.json()["city"],
        "country": response.json()["country"],
        "lat": response.json()["lat"],
        "lon": response.json()["lon"]
    }

Detection Logic

# Compare with recent login activity
recent_logins = await get_recent_logins(user_id, hours=24)

for login in recent_logins:
    if login.geo != current_geo:
        # Different location - suspicious
        return True
    if login.device_fingerprint != current_fingerprint:
        # Different device - suspicious
        return True

# Same location and device - safe
return False

State Machine

┌─────────┐  suspicious login  ┌─────────┐
│  PENDING │ ──────────────────>│  DENY   │
└─────────┘                    └─────────┘
    │                               │
    │ approve                       │
    ▼                               │
┌─────────┐                        │
│  ALLOW  │ <───────────────────────┘
└─────────┘

Input Validation

Pydantic Schemas

Email Validation:

class EmailRequest(BaseModel):
    email: EmailStr  # Validates email format

Password Validation:

class PasswordRequest(BaseModel):
    new_password: str = Field(min_length=8)

OTP Validation:

class OTPApproveRequest(BaseModel):
    otp_code: str = Field(pattern=r'^\d{6}$')  # Exactly 6 digits

SQL Injection Prevention

Repository Pattern:

# Always use parameterized queries
stmt = select(User).where(User.email == email)
# NOT: f"SELECT * FROM users WHERE email = '{email}'"


Output Encoding

PII Filtering

Never Log: - Passwords (plaintext or hashed) - API keys or secrets - JWT tokens (log first 8 chars of JTI only) - Credit card numbers - Full IP addresses (hash in production logs) - Email addresses (partial mask)

Redaction Example:

def redact_email(email: str) -> str:
    if '@' not in email:
        return '***@***.***'
    local, domain = email.split('@', 1)
    return f"{local[0]}***@{domain}"