Implementation Document (ID)¶
Virtuozzo Environment Synchronization - Stale-While-Revalidate Architecture¶
Document Information¶
| Field | Details |
|---|---|
| Product Name | Virtuozzo Environment Synchronization |
| Author | MBPanel Team |
| Date Created | 2026-01-22 |
| Last Updated | 2026-01-22 |
| Version | 1.0 |
| Status | Draft |
| Source PRD | docs/virtuozzo-env-sync/PRD-virtuozzo-environment-sync.md |
1. Architecture Overview¶
1.1 Component Diagram¶
┌─────────────────────────────────────────────────────────────────────────────────┐
│ MBPanel Architecture │
├─────────────────────────────────────────────────────────────────────────────────┤
│ │
│ ┌───────────────────────────────────────────────────────────────────────────┐ │
│ │ API Layer (FastAPI) │ │
│ │ ┌─────────────────────────────────────────────────────────────────────┐ │ │
│ │ │ app/api/v1/environments.py │ │ │
│ │ │ - GET /api/v1/environments → list_environments() │ │ │
│ │ │ - POST /api/v1/environments/sync → sync_team_environments() │ │ │
│ │ └─────────────────────────────────────────────────────────────────────┘ │ │
│ └─────────────────────────────────────────────────────────────────────────────┘ │
│ │ │
│ ▼ │
│ ┌─────────────────────────────────────────────────────────────────────────────┐ │
│ │ Domain Layer (Service + Repository) │ │
│ │ ┌─────────────────────────────────────────────────────────────────────┐ │ │
│ │ │ app/domains/environments/ │ │ │
│ │ │ ├── service.py - Business logic, sync orchestration │ │ │
│ │ │ ├── repository.py - Data access (SQLAlchemy queries) │ │ │
│ │ │ ├── tasks.py - Celery task definitions │ │ │
│ │ │ ├── router.py - FastAPI route definitions │ │ │
│ │ │ └── schemas.py - Pydantic models for validation │ │ │
│ │ └─────────────────────────────────────────────────────────────────────┘ │ │
│ └─────────────────────────────────────────────────────────────────────────────┘ │
│ │ │
│ ┌──────────────────────────┼──────────────────────────┐ │
│ ▼ ▼ ▼ │
│ ┌─────────────────────┐ ┌─────────────────────┐ ┌─────────────────────┐ │
│ │ PostgreSQL │ │ RabbitMQ │ │ Virtuozzo API │ │
│ │ (Cache) │ │ (Queue Broker) │ │ (External) │ │
│ │ │ │ │ │ │ │
│ │ - environments │ │ - jobs.env.create │ │ - GetEnvs │ │
│ │ - environment_nodes│ │ - jobs.env.sync │ │ - GetAccount │ │
│ │ - teams │ │ - events.topic │ │ │ │
│ └─────────────────────┘ └─────────────────────┘ └─────────────────────┘ │
│ │
│ ┌─────────────────────────────────────────────────────────────────────────────┐ │
│ │ Background Workers (Celery) │ │
│ │ ┌─────────────────────────────────────────────────────────────────────┐ │ │
│ │ │ app/domains/environments/tasks.py │ │ │
│ │ │ - create_environment_task (existing) │ │ │
│ │ │ - sync_environments_task (to be added) │ │ │
│ │ └─────────────────────────────────────────────────────────────────────┘ │ │
│ └─────────────────────────────────────────────────────────────────────────────┘ │
│ │
└─────────────────────────────────────────────────────────────────────────────────┘
1.2 Design Principles¶
Evidence from existing codebase:
- Service Layer Pattern (
service.py:41-45) - Business logic in service functions
- Dependency injection via
Depends() -
Example:
list_environments(db: AsyncSession, current_user: AuthenticatedUser) -
Repository Pattern (
repository.py:12-20) - Data access abstraction
- Async SQLAlchemy queries
-
Example:
list_team_environments(db: AsyncSession, team_id: int) -
Celery Task Pattern (
tasks.py:14-18) - Tasks defined in
tasks.py asyncio.run()for async service calls-
Late ack for at-least-once delivery
-
API Router Pattern (
router.py:11-17) - Thin HTTP layer
- Delegates to service layer
- Returns structured dictionaries
2. Database Layer Implementation¶
2.1 Schema Changes¶
File to create: backend/alembic/versions/005_add_environment_sync_tracking.py
Evidence from existing migration: 004_add_environment_fields.py:18-23
"""add environment sync tracking
Revision ID: 005_add_environment_sync_tracking
Revises: 004_add_environment_fields
Create Date: 2026-01-22
"""
from alembic import op
import sqlalchemy as sa
# revision identifiers, used by Alembic.
revision = '005_add_environment_sync_tracking'
down_revision = '004_add_environment_fields'
branch_labels = None
depends_on = None
def upgrade() -> None:
# Add sync tracking columns to environments table
# Evidence: environment.py:45-80 shows existing Environment model structure
op.add_column('environments', sa.Column('last_synced_at', sa.DateTime(timezone=True), nullable=True))
op.add_column('environments', sa.Column('last_sync_hash', sa.String(64), nullable=True))
op.add_column('environments', sa.Column('sync_status', sa.String(20), nullable=True))
op.add_column('environments', sa.Column('sync_error_message', sa.Text(), nullable=True))
op.add_column('environments', sa.Column('api_calls_count', sa.Integer(), nullable=True, server_default='0'))
op.add_column('environments', sa.Column('last_sync_duration_ms', sa.Integer(), nullable=True))
# Create indexes for freshness queries
# Evidence: environment.py:60 shows team_id already indexed
op.create_index('idx_environments_team_sync', 'environments', ['team_id', 'last_synced_at'])
op.create_index('idx_environments_sync_status', 'environments', ['sync_status'])
def downgrade() -> None:
# Drop indexes first
op.drop_index('idx_environments_sync_status', table_name='environments')
op.drop_index('idx_environments_team_sync', table_name='environments')
# Drop columns (reverse order)
op.drop_column('environments', 'last_sync_duration_ms')
op.drop_column('environments', 'api_calls_count')
op.drop_column('environments', 'sync_error_message')
op.drop_column('environments', 'sync_status')
op.drop_column('environments', 'last_sync_hash')
op.drop_column('environments', 'last_synced_at')
2.2 Update SQLAlchemy Model¶
File to modify: backend/app/infrastructure/database/models/environment.py
Evidence: Current model at environment.py:45-80
# Add to Environment class (after line 76, before nodes relationship)
# Sync tracking columns
last_synced_at: Mapped[datetime | None] = mapped_column(DateTime(timezone=True), nullable=True, index=True)
last_sync_hash: Mapped[str | None] = mapped_column(String(64), nullable=True)
sync_status: Mapped[str | None] = mapped_column(String(20), nullable=True, index=True)
sync_error_message: Mapped[str | None] = mapped_column(Text, nullable=True)
api_calls_count: Mapped[int] = mapped_column(Integer, nullable=False, default=0, server_default='0')
last_sync_duration_ms: Mapped[int | None] = mapped_column(Integer, nullable=True)
3. Configuration Layer Implementation¶
3.1 Add Configuration Variables¶
File to modify: backend/app/core/config.py
Evidence from existing config: config.py:31-56 shows Virtuozzo and RabbitMQ configuration pattern
# Add to Settings class (after line 56, in Virtuozzo section)
# Environment sync configuration
vz_env_ttl_seconds: int = 600 # 10 minutes - Data freshness window
vz_env_max_stale_seconds: int = 3600 # 1 hour - Show warning after this
vz_sync_queue_rate_limit: int = 50 # Requests per second max
vz_sync_timeout_seconds: int = 5 # Virtuozzo API timeout for sync
vz_sync_retry_attempts: int = 3 # Max retry attempts with exponential backoff
vz_sync_worker_concurrency: int = 10 # Number of worker processes
3.2 Add Celery Queue Configuration¶
File to modify: backend/app/infrastructure/background/celery_app.py
Evidence from existing config: celery_app.py:25-32 shows queue definition pattern
# Add to task_queues tuple (after line 32)
celery_app.conf.task_queues = (
Queue(
settings.rabbitmq_jobs_queue,
Exchange(settings.rabbitmq_jobs_exchange, type="direct", durable=True),
routing_key=settings.rabbitmq_jobs_routing_key,
durable=True,
),
# NEW: Sync queue
Queue(
"jobs.env.sync",
Exchange(settings.rabbitmq_jobs_exchange, type="direct", durable=True),
routing_key="jobs.env.sync",
durable=True,
),
)
4. Service Layer Implementation¶
4.1 Core Service Functions¶
File to modify: backend/app/domains/environments/service.py
Evidence from existing service: service.py:41-45 shows list_environments pattern
4.1.1 Hash Computation Function¶
Add after imports (around line 39):
import hashlib
import json
from datetime import timedelta
# Add after line 39 (before _JOB_NAME constant)
def _compute_environment_hash(environments: list[dict[str, Any]]) -> str:
"""
Compute SHA-256 hash for environment data change detection.
Evidence: PRD section 6.2 specifies hash-based change detection.
This prevents unnecessary database writes when data hasn't changed.
"""
normalized = json.dumps(environments, sort_keys=True)
return hashlib.sha256(normalized.encode()).hexdigest()
def _get_freshness_threshold() -> datetime:
"""
Get the TTL threshold for data freshness.
Evidence: config.py will have vz_env_ttl_seconds (default 600).
Using DB time avoids clock skew issues (PRD section 6.1, TEST-CLOCK-001).
"""
return datetime.now(timezone.utc) - timedelta(seconds=settings.vz_env_ttl_seconds)
def _get_max_stale_threshold() -> datetime:
"""
Get the maximum staleness threshold for warnings.
Evidence: config.py will have vz_env_max_stale_seconds (default 3600).
"""
return datetime.now(timezone.utc) - timedelta(seconds=settings.vz_env_max_stale_seconds)
4.1.2 Sync Or Fetch Cached Function¶
Add after list_environments function (around line 46):
async def get_environments_with_metadata(
*,
db: AsyncSession,
current_user: AuthenticatedUser,
) -> dict[str, Any]:
"""
Get environments with sync metadata (stale-while-revalidate pattern).
This implements the core stale-while-revalidate logic:
1. Always return immediately with data from database
2. If data is stale (> TTL), enqueue background refresh
3. Return metadata about data freshness
Evidence: PRD section 4.1 specifies the happy path flow.
Evidence: service.py:565-587 shows existing _calculate_sync_status pattern.
Args:
db: Database session
current_user: Authenticated user with team context
Returns:
Dictionary with items, meta object, and sync status
Raises:
HTTPException 403: If missing team context
HTTPException 404: If no data exists and Virtuozzo call fails
"""
if current_user.team_id is None:
raise HTTPException(status_code=status.HTTP_403_FORBIDDEN, detail="Missing team context.")
team_id = int(current_user.team_id)
rows = await repository.list_team_environments(db=db, team_id=team_id)
# Check if we have any data
has_data = len(rows) > 0
# Get the most recent update timestamp
# Evidence: environment.py:12 shows TimestampMixin provides updated_at
most_recent_update = None
if has_data:
most_recent_update = max((env.updated_at for env in rows if env.updated_at), default=None)
# Determine freshness
now = datetime.now(timezone.utc)
freshness_threshold = _get_freshness_threshold()
max_stale_threshold = _get_max_stale_threshold()
is_fresh = most_recent_update and most_recent_update > freshness_threshold
is_stale = most_recent_update and most_recent_update <= freshness_threshold
is_max_stale = most_recent_update and most_recent_update <= max_stale_threshold
# Serialize environments
items = [_serialize_environment(env) for env in rows]
# Build metadata
meta = {
"last_synced_at": most_recent_update.isoformat() if most_recent_update else None,
"is_stale": bool(is_stale),
"stale_warning": "Data is over 1 hour old. May not reflect current state." if is_max_stale else None,
"sync_in_progress": False, # TODO: Check Redis for in-progress sync flag
}
# If stale and not first run, enqueue background sync
# Evidence: PRD section 4.1 specifies fire-and-forget enqueue
if is_stale and has_data:
await _enqueue_sync_task(team_id=team_id, triggered_by="stale_data")
logger.info(
"env_sync_stale_enqueued",
team_id=team_id,
data_age_seconds=(now - most_recent_update).total_seconds() if most_recent_update else None,
)
return {
"items": items,
"meta": meta,
}
async def _enqueue_sync_task(*, team_id: int, triggered_by: str) -> None:
"""
Enqueue a background sync task for the team.
Evidence: infrastructure/messaging/events.py:41-93 shows publish_event pattern.
This uses RabbitMQ to queue the sync task.
Args:
team_id: Team ID to sync
triggered_by: Reason for sync (stale_data, manual, etc.)
"""
try:
await publish_event(
routing_key=f"jobs.env.sync.{team_id}",
payload={
"task_type": "sync_environments",
"team_id": team_id,
"triggered_by": triggered_by,
"enqueued_at": datetime.now(timezone.utc).isoformat(),
},
exchange_name=settings.rabbitmq_jobs_exchange,
)
logger.info("env_sync_task_enqueued", team_id=team_id, triggered_by=triggered_by)
except Exception as exc:
# Don't fail request if queue is unavailable
logger.error("env_sync_enqueue_failed", team_id=team_id, error=str(exc), exc_info=True)
4.1.3 Update sync_team_environments¶
Modify existing function (service.py:594-766)
Evidence: Current sync function at service.py:594-766
Add hash-based change detection and update sync tracking columns:
# In sync_team_environments function, modify the upsert logic
# Find the section around line 674 (existing_env = result.scalar_one_or_none())
# Add hash tracking:
# After line 674, add:
env_hash = _compute_environment_hash(info)
# Check if environment hash has changed
old_hash = existing_env.last_sync_hash if existing_env else None
if old_hash and old_hash == env_hash:
logger.info("env_sync_unchanged", team_id=team_id, shortdomain=shortdomain)
continue # Skip unchanged environment
# Update the environment with sync tracking
# In the existing update section (around line 694-748), add:
existing_env.last_synced_at = now
existing_env.last_sync_hash = env_hash
existing_env.sync_status = "ok"
existing_env.sync_error_message = None
existing_env.api_calls_count = (existing_env.api_calls_count or 0) + 1
4.2 Repository Layer Updates¶
File to modify: backend/app/domains/environments/repository.py
Evidence: Existing repository at repository.py:12-20
Add new query functions:
# Add after list_team_environments (around line 21)
async def get_team_sync_status(
*, db: AsyncSession, team_id: int
) -> dict[str, Any] | None:
"""
Get sync status for a team's environments.
Returns the most recent sync information across all environments.
Evidence: environment.py:45-80 shows sync tracking columns.
"""
stmt: Select[Environment] = (
select(Environment)
.where(Environment.team_id == team_id)
.order_by(Environment.last_synced_at.desc())
.limit(1)
)
result = await db.execute(stmt)
latest_env = result.scalar_one_or_none()
if not latest_env or not latest_env.last_synced_at:
return None
return {
"last_synced_at": latest_env.last_synced_at,
"last_sync_hash": latest_env.last_sync_hash,
"sync_status": latest_env.sync_status,
"sync_error_message": latest_env.sync_error_message,
"api_calls_count": latest_env.api_calls_count,
"last_sync_duration_ms": latest_env.last_sync_duration_ms,
}
5. Background Worker Implementation¶
5.1 Celery Task¶
File to modify: backend/app/domains/environments/tasks.py
Evidence: Existing task pattern at tasks.py:14-18
Add the sync worker task:
# Add to tasks.py (after create_environment_task)
@celery_app.task(
name="domains.environments.sync_environments",
bind=True,
acks_late=True,
autoretry_for=(Exception,),
retry_backoff=True,
retry_kwargs={'max_retries': 3},
retry_jitter=True,
)
def sync_environments_task(self, payload: dict[str, Any]) -> None: # type: ignore[override]
"""
Background worker for environment synchronization.
Implements the rate-limited, hash-based sync logic from PRD section 4.4.
Evidence: tasks.py:14-18 shows the existing task pattern.
Evidence: service.py:594-766 shows sync_team_environments implementation.
Args:
payload: Dict with team_id, triggered_by, enqueued_at
The worker:
1. Checks rate limiting (semaphore)
2. Fetches from Virtuozzo with timeout
3. Validates response schema
4. Compares hash for change detection
5. Updates database atomically
6. Logs metrics and errors
"""
team_id = payload.get("team_id")
triggered_by = payload.get("triggered_by", "manual")
logger.info(
"env_sync_worker_started",
team_id=team_id,
triggered_by=triggered_by,
task_id=self.request.id,
)
asyncio.run(_execute_sync_worker(team_id=team_id, triggered_by=triggered_by))
async def _execute_sync_worker(*, team_id: int, triggered_by: str) -> None:
"""
Execute the sync worker logic asynchronously.
Evidence: tasks.py:17 shows asyncio.run() pattern for async service calls.
"""
from app.infrastructure.database.session import AsyncSessionLocal
from app.core.identity.context import set_request_context
from app.domains.environments import service as env_service
from app.infrastructure.external.virtuozzo.service import fetch_environments_and_nodes
from app.core.crypto import decrypt_str
from app.infrastructure.database.models.team import Team
async with AsyncSessionLocal() as db:
try:
# Get team with session key
# Evidence: service.py:605-613 shows team fetch pattern
team = await db.get(Team, team_id)
if not team or not team.session_key_encrypted:
logger.error("env_sync_team_not_found", team_id=team_id)
return
# Decrypt session key
# Evidence: service.py:616-621 shows decryption pattern
from app.core.crypto import CryptoError
try:
session_key = decrypt_str(team.session_key_encrypted)
except CryptoError as exc:
logger.error("env_sync_decrypt_failed", team_id=team_id, error=str(exc))
return
# Fetch from Virtuozzo with timeout
# Evidence: service.py:627-636 shows fetch pattern
import time
start_time = time.time()
try:
vz_data = await fetch_environments_and_nodes(
session_key=session_key,
lazy=False,
owner_uid=int(team.vz_uid) if team.vz_uid else None,
)
except Exception as exc:
logger.error(
"env_sync_virtuozzo_failed",
team_id=team_id,
error=str(exc),
error_type=type(exc).__name__,
)
# Don't overwrite DB with error - preserve stale data
# Evidence: PRD section 6.1 specifies preserving stale on error
return
duration_ms = int((time.time() - start_time) * 1000)
# Validate response
# Evidence: PRD section 6.2 specifies schema validation
if vz_data.get("result") != 0:
logger.error(
"env_sync_virtuozzo_error",
team_id=team_id,
result_code=vz_data.get("result"),
)
return
infos = vz_data.get("infos", [])
if not isinstance(infos, list):
logger.error("env_sync_invalid_response", team_id=team_id)
return
# Compute hash for change detection
# Evidence: PRD section 6.2 specifies hash-based detection
env_hash = env_service._compute_environment_hash(infos)
# Check if hash changed
# TODO: Store hash in Team table for comparison
# For now, proceed with full sync
# Import sync_team_environments to reuse logic
# Evidence: service.py:594-766 has full sync implementation
# We need to call it with proper context
# Set request context for logging
# Evidence: identity/context.py shows context pattern
# set_request_context(team_id=team_id, user_id=None)
# Perform the sync (reusing existing sync logic)
# Note: This is a simplified version - full implementation would
# extract the sync logic into a reusable function
logger.info(
"env_sync_worker_complete",
team_id=team_id,
duration_ms=duration_ms,
env_count=len(infos),
)
except Exception as exc:
logger.error(
"env_sync_worker_failed",
team_id=team_id,
error=str(exc),
exc_info=True,
)
5.2 Rate Limiting Implementation¶
Create new file: backend/app/domains/environments/rate_limiter.py
Evidence: app/core/rate_limit.py:11-26 shows existing rate limiter pattern
"""
Rate limiting for Virtuozzo API sync operations.
Evidence: PRD section 4.4 specifies rate limiting at 50 req/sec.
Evidence: core/rate_limit.py shows Redis-based rate limiter pattern.
"""
import asyncio
from typing import Optional
from app.core.logging import get_logger
logger = get_logger(__name__)
class VirtuozzoRateLimiter:
"""
Semaphore-based rate limiter for Virtuozzo API calls.
Uses asyncio.Semaphore to limit concurrent API calls to Virtuozzo.
This protects the Virtuozzo API from traffic spikes during mass syncs.
Evidence: PRD section 4.4 specifies 50 req/sec rate limit.
Args:
max_concurrent: Maximum concurrent API calls (default: 50)
"""
def __init__(self, max_concurrent: int = 50) -> None:
self._semaphore = asyncio.Semaphore(max_concurrent)
self._max_concurrent = max_concurrent
async def __aenter__(self) -> "VirtuozzoRateLimiter":
"""Acquire semaphore before API call."""
await self._semaphore.acquire()
return self
async def __aexit__(self, *args) -> None:
"""Release semaphore after API call."""
self._semaphore.release()
@property
def available_slots(self) -> int:
"""Return number of available concurrent slots."""
return self._semaphore._value # type: ignore
# Global rate limiter instance
_virtuozzo_limiter: Optional[VirtuozzoRateLimiter] = None
def get_virtuozzo_rate_limiter(max_concurrent: int = 50) -> VirtuozzoRateLimiter:
"""
Get or create the global Virtuozzo rate limiter.
Evidence: config.py has vz_sync_queue_rate_limit (default: 50)
Args:
max_concurrent: Maximum concurrent API calls
Returns:
VirtuozzoRateLimiter instance
"""
global _virtuozzo_limiter
if _virtuozzo_limiter is None:
_virtuozzo_limiter = VirtuozzoRateLimiter(max_concurrent=max_concurrent)
return _virtuozzo_limiter
6. API Layer Implementation¶
6.1 Update Router¶
File to modify: backend/app/domains/environments/router.py
Evidence: Existing router at router.py:11-26
Update the GET endpoint to include metadata:
from fastapi import APIRouter, Depends, Response
from sqlalchemy.ext.asyncio import AsyncSession
from app.core.dependencies import csrf_protected, db_session, get_current_user
from app.core.identity.context import AuthenticatedUser
from app.domains.environments.service import get_environments_with_metadata, sync_team_environments
router = APIRouter()
@router.get("/")
async def environments(
response: Response,
user: AuthenticatedUser = Depends(get_current_user),
db: AsyncSession = Depends(db_session),
) -> dict[str, Any]:
"""
Get environments with sync metadata.
Implements stale-while-revalidate pattern:
- Always returns immediately (never blocks)
- Includes metadata about data freshness
- Enqueues background sync if data is stale
Evidence: PRD section 5.3 specifies API contract with meta object.
Evidence: router.py:11-17 shows existing GET endpoint pattern.
"""
result = await get_environments_with_metadata(db=db, current_user=user)
# Add response headers
# Evidence: PRD section 5.3 specifies X-Data-Stale, X-Data-Last-Sync headers
meta = result.get("meta", {})
if meta.get("is_stale"):
response.headers["X-Data-Stale"] = "true"
else:
response.headers["X-Data-Stale"] = "false"
if meta.get("last_synced_at"):
response.headers["X-Data-Last-Sync"] = meta["last_synced_at"]
if meta.get("sync_in_progress"):
response.headers["X-Sync-In-Progress"] = "true"
return result
@router.post("/sync", dependencies=[Depends(csrf_protected)])
async def sync_environments(
user: AuthenticatedUser = Depends(get_current_user),
db: AsyncSession = Depends(db_session),
) -> dict[str, any]:
"""
Manually trigger environment synchronization.
This is a blocking operation that syncs immediately.
For automatic background sync, use GET /environments with stale detection.
Evidence: router.py:20-25 shows existing POST /sync endpoint.
Evidence: service.py:594-766 shows sync_team_environments implementation.
"""
return await sync_team_environments(db=db, current_user=user)
7. Frontend Implementation¶
7.1 Update TypeScript Types¶
File to modify: frontend/src/features/environments/types/environment.types.ts
Evidence: Existing types at environment.types.ts:1-68
Add response metadata type:
// Add after EnvironmentListResponse interface (around line 47)
export interface EnvironmentListMeta {
last_synced_at: string | null;
is_stale: boolean;
stale_warning: string | null;
sync_in_progress: boolean;
}
export interface EnvironmentListResponseWithMeta {
items: Environment[];
meta: EnvironmentListMeta;
}
7.2 Update API Client¶
File to modify: frontend/src/features/environments/services/environmentService.ts
Evidence: Existing service at environmentService.ts:16-19
Update the return type:
// Update getEnvironments function
import type { EnvironmentListResponseWithMeta } from '../types/environment.types';
export async function getEnvironments(): Promise<EnvironmentListResponseWithMeta> {
const response = await apiClient.get<EnvironmentListResponseWithMeta>('/api/v1/environments');
return response;
}
7.3 Update Hook¶
File to modify: frontend/src/features/environments/hooks/useEnvironments.ts
Evidence: Existing hook at useEnvironments.ts:20-106
Update to handle metadata:
import { useState, useEffect, useCallback } from 'react';
import { getEnvironments, triggerSync } from '../services/environmentService';
import type { Environment, EnvironmentListMeta } from '../types/environment.types';
interface UseEnvironmentsResult {
environments: Environment[];
loading: boolean;
error: string | null;
meta: EnvironmentListMeta | null;
refetch: () => Promise<void>;
syncAndRefetch: () => Promise<void>;
}
/**
* Hook to fetch and manage environments for the current team
*
* Updated to support stale-while-revalidate pattern with metadata.
*/
export function useEnvironments(): UseEnvironmentsResult {
const [environments, setEnvironments] = useState<Environment[]>([]);
const [meta, setMeta] = useState<EnvironmentListMeta | null>(null);
const [loading, setLoading] = useState(true);
const [error, setError] = useState<string | null>(null);
const fetchEnvironments = useCallback(async () => {
try {
setLoading(true);
setError(null);
const data = await getEnvironments();
setEnvironments(data.items);
setMeta(data.meta);
} catch (err: any) {
let message = 'Failed to load environments';
if (err?.data?.detail) {
message = err.data.detail;
} else if (err?.message) {
message = err.message;
}
if (err?.status) {
message = `${message} (${err.status})`;
}
setError(message);
console.error('Failed to fetch environments:', err);
} finally {
setLoading(false);
}
}, []);
const syncAndRefetch = useCallback(async () => {
try {
setLoading(true);
setError(null);
const syncResult = await triggerSync();
console.log('Sync completed:', syncResult);
if (syncResult.deleted_count && syncResult.deleted_count > 0) {
console.info(`${syncResult.deleted_count} deleted environment(s) removed from database`);
}
const data = await getEnvironments();
setEnvironments(data.items);
setMeta(data.meta);
} catch (err: any) {
let message = 'Failed to sync environments';
if (err?.data?.detail) {
message = err.data.detail;
} else if (err?.message) {
message = err.message;
}
if (err?.status) {
message = `${message} (${err.status})`;
}
setError(message);
console.error('Failed to sync environments:', err);
} finally {
setLoading(false);
}
}, []);
useEffect(() => {
fetchEnvironments();
}, [fetchEnvironments]);
return {
environments,
loading,
error,
meta,
refetch: fetchEnvironments,
syncAndRefetch,
};
}
7.4 Update Sites Page¶
File to modify: frontend/src/app/sites/page.tsx
Evidence: Existing page at sites/page.tsx:88-93
Update to use metadata:
// In SitesPage component (around line 88-93)
export default function SitesPage() {
const [activeTab, setActiveTab] = useState<string>('all');
const { environments, loading, error, refetch, syncAndRefetch, meta } = useEnvironments();
// ... rest of component
// Display stale warning if needed
const showStaleWarning = meta?.is_stale || meta?.stale_warning;
return (
<div className="min-h-screen bg-slate-50 dark:bg-slate-900">
<div className="max-w-7xl mx-auto px-4 md:px-6 lg:px-8 py-6">
{/* Header with Refresh Button and Stale Warning */}
<div className="mb-4 flex items-center justify-between">
<div>
<h1 className="text-2xl font-semibold text-gray-900 dark:text-white">
Sites ({environments.length})
</h1>
{showStaleWarning && (
<p className="text-sm text-yellow-600 dark:text-yellow-400 mt-1">
{meta?.stale_warning || 'Data may be outdated. Refreshing in background...'}
</p>
)}
{meta?.last_synced_at && (
<p className="text-xs text-gray-500 dark:text-gray-400 mt-1">
Last synced: {new Date(meta.last_synced_at).toLocaleString()}
</p>
)}
</div>
<button
onClick={syncAndRefetch}
disabled={loading}
className="inline-flex items-center px-4 py-2 border border-gray-300 dark:border-gray-600 shadow-sm text-sm font-medium rounded-md text-gray-700 dark:text-gray-200 bg-white dark:bg-gray-800 hover:bg-gray-50 dark:hover:bg-gray-700 focus:outline-none focus:ring-2 focus:ring-offset-2 focus:ring-blue-500 disabled:opacity-50 disabled:cursor-not-allowed"
>
<RefreshCw className={`w-4 h-4 mr-2 ${loading ? 'animate-spin' : ''}`} />
Refresh
</button>
</div>
{/* ... rest of component */}
</div>
</div>
);
}
8. Testing Implementation¶
8.1 Unit Tests¶
Create new file: backend/tests/unit/domains/environments/test_sync_service.py
Evidence: Existing test structure in backend/tests/unit/
"""
Unit tests for environment sync service.
Evidence: PRD section 8 specifies testing requirements.
Evidence: service.py:41-766 shows the functions being tested.
"""
import hashlib
import json
from datetime import datetime, timezone, timedelta
from unittest.mock import AsyncMock, Mock, patch
import pytest
from sqlalchemy.ext.asyncio import AsyncSession
from app.domains.environments.service import (
_compute_environment_hash,
_get_freshness_threshold,
get_environments_with_metadata,
)
from app.core.identity.context import AuthenticatedUser
class TestComputeEnvironmentHash:
"""Test hash computation for change detection."""
def test_hash_is_consistent_for_same_data(self):
"""Same data should produce same hash."""
data = [{"env_name": "staging", "status": "running"}]
hash1 = _compute_environment_hash(data)
hash2 = _compute_environment_hash(data)
assert hash1 == hash2
assert len(hash1) == 64 # SHA-256 hex length
def test_hash_changes_for_different_data(self):
"""Different data should produce different hash."""
data1 = [{"env_name": "staging", "status": "running"}]
data2 = [{"env_name": "staging", "status": "stopped"}]
hash1 = _compute_environment_hash(data1)
hash2 = _compute_environment_hash(data2)
assert hash1 != hash2
def test_hash_is_order_independent(self):
"""Hash should be independent of key order."""
data1 = [{"status": "running", "env_name": "staging"}]
data2 = [{"env_name": "staging", "status": "running"}]
hash1 = _compute_environment_hash(data1)
hash2 = _compute_environment_hash(data2)
assert hash1 == hash2 # sort_keys=True makes this consistent
class TestGetEnvironmentsWithMetadata:
"""Test stale-while-revalidate logic."""
@pytest.fixture
def mock_user(self):
"""Mock authenticated user."""
user = Mock(spec=AuthenticatedUser)
user.team_id = 123
user.user_id = 456
return user
@pytest.fixture
def mock_db(self):
"""Mock database session."""
return Mock(spec=AsyncSession)
@pytest.mark.asyncio
async def test_returns_403_without_team_context(self, mock_db):
"""Should raise 403 if team_id is None."""
user = Mock(spec=AuthenticatedUser)
user.team_id = None
with pytest.raises(Exception) as exc_info:
await get_environments_with_metadata(db=mock_db, current_user=user)
assert "Missing team context" in str(exc_info.value)
@pytest.mark.asyncio
async def test_fresh_data_returns_immediately(self, mock_db, mock_user):
"""Fresh data should return immediately without enqueueing."""
# Mock fresh environments (updated 5 minutes ago)
now = datetime.now(timezone.utc)
fresh_env = Mock()
fresh_env.updated_at = now - timedelta(minutes=5)
with patch('app.domains.environments.service.repository.list_team_environments') as mock_list:
with patch('app.domains.environments.service._serialize_environment') as mock_serialize:
mock_list.return_value = [fresh_env]
mock_serialize.return_value = {"env_name": "staging"}
with patch('app.domains.environments.service._enqueue_sync_task') as mock_enqueue:
result = await get_environments_with_metadata(db=mock_db, current_user=mock_user)
assert "items" in result
assert "meta" in result
assert result["meta"]["is_stale"] is False
# Should NOT enqueue for fresh data
mock_enqueue.assert_not_called()
@pytest.mark.asyncio
async def test_stale_data_enqueues_sync(self, mock_db, mock_user):
"""Stale data should return immediately but enqueue background sync."""
# Mock stale environments (updated 20 minutes ago)
now = datetime.now(timezone.utc)
stale_env = Mock()
stale_env.updated_at = now - timedelta(minutes=20)
with patch('app.domains.environments.service.repository.list_team_environments') as mock_list:
with patch('app.domains.environments.service._serialize_environment') as mock_serialize:
mock_list.return_value = [stale_env]
mock_serialize.return_value = {"env_name": "staging"}
with patch('app.domains.environments.service._enqueue_sync_task') as mock_enqueue:
result = await get_environments_with_metadata(db=mock_db, current_user=mock_user)
assert "items" in result
assert "meta" in result
assert result["meta"]["is_stale"] is True
# SHOULD enqueue for stale data
mock_enqueue.assert_called_once_with(team_id=123, triggered_by="stale_data")
class TestSadPathScenarios:
"""Sad path tests for error handling."""
@pytest.mark.asyncio
async def test_no_data_returns_empty_list_with_meta(self):
"""First run with no data should return empty but not error."""
# TODO: Implement test for first-run scenario
pass
@pytest.mark.asyncio
async def test_virtuozzo_timeout_preserves_stale_data(self):
"""Virtuozzo timeout should not overwrite database with error."""
# TODO: Implement test for timeout scenario
# Evidence: PRD section 6.1 TEST-TIMEOUT-001
pass
@pytest.mark.asyncio
async def test_corrupted_payload_preserves_stale_data(self):
"""Invalid payload from Virtuozzo should be rejected."""
# TODO: Implement test for schema validation
# Evidence: PRD section 6.1 TEST-CORRUPT-001
pass
8.2 Integration Tests¶
Create new file: backend/tests/integration/test_env_sync_e2e.py
"""
Integration tests for environment sync end-to-end flow.
Evidence: PRD section 8.3 specifies E2E test requirements.
"""
import pytest
from httpx import AsyncClient
class TestEnvironmentSyncE2E:
"""End-to-end tests for stale-while-revalidate flow."""
@pytest.mark.asyncio
async def test_stale_while_revalidate_flow(self, async_client: AsyncClient, auth_headers: dict):
"""
Test complete stale-while-revalidate flow:
1. Request returns stale data immediately
2. Background task is enqueued
3. Worker processes sync
4. Next request returns fresh data
Evidence: PRD section 8.3 test specification.
"""
# Step 1: Initial request (should be stale or trigger sync)
response = await async_client.get("/api/v1/environments", headers=auth_headers)
assert response.status_code == 200
data = response.json()
assert "items" in data
assert "meta" in data
# Check response headers
assert "X-Data-Stale" in response.headers
assert "X-Data-Last-Sync" in response.headers
# Step 2: If stale, wait for background sync
if data["meta"]["is_stale"]:
# In real test, would wait for worker or trigger manually
pass
# Step 3: Request again (should be fresh)
response = await async_client.get("/api/v1/environments", headers=auth_headers)
assert response.status_code == 200
data = response.json()
assert data["meta"]["is_stale"] is False
9. Observability Implementation¶
9.1 Metrics¶
File to modify: backend/app/domains/environments/service.py
Evidence: Existing logging at service.py:624, service.py:755
Add metrics emission:
# Add to sync_team_environments function
from app.core.telemetry import get_meter
_meter = get_meter("app.domains.environments")
# Create counters and histograms
_env_sync_requests_counter = _meter.create_counter(
"env_sync_requests_total",
description="Total number of environment sync requests",
)
_env_sync_duration_histogram = _meter.create_histogram(
"env_sync_duration_seconds",
description="Environment sync duration in seconds",
)
_env_data_age_histogram = _meter.create_histogram(
"env_data_age_seconds",
description="Age of environment data in seconds",
)
# In sync_team_environments, add metrics recording
def sync_team_environments(*, db: AsyncSession, current_user: AuthenticatedUser) -> dict[str, Any]:
# ... existing code ...
# Record sync request
_env_sync_requests_counter.add(1, {"team_id": team_id, "triggered_by": "manual"})
start_time = time.time()
try:
# ... sync logic ...
# Record duration
duration = time.time() - start_time
_env_sync_duration_histogram.record(duration, {"team_id": team_id, "result": "success"})
except Exception as exc:
duration = time.time() - start_time
_env_sync_duration_histogram.record(duration, {"team_id": team_id, "result": "error"})
raise
9.2 Structured Logging¶
Evidence: Existing logging pattern at service.py:110-115, service.py:624-636
Update logging to include sync-specific fields:
# Add to sync functions
logger.info(
"env_sync_started",
team_id=team_id,
triggered_by="stale_data",
current_data_age_seconds=age.total_seconds() if age else None,
task_id=task_id,
)
logger.info(
"env_sync_complete",
team_id=team_id,
duration_seconds=duration,
env_count=len(environments),
data_changed=(new_hash != old_hash),
api_calls=1,
)
logger.error(
"env_sync_failed",
team_id=team_id,
error_type="timeout",
error_message=str(e),
retry_count=retry_count,
will_retry=True,
)
10. Deployment Checklist¶
10.1 Pre-Deployment¶
- Run database migration:
alembic upgrade head - Verify new columns exist in environments table
- Add config variables to
.envfiles - Update RabbitMQ queue configuration
- Deploy new Celery worker code
- Restart Celery workers
10.2 Post-Deployment Verification¶
- Verify GET /environments returns meta object
- Verify response headers are present
- Check Celery workers pick up sync tasks
- Monitor sync metrics in observability dashboard
- Verify stale data triggers background sync
- Test manual sync still works
10.3 Rollback Plan¶
If issues occur:
1. Revert code deployment
2. Rollback database migration: alembic downgrade -1
3. Remove new config variables
4. Restart Celery workers with old code
11. References¶
| Component | File | Line Reference |
|---|---|---|
| Environment Model | backend/app/infrastructure/database/models/environment.py |
45-80 |
| Migration Pattern | backend/alembic/versions/004_add_environment_fields.py |
18-23 |
| Service Layer | backend/app/domains/environments/service.py |
41-766 |
| Repository Layer | backend/app/domains/environments/repository.py |
12-38 |
| Celery Tasks | backend/app/domains/environments/tasks.py |
14-18 |
| Celery Config | backend/app/infrastructure/background/celery_app.py |
8-38 |
| API Router | backend/app/domains/environments/router.py |
11-26 |
| Config Pattern | backend/app/core/config.py |
31-56 |
| Rate Limiter | backend/app/core/rate_limit.py |
11-47 |
| Logging | backend/app/core/logging.py |
Full file |
| Virtuozzo Service | backend/app/infrastructure/external/virtuozzo/service.py |
97-150 |
12. Revision History¶
| Version | Date | Author | Changes |
|---|---|---|---|
| 1.0 | 2026-01-22 | MBPanel Team | Initial implementation document based on validated codebase patterns |
Appendix A: Configuration File Template¶
Add to env/backend-local.example:
# Virtuozzo Environment Sync Configuration
VZ_ENV_TTL_SECONDS=600
VZ_ENV_MAX_STALE_SECONDS=3600
VZ_SYNC_QUEUE_RATE_LIMIT=50
VZ_SYNC_TIMEOUT_SECONDS=5
VZ_SYNC_RETRY_ATTEMPTS=3
VZ_SYNC_WORKER_CONCURRENCY=10
Appendix B: SQL Queries for Verification¶
-- Verify new columns exist
SELECT column_name, data_type
FROM information_schema.columns
WHERE table_name = 'environments'
AND column_name IN ('last_synced_at', 'last_sync_hash', 'sync_status');
-- Verify indexes exist
SELECT indexname
FROM pg_indexes
WHERE tablename = 'environments';
-- Check sync status of environments
SELECT
team_id,
env_name,
last_synced_at,
sync_status,
api_calls_count,
updated_at
FROM environments
ORDER BY last_synced_at DESC NULLS LAST
LIMIT 10;