Security Architecture¶
Deep dive into the security implementation of the authentication system.
Cryptography¶
Password Hashing¶
Algorithm: bcrypt with 12 rounds
# backend/app/core/security.py
import bcrypt
def get_password_hash(password: str) -> str:
salt = bcrypt.gensalt(rounds=settings.bcrypt_rounds) # 12
return bcrypt.hashpw(password.encode(), salt).decode()
def verify_password(plain: str, hashed: str) -> bool:
return bcrypt.checkpw(plain.encode(), hashed.encode())
Properties:
- Salt: Built-in to bcrypt, unique per password
- Cost Factor: 12 rounds (configurable via BCRYPT_ROUNDS)
- Timing: Constant-time comparison prevents timing attacks
- Memory: Hardened against GPU/ASIC attacks
JWT Tokens¶
Algorithm: HS256 (HMAC-SHA256)
# Token payload
{
"sub": "123", # user_id
"type": "access" | "refresh",
"user_name": "John Doe",
"team_id": 456,
"team_name": "Acme Corp",
"role_id": 1,
"role_name": "Owner",
"permissions": ["*"],
"jti": "uuid4-hex", # unique token ID
"iat": 1234567890, # issued at
"exp": 1234585890 # expiration
}
Security Properties: - Secret: 32+ character secret key - TTL: Access (6h), Refresh (7d/1d) - Revocation: Blacklist via Redis - Server-side Validation: JTI checked against blacklist
Encryption at Rest¶
Algorithm: Fernet (symmetric encryption)
Use Cases: - Virtuozzo passwords - Virtuozzo session keys - TOTP secrets (future) - Backup codes (future)
# backend/app/core/crypto.py
from cryptography.fernet import Fernet
def encrypt_str(plaintext: str) -> str:
f = Fernet(settings.encryption_key.encode())
return f.encrypt(plaintext.encode()).decode()
def decrypt_str(ciphertext: str) -> str:
f = Fernet(settings.encryption_key.encode())
return f.decrypt(ciphertext.encode()).decode()
Key Requirements: - 32-byte base64 urlsafe key - Rotate quarterly (recommended) - Never log plaintext keys
Token Security¶
Access Token¶
| Property | Value |
|---|---|
| Format | JWT (HS256) |
| TTL | 360 minutes (6 hours) |
| Storage | HttpOnly cookie |
| Scope | Single team |
| Revocation | Redis blacklist |
Cookie Attributes:
Refresh Token¶
| Property | Value |
|---|---|
| Format | JWT (HS256) + Redis session |
| TTL (Remember) | 10080 minutes (7 days) |
| TTL (Normal) | 1440 minutes (1 day) |
| Storage | HttpOnly cookie + Redis |
| Revocation | Blacklist + session deletion |
Redis Session State:
{
"user_id": 123,
"team_id": 456,
"role_name": "Owner",
"permissions": ["*"],
"session_info": {
"device_fingerprint": "sha256-hash",
"ip": "1.2.3.4",
"user_agent": "Mozilla/5.0...",
"geo": {"city": "SF", "country": "US"}
},
"remember_me": true,
"last_seen_at": "2025-01-22T10:30:00Z"
}
Token Lifecycle¶
┌─────────────┐
│ 1. Issue │ create_access_token(), create_refresh_token()
└──────┬──────┘
│
▼
┌─────────────┐
│ 2. Validate │ decode_token(), check signature/expiry
└──────┬──────┘
│
▼
┌─────────────┐
│ 3. Use │ Attach to request as AuthenticatedUser
└──────┬──────┘
│
▼
┌─────────────┐
│ 4. Rotate │ refresh_tokens() - blacklist old, issue new
└──────┬──────┘
│
▼
┌─────────────┐
│ 5. Revoke │ logout(), blacklist JTI, delete session
└─────────────┘
CSRF Protection¶
Double-Submit Cookie Pattern¶
┌──────────────┐ ┌──────────────┐
│ CLIENT │ │ BACKEND │
└──────┬───────┘ └──────┬───────┘
│ │
│ 1. GET /auth/session-exchange │
├─────────────────────────────────>│
│ │
│ 2. Set-Cookie: mbpanel_csrf │
│ = random-token │
│<─────────────────────────────────┤
│ │
│ 3. POST /auth/refresh │
│ Cookie: mbpanel_csrf=token │
│ Header: X-CSRF-Token=token │
├─────────────────────────────────>│
│ │
│ 4. Verify cookie == header │
│ 5. Set-Cookie: new CSRF token │
│<─────────────────────────────────┤
│ │
Implementation:
# backend/app/core/csrf.py
def verify_csrf(request: Request) -> None:
cookie_token = request.cookies.get(settings.csrf_cookie_name)
header_token = request.headers.get(settings.csrf_header_name)
if cookie_token != header_token:
raise HTTPException(status_code=403, detail="CSRF token mismatch")
Protected Endpoints:
- POST /auth/refresh
- POST /auth/logout
- POST /auth/totp/*
- POST /auth/device/approve
- POST /auth/trusted-devices/revoke
Rate Limiting¶
Login Rate Limiting¶
Implementation: Redis INCR counter
# backend/app/core/rate_limit.py
async def check(key: str, limit: int, window_seconds: int) -> None:
count = await redis.incr(key)
if count == 1:
await redis.expire(key, window_seconds)
if count > limit:
raise HTTPException(status_code=429, detail="Too many attempts")
Configuration:
| Setting | Default |
|---------|---------|
| Max Attempts | 5 per window |
| Window | 300 seconds (5 minutes) |
| Key Format | rl:login:{client_host} |
Behavior: - Track attempts per IP address - Return 429 when limit exceeded - Log violation for abuse detection - Reset after window expires
Session Security¶
Idle Timeout¶
Purpose: Auto-logout inactive users
Implementation:
# On every refresh
if now - session.last_seen_at > settings.refresh_idle_timeout_minutes:
raise SessionInactive()
session.last_seen_at = now
Configuration: - Default: 60 minutes - Per-user: Can be configured via team settings
Absolute Timeout¶
Purpose: Force periodic re-authentication
Implementation: - Built into JWT TTL (7 days / 1 day) - User must re-login after expiration
Single-Session Enforcement¶
Purpose: Prevent multiple concurrent sessions
Implementation:
# On session-exchange
existing = await get_active_session(user_id)
if existing:
# Require approval from existing session
raise ConcurrentLoginPending()
# Replace with new session
await replace_active_session(user_id, new_session)
Device Security¶
Device Fingerprinting¶
Components: - SHA-256 hash of user agent - IP address - Optional: Device-specific identifiers
Usage:
Device Approval Flow¶
OTP Generation:
Storage:
# Redis: hashed OTP
key = f"device-otp:{user_id}:{fingerprint}"
await redis.setex(
key,
settings.device_approval_otp_ttl_seconds, # 300
hmac_sha256_hash(otp, settings.encryption_key)
)
Verification:
# Hash provided OTP and compare
stored_hash = await redis.get(key)
provided_hash = hmac_sha256_hash(provided_otp, key)
if not compare_digest(stored_hash, provided_hash):
raise OTPInvalid()
Suspicious Login Detection¶
Geo-IP Lookup¶
Provider: ip-api.com (free tier)
Implementation:
# backend/app/infrastructure/external/geo/ip_api.py
async def lookup_geo(ip: str) -> dict:
response = await httpx.get(f"http://ip-api.com/json/{ip}")
return {
"city": response.json()["city"],
"country": response.json()["country"],
"lat": response.json()["lat"],
"lon": response.json()["lon"]
}
Detection Logic¶
# Compare with recent login activity
recent_logins = await get_recent_logins(user_id, hours=24)
for login in recent_logins:
if login.geo != current_geo:
# Different location - suspicious
return True
if login.device_fingerprint != current_fingerprint:
# Different device - suspicious
return True
# Same location and device - safe
return False
State Machine¶
┌─────────┐ suspicious login ┌─────────┐
│ PENDING │ ──────────────────>│ DENY │
└─────────┘ └─────────┘
│ │
│ approve │
▼ │
┌─────────┐ │
│ ALLOW │ <───────────────────────┘
└─────────┘
Input Validation¶
Pydantic Schemas¶
Email Validation:
Password Validation:
OTP Validation:
SQL Injection Prevention¶
Repository Pattern:
# Always use parameterized queries
stmt = select(User).where(User.email == email)
# NOT: f"SELECT * FROM users WHERE email = '{email}'"
Output Encoding¶
PII Filtering¶
Never Log: - Passwords (plaintext or hashed) - API keys or secrets - JWT tokens (log first 8 chars of JTI only) - Credit card numbers - Full IP addresses (hash in production logs) - Email addresses (partial mask)
Redaction Example:
def redact_email(email: str) -> str:
if '@' not in email:
return '***@***.***'
local, domain = email.split('@', 1)
return f"{local[0]}***@{domain}"
Related Documentation¶
- Architecture Overview - System design
- RBAC Design - Permission system
- Security Posture - Manager-facing security
- Environment Variables - Configuration reference