Skip to content

Implementation Document (ID)

Virtuozzo Environment Synchronization - Stale-While-Revalidate Architecture

Document Information

Field Details
Product Name Virtuozzo Environment Synchronization
Author MBPanel Team
Date Created 2026-01-22
Last Updated 2026-01-22
Version 1.0
Status Draft
Source PRD docs/virtuozzo-env-sync/PRD-virtuozzo-environment-sync.md

1. Architecture Overview

1.1 Component Diagram

┌─────────────────────────────────────────────────────────────────────────────────┐
│                            MBPanel Architecture                                  │
├─────────────────────────────────────────────────────────────────────────────────┤
│                                                                                  │
│  ┌───────────────────────────────────────────────────────────────────────────┐ │
│  │                           API Layer (FastAPI)                               │ │
│  │  ┌─────────────────────────────────────────────────────────────────────┐   │ │
│  │  │  app/api/v1/environments.py                                         │   │ │
│  │  │  - GET /api/v1/environments  → list_environments()                 │   │ │
│  │  │  - POST /api/v1/environments/sync → sync_team_environments()        │   │ │
│  │  └─────────────────────────────────────────────────────────────────────┘   │ │
│  └─────────────────────────────────────────────────────────────────────────────┘ │
│                                         │                                         │
│                                         ▼                                         │
│  ┌─────────────────────────────────────────────────────────────────────────────┐ │
│  │                        Domain Layer (Service + Repository)                   │ │
│  │  ┌─────────────────────────────────────────────────────────────────────┐   │ │
│  │  │  app/domains/environments/                                          │   │ │
│  │  │  ├── service.py     - Business logic, sync orchestration            │   │ │
│  │  │  ├── repository.py  - Data access (SQLAlchemy queries)              │   │ │
│  │  │  ├── tasks.py       - Celery task definitions                       │   │ │
│  │  │  ├── router.py      - FastAPI route definitions                     │   │ │
│  │  │  └── schemas.py     - Pydantic models for validation                │   │ │
│  │  └─────────────────────────────────────────────────────────────────────┘   │ │
│  └─────────────────────────────────────────────────────────────────────────────┘ │
│                                         │                                         │
│              ┌──────────────────────────┼──────────────────────────┐              │
│              ▼                          ▼                          ▼              │
│  ┌─────────────────────┐   ┌─────────────────────┐   ┌─────────────────────┐    │
│  │   PostgreSQL        │   │   RabbitMQ          │   │   Virtuozzo API     │    │
│  │   (Cache)           │   │   (Queue Broker)    │   │   (External)        │    │
│  │                     │   │                     │   │                     │    │
│  │  - environments     │   │  - jobs.env.create  │   │  - GetEnvs          │    │
│  │  - environment_nodes│   │  - jobs.env.sync    │   │  - GetAccount       │    │
│  │  - teams            │   │  - events.topic     │   │                     │    │
│  └─────────────────────┘   └─────────────────────┘   └─────────────────────┘    │
│                                                                                  │
│  ┌─────────────────────────────────────────────────────────────────────────────┐ │
│  │                        Background Workers (Celery)                            │ │
│  │  ┌─────────────────────────────────────────────────────────────────────┐   │ │
│  │  │  app/domains/environments/tasks.py                                   │   │ │
│  │  │  - create_environment_task  (existing)                               │   │ │
│  │  │  - sync_environments_task    (to be added)                           │   │ │
│  │  └─────────────────────────────────────────────────────────────────────┘   │ │
│  └─────────────────────────────────────────────────────────────────────────────┘ │
│                                                                                  │
└─────────────────────────────────────────────────────────────────────────────────┘

1.2 Design Principles

Evidence from existing codebase:

  1. Service Layer Pattern (service.py:41-45)
  2. Business logic in service functions
  3. Dependency injection via Depends()
  4. Example: list_environments(db: AsyncSession, current_user: AuthenticatedUser)

  5. Repository Pattern (repository.py:12-20)

  6. Data access abstraction
  7. Async SQLAlchemy queries
  8. Example: list_team_environments(db: AsyncSession, team_id: int)

  9. Celery Task Pattern (tasks.py:14-18)

  10. Tasks defined in tasks.py
  11. asyncio.run() for async service calls
  12. Late ack for at-least-once delivery

  13. API Router Pattern (router.py:11-17)

  14. Thin HTTP layer
  15. Delegates to service layer
  16. Returns structured dictionaries

2. Database Layer Implementation

2.1 Schema Changes

File to create: backend/alembic/versions/005_add_environment_sync_tracking.py

Evidence from existing migration: 004_add_environment_fields.py:18-23

"""add environment sync tracking

Revision ID: 005_add_environment_sync_tracking
Revises: 004_add_environment_fields
Create Date: 2026-01-22

"""
from alembic import op
import sqlalchemy as sa

# revision identifiers, used by Alembic.
revision = '005_add_environment_sync_tracking'
down_revision = '004_add_environment_fields'
branch_labels = None
depends_on = None


def upgrade() -> None:
    # Add sync tracking columns to environments table
    # Evidence: environment.py:45-80 shows existing Environment model structure
    op.add_column('environments', sa.Column('last_synced_at', sa.DateTime(timezone=True), nullable=True))
    op.add_column('environments', sa.Column('last_sync_hash', sa.String(64), nullable=True))
    op.add_column('environments', sa.Column('sync_status', sa.String(20), nullable=True))
    op.add_column('environments', sa.Column('sync_error_message', sa.Text(), nullable=True))
    op.add_column('environments', sa.Column('api_calls_count', sa.Integer(), nullable=True, server_default='0'))
    op.add_column('environments', sa.Column('last_sync_duration_ms', sa.Integer(), nullable=True))

    # Create indexes for freshness queries
    # Evidence: environment.py:60 shows team_id already indexed
    op.create_index('idx_environments_team_sync', 'environments', ['team_id', 'last_synced_at'])
    op.create_index('idx_environments_sync_status', 'environments', ['sync_status'])


def downgrade() -> None:
    # Drop indexes first
    op.drop_index('idx_environments_sync_status', table_name='environments')
    op.drop_index('idx_environments_team_sync', table_name='environments')

    # Drop columns (reverse order)
    op.drop_column('environments', 'last_sync_duration_ms')
    op.drop_column('environments', 'api_calls_count')
    op.drop_column('environments', 'sync_error_message')
    op.drop_column('environments', 'sync_status')
    op.drop_column('environments', 'last_sync_hash')
    op.drop_column('environments', 'last_synced_at')

2.2 Update SQLAlchemy Model

File to modify: backend/app/infrastructure/database/models/environment.py

Evidence: Current model at environment.py:45-80

# Add to Environment class (after line 76, before nodes relationship)

# Sync tracking columns
last_synced_at: Mapped[datetime | None] = mapped_column(DateTime(timezone=True), nullable=True, index=True)
last_sync_hash: Mapped[str | None] = mapped_column(String(64), nullable=True)
sync_status: Mapped[str | None] = mapped_column(String(20), nullable=True, index=True)
sync_error_message: Mapped[str | None] = mapped_column(Text, nullable=True)
api_calls_count: Mapped[int] = mapped_column(Integer, nullable=False, default=0, server_default='0')
last_sync_duration_ms: Mapped[int | None] = mapped_column(Integer, nullable=True)

3. Configuration Layer Implementation

3.1 Add Configuration Variables

File to modify: backend/app/core/config.py

Evidence from existing config: config.py:31-56 shows Virtuozzo and RabbitMQ configuration pattern

# Add to Settings class (after line 56, in Virtuozzo section)

# Environment sync configuration
vz_env_ttl_seconds: int = 600  # 10 minutes - Data freshness window
vz_env_max_stale_seconds: int = 3600  # 1 hour - Show warning after this
vz_sync_queue_rate_limit: int = 50  # Requests per second max
vz_sync_timeout_seconds: int = 5  # Virtuozzo API timeout for sync
vz_sync_retry_attempts: int = 3  # Max retry attempts with exponential backoff
vz_sync_worker_concurrency: int = 10  # Number of worker processes

3.2 Add Celery Queue Configuration

File to modify: backend/app/infrastructure/background/celery_app.py

Evidence from existing config: celery_app.py:25-32 shows queue definition pattern

# Add to task_queues tuple (after line 32)
celery_app.conf.task_queues = (
    Queue(
        settings.rabbitmq_jobs_queue,
        Exchange(settings.rabbitmq_jobs_exchange, type="direct", durable=True),
        routing_key=settings.rabbitmq_jobs_routing_key,
        durable=True,
    ),
    # NEW: Sync queue
    Queue(
        "jobs.env.sync",
        Exchange(settings.rabbitmq_jobs_exchange, type="direct", durable=True),
        routing_key="jobs.env.sync",
        durable=True,
    ),
)

4. Service Layer Implementation

4.1 Core Service Functions

File to modify: backend/app/domains/environments/service.py

Evidence from existing service: service.py:41-45 shows list_environments pattern

4.1.1 Hash Computation Function

Add after imports (around line 39):

import hashlib
import json
from datetime import timedelta

# Add after line 39 (before _JOB_NAME constant)


def _compute_environment_hash(environments: list[dict[str, Any]]) -> str:
    """
    Compute SHA-256 hash for environment data change detection.

    Evidence: PRD section 6.2 specifies hash-based change detection.
    This prevents unnecessary database writes when data hasn't changed.
    """
    normalized = json.dumps(environments, sort_keys=True)
    return hashlib.sha256(normalized.encode()).hexdigest()


def _get_freshness_threshold() -> datetime:
    """
    Get the TTL threshold for data freshness.

    Evidence: config.py will have vz_env_ttl_seconds (default 600).
    Using DB time avoids clock skew issues (PRD section 6.1, TEST-CLOCK-001).
    """
    return datetime.now(timezone.utc) - timedelta(seconds=settings.vz_env_ttl_seconds)


def _get_max_stale_threshold() -> datetime:
    """
    Get the maximum staleness threshold for warnings.

    Evidence: config.py will have vz_env_max_stale_seconds (default 3600).
    """
    return datetime.now(timezone.utc) - timedelta(seconds=settings.vz_env_max_stale_seconds)

4.1.2 Sync Or Fetch Cached Function

Add after list_environments function (around line 46):

async def get_environments_with_metadata(
    *,
    db: AsyncSession,
    current_user: AuthenticatedUser,
) -> dict[str, Any]:
    """
    Get environments with sync metadata (stale-while-revalidate pattern).

    This implements the core stale-while-revalidate logic:
    1. Always return immediately with data from database
    2. If data is stale (> TTL), enqueue background refresh
    3. Return metadata about data freshness

    Evidence: PRD section 4.1 specifies the happy path flow.
    Evidence: service.py:565-587 shows existing _calculate_sync_status pattern.

    Args:
        db: Database session
        current_user: Authenticated user with team context

    Returns:
        Dictionary with items, meta object, and sync status

    Raises:
        HTTPException 403: If missing team context
        HTTPException 404: If no data exists and Virtuozzo call fails
    """
    if current_user.team_id is None:
        raise HTTPException(status_code=status.HTTP_403_FORBIDDEN, detail="Missing team context.")

    team_id = int(current_user.team_id)
    rows = await repository.list_team_environments(db=db, team_id=team_id)

    # Check if we have any data
    has_data = len(rows) > 0

    # Get the most recent update timestamp
    # Evidence: environment.py:12 shows TimestampMixin provides updated_at
    most_recent_update = None
    if has_data:
        most_recent_update = max((env.updated_at for env in rows if env.updated_at), default=None)

    # Determine freshness
    now = datetime.now(timezone.utc)
    freshness_threshold = _get_freshness_threshold()
    max_stale_threshold = _get_max_stale_threshold()

    is_fresh = most_recent_update and most_recent_update > freshness_threshold
    is_stale = most_recent_update and most_recent_update <= freshness_threshold
    is_max_stale = most_recent_update and most_recent_update <= max_stale_threshold

    # Serialize environments
    items = [_serialize_environment(env) for env in rows]

    # Build metadata
    meta = {
        "last_synced_at": most_recent_update.isoformat() if most_recent_update else None,
        "is_stale": bool(is_stale),
        "stale_warning": "Data is over 1 hour old. May not reflect current state." if is_max_stale else None,
        "sync_in_progress": False,  # TODO: Check Redis for in-progress sync flag
    }

    # If stale and not first run, enqueue background sync
    # Evidence: PRD section 4.1 specifies fire-and-forget enqueue
    if is_stale and has_data:
        await _enqueue_sync_task(team_id=team_id, triggered_by="stale_data")
        logger.info(
            "env_sync_stale_enqueued",
            team_id=team_id,
            data_age_seconds=(now - most_recent_update).total_seconds() if most_recent_update else None,
        )

    return {
        "items": items,
        "meta": meta,
    }


async def _enqueue_sync_task(*, team_id: int, triggered_by: str) -> None:
    """
    Enqueue a background sync task for the team.

    Evidence: infrastructure/messaging/events.py:41-93 shows publish_event pattern.
    This uses RabbitMQ to queue the sync task.

    Args:
        team_id: Team ID to sync
        triggered_by: Reason for sync (stale_data, manual, etc.)
    """
    try:
        await publish_event(
            routing_key=f"jobs.env.sync.{team_id}",
            payload={
                "task_type": "sync_environments",
                "team_id": team_id,
                "triggered_by": triggered_by,
                "enqueued_at": datetime.now(timezone.utc).isoformat(),
            },
            exchange_name=settings.rabbitmq_jobs_exchange,
        )
        logger.info("env_sync_task_enqueued", team_id=team_id, triggered_by=triggered_by)
    except Exception as exc:
        # Don't fail request if queue is unavailable
        logger.error("env_sync_enqueue_failed", team_id=team_id, error=str(exc), exc_info=True)

4.1.3 Update sync_team_environments

Modify existing function (service.py:594-766)

Evidence: Current sync function at service.py:594-766

Add hash-based change detection and update sync tracking columns:

# In sync_team_environments function, modify the upsert logic
# Find the section around line 674 (existing_env = result.scalar_one_or_none())
# Add hash tracking:

# After line 674, add:
env_hash = _compute_environment_hash(info)

# Check if environment hash has changed
old_hash = existing_env.last_sync_hash if existing_env else None
if old_hash and old_hash == env_hash:
    logger.info("env_sync_unchanged", team_id=team_id, shortdomain=shortdomain)
    continue  # Skip unchanged environment

# Update the environment with sync tracking
# In the existing update section (around line 694-748), add:
existing_env.last_synced_at = now
existing_env.last_sync_hash = env_hash
existing_env.sync_status = "ok"
existing_env.sync_error_message = None
existing_env.api_calls_count = (existing_env.api_calls_count or 0) + 1

4.2 Repository Layer Updates

File to modify: backend/app/domains/environments/repository.py

Evidence: Existing repository at repository.py:12-20

Add new query functions:

# Add after list_team_environments (around line 21)


async def get_team_sync_status(
    *, db: AsyncSession, team_id: int
) -> dict[str, Any] | None:
    """
    Get sync status for a team's environments.

    Returns the most recent sync information across all environments.

    Evidence: environment.py:45-80 shows sync tracking columns.
    """
    stmt: Select[Environment] = (
        select(Environment)
        .where(Environment.team_id == team_id)
        .order_by(Environment.last_synced_at.desc())
        .limit(1)
    )
    result = await db.execute(stmt)
    latest_env = result.scalar_one_or_none()

    if not latest_env or not latest_env.last_synced_at:
        return None

    return {
        "last_synced_at": latest_env.last_synced_at,
        "last_sync_hash": latest_env.last_sync_hash,
        "sync_status": latest_env.sync_status,
        "sync_error_message": latest_env.sync_error_message,
        "api_calls_count": latest_env.api_calls_count,
        "last_sync_duration_ms": latest_env.last_sync_duration_ms,
    }

5. Background Worker Implementation

5.1 Celery Task

File to modify: backend/app/domains/environments/tasks.py

Evidence: Existing task pattern at tasks.py:14-18

Add the sync worker task:

# Add to tasks.py (after create_environment_task)

@celery_app.task(
    name="domains.environments.sync_environments",
    bind=True,
    acks_late=True,
    autoretry_for=(Exception,),
    retry_backoff=True,
    retry_kwargs={'max_retries': 3},
    retry_jitter=True,
)
def sync_environments_task(self, payload: dict[str, Any]) -> None:  # type: ignore[override]
    """
    Background worker for environment synchronization.

    Implements the rate-limited, hash-based sync logic from PRD section 4.4.

    Evidence: tasks.py:14-18 shows the existing task pattern.
    Evidence: service.py:594-766 shows sync_team_environments implementation.

    Args:
        payload: Dict with team_id, triggered_by, enqueued_at

    The worker:
    1. Checks rate limiting (semaphore)
    2. Fetches from Virtuozzo with timeout
    3. Validates response schema
    4. Compares hash for change detection
    5. Updates database atomically
    6. Logs metrics and errors
    """
    team_id = payload.get("team_id")
    triggered_by = payload.get("triggered_by", "manual")

    logger.info(
        "env_sync_worker_started",
        team_id=team_id,
        triggered_by=triggered_by,
        task_id=self.request.id,
    )

    asyncio.run(_execute_sync_worker(team_id=team_id, triggered_by=triggered_by))


async def _execute_sync_worker(*, team_id: int, triggered_by: str) -> None:
    """
    Execute the sync worker logic asynchronously.

    Evidence: tasks.py:17 shows asyncio.run() pattern for async service calls.
    """
    from app.infrastructure.database.session import AsyncSessionLocal
    from app.core.identity.context import set_request_context
    from app.domains.environments import service as env_service
    from app.infrastructure.external.virtuozzo.service import fetch_environments_and_nodes
    from app.core.crypto import decrypt_str
    from app.infrastructure.database.models.team import Team

    async with AsyncSessionLocal() as db:
        try:
            # Get team with session key
            # Evidence: service.py:605-613 shows team fetch pattern
            team = await db.get(Team, team_id)
            if not team or not team.session_key_encrypted:
                logger.error("env_sync_team_not_found", team_id=team_id)
                return

            # Decrypt session key
            # Evidence: service.py:616-621 shows decryption pattern
            from app.core.crypto import CryptoError
            try:
                session_key = decrypt_str(team.session_key_encrypted)
            except CryptoError as exc:
                logger.error("env_sync_decrypt_failed", team_id=team_id, error=str(exc))
                return

            # Fetch from Virtuozzo with timeout
            # Evidence: service.py:627-636 shows fetch pattern
            import time
            start_time = time.time()

            try:
                vz_data = await fetch_environments_and_nodes(
                    session_key=session_key,
                    lazy=False,
                    owner_uid=int(team.vz_uid) if team.vz_uid else None,
                )
            except Exception as exc:
                logger.error(
                    "env_sync_virtuozzo_failed",
                    team_id=team_id,
                    error=str(exc),
                    error_type=type(exc).__name__,
                )
                # Don't overwrite DB with error - preserve stale data
                # Evidence: PRD section 6.1 specifies preserving stale on error
                return

            duration_ms = int((time.time() - start_time) * 1000)

            # Validate response
            # Evidence: PRD section 6.2 specifies schema validation
            if vz_data.get("result") != 0:
                logger.error(
                    "env_sync_virtuozzo_error",
                    team_id=team_id,
                    result_code=vz_data.get("result"),
                )
                return

            infos = vz_data.get("infos", [])
            if not isinstance(infos, list):
                logger.error("env_sync_invalid_response", team_id=team_id)
                return

            # Compute hash for change detection
            # Evidence: PRD section 6.2 specifies hash-based detection
            env_hash = env_service._compute_environment_hash(infos)

            # Check if hash changed
            # TODO: Store hash in Team table for comparison
            # For now, proceed with full sync

            # Import sync_team_environments to reuse logic
            # Evidence: service.py:594-766 has full sync implementation
            # We need to call it with proper context

            # Set request context for logging
            # Evidence: identity/context.py shows context pattern
            # set_request_context(team_id=team_id, user_id=None)

            # Perform the sync (reusing existing sync logic)
            # Note: This is a simplified version - full implementation would
            # extract the sync logic into a reusable function

            logger.info(
                "env_sync_worker_complete",
                team_id=team_id,
                duration_ms=duration_ms,
                env_count=len(infos),
            )

        except Exception as exc:
            logger.error(
                "env_sync_worker_failed",
                team_id=team_id,
                error=str(exc),
                exc_info=True,
            )

5.2 Rate Limiting Implementation

Create new file: backend/app/domains/environments/rate_limiter.py

Evidence: app/core/rate_limit.py:11-26 shows existing rate limiter pattern

"""
Rate limiting for Virtuozzo API sync operations.

Evidence: PRD section 4.4 specifies rate limiting at 50 req/sec.
Evidence: core/rate_limit.py shows Redis-based rate limiter pattern.
"""

import asyncio
from typing import Optional

from app.core.logging import get_logger

logger = get_logger(__name__)


class VirtuozzoRateLimiter:
    """
    Semaphore-based rate limiter for Virtuozzo API calls.

    Uses asyncio.Semaphore to limit concurrent API calls to Virtuozzo.
    This protects the Virtuozzo API from traffic spikes during mass syncs.

    Evidence: PRD section 4.4 specifies 50 req/sec rate limit.

    Args:
        max_concurrent: Maximum concurrent API calls (default: 50)
    """

    def __init__(self, max_concurrent: int = 50) -> None:
        self._semaphore = asyncio.Semaphore(max_concurrent)
        self._max_concurrent = max_concurrent

    async def __aenter__(self) -> "VirtuozzoRateLimiter":
        """Acquire semaphore before API call."""
        await self._semaphore.acquire()
        return self

    async def __aexit__(self, *args) -> None:
        """Release semaphore after API call."""
        self._semaphore.release()

    @property
    def available_slots(self) -> int:
        """Return number of available concurrent slots."""
        return self._semaphore._value  # type: ignore


# Global rate limiter instance
_virtuozzo_limiter: Optional[VirtuozzoRateLimiter] = None


def get_virtuozzo_rate_limiter(max_concurrent: int = 50) -> VirtuozzoRateLimiter:
    """
    Get or create the global Virtuozzo rate limiter.

    Evidence: config.py has vz_sync_queue_rate_limit (default: 50)

    Args:
        max_concurrent: Maximum concurrent API calls

    Returns:
        VirtuozzoRateLimiter instance
    """
    global _virtuozzo_limiter
    if _virtuozzo_limiter is None:
        _virtuozzo_limiter = VirtuozzoRateLimiter(max_concurrent=max_concurrent)
    return _virtuozzo_limiter

6. API Layer Implementation

6.1 Update Router

File to modify: backend/app/domains/environments/router.py

Evidence: Existing router at router.py:11-26

Update the GET endpoint to include metadata:

from fastapi import APIRouter, Depends, Response
from sqlalchemy.ext.asyncio import AsyncSession

from app.core.dependencies import csrf_protected, db_session, get_current_user
from app.core.identity.context import AuthenticatedUser
from app.domains.environments.service import get_environments_with_metadata, sync_team_environments

router = APIRouter()


@router.get("/")
async def environments(
    response: Response,
    user: AuthenticatedUser = Depends(get_current_user),
    db: AsyncSession = Depends(db_session),
) -> dict[str, Any]:
    """
    Get environments with sync metadata.

    Implements stale-while-revalidate pattern:
    - Always returns immediately (never blocks)
    - Includes metadata about data freshness
    - Enqueues background sync if data is stale

    Evidence: PRD section 5.3 specifies API contract with meta object.
    Evidence: router.py:11-17 shows existing GET endpoint pattern.
    """
    result = await get_environments_with_metadata(db=db, current_user=user)

    # Add response headers
    # Evidence: PRD section 5.3 specifies X-Data-Stale, X-Data-Last-Sync headers
    meta = result.get("meta", {})

    if meta.get("is_stale"):
        response.headers["X-Data-Stale"] = "true"
    else:
        response.headers["X-Data-Stale"] = "false"

    if meta.get("last_synced_at"):
        response.headers["X-Data-Last-Sync"] = meta["last_synced_at"]

    if meta.get("sync_in_progress"):
        response.headers["X-Sync-In-Progress"] = "true"

    return result


@router.post("/sync", dependencies=[Depends(csrf_protected)])
async def sync_environments(
    user: AuthenticatedUser = Depends(get_current_user),
    db: AsyncSession = Depends(db_session),
) -> dict[str, any]:
    """
    Manually trigger environment synchronization.

    This is a blocking operation that syncs immediately.
    For automatic background sync, use GET /environments with stale detection.

    Evidence: router.py:20-25 shows existing POST /sync endpoint.
    Evidence: service.py:594-766 shows sync_team_environments implementation.
    """
    return await sync_team_environments(db=db, current_user=user)

7. Frontend Implementation

7.1 Update TypeScript Types

File to modify: frontend/src/features/environments/types/environment.types.ts

Evidence: Existing types at environment.types.ts:1-68

Add response metadata type:

// Add after EnvironmentListResponse interface (around line 47)

export interface EnvironmentListMeta {
  last_synced_at: string | null;
  is_stale: boolean;
  stale_warning: string | null;
  sync_in_progress: boolean;
}

export interface EnvironmentListResponseWithMeta {
  items: Environment[];
  meta: EnvironmentListMeta;
}

7.2 Update API Client

File to modify: frontend/src/features/environments/services/environmentService.ts

Evidence: Existing service at environmentService.ts:16-19

Update the return type:

// Update getEnvironments function
import type { EnvironmentListResponseWithMeta } from '../types/environment.types';

export async function getEnvironments(): Promise<EnvironmentListResponseWithMeta> {
  const response = await apiClient.get<EnvironmentListResponseWithMeta>('/api/v1/environments');
  return response;
}

7.3 Update Hook

File to modify: frontend/src/features/environments/hooks/useEnvironments.ts

Evidence: Existing hook at useEnvironments.ts:20-106

Update to handle metadata:

import { useState, useEffect, useCallback } from 'react';
import { getEnvironments, triggerSync } from '../services/environmentService';
import type { Environment, EnvironmentListMeta } from '../types/environment.types';

interface UseEnvironmentsResult {
  environments: Environment[];
  loading: boolean;
  error: string | null;
  meta: EnvironmentListMeta | null;
  refetch: () => Promise<void>;
  syncAndRefetch: () => Promise<void>;
}

/**
 * Hook to fetch and manage environments for the current team
 *
 * Updated to support stale-while-revalidate pattern with metadata.
 */
export function useEnvironments(): UseEnvironmentsResult {
  const [environments, setEnvironments] = useState<Environment[]>([]);
  const [meta, setMeta] = useState<EnvironmentListMeta | null>(null);
  const [loading, setLoading] = useState(true);
  const [error, setError] = useState<string | null>(null);

  const fetchEnvironments = useCallback(async () => {
    try {
      setLoading(true);
      setError(null);
      const data = await getEnvironments();
      setEnvironments(data.items);
      setMeta(data.meta);
    } catch (err: any) {
      let message = 'Failed to load environments';

      if (err?.data?.detail) {
        message = err.data.detail;
      } else if (err?.message) {
        message = err.message;
      }

      if (err?.status) {
        message = `${message} (${err.status})`;
      }

      setError(message);
      console.error('Failed to fetch environments:', err);
    } finally {
      setLoading(false);
    }
  }, []);

  const syncAndRefetch = useCallback(async () => {
    try {
      setLoading(true);
      setError(null);

      const syncResult = await triggerSync();
      console.log('Sync completed:', syncResult);

      if (syncResult.deleted_count && syncResult.deleted_count > 0) {
        console.info(`${syncResult.deleted_count} deleted environment(s) removed from database`);
      }

      const data = await getEnvironments();
      setEnvironments(data.items);
      setMeta(data.meta);
    } catch (err: any) {
      let message = 'Failed to sync environments';

      if (err?.data?.detail) {
        message = err.data.detail;
      } else if (err?.message) {
        message = err.message;
      }

      if (err?.status) {
        message = `${message} (${err.status})`;
      }

      setError(message);
      console.error('Failed to sync environments:', err);
    } finally {
      setLoading(false);
    }
  }, []);

  useEffect(() => {
    fetchEnvironments();
  }, [fetchEnvironments]);

  return {
    environments,
    loading,
    error,
    meta,
    refetch: fetchEnvironments,
    syncAndRefetch,
  };
}

7.4 Update Sites Page

File to modify: frontend/src/app/sites/page.tsx

Evidence: Existing page at sites/page.tsx:88-93

Update to use metadata:

// In SitesPage component (around line 88-93)
export default function SitesPage() {
  const [activeTab, setActiveTab] = useState<string>('all');
  const { environments, loading, error, refetch, syncAndRefetch, meta } = useEnvironments();

  // ... rest of component

  // Display stale warning if needed
  const showStaleWarning = meta?.is_stale || meta?.stale_warning;

  return (
    <div className="min-h-screen bg-slate-50 dark:bg-slate-900">
      <div className="max-w-7xl mx-auto px-4 md:px-6 lg:px-8 py-6">
        {/* Header with Refresh Button and Stale Warning */}
        <div className="mb-4 flex items-center justify-between">
          <div>
            <h1 className="text-2xl font-semibold text-gray-900 dark:text-white">
              Sites ({environments.length})
            </h1>
            {showStaleWarning && (
              <p className="text-sm text-yellow-600 dark:text-yellow-400 mt-1">
                {meta?.stale_warning || 'Data may be outdated. Refreshing in background...'}
              </p>
            )}
            {meta?.last_synced_at && (
              <p className="text-xs text-gray-500 dark:text-gray-400 mt-1">
                Last synced: {new Date(meta.last_synced_at).toLocaleString()}
              </p>
            )}
          </div>
          <button
            onClick={syncAndRefetch}
            disabled={loading}
            className="inline-flex items-center px-4 py-2 border border-gray-300 dark:border-gray-600 shadow-sm text-sm font-medium rounded-md text-gray-700 dark:text-gray-200 bg-white dark:bg-gray-800 hover:bg-gray-50 dark:hover:bg-gray-700 focus:outline-none focus:ring-2 focus:ring-offset-2 focus:ring-blue-500 disabled:opacity-50 disabled:cursor-not-allowed"
          >
            <RefreshCw className={`w-4 h-4 mr-2 ${loading ? 'animate-spin' : ''}`} />
            Refresh
          </button>
        </div>
        {/* ... rest of component */}
      </div>
    </div>
  );
}

8. Testing Implementation

8.1 Unit Tests

Create new file: backend/tests/unit/domains/environments/test_sync_service.py

Evidence: Existing test structure in backend/tests/unit/

"""
Unit tests for environment sync service.

Evidence: PRD section 8 specifies testing requirements.
Evidence: service.py:41-766 shows the functions being tested.
"""

import hashlib
import json
from datetime import datetime, timezone, timedelta
from unittest.mock import AsyncMock, Mock, patch

import pytest
from sqlalchemy.ext.asyncio import AsyncSession

from app.domains.environments.service import (
    _compute_environment_hash,
    _get_freshness_threshold,
    get_environments_with_metadata,
)
from app.core.identity.context import AuthenticatedUser


class TestComputeEnvironmentHash:
    """Test hash computation for change detection."""

    def test_hash_is_consistent_for_same_data(self):
        """Same data should produce same hash."""
        data = [{"env_name": "staging", "status": "running"}]
        hash1 = _compute_environment_hash(data)
        hash2 = _compute_environment_hash(data)
        assert hash1 == hash2
        assert len(hash1) == 64  # SHA-256 hex length

    def test_hash_changes_for_different_data(self):
        """Different data should produce different hash."""
        data1 = [{"env_name": "staging", "status": "running"}]
        data2 = [{"env_name": "staging", "status": "stopped"}]
        hash1 = _compute_environment_hash(data1)
        hash2 = _compute_environment_hash(data2)
        assert hash1 != hash2

    def test_hash_is_order_independent(self):
        """Hash should be independent of key order."""
        data1 = [{"status": "running", "env_name": "staging"}]
        data2 = [{"env_name": "staging", "status": "running"}]
        hash1 = _compute_environment_hash(data1)
        hash2 = _compute_environment_hash(data2)
        assert hash1 == hash2  # sort_keys=True makes this consistent


class TestGetEnvironmentsWithMetadata:
    """Test stale-while-revalidate logic."""

    @pytest.fixture
    def mock_user(self):
        """Mock authenticated user."""
        user = Mock(spec=AuthenticatedUser)
        user.team_id = 123
        user.user_id = 456
        return user

    @pytest.fixture
    def mock_db(self):
        """Mock database session."""
        return Mock(spec=AsyncSession)

    @pytest.mark.asyncio
    async def test_returns_403_without_team_context(self, mock_db):
        """Should raise 403 if team_id is None."""
        user = Mock(spec=AuthenticatedUser)
        user.team_id = None

        with pytest.raises(Exception) as exc_info:
            await get_environments_with_metadata(db=mock_db, current_user=user)

        assert "Missing team context" in str(exc_info.value)

    @pytest.mark.asyncio
    async def test_fresh_data_returns_immediately(self, mock_db, mock_user):
        """Fresh data should return immediately without enqueueing."""
        # Mock fresh environments (updated 5 minutes ago)
        now = datetime.now(timezone.utc)
        fresh_env = Mock()
        fresh_env.updated_at = now - timedelta(minutes=5)

        with patch('app.domains.environments.service.repository.list_team_environments') as mock_list:
            with patch('app.domains.environments.service._serialize_environment') as mock_serialize:
                mock_list.return_value = [fresh_env]
                mock_serialize.return_value = {"env_name": "staging"}

                with patch('app.domains.environments.service._enqueue_sync_task') as mock_enqueue:
                    result = await get_environments_with_metadata(db=mock_db, current_user=mock_user)

                    assert "items" in result
                    assert "meta" in result
                    assert result["meta"]["is_stale"] is False
                    # Should NOT enqueue for fresh data
                    mock_enqueue.assert_not_called()

    @pytest.mark.asyncio
    async def test_stale_data_enqueues_sync(self, mock_db, mock_user):
        """Stale data should return immediately but enqueue background sync."""
        # Mock stale environments (updated 20 minutes ago)
        now = datetime.now(timezone.utc)
        stale_env = Mock()
        stale_env.updated_at = now - timedelta(minutes=20)

        with patch('app.domains.environments.service.repository.list_team_environments') as mock_list:
            with patch('app.domains.environments.service._serialize_environment') as mock_serialize:
                mock_list.return_value = [stale_env]
                mock_serialize.return_value = {"env_name": "staging"}

                with patch('app.domains.environments.service._enqueue_sync_task') as mock_enqueue:
                    result = await get_environments_with_metadata(db=mock_db, current_user=mock_user)

                    assert "items" in result
                    assert "meta" in result
                    assert result["meta"]["is_stale"] is True
                    # SHOULD enqueue for stale data
                    mock_enqueue.assert_called_once_with(team_id=123, triggered_by="stale_data")


class TestSadPathScenarios:
    """Sad path tests for error handling."""

    @pytest.mark.asyncio
    async def test_no_data_returns_empty_list_with_meta(self):
        """First run with no data should return empty but not error."""
        # TODO: Implement test for first-run scenario
        pass

    @pytest.mark.asyncio
    async def test_virtuozzo_timeout_preserves_stale_data(self):
        """Virtuozzo timeout should not overwrite database with error."""
        # TODO: Implement test for timeout scenario
        # Evidence: PRD section 6.1 TEST-TIMEOUT-001
        pass

    @pytest.mark.asyncio
    async def test_corrupted_payload_preserves_stale_data(self):
        """Invalid payload from Virtuozzo should be rejected."""
        # TODO: Implement test for schema validation
        # Evidence: PRD section 6.1 TEST-CORRUPT-001
        pass

8.2 Integration Tests

Create new file: backend/tests/integration/test_env_sync_e2e.py

"""
Integration tests for environment sync end-to-end flow.

Evidence: PRD section 8.3 specifies E2E test requirements.
"""

import pytest
from httpx import AsyncClient


class TestEnvironmentSyncE2E:
    """End-to-end tests for stale-while-revalidate flow."""

    @pytest.mark.asyncio
    async def test_stale_while_revalidate_flow(self, async_client: AsyncClient, auth_headers: dict):
        """
        Test complete stale-while-revalidate flow:
        1. Request returns stale data immediately
        2. Background task is enqueued
        3. Worker processes sync
        4. Next request returns fresh data

        Evidence: PRD section 8.3 test specification.
        """
        # Step 1: Initial request (should be stale or trigger sync)
        response = await async_client.get("/api/v1/environments", headers=auth_headers)
        assert response.status_code == 200

        data = response.json()
        assert "items" in data
        assert "meta" in data

        # Check response headers
        assert "X-Data-Stale" in response.headers
        assert "X-Data-Last-Sync" in response.headers

        # Step 2: If stale, wait for background sync
        if data["meta"]["is_stale"]:
            # In real test, would wait for worker or trigger manually
            pass

        # Step 3: Request again (should be fresh)
        response = await async_client.get("/api/v1/environments", headers=auth_headers)
        assert response.status_code == 200

        data = response.json()
        assert data["meta"]["is_stale"] is False

9. Observability Implementation

9.1 Metrics

File to modify: backend/app/domains/environments/service.py

Evidence: Existing logging at service.py:624, service.py:755

Add metrics emission:

# Add to sync_team_environments function

from app.core.telemetry import get_meter

_meter = get_meter("app.domains.environments")

# Create counters and histograms
_env_sync_requests_counter = _meter.create_counter(
    "env_sync_requests_total",
    description="Total number of environment sync requests",
)

_env_sync_duration_histogram = _meter.create_histogram(
    "env_sync_duration_seconds",
    description="Environment sync duration in seconds",
)

_env_data_age_histogram = _meter.create_histogram(
    "env_data_age_seconds",
    description="Age of environment data in seconds",
)

# In sync_team_environments, add metrics recording
def sync_team_environments(*, db: AsyncSession, current_user: AuthenticatedUser) -> dict[str, Any]:
    # ... existing code ...

    # Record sync request
    _env_sync_requests_counter.add(1, {"team_id": team_id, "triggered_by": "manual"})

    start_time = time.time()

    try:
        # ... sync logic ...

        # Record duration
        duration = time.time() - start_time
        _env_sync_duration_histogram.record(duration, {"team_id": team_id, "result": "success"})

    except Exception as exc:
        duration = time.time() - start_time
        _env_sync_duration_histogram.record(duration, {"team_id": team_id, "result": "error"})
        raise

9.2 Structured Logging

Evidence: Existing logging pattern at service.py:110-115, service.py:624-636

Update logging to include sync-specific fields:

# Add to sync functions

logger.info(
    "env_sync_started",
    team_id=team_id,
    triggered_by="stale_data",
    current_data_age_seconds=age.total_seconds() if age else None,
    task_id=task_id,
)

logger.info(
    "env_sync_complete",
    team_id=team_id,
    duration_seconds=duration,
    env_count=len(environments),
    data_changed=(new_hash != old_hash),
    api_calls=1,
)

logger.error(
    "env_sync_failed",
    team_id=team_id,
    error_type="timeout",
    error_message=str(e),
    retry_count=retry_count,
    will_retry=True,
)

10. Deployment Checklist

10.1 Pre-Deployment

  • Run database migration: alembic upgrade head
  • Verify new columns exist in environments table
  • Add config variables to .env files
  • Update RabbitMQ queue configuration
  • Deploy new Celery worker code
  • Restart Celery workers

10.2 Post-Deployment Verification

  • Verify GET /environments returns meta object
  • Verify response headers are present
  • Check Celery workers pick up sync tasks
  • Monitor sync metrics in observability dashboard
  • Verify stale data triggers background sync
  • Test manual sync still works

10.3 Rollback Plan

If issues occur: 1. Revert code deployment 2. Rollback database migration: alembic downgrade -1 3. Remove new config variables 4. Restart Celery workers with old code


11. References

Component File Line Reference
Environment Model backend/app/infrastructure/database/models/environment.py 45-80
Migration Pattern backend/alembic/versions/004_add_environment_fields.py 18-23
Service Layer backend/app/domains/environments/service.py 41-766
Repository Layer backend/app/domains/environments/repository.py 12-38
Celery Tasks backend/app/domains/environments/tasks.py 14-18
Celery Config backend/app/infrastructure/background/celery_app.py 8-38
API Router backend/app/domains/environments/router.py 11-26
Config Pattern backend/app/core/config.py 31-56
Rate Limiter backend/app/core/rate_limit.py 11-47
Logging backend/app/core/logging.py Full file
Virtuozzo Service backend/app/infrastructure/external/virtuozzo/service.py 97-150

12. Revision History

Version Date Author Changes
1.0 2026-01-22 MBPanel Team Initial implementation document based on validated codebase patterns

Appendix A: Configuration File Template

Add to env/backend-local.example:

# Virtuozzo Environment Sync Configuration
VZ_ENV_TTL_SECONDS=600
VZ_ENV_MAX_STALE_SECONDS=3600
VZ_SYNC_QUEUE_RATE_LIMIT=50
VZ_SYNC_TIMEOUT_SECONDS=5
VZ_SYNC_RETRY_ATTEMPTS=3
VZ_SYNC_WORKER_CONCURRENCY=10

Appendix B: SQL Queries for Verification

-- Verify new columns exist
SELECT column_name, data_type
FROM information_schema.columns
WHERE table_name = 'environments'
AND column_name IN ('last_synced_at', 'last_sync_hash', 'sync_status');

-- Verify indexes exist
SELECT indexname
FROM pg_indexes
WHERE tablename = 'environments';

-- Check sync status of environments
SELECT
    team_id,
    env_name,
    last_synced_at,
    sync_status,
    api_calls_count,
    updated_at
FROM environments
ORDER BY last_synced_at DESC NULLS LAST
LIMIT 10;