SSE Notifications
Overview¶
We use RabbitMQ (durable, ack/retry/DLQ) for job execution and SSE for browser notifications. FastAPI publishes jobs and exposes /events for streaming updates. Celery workers consume jobs, call Virtuozzo, persist status, and publish events back through RabbitMQ.
End-to-end flow (Create Environment)¶
+-----------------+ +------------------+ +------------------+
| Next.js UI | POST | FastAPI API | publish | RabbitMQ jobs |
| (App Router) |------->| /environments |-------->| queue: env.create|
| | | (returns job_id) | +---------+--------+
+--------+--------+ +---------+--------+ |
| | |
| SSE /events | |
|<---------------------------+ |
| | |
| v |
| +------------------+ |
| | Celery worker | consume job |
| +---------+--------+------------------+
| | call Virtuozzo (provision env)
| v
| +------------------+
| | Virtuozzo API |
| +------------------+
| |
| update job_logs/env row | (appId, status, etc.)
|<----------------------------+
| |
| publish status event ---> | RabbitMQ events exchange
|<----------------------------+
| SSE: job status (queued/started/polling/completed/failed)
+-------------------------------------------------------+
Components¶
- FastAPI API: validates, publishes jobs to RabbitMQ, returns
job_id. - Celery workers (RabbitMQ broker): consume jobs, call
app/infrastructure/external/virtuozzo.py, update Postgres (job_logs,environments), publish status events. - RabbitMQ:
- Jobs queue: durable, with DLQ for failures.
- Events exchange: fanout/topic for status broadcasts.
- SSE endpoint: subscribes to events exchange; filters by team/user/job_id; streams events.
- Next.js UI: opens
/events, keepsjob_id, updates UI on matching events.
Implementation status (2025-12-18)¶
- SSE broker —
backend/app/infrastructure/messaging/sse.py::SSEBrokerconsumesevents.topicviaaio_pika.connect_robust, declares durable queuesettings.rabbitmq_events_queue, enforces QoS, tracksSSEMetrics, and performs bounded fan-out/backpressure logging. The broker is started/stopped inside the FastAPI lifespan hook (backend/app/core/app_factory.py), so every API instance maintains a resilient subscriber. - FastAPI endpoint —
backend/app/api/v1/events.pyexposesGET /api/v1/events(StreamingResponse) guarded byget_current_user,events:readpermission checks, tenant isolation, and active-team validation. The handler delegates toevent_stream, which filters routing keys per connection (team_id, optionaljob_id) before yielding SSE frames. - Publisher helper —
backend/app/infrastructure/messaging/events.publish_event(aio-pika) shares the same RabbitMQ settings, adds OpenTelemetry spans, and is already exercised by auth flows (backend/app/modules/auth/service.py) and health probes for regression coverage. - Configuration + env docs — RabbitMQ knobs live in
backend/app/core/config.py, andenv/backend-local.exampledocumentsRABBITMQ_URL,RABBITMQ_EVENTS_*, andRABBITMQ_PREFETCH_COUNT(container DNS defaults). - Client example + guidance —
docs/development/WEB-NOTIF.md/sse-notif-scaffold.mdprovides the EventSource snippet (using WHATWG spec) plus reconnect notes for the Next.js App Router; architecture docs cite the official SSE references, and the/createflow (frontend/src/app/create/page.tsx) now opens a real EventSource to stream provisioning updates. - Testing + smoke coverage —
backend/tests/integration/api/test_events_sse_permissions.pycovers auth/tenant gates, and smoke utilities underbackend/tests/smoke/sse/publish sample events toevents.topicand stream them back through the SSE endpoint to validate the RabbitMQ↔FastAPI↔EventSource round trip. - Worker smoke —
backend/tests/smoke/environments/smoke_worker_stub.pyruns Celery in eager mode with stubbed Virtuozzo responses, exercising the full job log + environment persistence pipeline without external dependencies. - Observability —
SSEMetrics, structured logs, and tracing spans (sse.broker.*,sse.stream.*,rabbitmq.publish) emit the signals required bydocs/OBSERVABILITY/SIGNOZ-SMOKE-VERIFY.md.
Message design¶
- Job (queue
env.create): job_id,team_id,user_id,session_key,display_name,shortdomain,owner_uid,payload(Virtuozzo params).- Status event (exchange
events.status, topicteam.{team_id}.job.{job_id}.status): job_id,team_id,status(queued|started|polling|completed|failed),env_name,app_id(optional),message,occurred_at.
API surface (outline)¶
POST /api/v1/environments→ publish job, return{job_id}.GET /api/v1/events→ SSE stream (auth required); optional filtersteam_id,job_id.
Worker behavior (mapped from legacy Laravel job)¶
- Validate required params; fail early if missing.
- Call Virtuozzo create; if
appIdmissing, poll (bounded retries) as in legacy flow. - Persist environment (appId, shortdomain, team, encrypted session_key) and job_logs (start/end/status).
- Publish status events at transitions (queued/started/polling/completed/failed).
SSE behavior¶
- Single SSE endpoint; server-side subscriber to RabbitMQ events exchange.
- Per-connection filters (team/user/job_id); send
event: message\ndata: {...}\n\n. - Clients reconnect on drop; UI can also refetch job status from DB for gaps.
Queue/exchange topology (proposed)¶
- Exchange:
jobs.direct(type: direct) → Queue:jobs.env.create(durable, DLQ:jobs.env.create.dlq). - Exchange:
events.topic(type: topic) with routing keysteam.{team_id}.job.{job_id}.status. - TTL / DLQ: configure message TTL + dead-letter exchange for retries/backoff.
Persistence (Postgres)¶
job_logs: job_id (uuid), team_id, user_id, job_name, env_name, app_id, status, started_at, ended_at, message, payload (jsonb).environments: app_id, env_name, shortdomain, team_id, session_key (encrypted), status, created_at, updated_at.
Env/config (examples)¶
RABBITMQ_URL=amqp://user:pass@rabbitmq:5672/CELERY_BROKER_URL=RABBITMQ_URLCELERY_RESULT_BACKEND(if used) =rpc://or Redis (optional)- Queue/exchange names/routing keys configurable in settings.
Reliability and ops¶
- RabbitMQ: durable queues, acks, retries, DLQ; TLS/auth; internal network only.
- Events are lightweight; DB is source of truth for history. Status changes are persisted before/after emitting events.
- Monitor queue depth, consumer lag, retry counts; set alerts.
Client considerations¶
- SSE endpoint requires auth; include team filter to scope events.
- On reconnect, UI can refetch job status from API/DB to fill gaps.
- Handle
failedstatus with user-facing error messaging.
References (official docs)¶
- FastAPI SSE: https://fastapi.tiangolo.com/advanced/custom-response/ (streaming), SSE patterns.
- Celery with RabbitMQ broker: https://docs.celeryq.dev/en/stable/getting-started/first-steps-with-celery.html and broker setup: https://docs.celeryq.dev/en/stable/getting-started/brokers/rabbitmq.html
- RabbitMQ exchanges/queues: https://www.rabbitmq.com/tutorials/amqp-concepts.html
Next steps¶
- Integrate the real Virtuozzo service layer (today’s worker smoke uses a stub) and add negative-path retries/polling caps aligned with Virtuozzo SLAs.
- Expand the frontend to surface per-job history (persisted job logs) and deep links once the SSE stream reports
completed/failed. - Replace the eager Celery smoke harness with a dockerized worker + RabbitMQ pipeline in CI once infrastructure is available, and record the results in
docs/OBSERVABILITY/SIGNOZ-SMOKE-VERIFY.md.
Central RabbitMQ config (names, bindings)¶
- Env vars (example):
RABBITMQ_URL=amqp://user:pass@rabbitmq:5672/- Jobs:
RABBITMQ_JOBS_EXCHANGE=jobs.direct(type: direct)RABBITMQ_JOBS_QUEUE=jobs.env.create(durable)RABBITMQ_JOBS_ROUTING_KEY=jobs.env.create- DLQ:
RABBITMQ_DLX=jobs.dlx(direct),RABBITMQ_DLX_QUEUE=jobs.env.create.dlq,RABBITMQ_DLX_ROUTING_KEY=jobs.env.create.dlq - Retry/TTL (example defaults):
RABBITMQ_JOBS_MESSAGE_TTL_MS=60000(1 min),RABBITMQ_JOBS_MAX_RETRIES=5(implemented via republish/backoff to DLX)
- Events:
RABBITMQ_EVENTS_EXCHANGE=events.topic(type: topic)RABBITMQ_EVENTS_QUEUE=events.sse(shared fan-out to SSE service)RABBITMQ_EVENTS_BINDING_KEY=team.*.job.*.status
- Bindings:
jobs.env.create→ bind tojobs.directwithjobs.env.create.jobs.env.create.dlq→ bind tojobs.dlxwithjobs.env.create.dlq.events.sse→ bind toevents.topicwithteam.*.job.*.status(tighten routing if you add per-team queues).- Operational notes:
- Enable acks; set worker prefetch (e.g., 1–10). Configure message TTL and DLX for retries/backoff.
- Per-team event queues (optional for isolation): create
events.team.{team_id}queues bound toevents.topicwithteam.{team_id}.job.*.status; SSE service can subscribe per team if needed. - Keep events lightweight; persist authoritative status to Postgres.
- RabbitMQ concepts: https://www.rabbitmq.com/tutorials/amqp-concepts.html