Caching Env Sync
Environment Sync Implementation Plan (Production Grade)¶
Guiding contracts
- Architecture & file layout: docs/architecture/001-hybrid-modular-ddd.md.
- Observability, logging, tracing: docs/AI-GUIDES/OBSERVABILITY-SIGNOZ-AI-GUIDE.md.
- Testing: docs/AI-GUIDES/FAST-TESTING-GUIDE.md, docs/AI-GUIDES/END-TO-END-TESTING-GUIDE.md.
- Linting: docs/AI-GUIDES/LINTING-GUIDE.md.
This backlog operationalizes docs/architecture/CACHING/environments-sync.md for the Virtuozzo → Postgres sync pipeline.
1. Schema & Metadata (Owner: Backend Platform)¶
- SQLAlchemy models (infrastructure layer)
- File:
backend/app/infrastructure/database/models/environment_sync.py. - Define
VZSyncState,VZSyncJob, and extendEnvironment,Node,ArchivedSite,JobLogwithsource_version,synced_at,stale,needs_validation. - Update
backend/app/infrastructure/database/models/__init__.pyexports. - Alembic migration
- Path:
backend/app/infrastructure/database/migrations/versions/2025_12_XX_vz_sync_core.py. - Create
vz_sync_states,vz_sync_jobs; alter resource tables (columns + indexes) per architecture doc, include downgrade. - Repositories (domains layer)
- Add to
backend/app/domains/environments/repository.py:upsert_environment_batch(payload: list[EnvironmentUpsert])mark_environment_subset_stale(team_id: int, env_slugs: list[str])
- Both rely on SQLAlchemy
insert().on_conflict_do_update. - Settings (core layer)
backend/app/core/config.pyadditions + env docs:vz_sync_full_interval_minutes = 60vz_sync_delta_interval_minutes = 10vz_sync_job_lock_seconds = 180vz_sync_max_parallel_jobs = 5
- Verification
- Run
alembic upgrade head, update ER diagram indocs/architecture/CACHING/environments-sync.md.
2. Scheduler Service (Owner: Background Services)¶
- Module skeleton
- Create
backend/app/modules/vz_sync/scheduler.py(modules layer) implementingSyncScheduler. - Responsibilities: poll
vz_sync_statesvia repositories, compute freshness, enqueue jobs. - Redis due queue
- Key
vz:sync:duewith TTL 24h. - Every 30s:
SELECT ... FOR UPDATE SKIP LOCKEDfor states wherenext_due_at <= now().- Build payload JSON:
ZADD vz:sync:due score payload_json.
- RabbitMQ dispatcher
- File:
backend/app/modules/vz_sync/dispatcher.py. - Pop jobs via
ZRANGEBYSCORE+ZREMand publish tojobs.vz.syncexchange with AMQP priority. - Manual trigger API
- Route:
POST /api/v1/teams/{team_id}/vz-syncwithrequire_permission("team.manage"). - Handler:
await scheduler.force_sync(...); logvz_sync_enqueue(SAFE_FIELDS only). - Tests
backend/tests/unit/modules/vz_sync/test_scheduler.pycovering due math, Redis dedupe, manual trigger (FAST-TESTING-GUIDE fixtures).
3. Worker Adaptation (Owner: Background Worker Team)¶
- Worker entrypoint
- File:
backend/app/modules/vz_sync/worker.py; consumejobs.vz.sync. - Locking
- Redis key
vz:sync:lock:{team_id}:{scope}with TTLsettings.vz_sync_job_lock_seconds. On collision logvz_sync_skip_lockedand requeue with 30s delay. - Virtuozzo service integration
- Extend
backend/app/infrastructure/external/virtuozzo/service.pywith:fetch_environments(session_key, modified_since=None)fetch_environment_details(session_key, env_name)fetch_nodes(session_key, env_name)
- Use
create_external_async_client()to inherit logging/OTEL instrumentation. - Processing pipeline
- Resolve Owner-only encrypted session key, decrypt, and wrap API calls with
_telemetry_span("vz.sync.fetch"). - Compute
source_version = sha256(sorted_json_bytes)for each payload item. - Upsert records via repositories; mark rows absent in payload as
stale=true. - Update
vz_sync_states, insertvz_sync_jobs, emit SSE/RabbitMQsync.completed. - Error handling & observability
- Retryable errors (timeout/429/5xx): exponential backoff ≤5 min, log
vz_sync_retrywithexc_info=True, mark OTEL span error. - Non-retryable (401): set
status='error',pending_reason, emit SSEsync.failed, calllog_auth_violationif request lacked Owner rights. - Tests
backend/tests/unit/modules/vz_sync/test_worker.py(lock, upsert, retries).backend/tests/integration/modules/vz_sync/test_worker_e2e.pywith Virtuozzo stub + OTEL/log assertions.
4. API & UX Integration (Owner: API + Frontend)¶
- Domain response contract
- Modify
backend/app/domains/environments/service.pyto add: freshnessdetermined bynow - synced_at >= max_allowed_staleness.- Demand-driven enqueue
- When
freshness == "stale", scheduleenqueue_delta(team_id, scope)via FastAPI background task; logvz_sync_enqueue. - Frontend contract
- Update/design
docs/frontend/API-environment-sync.mdto document stale badge + SSEsync.completedhandling. - Permissions
- Owner/Manager can trigger manual sync; Developers read-only.
- Smoke/E2E tests
- Script
backend/tests/smoke/vz_sync/smoke_refresh.sh: manual trigger → wait for SSE → assertfreshness == "current". - End-to-end UI test per guide ensuring stale banner clears after sync.
5. Observability & Rollout (Owner: SRE + Backend)¶
- Metrics
- Add to
backend/app/core/observability.py:Counter vz_sync_jobs_total{state,reason}Histogram vz_sync_duration_seconds{scope}Gauge vz_sync_staleness_seconds{team_id,data_type}
- Logging & tracing
- Use
get_logger(__name__); emit eventsvz_sync_job_dispatched,vz_sync_job_completed, etc., with SAFE_FIELDS only. - Wrap long-running steps with OTEL spans per SigNoz guide.
- Alerts
- Prometheus rules:
vz_sync_staleness_seconds > max_allowed_staleness * 2for 10 minutes.increase(vz_sync_jobs_total{state="failed"}[5m]) > 3per team.
- Feature flag & rollout
- Config
ENABLE_VZ_SYNC_PIPELINEdefaultfalse. - Rollout: enable internal tenant → observe 24h → enable per cohort while monitoring metrics/logs.
- Documentation
- Update architecture doc and create
docs/operations/runbooks/vz-sync.md(manual trigger, dashboards, alert response). - Acceptance checklist
python -m black backend&ruff check backend(lint guide).pytest tests/unit/modules/vz_sync/ -q.pytest tests/integration/modules/vz_sync/ -q.mutmut runwith CI thresholds.make smoke-vz_sync.python backend/tests/smoke/logging/log_sink_probe.py --log-dir logs.alembic upgrade headon staging.
Delivering every item above completes the environment sync system with zero ambiguity, full observability, and production-grade coverage.