Skip to content

Caching Env Sync

Environment Sync Implementation Plan (Production Grade)

Guiding contracts - Architecture & file layout: docs/architecture/001-hybrid-modular-ddd.md. - Observability, logging, tracing: docs/AI-GUIDES/OBSERVABILITY-SIGNOZ-AI-GUIDE.md. - Testing: docs/AI-GUIDES/FAST-TESTING-GUIDE.md, docs/AI-GUIDES/END-TO-END-TESTING-GUIDE.md. - Linting: docs/AI-GUIDES/LINTING-GUIDE.md.

This backlog operationalizes docs/architecture/CACHING/environments-sync.md for the Virtuozzo → Postgres sync pipeline.


1. Schema & Metadata (Owner: Backend Platform)

  1. SQLAlchemy models (infrastructure layer)
  2. File: backend/app/infrastructure/database/models/environment_sync.py.
  3. Define VZSyncState, VZSyncJob, and extend Environment, Node, ArchivedSite, JobLog with source_version, synced_at, stale, needs_validation.
  4. Update backend/app/infrastructure/database/models/__init__.py exports.
  5. Alembic migration
  6. Path: backend/app/infrastructure/database/migrations/versions/2025_12_XX_vz_sync_core.py.
  7. Create vz_sync_states, vz_sync_jobs; alter resource tables (columns + indexes) per architecture doc, include downgrade.
  8. Repositories (domains layer)
  9. Add to backend/app/domains/environments/repository.py:
    • upsert_environment_batch(payload: list[EnvironmentUpsert])
    • mark_environment_subset_stale(team_id: int, env_slugs: list[str])
  10. Both rely on SQLAlchemy insert().on_conflict_do_update.
  11. Settings (core layer)
  12. backend/app/core/config.py additions + env docs:
    • vz_sync_full_interval_minutes = 60
    • vz_sync_delta_interval_minutes = 10
    • vz_sync_job_lock_seconds = 180
    • vz_sync_max_parallel_jobs = 5
  13. Verification
  14. Run alembic upgrade head, update ER diagram in docs/architecture/CACHING/environments-sync.md.

2. Scheduler Service (Owner: Background Services)

  1. Module skeleton
  2. Create backend/app/modules/vz_sync/scheduler.py (modules layer) implementing SyncScheduler.
  3. Responsibilities: poll vz_sync_states via repositories, compute freshness, enqueue jobs.
  4. Redis due queue
  5. Key vz:sync:due with TTL 24h.
  6. Every 30s:
    1. SELECT ... FOR UPDATE SKIP LOCKED for states where next_due_at <= now().
    2. Build payload JSON:
      {
        "job_id": "<uuid4>",
        "team_id": <int>,
        "scope": "<team|environment:<slug>|node:<uuid>>",
        "data_type": "<environment|node|archive>",
        "job_kind": "<full|delta|single>",
        "priority": <1-100>,
        "reason": "<sla_exceeded|demand|manual>"
      }
      
    3. ZADD vz:sync:due score payload_json.
  7. RabbitMQ dispatcher
  8. File: backend/app/modules/vz_sync/dispatcher.py.
  9. Pop jobs via ZRANGEBYSCORE + ZREM and publish to jobs.vz.sync exchange with AMQP priority.
  10. Manual trigger API
  11. Route: POST /api/v1/teams/{team_id}/vz-sync with require_permission("team.manage").
  12. Handler: await scheduler.force_sync(...); log vz_sync_enqueue (SAFE_FIELDS only).
  13. Tests
  14. backend/tests/unit/modules/vz_sync/test_scheduler.py covering due math, Redis dedupe, manual trigger (FAST-TESTING-GUIDE fixtures).

3. Worker Adaptation (Owner: Background Worker Team)

  1. Worker entrypoint
  2. File: backend/app/modules/vz_sync/worker.py; consume jobs.vz.sync.
  3. Locking
  4. Redis key vz:sync:lock:{team_id}:{scope} with TTL settings.vz_sync_job_lock_seconds. On collision log vz_sync_skip_locked and requeue with 30s delay.
  5. Virtuozzo service integration
  6. Extend backend/app/infrastructure/external/virtuozzo/service.py with:
    • fetch_environments(session_key, modified_since=None)
    • fetch_environment_details(session_key, env_name)
    • fetch_nodes(session_key, env_name)
  7. Use create_external_async_client() to inherit logging/OTEL instrumentation.
  8. Processing pipeline
  9. Resolve Owner-only encrypted session key, decrypt, and wrap API calls with _telemetry_span("vz.sync.fetch").
  10. Compute source_version = sha256(sorted_json_bytes) for each payload item.
  11. Upsert records via repositories; mark rows absent in payload as stale=true.
  12. Update vz_sync_states, insert vz_sync_jobs, emit SSE/RabbitMQ sync.completed.
  13. Error handling & observability
  14. Retryable errors (timeout/429/5xx): exponential backoff ≤5 min, log vz_sync_retry with exc_info=True, mark OTEL span error.
  15. Non-retryable (401): set status='error', pending_reason, emit SSE sync.failed, call log_auth_violation if request lacked Owner rights.
  16. Tests
  17. backend/tests/unit/modules/vz_sync/test_worker.py (lock, upsert, retries).
  18. backend/tests/integration/modules/vz_sync/test_worker_e2e.py with Virtuozzo stub + OTEL/log assertions.

4. API & UX Integration (Owner: API + Frontend)

  1. Domain response contract
  2. Modify backend/app/domains/environments/service.py to add:
    "sync_status": {
        "freshness": "<current|stale|unknown>",
        "lastSyncedAt": synced_at,
        "nextSyncAt": state.next_due_at,
        "source": "vz_cached"
    }
    
  3. freshness determined by now - synced_at >= max_allowed_staleness.
  4. Demand-driven enqueue
  5. When freshness == "stale", schedule enqueue_delta(team_id, scope) via FastAPI background task; log vz_sync_enqueue.
  6. Frontend contract
  7. Update/design docs/frontend/API-environment-sync.md to document stale badge + SSE sync.completed handling.
  8. Permissions
  9. Owner/Manager can trigger manual sync; Developers read-only.
  10. Smoke/E2E tests
  11. Script backend/tests/smoke/vz_sync/smoke_refresh.sh: manual trigger → wait for SSE → assert freshness == "current".
  12. End-to-end UI test per guide ensuring stale banner clears after sync.

5. Observability & Rollout (Owner: SRE + Backend)

  1. Metrics
  2. Add to backend/app/core/observability.py:
    • Counter vz_sync_jobs_total{state,reason}
    • Histogram vz_sync_duration_seconds{scope}
    • Gauge vz_sync_staleness_seconds{team_id,data_type}
  3. Logging & tracing
  4. Use get_logger(__name__); emit events vz_sync_job_dispatched, vz_sync_job_completed, etc., with SAFE_FIELDS only.
  5. Wrap long-running steps with OTEL spans per SigNoz guide.
  6. Alerts
  7. Prometheus rules:
    • vz_sync_staleness_seconds > max_allowed_staleness * 2 for 10 minutes.
    • increase(vz_sync_jobs_total{state="failed"}[5m]) > 3 per team.
  8. Feature flag & rollout
  9. Config ENABLE_VZ_SYNC_PIPELINE default false.
  10. Rollout: enable internal tenant → observe 24h → enable per cohort while monitoring metrics/logs.
  11. Documentation
  12. Update architecture doc and create docs/operations/runbooks/vz-sync.md (manual trigger, dashboards, alert response).
  13. Acceptance checklist
  14. python -m black backend & ruff check backend (lint guide).
  15. pytest tests/unit/modules/vz_sync/ -q.
  16. pytest tests/integration/modules/vz_sync/ -q.
  17. mutmut run with CI thresholds.
  18. make smoke-vz_sync.
  19. python backend/tests/smoke/logging/log_sink_probe.py --log-dir logs.
  20. alembic upgrade head on staging.

Delivering every item above completes the environment sync system with zero ambiguity, full observability, and production-grade coverage.