Skip to content

Security Implementation Guide

Security guidelines for developers implementing authentication features.


Core Security Principles

1. Never Trust User Input

❌ Bad:

# SQL injection vulnerability
query = f"SELECT * FROM users WHERE email = '{email}'"

✅ Good:

# Use parameterized queries via SQLAlchemy
stmt = select(User).where(User.email == email)

2. Validate Everything

❌ Bad:

def update_password(user_id: int, new_password: str):
    user.password = new_password  # No validation!

✅ Good:

class PasswordUpdate(BaseModel):
    new_password: str = Field(min_length=8)

def update_password(user_id: int, data: PasswordUpdate):
    user.hashed_password = get_password_hash(data.new_password)

3. Use HttpOnly Cookies for Auth Tokens

❌ Bad:

# Store token in localStorage (XSS vulnerable)
localStorage.setItem('token', token)

✅ Good:

# Let backend set HttpOnly cookie
response.set_cookie(
    key="mbpanel_access",
    value=token,
    httponly=True,
    secure=True
)

4. Never Log Secrets

❌ Bad:

logger.info(f"User logged in: {user.email}, token: {token}")

✅ Good:

logger.info("User logged in", user_id=user.id, jti=token.jti[:8])


Input Validation

Pydantic Schemas

Always use Pydantic for request validation:

from pydantic import BaseModel, EmailStr, Field

class LoginRequest(BaseModel):
    email: EmailStr  # Validates email format
    password: str = Field(min_length=8)  # Min length
    remember_me: bool = False  # Type-safe boolean

class OTPApproveRequest(BaseModel):
    otp_code: str = Field(
        pattern=r'^\d{6}$',
        description="6-digit OTP code"
    )

SQL Injection Prevention

Always use the repository pattern:

# ❌ BAD - String formatting
async def get_user_by_email(email: str):
    query = f"SELECT * FROM users WHERE email = '{email}'"
    return await db.execute(query)

# ✅ GOOD - Parameterized
async def get_user_by_email(email: str):
    stmt = select(User).where(User.email == email)
    return await db.scalar(stmt)

Output Encoding

PII Redaction

Never log personally identifiable information:

from app.core.logging import get_logger

logger = get_logger(__name__)

# ❌ BAD - Logs email
logger.info(f"Password reset for {email}")

# ✅ GOOD - Redacts email
logger.info(
    "password_reset_requested",
    email=redact_email(email)  # "j***@example.com"
)

# ❌ BAD - Logs full token
logger.debug(f"Token: {token}")

# ✅ GOOD - Logs only first 8 chars of JTI
logger.debug("token_issued", jti=token.jti[:8])

Redaction Helper

import re

def redact_email(email: str) -> str:
    """Redact email for logging."""
    if '@' not in email:
        return '***@***.***'
    local, domain = email.split('@', 1)
    if len(local) > 1:
        local = local[0] + '***'
    return f"{local}@{domain}"

def redact_ip(ip: str) -> str:
    """Redact IP address for logging."""
    parts = ip.split('.')
    if len(parts) == 4:
        return f"{parts[0]}.{parts[1]}.***.***"
    return "***.***.***.***"

Secrets Management

Never Hardcode Secrets

❌ Bad:

API_KEY = "sk-1234567890abcdef"  # pragma: allowlist secret (Never commit!)

✅ Good:

from app.core.config import settings

API_KEY = settings.virtuozzo_api_key  # From environment

Environment Variable Requirements

Variable Required? Description
JWT_SECRET_KEY ✅ Yes Min 32 characters
ENCRYPTION_KEY ✅ Yes Base64 urlsafe 32-byte
BCRYPT_ROUNDS ✅ Yes Min 4, recommended 12
REDIS_URL ✅ Yes Redis connection
POSTMARK_SERVER_TOKEN ✅ Yes Email service

Secrets Rotation

Plan for secret rotation:

# Support multiple active secrets during rotation
JWT_SECRETS = [
    settings.jwt_secret_key,  # Current
    settings.jwt_secret_key_v2  # Next (during rotation)
]

def decode_token(token: str):
    # Try each secret
    for secret in JWT_SECRETS:
        try:
            return jwt.decode(token, secret)
        except JWTError:
            continue
    raise TokenValidationError("Invalid token")

XSS Prevention

Content Security Policy

# backend/app/core/app_factory.py
app.add_middleware(
    CORSMiddleware,
    allow_origins=settings.cors_origins_list,
    allow_credentials=True,
    allow_methods=["*"],
    allow_headers=["*"],
)

HTML Escaping

import html

# ❌ BAD - Unescaped output
def render_username(username: str) -> str:
    return f"<div>Hello {username}</div>"

# ✅ GOOD - Escaped output
def render_username(username: str) -> str:
    safe = html.escape(username)
    return f"<div>Hello {safe}</div>"

CSRF Protection

Apply to State-Changing Endpoints

All endpoints that modify state must use CSRF:

from app.core.dependencies import csrf_protected

@router.post("/auth/refresh")
async def refresh_tokens(
    _: None = Depends(csrf_protected),  # Required!
    # ...
):
    pass

Frontend Implementation

// Next.js/React
const csrfToken = getCookie('mbpanel_csrf');

fetch('/api/v1/auth/refresh', {
  method: 'POST',
  credentials: 'include',
  headers: {
    'X-CSRF-Token': csrfToken,
    'Content-Type': 'application/json'
  }
});

Rate Limiting

Apply to Sensitive Endpoints

from app.core.rate_limit import enforce_login_rate_limit

@router.post("/auth/login")
async def login(
    _: None = Depends(enforce_login_rate_limit),  # Required!
    # ...
):
    pass

Custom Rate Limits

async def custom_rate_limit(request: Request) -> None:
    client_host = request.client.host
    key = f"rl:custom:{client_host}"
    limiter = RateLimiter()
    await limiter.check(
        key=key,
        limit=10,  # 10 requests
        window_seconds=60  # per minute
    )

Session Security

Idle Timeout

Enforce idle timeout on refresh:

# In refresh_tokens()
if now - session.last_seen_at > settings.refresh_idle_timeout_minutes:
    # Update session
    session.last_seen_at = now
else:
    # Session expired
    await revoke_session(session)
    raise SessionInactive()

Absolute Timeout

Built into JWT TTL:

# Token automatically expires
access_token = create_access_token(
    # ...
    expires_delta=timedelta(minutes=settings.access_token_exp_minutes)
)

Error Handling

Never Expose Internal Details

❌ Bad:

except Exception as exc:
    return {"error": str(exc)}  # Leaks implementation!

✅ Good:

except Exception as exc:
    logger.error("internal_error", exc_info=True)
    return {"error": "An error occurred"}

Use Proper HTTP Status Codes

Status Use Case
200 Success
201 Created
400 Bad Request (validation)
401 Unauthorized (no auth)
403 Forbidden (no permission)
404 Not Found
422 Validation Error
429 Rate Limited
500 Internal Server Error

Secure Dependencies

Use Constraints File

# Install with constraints
pip install --no-deps -c constraints.txt -e .

Regular Audits

# Check for vulnerabilities
pip-audit

# Or
safety check

Code Review Checklist

Before merging auth code:

  • No hardcoded secrets
  • All inputs validated with Pydantic
  • SQL uses parameterized queries
  • No sensitive data in logs
  • CSRF protection on state-changing endpoints
  • Rate limiting on auth endpoints
  • Proper error handling (no exposure)
  • Tests cover sad paths
  • Permission checks where needed