Architecture Design Document (ADD)¶
Virtuozzo Environment Synchronization - Stale-While-Revalidate Implementation¶
Document Information¶
| Field | Details |
|---|---|
| Project Name | MBPanel Virtuozzo Environment Sync |
| Author | Technical Architecture Team |
| Date Created | 2026-01-09 |
| Last Updated | 2026-01-22 |
| Version | 2.0 |
| Status | Draft - Architecture Refresh |
| Related PRD | PRD-virtuozzo-environment-sync.md |
| Related ARD | ARD-virtuozzo-environment-sync.md |
1. Introduction¶
1.1 Purpose¶
This document defines the technical architecture for implementing Stale-While-Revalidate pattern - a distributed caching strategy that serves stale data immediately while asynchronously refreshing from the source. This is a classic distributed systems optimization problem involving consistency vs. availability trade-offs, cache invalidation strategies, and scalable queue-based processing.
1.2 Architectural Shift (v2.0)¶
Previous Design (v1.0): - Lazy sync on read + distributed locking - Background Celery cron iterating through all teams - Redis hot cache + PostgreSQL warm cache
New Design (v2.0 - Stale-While-Revalidate): - Always return immediately (fresh or stale), never block - Queue-based refresh (event-driven, not cron-based) - Active-user priority (only logged-in users trigger refresh) - Rate-limited worker pool (protects Virtuozzo from spikes) - PostgreSQL-only cache (simpler, survives restarts)
1.3 Scope¶
In Scope: - Stale-While-Revalidate pattern implementation - Queue-based background refresh system - Rate-limited Virtuozzo API calls - Graceful degradation (serve stale on errors) - Data integrity validation (hash-based change detection) - Observability (metrics, logging, tracing)
Out of Scope:
- Virtuozzo authentication (existing app/modules/sessions/)
- Environment provisioning (existing app/domains/environments/tasks.py)
- Real-time WebSocket sync (SSE for notifications only)
2. Architecture Overview¶
2.1 Core Pattern: Stale-While-Revalidate¶
┌─────────────────────────────────────────────────────────────────────────┐
│ STALE-WHILE-REVALIDATE PATTERN │
├─────────────────────────────────────────────────────────────────────────┤
│ │
│ REQUEST → Check DB → Return Data (ALWAYS) │
│ │ │
│ ├─ Fresh (< TTL) → Done (instant) │
│ └─ Stale (> TTL) → Return stale + enqueue refresh │
│ │
│ BACKGROUND WORKER (Independent Process) │
│ → Process queue at fixed rate (rate-limited) │
│ → Call Virtuozzo API │
│ → Validate payload │
│ → Update DB if changed │
│ │
└─────────────────────────────────────────────────────────────────────────┘
2.2 Context Diagram¶
graph TB
subgraph "External Systems"
VZ[Virtuozzo API<br/>environments source of truth]
end
subgraph "MBPanel Platform"
subgraph "Frontend"
UI[Next.js 16<br/>Stale Badge UI]
end
subgraph "API Layer"
API[FastAPI Backend<br/>environments router]
end
subgraph "Queue System"
Q[Refresh Queue<br/>RabbitMQ/Redis]
W1[Worker 1]
W2[Worker 2]
WN[Worker N]
end
subgraph "Business Logic"
SYNC[EnvironmentSyncService<br/>stale-while-revalidate]
end
subgraph "Infrastructure"
DB[(PostgreSQL<br/>primary cache)]
end
end
UI --> API
API --> SYNC
SYNC --> DB
SYNC -->|enqueue| Q
Q --> W1
Q --> W2
Q --> WN
W1 --> VZ
W2 --> VZ
WN --> VZ
style VZ fill:#f9f,stroke:#333,stroke-width:4px
style Q fill:#ff9,stroke:#333,stroke-width:3px
style SYNC fill:#bbf,stroke:#333,stroke-width:2px
2.3 Component Overview¶
| Component | Responsibility | Technology |
|---|---|---|
| EnvironmentSyncService | Orchestrate stale-while-revalidate logic | Python 3.12+, async/await |
| Refresh Queue | Hold pending sync tasks | RabbitMQ / Redis |
| Worker Pool | Process queue, call Virtuozzo | Celery / Custom asyncio workers |
| PostgreSQL | Primary cache, survives restarts | SQLAlchemy 2.0 async |
| VirtuozzoClient | External API calls (EXISTS) | httpx async |
3. Detailed Component Design¶
3.1 EnvironmentSyncService¶
Location: app/domains/environments/sync_service.py (new file)
Responsibilities:
1. Check database for cached environments
2. Determine freshness based on last_synced_at timestamp
3. Return data immediately (fresh or stale)
4. Enqueue background sync task if stale
5. Never block the user request
Interface:
class EnvironmentSyncService:
async def get_environments(
self,
*,
team_id: int,
db: AsyncSession,
) -> list[Environment]:
"""
Return environments from cache.
Always returns immediately (fresh or stale).
Enqueues background refresh if stale.
"""
async def enqueue_background_sync(
self,
*,
team_id: int,
triggered_by: str, # "stale_data", "manual", "first_run"
) -> str:
"""
Enqueue sync task to background queue.
Returns task_id for tracking.
"""
Implementation:
# app/domains/environments/sync_service.py
from __future__ import annotations
import hashlib
import json
from datetime import datetime, timezone, timedelta
from typing import Any
from sqlalchemy.ext.asyncio import AsyncSession
from app.core.config import settings
from app.core.logging import get_logger
from app.domains.environments.repository import list_team_environments
from app.infrastructure.queue.client import enqueue_task
logger = get_logger(__name__)
# Configuration
_TTL_SECONDS = settings.vz_env_ttl_seconds or 600 # 10 minutes default
_MAX_STALE_SECONDS = settings.vz_env_max_stale_seconds or 3600 # 1 hour
class EnvironmentSyncService:
"""Stale-While-Revalidate pattern for environment synchronization."""
def __init__(self, db: AsyncSession):
self.db = db
async def get_environments(
self,
*,
team_id: int,
) -> tuple[list[dict[str, Any]], SyncMetadata]:
"""
Get environments for a team.
ALWAYS returns immediately with data from the database.
If data is stale (> TTL), enqueues background refresh.
Returns:
(environments, metadata): Tuple of environment list and sync metadata
"""
# 1. Query database (always fast, < 50ms)
rows = await list_team_environments(db=self.db, team_id=team_id)
# 2. Check if we have any data
if not rows:
# First run: no cached data exists
# This is unavoidable - user must wait for initial fetch
logger.info("env_first_run", team_id=team_id)
return [], SyncMetadata(
has_data=False,
is_stale=False,
should_block=True,
reason="first_run"
)
# 3. Check freshness
most_recent_sync = max(
(row.last_synced_at for row in rows if row.last_synced_at),
default=None
)
if most_recent_sync is None:
# Legacy data without timestamp - treat as stale
logger.warning("env_no_timestamp", team_id=team_id)
await self._enqueue_sync(team_id=team_id, triggered_by="missing_timestamp")
return self._serialize(rows), SyncMetadata(
has_data=True,
is_stale=True,
should_block=False,
last_synced_at=None,
age_seconds=None,
reason="missing_timestamp"
)
# 4. Calculate data age
now = datetime.now(timezone.utc)
age = now - most_recent_sync
age_seconds = age.total_seconds()
is_fresh = age_seconds < _TTL_SECONDS
is_max_stale = age_seconds > _MAX_STALE_SECONDS
# 5. Return metadata
metadata = SyncMetadata(
has_data=True,
is_stale=not is_fresh,
is_max_stale=is_max_stale,
should_block=False, # NEVER block after first run
last_synced_at=most_recent_sync,
age_seconds=age_seconds,
ttl_seconds=_TTL_SECONDS,
)
# 6. Enqueue background sync if stale
if not is_fresh:
staleness_bucket = self._get_staleness_bucket(age_seconds)
logger.info(
"env_stale_enqueueing",
team_id=team_id,
age_seconds=age_seconds,
staleness_bucket=staleness_bucket,
)
await self._enqueue_sync(
team_id=team_id,
triggered_by=f"staleness_{staleness_bucket}"
)
metadata = metadata._replace(
sync_enqueued=True,
reason="stale_data"
)
else:
metadata = metadata._replace(reason="fresh_data")
return self._serialize(rows), metadata
async def _enqueue_sync(
self,
*,
team_id: int,
triggered_by: str,
) -> str:
"""Enqueue background sync task."""
from app.infrastructure.queue.tasks import sync_environments_task
task_id = await enqueue_task(
task_name="sync_environments",
payload={
"team_id": team_id,
"triggered_by": triggered_by,
"enqueued_at": datetime.now(timezone.utc).isoformat(),
},
priority="normal",
)
logger.info(
"env_sync_enqueued",
team_id=team_id,
task_id=task_id,
triggered_by=triggered_by,
)
return task_id
def _serialize(self, rows: list[Any]) -> list[dict[str, Any]]:
"""Serialize environment rows to dict."""
return [
{
"env_name": row.env_name,
"display_name": row.display_name,
"shortdomain": row.shortdomain,
"app_id": row.app_id,
"status": row.status,
"last_synced_at": row.last_synced_at.isoformat() if row.last_synced_at else None,
}
for row in rows
]
def _get_staleness_bucket(self, age_seconds: float) -> str:
"""Get staleness bucket for logging/metrics."""
if age_seconds < _TTL_SECONDS:
return "fresh"
elif age_seconds < 3600: # 1 hour
return "stale"
else:
return "very_stale"
@dataclass
class SyncMetadata:
"""Metadata about environment sync status."""
has_data: bool
is_stale: bool
should_block: bool
last_synced_at: datetime | None = None
age_seconds: float | None = None
ttl_seconds: int = _TTL_SECONDS
is_max_stale: bool = False
sync_enqueued: bool = False
reason: str = ""
3.2 Background Worker¶
Location: app/domains/environments/workers.py (new file)
Responsibilities: 1. Process sync tasks from queue 2. Rate-limit Virtuozzo API calls 3. Validate Virtuozzo response 4. Update database atomically 5. Handle errors gracefully (never corrupt DB)
Implementation:
# app/domains/environments/workers.py
from __future__ import annotations
import hashlib
import json
from datetime import datetime, timezone
from sqlalchemy.ext.asyncio import AsyncSession
from sqlalchemy import delete
from app.core.config import settings
from app.core.crypto import decrypt_str
from app.core.logging import get_logger
from app.infrastructure.database.models.environment import Environment
from app.infrastructure.database.models.team import Team
from app.infrastructure.external.virtuozzo import client as virtuozzo_client
from app.infrastructure.queue.client import task_context
logger = get_logger(__name__)
_RATE_LIMIT_PER_SECOND = settings.vz_sync_queue_rate_limit or 50
_TIMEOUT_SECONDS = settings.vz_sync_timeout_seconds or 5
_MAX_RETRY_ATTEMPTS = 3
async def sync_environments_worker(task_payload: dict[str, Any]) -> None:
"""
Background worker to sync environments from Virtuozzo.
Sad Path Engineering:
- NEVER overwrites DB with error data
- Validates payload before writing
- Uses atomic transactions
- Logs all failures for observability
"""
team_id = task_payload["team_id"]
triggered_by = task_payload.get("triggered_by", "unknown")
logger.info(
"worker_sync_started",
team_id=team_id,
triggered_by=triggered_by,
task_id=task_context.get_task_id(),
)
with AsyncSessionLocal() as db:
try:
# 1. Get team and session key
team = await db.get(Team, team_id)
if not team:
logger.error("worker_team_not_found", team_id=team_id)
return
if not team.session_key_encrypted:
logger.error("worker_no_session_key", team_id=team_id)
return
# 2. Decrypt session key
try:
session_key = decrypt_str(team.session_key_encrypted)
except Exception as exc:
logger.error("worker_decrypt_failed", team_id=team_id, error=str(exc))
return
# 3. Call Virtuozzo API with timeout
start_time = datetime.now(timezone.utc)
try:
response = await virtuozzo_client.get_envs(
session=session_key,
lazy=True, # Optimize: skip node details
)
duration_ms = int((datetime.now(timezone.utc) - start_time).total_seconds() * 1000)
except TimeoutError as exc:
logger.error(
"worker_virtuozzo_timeout",
team_id=team_id,
timeout_seconds=_TIMEOUT_SECONDS,
)
# DO NOT update DB - stale data is better than error
return
except Exception as exc:
logger.error(
"worker_virtuozzo_error",
team_id=team_id,
error_type=type(exc).__name__,
error_message=str(exc),
)
# DO NOT update DB
return
# 4. Validate response structure
if not _validate_virtuozzo_response(response):
logger.error(
"worker_invalid_response",
team_id=team_id,
response_keys=list(response.keys()) if isinstance(response, dict) else type(response),
)
# DO NOT update DB - protect from corruption
return
# 5. Compute hash for change detection
new_hash = _compute_env_hash(response.get("infos", []))
old_hash = team.last_env_sync_hash
if new_hash == old_hash:
logger.info(
"worker_unchanged",
team_id=team_id,
hash=new_hash[:8], # Log first 8 chars
)
# Update timestamp anyway (shows we checked)
team.last_sync_checked_at = datetime.now(timezone.utc)
await db.commit()
return
# 6. Atomic database update
await _upsert_environments(db, team_id, response)
# 7. Update team metadata
team.last_env_sync_hash = new_hash
team.last_env_sync_at = datetime.now(timezone.utc)
team.last_sync_duration_ms = duration_ms
await db.commit()
logger.info(
"worker_sync_complete",
team_id=team_id,
env_count=len(response.get("infos", [])),
duration_ms=duration_ms,
hash=new_hash[:8],
)
except Exception as exc:
logger.error(
"worker_unexpected_error",
team_id=team_id,
error_type=type(exc).__name__,
error_message=str(exc),
exc_info=True,
)
# DO NOT commit transaction - let it rollback
def _validate_virtuozzo_response(data: dict[str, Any]) -> bool:
"""Validate Virtuozzo response structure before DB write."""
# Check required top-level fields
if not isinstance(data, dict):
return False
if "result" not in data:
return False
if data["result"] != 0:
logger.warning("virtuozzo_nonzero_result", result=data["result"])
return False
# Check infos is a list
infos = data.get("infos")
if not isinstance(infos, list):
return False
# Check each env has required structure
for item in infos:
if not isinstance(item, dict):
return False
if "env" not in item:
return False
return True
def _compute_env_hash(infos: list[dict]) -> str:
"""Compute SHA-256 hash of environment list for change detection."""
normalized = json.dumps(infos, sort_keys=True)
return hashlib.sha256(normalized.encode()).hexdigest()
async def _upsert_environments(
db: AsyncSession,
team_id: int,
response: dict[str, Any],
) -> None:
"""Atomically upsert environments from Virtuozzo response."""
now = datetime.now(timezone.utc)
infos = response.get("infos", [])
# Track which environments exist in Virtuozzo
vz_shortdomains = set()
for item in infos:
env_data = item.get("env", {})
if not env_data:
continue
shortdomain = env_data.get("shortdomain") or env_data.get("domain")
if not shortdomain:
continue
vz_shortdomains.add(shortdomain)
# Check if exists
existing = await db.execute(
select(Environment).where(
Environment.team_id == team_id,
Environment.shortdomain == shortdomain,
)
)
env = existing.scalar_one_or_none()
if env:
# Update existing
env.display_name = env_data.get("displayName") or env_data.get("name") or shortdomain
env.app_id = env_data.get("appid")
env.status = str(env_data.get("status", "unknown"))
env.owner_uid = env_data.get("owner")
env.region = env_data.get("region")
env.last_synced_at = now
env.sync_status = "ok"
env.sync_error_message = None
else:
# Create new
env = Environment(
team_id=team_id,
env_name=env_data.get("name") or shortdomain,
shortdomain=shortdomain,
display_name=env_data.get("displayName") or shortdomain,
app_id=env_data.get("appid"),
status=str(env_data.get("status", "unknown")),
owner_uid=env_data.get("owner"),
region=env_data.get("region"),
last_synced_at=now,
sync_status="ok",
created_at=now,
updated_at=now,
)
db.add(env)
# Delete environments that no longer exist in Virtuozzo
await db.execute(
delete(Environment).where(
Environment.team_id == team_id,
Environment.shortdomain.not_in(vz_shortdomains),
)
)
3.3 API Router Integration¶
Location: app/domains/environments/router.py (modify existing)
Changes:
# BEFORE (current code)
@router.get("/")
async def environments(
user: AuthenticatedUser = Depends(get_current_user),
db: AsyncSession = Depends(db_session),
) -> dict[str, list]:
items = await list_environments(db=db, current_user=user)
return {"items": items}
# AFTER (with stale-while-revalidate)
from app.domains.environments.sync_service import EnvironmentSyncService
@router.get("/")
async def environments(
user: AuthenticatedUser = Depends(get_current_user),
db: AsyncSession = Depends(db_session),
) -> dict[str, Any]:
"""
Get environments for the current team.
Returns:
- items: List of environments
- meta: Sync metadata (is_stale, last_synced_at, etc.)
Headers:
- X-Data-Stale: true if data is older than TTL
- X-Data-Last-Sync: ISO timestamp of last sync
- X-Sync-In-Progress: true if background sync is running
"""
if user.team_id is None:
raise HTTPException(status_code=403, detail="Missing team context")
sync_service = EnvironmentSyncService(db)
items, metadata = await sync_service.get_environments(team_id=user.team_id)
# Set response headers
headers = {
"X-Data-Stale": str(metadata.is_stale).lower(),
"X-Data-Last-Sync": metadata.last_synced_at.isoformat() if metadata.last_synced_at else "",
"X-Sync-In-Progress": str(metadata.sync_enqueued).lower(),
}
# Special case: first run (no data)
if metadata.should_block:
# Trigger synchronous fetch for first run
# This is unavoidable - user must wait for initial data
logger.info("env_first_run_blocking", team_id=user.team_id)
# ... synchronous fetch logic ...
return JSONResponse(
content={
"items": items,
"meta": {
"last_synced_at": metadata.last_synced_at.isoformat() if metadata.last_synced_at else None,
"is_stale": metadata.is_stale,
"is_max_stale": metadata.is_max_stale,
"age_seconds": metadata.age_seconds,
"ttl_seconds": metadata.ttl_seconds,
"sync_enqueued": metadata.sync_enqueued,
"reason": metadata.reason,
}
},
headers=headers,
)
4. Queue System Design¶
4.1 Queue Architecture¶
graph TB
subgraph "Producers"
API[FastAPI<br/>environments endpoint]
end
subgraph "Queue Layer"
Q[RabbitMQ / Redis Queue<br/>env_sync_queue]
end
subgraph "Consumer Layer"
RATE[Rate Limiter<br/>50 req/sec]
W1[Worker 1]
W2[Worker 2]
WN[Worker N]
end
subgraph "External"
VZ[Virtuozzo API]
end
API -->|enqueue| Q
Q --> RATE
RATE --> W1
RATE --> W2
RATE --> WN
W1 --> VZ
W2 --> VZ
WN --> VZ
style Q fill:#ff9,stroke:#333,stroke-width:3px
style RATE fill:#faa,stroke:#333,stroke-width:2px
4.2 Queue Message Schema¶
{
"task_id": "uuid",
"task_type": "sync_environments",
"team_id": 123,
"priority": "normal",
"triggered_by": "stale_data",
"enqueued_at": "2026-01-22T10:30:00Z",
"retry_count": 0,
"max_retries": 3
}
4.3 Rate Limiting Strategy¶
Token Bucket Algorithm:
# app/infrastructure/queue/rate_limiter.py
import asyncio
from datetime import datetime, timedelta
class RateLimiter:
"""Token bucket rate limiter for Virtuozzo API calls."""
def __init__(self, rate: int, per: int = 1):
"""
Args:
rate: Maximum requests allowed
per: Time period in seconds (default: 1)
"""
self.rate = rate
self.per = per
self.tokens = rate
self.last_update = datetime.now()
self._lock = asyncio.Lock()
async def acquire(self) -> None:
"""Acquire a token. Blocks if rate limit exceeded."""
async with self._lock:
now = datetime.now()
elapsed = (now - self.last_update).total_seconds()
# Replenish tokens
self.tokens = min(
self.rate,
self.tokens + elapsed * (self.rate / self.per)
)
self.last_update = now
# Wait if no tokens available
if self.tokens < 1:
wait_time = (1 - self.tokens) * (self.per / self.rate)
await asyncio.sleep(wait_time)
self.tokens = 0
else:
self.tokens -= 1
# Global rate limiter instance
virtuozzo_rate_limiter = RateLimiter(rate=50, per=1)
5. Database Schema¶
5.1 Changes to environments Table¶
-- Add sync tracking columns
ALTER TABLE environments
ADD COLUMN last_synced_at TIMESTAMP WITH TIME ZONE DEFAULT NOW(),
ADD COLUMN sync_status VARCHAR(20) DEFAULT 'ok',
ADD COLUMN sync_error_message TEXT,
ADD COLUMN last_sync_duration_ms INTEGER;
-- Add index for freshness queries
CREATE INDEX idx_environments_team_sync
ON environments(team_id, last_synced_at);
-- Add index for status queries
CREATE INDEX idx_environments_sync_status
ON environments(sync_status);
5.2 Changes to teams Table¶
-- Add environment sync tracking
ALTER TABLE teams
ADD COLUMN last_env_sync_at TIMESTAMP WITH TIME ZONE,
ADD COLUMN last_env_sync_hash VARCHAR(64),
ADD COLUMN last_sync_checked_at TIMESTAMP WITH TIME ZONE,
ADD COLUMN last_sync_duration_ms INTEGER;
5.3 Optional: Sync Queue Tracking Table¶
-- For observability and debugging
CREATE TABLE environment_sync_queue_log (
id SERIAL PRIMARY KEY,
task_id VARCHAR(36) UNIQUE,
team_id INTEGER NOT NULL,
triggered_by VARCHAR(50),
enqueued_at TIMESTAMP WITH TIME ZONE DEFAULT NOW(),
started_at TIMESTAMP WITH TIME ZONE,
completed_at TIMESTAMP WITH TIME ZONE,
status VARCHAR(20), -- pending, processing, completed, failed
error_message TEXT,
retry_count INTEGER DEFAULT 0,
env_count INTEGER,
duration_ms INTEGER,
hash_before VARCHAR(64),
hash_after VARCHAR(64)
);
6. Configuration¶
6.1 Environment Variables¶
# Environment Cache Settings
VZ_ENV_TTL_SECONDS=600 # 10 minutes - Fresh data window
VZ_ENV_MAX_STALE_SECONDS=3600 # 1 hour - Max staleness before warning
VZ_SYNC_QUEUE_RATE_LIMIT=50 # 50 requests per second max
VZ_SYNC_TIMEOUT_SECONDS=5 # Virtuozzo API timeout
VZ_SYNC_RETRY_ATTEMPTS=3 # Max retry attempts
VZ_SYNC_WORKER_CONCURRENCY=10 # Number of worker processes
VZ_SYNC_ENABLE_QUEUE=true # Enable queue-based sync
6.2 Feature Flags¶
# app/core/config.py
class Settings(BaseSettings):
# ... existing settings ...
# Environment sync settings
vz_env_ttl_seconds: int = 600
vz_env_max_stale_seconds: int = 3600
vz_sync_queue_rate_limit: int = 50
vz_sync_timeout_seconds: int = 5
vz_sync_retry_attempts: int = 3
vz_sync_worker_concurrency: int = 10
vz_sync_enable_queue: bool = True
# Queue backend
sync_queue_backend: Literal["rabbitmq", "redis"] = "rabbitmq"
7. Observability¶
7.1 Metrics¶
# app/domains/environments/metrics.py
from prometheus_client import Counter, Histogram, Gauge
# Request metrics
env_requests_total = Counter(
"env_requests_total",
"Total environment requests",
["team_id", "status"] # status: fresh, stale, first_run
)
env_stale_served_total = Counter(
"env_stale_served_total",
"Total stale environment responses",
["team_id", "staleness_bucket"] # bucket: stale, very_stale
)
# Sync metrics
env_sync_duration_seconds = Histogram(
"env_sync_duration_seconds",
"Environment sync duration",
["team_id", "result"] # result: success, timeout, error
)
env_sync_queue_depth = Gauge(
"env_sync_queue_depth",
"Current sync queue depth",
["priority"]
)
# API metrics
virtuozzo_api_requests_total = Counter(
"virtuozzo_api_requests_total",
"Total Virtuozzo API requests",
["endpoint", "status_code"]
)
virtuozzo_api_duration_seconds = Histogram(
"virtuozzo_api_duration_seconds",
"Virtuozzo API call duration",
["endpoint"]
)
7.2 Structured Logging¶
# Log examples
logger.info(
"env_request",
team_id=team_id,
has_data=True,
is_stale=False,
age_seconds=45,
sync_enqueued=False,
)
logger.info(
"sync_started",
team_id=team_id,
triggered_by="stale_data",
task_id=task_id,
)
logger.info(
"sync_complete",
team_id=team_id,
duration_seconds=3.2,
env_count=5,
data_changed=True,
hash_old="abc123...",
hash_new="def456...",
)
logger.error(
"sync_failed",
team_id=team_id,
error_type="timeout",
error_message="Connection timeout after 5 seconds",
will_retry=True,
retry_count=1,
)
8. Testing Strategy¶
8.1 Unit Tests¶
# tests/unit/test_sync_service.py
@pytest.mark.asyncio
async def test_fresh_data_returned_immediately():
"""Fresh data (< TTL) returns immediately without queueing."""
# Setup: DB has data from 5 minutes ago
await insert_test_envs(team_id=1, age_minutes=5)
# Act
service = EnvironmentSyncService(db)
items, metadata = await service.get_environments(team_id=1)
# Assert
assert len(items) == 5
assert metadata.has_data is True
assert metadata.is_stale is False
assert metadata.sync_enqueued is False
@pytest.mark.asyncio
async def test_stale_data_returned_with_enqueue():
"""Stale data (> TTL) returns immediately with background sync queued."""
# Setup: DB has data from 20 minutes ago
await insert_test_envs(team_id=1, age_minutes=20)
# Act
service = EnvironmentSyncService(db)
items, metadata = await service.get_environments(team_id=1)
# Assert
assert len(items) == 5 # Data returned immediately
assert metadata.is_stale is True
assert metadata.sync_enqueued is True
assert metadata.should_block is False # NEVER blocks
@pytest.mark.asyncio
async def test_first_run_blocks_for_fetch():
"""First run (no data) blocks for initial fetch."""
# Setup: No data in DB
await clear_environments(team_id=1)
# Act
service = EnvironmentSyncService(db)
items, metadata = await service.get_environments(team_id=1)
# Assert
assert items == []
assert metadata.has_data is False
assert metadata.should_block is True
8.2 Sad Path Tests¶
# tests/sad_path/test_worker_failures.py
@pytest.mark.asyncio
async def test_timeout_preserves_stale_data():
"""Virtuozzo timeout preserves stale data in DB."""
# Setup: DB has valid data
await insert_test_envs(team_id=1)
# Mock: Virtuozzo times out
with patch("virtuozzo_client.get_envs", side_effect=TimeoutError):
await sync_environments_worker({"team_id": 1})
# Assert: DB unchanged
data = await get_environments(team_id=1)
assert len(data) == 5 # Original data preserved
@pytest.mark.asyncio
async def test_invalid_payload_rejected():
"""Invalid Virtuozzo response is rejected, stale data preserved."""
# Setup: DB has valid data
await insert_test_envs(team_id=1)
# Mock: Virtuozzo returns invalid structure
with patch("virtuozzo_client.get_envs", return_value={"invalid": "payload"}):
await sync_environments_worker({"team_id": 1})
# Assert: DB unchanged
data = await get_environments(team_id=1)
assert len(data) == 5
@pytest.mark.asyncio
async def test_worker_crash_rolls_back_transaction():
"""Worker crash during upsert rolls back transaction."""
# Setup: DB has data
await insert_test_envs(team_id=1)
old_data = await get_environments(team_id=1)
# Mock: Worker crashes mid-transaction
with patch("_upsert_environments", side_effect=RuntimeError):
with pytest.raises(RuntimeError):
await sync_environments_worker({"team_id": 1})
# Assert: Transaction rolled back
new_data = await get_environments(team_id=1)
assert len(new_data) == len(old_data)
9. Deployment Checklist¶
| Phase | Task | Status |
|---|---|---|
| Infrastructure | Set up RabbitMQ / Redis queue | |
| Infrastructure | Configure worker processes | |
| Database | Run migration to add sync columns | |
| Backend | Implement EnvironmentSyncService |
|
| Backend | Implement background worker | |
| Backend | Update router to use sync service | |
| Testing | Unit tests (85% coverage) | |
| Testing | Sad path tests (3:1 ratio) | |
| Testing | Integration tests | |
| Observability | Configure metrics | |
| Observability | Configure structured logging | |
| Documentation | Update runbooks | |
| Frontend | Implement stale badge UI |
10. Revision History¶
| Version | Date | Author | Changes |
|---|---|---|---|
| 1.0 | 2026-01-09 | MBPanel Team | Initial design (lazy sync + distributed locking) |
| 2.0 | 2026-01-22 | MBPanel Team | Major refactor: Stale-While-Revalidate + Queue-based architecture |