Skip to content

Architecture Design Document (ADD)

Virtuozzo Environment Synchronization - Stale-While-Revalidate Implementation

Document Information

Field Details
Project Name MBPanel Virtuozzo Environment Sync
Author Technical Architecture Team
Date Created 2026-01-09
Last Updated 2026-01-22
Version 2.0
Status Draft - Architecture Refresh
Related PRD PRD-virtuozzo-environment-sync.md
Related ARD ARD-virtuozzo-environment-sync.md

1. Introduction

1.1 Purpose

This document defines the technical architecture for implementing Stale-While-Revalidate pattern - a distributed caching strategy that serves stale data immediately while asynchronously refreshing from the source. This is a classic distributed systems optimization problem involving consistency vs. availability trade-offs, cache invalidation strategies, and scalable queue-based processing.

1.2 Architectural Shift (v2.0)

Previous Design (v1.0): - Lazy sync on read + distributed locking - Background Celery cron iterating through all teams - Redis hot cache + PostgreSQL warm cache

New Design (v2.0 - Stale-While-Revalidate): - Always return immediately (fresh or stale), never block - Queue-based refresh (event-driven, not cron-based) - Active-user priority (only logged-in users trigger refresh) - Rate-limited worker pool (protects Virtuozzo from spikes) - PostgreSQL-only cache (simpler, survives restarts)

1.3 Scope

In Scope: - Stale-While-Revalidate pattern implementation - Queue-based background refresh system - Rate-limited Virtuozzo API calls - Graceful degradation (serve stale on errors) - Data integrity validation (hash-based change detection) - Observability (metrics, logging, tracing)

Out of Scope: - Virtuozzo authentication (existing app/modules/sessions/) - Environment provisioning (existing app/domains/environments/tasks.py) - Real-time WebSocket sync (SSE for notifications only)


2. Architecture Overview

2.1 Core Pattern: Stale-While-Revalidate

┌─────────────────────────────────────────────────────────────────────────┐
│                    STALE-WHILE-REVALIDATE PATTERN                       │
├─────────────────────────────────────────────────────────────────────────┤
│                                                                          │
│  REQUEST → Check DB → Return Data (ALWAYS)                               │
│                    │                                                     │
│                    ├─ Fresh (< TTL)    → Done (instant)                 │
│                    └─ Stale (> TTL)    → Return stale + enqueue refresh │
│                                                                          │
│  BACKGROUND WORKER (Independent Process)                                 │
│  → Process queue at fixed rate (rate-limited)                           │
│  → Call Virtuozzo API                                                    │
│  → Validate payload                                                      │
│  → Update DB if changed                                                  │
│                                                                          │
└─────────────────────────────────────────────────────────────────────────┘

2.2 Context Diagram

graph TB
    subgraph "External Systems"
        VZ[Virtuozzo API<br/>environments source of truth]
    end

    subgraph "MBPanel Platform"
        subgraph "Frontend"
            UI[Next.js 16<br/>Stale Badge UI]
        end

        subgraph "API Layer"
            API[FastAPI Backend<br/>environments router]
        end

        subgraph "Queue System"
            Q[Refresh Queue<br/>RabbitMQ/Redis]
            W1[Worker 1]
            W2[Worker 2]
            WN[Worker N]
        end

        subgraph "Business Logic"
            SYNC[EnvironmentSyncService<br/>stale-while-revalidate]
        end

        subgraph "Infrastructure"
            DB[(PostgreSQL<br/>primary cache)]
        end
    end

    UI --> API
    API --> SYNC
    SYNC --> DB
    SYNC -->|enqueue| Q
    Q --> W1
    Q --> W2
    Q --> WN
    W1 --> VZ
    W2 --> VZ
    WN --> VZ

    style VZ fill:#f9f,stroke:#333,stroke-width:4px
    style Q fill:#ff9,stroke:#333,stroke-width:3px
    style SYNC fill:#bbf,stroke:#333,stroke-width:2px

2.3 Component Overview

Component Responsibility Technology
EnvironmentSyncService Orchestrate stale-while-revalidate logic Python 3.12+, async/await
Refresh Queue Hold pending sync tasks RabbitMQ / Redis
Worker Pool Process queue, call Virtuozzo Celery / Custom asyncio workers
PostgreSQL Primary cache, survives restarts SQLAlchemy 2.0 async
VirtuozzoClient External API calls (EXISTS) httpx async

3. Detailed Component Design

3.1 EnvironmentSyncService

Location: app/domains/environments/sync_service.py (new file)

Responsibilities: 1. Check database for cached environments 2. Determine freshness based on last_synced_at timestamp 3. Return data immediately (fresh or stale) 4. Enqueue background sync task if stale 5. Never block the user request

Interface:

class EnvironmentSyncService:
    async def get_environments(
        self,
        *,
        team_id: int,
        db: AsyncSession,
    ) -> list[Environment]:
        """
        Return environments from cache.
        Always returns immediately (fresh or stale).
        Enqueues background refresh if stale.
        """

    async def enqueue_background_sync(
        self,
        *,
        team_id: int,
        triggered_by: str,  # "stale_data", "manual", "first_run"
    ) -> str:
        """
        Enqueue sync task to background queue.
        Returns task_id for tracking.
        """

Implementation:

# app/domains/environments/sync_service.py
from __future__ import annotations

import hashlib
import json
from datetime import datetime, timezone, timedelta
from typing import Any

from sqlalchemy.ext.asyncio import AsyncSession

from app.core.config import settings
from app.core.logging import get_logger
from app.domains.environments.repository import list_team_environments
from app.infrastructure.queue.client import enqueue_task

logger = get_logger(__name__)

# Configuration
_TTL_SECONDS = settings.vz_env_ttl_seconds or 600  # 10 minutes default
_MAX_STALE_SECONDS = settings.vz_env_max_stale_seconds or 3600  # 1 hour


class EnvironmentSyncService:
    """Stale-While-Revalidate pattern for environment synchronization."""

    def __init__(self, db: AsyncSession):
        self.db = db

    async def get_environments(
        self,
        *,
        team_id: int,
    ) -> tuple[list[dict[str, Any]], SyncMetadata]:
        """
        Get environments for a team.

        ALWAYS returns immediately with data from the database.
        If data is stale (> TTL), enqueues background refresh.

        Returns:
            (environments, metadata): Tuple of environment list and sync metadata
        """
        # 1. Query database (always fast, < 50ms)
        rows = await list_team_environments(db=self.db, team_id=team_id)

        # 2. Check if we have any data
        if not rows:
            # First run: no cached data exists
            # This is unavoidable - user must wait for initial fetch
            logger.info("env_first_run", team_id=team_id)
            return [], SyncMetadata(
                has_data=False,
                is_stale=False,
                should_block=True,
                reason="first_run"
            )

        # 3. Check freshness
        most_recent_sync = max(
            (row.last_synced_at for row in rows if row.last_synced_at),
            default=None
        )

        if most_recent_sync is None:
            # Legacy data without timestamp - treat as stale
            logger.warning("env_no_timestamp", team_id=team_id)
            await self._enqueue_sync(team_id=team_id, triggered_by="missing_timestamp")
            return self._serialize(rows), SyncMetadata(
                has_data=True,
                is_stale=True,
                should_block=False,
                last_synced_at=None,
                age_seconds=None,
                reason="missing_timestamp"
            )

        # 4. Calculate data age
        now = datetime.now(timezone.utc)
        age = now - most_recent_sync
        age_seconds = age.total_seconds()

        is_fresh = age_seconds < _TTL_SECONDS
        is_max_stale = age_seconds > _MAX_STALE_SECONDS

        # 5. Return metadata
        metadata = SyncMetadata(
            has_data=True,
            is_stale=not is_fresh,
            is_max_stale=is_max_stale,
            should_block=False,  # NEVER block after first run
            last_synced_at=most_recent_sync,
            age_seconds=age_seconds,
            ttl_seconds=_TTL_SECONDS,
        )

        # 6. Enqueue background sync if stale
        if not is_fresh:
            staleness_bucket = self._get_staleness_bucket(age_seconds)
            logger.info(
                "env_stale_enqueueing",
                team_id=team_id,
                age_seconds=age_seconds,
                staleness_bucket=staleness_bucket,
            )
            await self._enqueue_sync(
                team_id=team_id,
                triggered_by=f"staleness_{staleness_bucket}"
            )
            metadata = metadata._replace(
                sync_enqueued=True,
                reason="stale_data"
            )
        else:
            metadata = metadata._replace(reason="fresh_data")

        return self._serialize(rows), metadata

    async def _enqueue_sync(
        self,
        *,
        team_id: int,
        triggered_by: str,
    ) -> str:
        """Enqueue background sync task."""
        from app.infrastructure.queue.tasks import sync_environments_task

        task_id = await enqueue_task(
            task_name="sync_environments",
            payload={
                "team_id": team_id,
                "triggered_by": triggered_by,
                "enqueued_at": datetime.now(timezone.utc).isoformat(),
            },
            priority="normal",
        )

        logger.info(
            "env_sync_enqueued",
            team_id=team_id,
            task_id=task_id,
            triggered_by=triggered_by,
        )
        return task_id

    def _serialize(self, rows: list[Any]) -> list[dict[str, Any]]:
        """Serialize environment rows to dict."""
        return [
            {
                "env_name": row.env_name,
                "display_name": row.display_name,
                "shortdomain": row.shortdomain,
                "app_id": row.app_id,
                "status": row.status,
                "last_synced_at": row.last_synced_at.isoformat() if row.last_synced_at else None,
            }
            for row in rows
        ]

    def _get_staleness_bucket(self, age_seconds: float) -> str:
        """Get staleness bucket for logging/metrics."""
        if age_seconds < _TTL_SECONDS:
            return "fresh"
        elif age_seconds < 3600:  # 1 hour
            return "stale"
        else:
            return "very_stale"


@dataclass
class SyncMetadata:
    """Metadata about environment sync status."""
    has_data: bool
    is_stale: bool
    should_block: bool
    last_synced_at: datetime | None = None
    age_seconds: float | None = None
    ttl_seconds: int = _TTL_SECONDS
    is_max_stale: bool = False
    sync_enqueued: bool = False
    reason: str = ""

3.2 Background Worker

Location: app/domains/environments/workers.py (new file)

Responsibilities: 1. Process sync tasks from queue 2. Rate-limit Virtuozzo API calls 3. Validate Virtuozzo response 4. Update database atomically 5. Handle errors gracefully (never corrupt DB)

Implementation:

# app/domains/environments/workers.py
from __future__ import annotations

import hashlib
import json
from datetime import datetime, timezone

from sqlalchemy.ext.asyncio import AsyncSession
from sqlalchemy import delete

from app.core.config import settings
from app.core.crypto import decrypt_str
from app.core.logging import get_logger
from app.infrastructure.database.models.environment import Environment
from app.infrastructure.database.models.team import Team
from app.infrastructure.external.virtuozzo import client as virtuozzo_client
from app.infrastructure.queue.client import task_context

logger = get_logger(__name__)

_RATE_LIMIT_PER_SECOND = settings.vz_sync_queue_rate_limit or 50
_TIMEOUT_SECONDS = settings.vz_sync_timeout_seconds or 5
_MAX_RETRY_ATTEMPTS = 3


async def sync_environments_worker(task_payload: dict[str, Any]) -> None:
    """
    Background worker to sync environments from Virtuozzo.

    Sad Path Engineering:
    - NEVER overwrites DB with error data
    - Validates payload before writing
    - Uses atomic transactions
    - Logs all failures for observability
    """
    team_id = task_payload["team_id"]
    triggered_by = task_payload.get("triggered_by", "unknown")

    logger.info(
        "worker_sync_started",
        team_id=team_id,
        triggered_by=triggered_by,
        task_id=task_context.get_task_id(),
    )

    with AsyncSessionLocal() as db:
        try:
            # 1. Get team and session key
            team = await db.get(Team, team_id)
            if not team:
                logger.error("worker_team_not_found", team_id=team_id)
                return

            if not team.session_key_encrypted:
                logger.error("worker_no_session_key", team_id=team_id)
                return

            # 2. Decrypt session key
            try:
                session_key = decrypt_str(team.session_key_encrypted)
            except Exception as exc:
                logger.error("worker_decrypt_failed", team_id=team_id, error=str(exc))
                return

            # 3. Call Virtuozzo API with timeout
            start_time = datetime.now(timezone.utc)
            try:
                response = await virtuozzo_client.get_envs(
                    session=session_key,
                    lazy=True,  # Optimize: skip node details
                )
                duration_ms = int((datetime.now(timezone.utc) - start_time).total_seconds() * 1000)
            except TimeoutError as exc:
                logger.error(
                    "worker_virtuozzo_timeout",
                    team_id=team_id,
                    timeout_seconds=_TIMEOUT_SECONDS,
                )
                # DO NOT update DB - stale data is better than error
                return
            except Exception as exc:
                logger.error(
                    "worker_virtuozzo_error",
                    team_id=team_id,
                    error_type=type(exc).__name__,
                    error_message=str(exc),
                )
                # DO NOT update DB
                return

            # 4. Validate response structure
            if not _validate_virtuozzo_response(response):
                logger.error(
                    "worker_invalid_response",
                    team_id=team_id,
                    response_keys=list(response.keys()) if isinstance(response, dict) else type(response),
                )
                # DO NOT update DB - protect from corruption
                return

            # 5. Compute hash for change detection
            new_hash = _compute_env_hash(response.get("infos", []))
            old_hash = team.last_env_sync_hash

            if new_hash == old_hash:
                logger.info(
                    "worker_unchanged",
                    team_id=team_id,
                    hash=new_hash[:8],  # Log first 8 chars
                )
                # Update timestamp anyway (shows we checked)
                team.last_sync_checked_at = datetime.now(timezone.utc)
                await db.commit()
                return

            # 6. Atomic database update
            await _upsert_environments(db, team_id, response)

            # 7. Update team metadata
            team.last_env_sync_hash = new_hash
            team.last_env_sync_at = datetime.now(timezone.utc)
            team.last_sync_duration_ms = duration_ms

            await db.commit()

            logger.info(
                "worker_sync_complete",
                team_id=team_id,
                env_count=len(response.get("infos", [])),
                duration_ms=duration_ms,
                hash=new_hash[:8],
            )

        except Exception as exc:
            logger.error(
                "worker_unexpected_error",
                team_id=team_id,
                error_type=type(exc).__name__,
                error_message=str(exc),
                exc_info=True,
            )
            # DO NOT commit transaction - let it rollback


def _validate_virtuozzo_response(data: dict[str, Any]) -> bool:
    """Validate Virtuozzo response structure before DB write."""
    # Check required top-level fields
    if not isinstance(data, dict):
        return False

    if "result" not in data:
        return False

    if data["result"] != 0:
        logger.warning("virtuozzo_nonzero_result", result=data["result"])
        return False

    # Check infos is a list
    infos = data.get("infos")
    if not isinstance(infos, list):
        return False

    # Check each env has required structure
    for item in infos:
        if not isinstance(item, dict):
            return False
        if "env" not in item:
            return False

    return True


def _compute_env_hash(infos: list[dict]) -> str:
    """Compute SHA-256 hash of environment list for change detection."""
    normalized = json.dumps(infos, sort_keys=True)
    return hashlib.sha256(normalized.encode()).hexdigest()


async def _upsert_environments(
    db: AsyncSession,
    team_id: int,
    response: dict[str, Any],
) -> None:
    """Atomically upsert environments from Virtuozzo response."""
    now = datetime.now(timezone.utc)
    infos = response.get("infos", [])

    # Track which environments exist in Virtuozzo
    vz_shortdomains = set()

    for item in infos:
        env_data = item.get("env", {})
        if not env_data:
            continue

        shortdomain = env_data.get("shortdomain") or env_data.get("domain")
        if not shortdomain:
            continue

        vz_shortdomains.add(shortdomain)

        # Check if exists
        existing = await db.execute(
            select(Environment).where(
                Environment.team_id == team_id,
                Environment.shortdomain == shortdomain,
            )
        )
        env = existing.scalar_one_or_none()

        if env:
            # Update existing
            env.display_name = env_data.get("displayName") or env_data.get("name") or shortdomain
            env.app_id = env_data.get("appid")
            env.status = str(env_data.get("status", "unknown"))
            env.owner_uid = env_data.get("owner")
            env.region = env_data.get("region")
            env.last_synced_at = now
            env.sync_status = "ok"
            env.sync_error_message = None
        else:
            # Create new
            env = Environment(
                team_id=team_id,
                env_name=env_data.get("name") or shortdomain,
                shortdomain=shortdomain,
                display_name=env_data.get("displayName") or shortdomain,
                app_id=env_data.get("appid"),
                status=str(env_data.get("status", "unknown")),
                owner_uid=env_data.get("owner"),
                region=env_data.get("region"),
                last_synced_at=now,
                sync_status="ok",
                created_at=now,
                updated_at=now,
            )
            db.add(env)

    # Delete environments that no longer exist in Virtuozzo
    await db.execute(
        delete(Environment).where(
            Environment.team_id == team_id,
            Environment.shortdomain.not_in(vz_shortdomains),
        )
    )

3.3 API Router Integration

Location: app/domains/environments/router.py (modify existing)

Changes:

# BEFORE (current code)
@router.get("/")
async def environments(
    user: AuthenticatedUser = Depends(get_current_user),
    db: AsyncSession = Depends(db_session),
) -> dict[str, list]:
    items = await list_environments(db=db, current_user=user)
    return {"items": items}

# AFTER (with stale-while-revalidate)
from app.domains.environments.sync_service import EnvironmentSyncService

@router.get("/")
async def environments(
    user: AuthenticatedUser = Depends(get_current_user),
    db: AsyncSession = Depends(db_session),
) -> dict[str, Any]:
    """
    Get environments for the current team.

    Returns:
        - items: List of environments
        - meta: Sync metadata (is_stale, last_synced_at, etc.)

    Headers:
        - X-Data-Stale: true if data is older than TTL
        - X-Data-Last-Sync: ISO timestamp of last sync
        - X-Sync-In-Progress: true if background sync is running
    """
    if user.team_id is None:
        raise HTTPException(status_code=403, detail="Missing team context")

    sync_service = EnvironmentSyncService(db)
    items, metadata = await sync_service.get_environments(team_id=user.team_id)

    # Set response headers
    headers = {
        "X-Data-Stale": str(metadata.is_stale).lower(),
        "X-Data-Last-Sync": metadata.last_synced_at.isoformat() if metadata.last_synced_at else "",
        "X-Sync-In-Progress": str(metadata.sync_enqueued).lower(),
    }

    # Special case: first run (no data)
    if metadata.should_block:
        # Trigger synchronous fetch for first run
        # This is unavoidable - user must wait for initial data
        logger.info("env_first_run_blocking", team_id=user.team_id)
        # ... synchronous fetch logic ...

    return JSONResponse(
        content={
            "items": items,
            "meta": {
                "last_synced_at": metadata.last_synced_at.isoformat() if metadata.last_synced_at else None,
                "is_stale": metadata.is_stale,
                "is_max_stale": metadata.is_max_stale,
                "age_seconds": metadata.age_seconds,
                "ttl_seconds": metadata.ttl_seconds,
                "sync_enqueued": metadata.sync_enqueued,
                "reason": metadata.reason,
            }
        },
        headers=headers,
    )

4. Queue System Design

4.1 Queue Architecture

graph TB
    subgraph "Producers"
        API[FastAPI<br/>environments endpoint]
    end

    subgraph "Queue Layer"
        Q[RabbitMQ / Redis Queue<br/>env_sync_queue]
    end

    subgraph "Consumer Layer"
        RATE[Rate Limiter<br/>50 req/sec]
        W1[Worker 1]
        W2[Worker 2]
        WN[Worker N]
    end

    subgraph "External"
        VZ[Virtuozzo API]
    end

    API -->|enqueue| Q
    Q --> RATE
    RATE --> W1
    RATE --> W2
    RATE --> WN
    W1 --> VZ
    W2 --> VZ
    WN --> VZ

    style Q fill:#ff9,stroke:#333,stroke-width:3px
    style RATE fill:#faa,stroke:#333,stroke-width:2px

4.2 Queue Message Schema

{
  "task_id": "uuid",
  "task_type": "sync_environments",
  "team_id": 123,
  "priority": "normal",
  "triggered_by": "stale_data",
  "enqueued_at": "2026-01-22T10:30:00Z",
  "retry_count": 0,
  "max_retries": 3
}

4.3 Rate Limiting Strategy

Token Bucket Algorithm:

# app/infrastructure/queue/rate_limiter.py
import asyncio
from datetime import datetime, timedelta

class RateLimiter:
    """Token bucket rate limiter for Virtuozzo API calls."""

    def __init__(self, rate: int, per: int = 1):
        """
        Args:
            rate: Maximum requests allowed
            per: Time period in seconds (default: 1)
        """
        self.rate = rate
        self.per = per
        self.tokens = rate
        self.last_update = datetime.now()
        self._lock = asyncio.Lock()

    async def acquire(self) -> None:
        """Acquire a token. Blocks if rate limit exceeded."""
        async with self._lock:
            now = datetime.now()
            elapsed = (now - self.last_update).total_seconds()

            # Replenish tokens
            self.tokens = min(
                self.rate,
                self.tokens + elapsed * (self.rate / self.per)
            )
            self.last_update = now

            # Wait if no tokens available
            if self.tokens < 1:
                wait_time = (1 - self.tokens) * (self.per / self.rate)
                await asyncio.sleep(wait_time)
                self.tokens = 0
            else:
                self.tokens -= 1


# Global rate limiter instance
virtuozzo_rate_limiter = RateLimiter(rate=50, per=1)

5. Database Schema

5.1 Changes to environments Table

-- Add sync tracking columns
ALTER TABLE environments
ADD COLUMN last_synced_at TIMESTAMP WITH TIME ZONE DEFAULT NOW(),
ADD COLUMN sync_status VARCHAR(20) DEFAULT 'ok',
ADD COLUMN sync_error_message TEXT,
ADD COLUMN last_sync_duration_ms INTEGER;

-- Add index for freshness queries
CREATE INDEX idx_environments_team_sync
ON environments(team_id, last_synced_at);

-- Add index for status queries
CREATE INDEX idx_environments_sync_status
ON environments(sync_status);

5.2 Changes to teams Table

-- Add environment sync tracking
ALTER TABLE teams
ADD COLUMN last_env_sync_at TIMESTAMP WITH TIME ZONE,
ADD COLUMN last_env_sync_hash VARCHAR(64),
ADD COLUMN last_sync_checked_at TIMESTAMP WITH TIME ZONE,
ADD COLUMN last_sync_duration_ms INTEGER;

5.3 Optional: Sync Queue Tracking Table

-- For observability and debugging
CREATE TABLE environment_sync_queue_log (
    id SERIAL PRIMARY KEY,
    task_id VARCHAR(36) UNIQUE,
    team_id INTEGER NOT NULL,
    triggered_by VARCHAR(50),
    enqueued_at TIMESTAMP WITH TIME ZONE DEFAULT NOW(),
    started_at TIMESTAMP WITH TIME ZONE,
    completed_at TIMESTAMP WITH TIME ZONE,
    status VARCHAR(20),  -- pending, processing, completed, failed
    error_message TEXT,
    retry_count INTEGER DEFAULT 0,
    env_count INTEGER,
    duration_ms INTEGER,
    hash_before VARCHAR(64),
    hash_after VARCHAR(64)
);

6. Configuration

6.1 Environment Variables

# Environment Cache Settings
VZ_ENV_TTL_SECONDS=600              # 10 minutes - Fresh data window
VZ_ENV_MAX_STALE_SECONDS=3600       # 1 hour - Max staleness before warning
VZ_SYNC_QUEUE_RATE_LIMIT=50         # 50 requests per second max
VZ_SYNC_TIMEOUT_SECONDS=5           # Virtuozzo API timeout
VZ_SYNC_RETRY_ATTEMPTS=3            # Max retry attempts
VZ_SYNC_WORKER_CONCURRENCY=10       # Number of worker processes
VZ_SYNC_ENABLE_QUEUE=true           # Enable queue-based sync

6.2 Feature Flags

# app/core/config.py
class Settings(BaseSettings):
    # ... existing settings ...

    # Environment sync settings
    vz_env_ttl_seconds: int = 600
    vz_env_max_stale_seconds: int = 3600
    vz_sync_queue_rate_limit: int = 50
    vz_sync_timeout_seconds: int = 5
    vz_sync_retry_attempts: int = 3
    vz_sync_worker_concurrency: int = 10
    vz_sync_enable_queue: bool = True

    # Queue backend
    sync_queue_backend: Literal["rabbitmq", "redis"] = "rabbitmq"

7. Observability

7.1 Metrics

# app/domains/environments/metrics.py
from prometheus_client import Counter, Histogram, Gauge

# Request metrics
env_requests_total = Counter(
    "env_requests_total",
    "Total environment requests",
    ["team_id", "status"]  # status: fresh, stale, first_run
)

env_stale_served_total = Counter(
    "env_stale_served_total",
    "Total stale environment responses",
    ["team_id", "staleness_bucket"]  # bucket: stale, very_stale
)

# Sync metrics
env_sync_duration_seconds = Histogram(
    "env_sync_duration_seconds",
    "Environment sync duration",
    ["team_id", "result"]  # result: success, timeout, error
)

env_sync_queue_depth = Gauge(
    "env_sync_queue_depth",
    "Current sync queue depth",
    ["priority"]
)

# API metrics
virtuozzo_api_requests_total = Counter(
    "virtuozzo_api_requests_total",
    "Total Virtuozzo API requests",
    ["endpoint", "status_code"]
)

virtuozzo_api_duration_seconds = Histogram(
    "virtuozzo_api_duration_seconds",
    "Virtuozzo API call duration",
    ["endpoint"]
)

7.2 Structured Logging

# Log examples
logger.info(
    "env_request",
    team_id=team_id,
    has_data=True,
    is_stale=False,
    age_seconds=45,
    sync_enqueued=False,
)

logger.info(
    "sync_started",
    team_id=team_id,
    triggered_by="stale_data",
    task_id=task_id,
)

logger.info(
    "sync_complete",
    team_id=team_id,
    duration_seconds=3.2,
    env_count=5,
    data_changed=True,
    hash_old="abc123...",
    hash_new="def456...",
)

logger.error(
    "sync_failed",
    team_id=team_id,
    error_type="timeout",
    error_message="Connection timeout after 5 seconds",
    will_retry=True,
    retry_count=1,
)

8. Testing Strategy

8.1 Unit Tests

# tests/unit/test_sync_service.py

@pytest.mark.asyncio
async def test_fresh_data_returned_immediately():
    """Fresh data (< TTL) returns immediately without queueing."""
    # Setup: DB has data from 5 minutes ago
    await insert_test_envs(team_id=1, age_minutes=5)

    # Act
    service = EnvironmentSyncService(db)
    items, metadata = await service.get_environments(team_id=1)

    # Assert
    assert len(items) == 5
    assert metadata.has_data is True
    assert metadata.is_stale is False
    assert metadata.sync_enqueued is False


@pytest.mark.asyncio
async def test_stale_data_returned_with_enqueue():
    """Stale data (> TTL) returns immediately with background sync queued."""
    # Setup: DB has data from 20 minutes ago
    await insert_test_envs(team_id=1, age_minutes=20)

    # Act
    service = EnvironmentSyncService(db)
    items, metadata = await service.get_environments(team_id=1)

    # Assert
    assert len(items) == 5  # Data returned immediately
    assert metadata.is_stale is True
    assert metadata.sync_enqueued is True
    assert metadata.should_block is False  # NEVER blocks


@pytest.mark.asyncio
async def test_first_run_blocks_for_fetch():
    """First run (no data) blocks for initial fetch."""
    # Setup: No data in DB
    await clear_environments(team_id=1)

    # Act
    service = EnvironmentSyncService(db)
    items, metadata = await service.get_environments(team_id=1)

    # Assert
    assert items == []
    assert metadata.has_data is False
    assert metadata.should_block is True

8.2 Sad Path Tests

# tests/sad_path/test_worker_failures.py

@pytest.mark.asyncio
async def test_timeout_preserves_stale_data():
    """Virtuozzo timeout preserves stale data in DB."""
    # Setup: DB has valid data
    await insert_test_envs(team_id=1)

    # Mock: Virtuozzo times out
    with patch("virtuozzo_client.get_envs", side_effect=TimeoutError):
        await sync_environments_worker({"team_id": 1})

    # Assert: DB unchanged
    data = await get_environments(team_id=1)
    assert len(data) == 5  # Original data preserved


@pytest.mark.asyncio
async def test_invalid_payload_rejected():
    """Invalid Virtuozzo response is rejected, stale data preserved."""
    # Setup: DB has valid data
    await insert_test_envs(team_id=1)

    # Mock: Virtuozzo returns invalid structure
    with patch("virtuozzo_client.get_envs", return_value={"invalid": "payload"}):
        await sync_environments_worker({"team_id": 1})

    # Assert: DB unchanged
    data = await get_environments(team_id=1)
    assert len(data) == 5


@pytest.mark.asyncio
async def test_worker_crash_rolls_back_transaction():
    """Worker crash during upsert rolls back transaction."""
    # Setup: DB has data
    await insert_test_envs(team_id=1)
    old_data = await get_environments(team_id=1)

    # Mock: Worker crashes mid-transaction
    with patch("_upsert_environments", side_effect=RuntimeError):
        with pytest.raises(RuntimeError):
            await sync_environments_worker({"team_id": 1})

    # Assert: Transaction rolled back
    new_data = await get_environments(team_id=1)
    assert len(new_data) == len(old_data)

9. Deployment Checklist

Phase Task Status
Infrastructure Set up RabbitMQ / Redis queue
Infrastructure Configure worker processes
Database Run migration to add sync columns
Backend Implement EnvironmentSyncService
Backend Implement background worker
Backend Update router to use sync service
Testing Unit tests (85% coverage)
Testing Sad path tests (3:1 ratio)
Testing Integration tests
Observability Configure metrics
Observability Configure structured logging
Documentation Update runbooks
Frontend Implement stale badge UI

10. Revision History

Version Date Author Changes
1.0 2026-01-09 MBPanel Team Initial design (lazy sync + distributed locking)
2.0 2026-01-22 MBPanel Team Major refactor: Stale-While-Revalidate + Queue-based architecture