Skip to content

SSE Notifications

Overview

We use RabbitMQ (durable, ack/retry/DLQ) for job execution and SSE for browser notifications. FastAPI publishes jobs and exposes /events for streaming updates. Celery workers consume jobs, call Virtuozzo, persist status, and publish events back through RabbitMQ.

End-to-end flow (Create Environment)

+-----------------+        +------------------+         +------------------+
| Next.js UI      |  POST  | FastAPI API      | publish | RabbitMQ jobs    |
| (App Router)    |------->| /environments    |-------->| queue: env.create|
|                 |        | (returns job_id) |         +---------+--------+
+--------+--------+        +---------+--------+                   |
         |                            |                           |
         |   SSE /events              |                           |
         |<---------------------------+                           |
         |                            |                           |
         |                            v                           |
         |                   +------------------+                  |
         |                   | Celery worker    | consume job      |
         |                   +---------+--------+------------------+
         |                             | call Virtuozzo (provision env)
         |                             v
         |                   +------------------+
         |                   | Virtuozzo API    |
         |                   +------------------+
         |                             |
         |     update job_logs/env row | (appId, status, etc.)
         |<----------------------------+
         |                             |
         |   publish status event ---> | RabbitMQ events exchange
         |<----------------------------+
         |   SSE: job status (queued/started/polling/completed/failed)
         +-------------------------------------------------------+

Components

  • FastAPI API: validates, publishes jobs to RabbitMQ, returns job_id.
  • Celery workers (RabbitMQ broker): consume jobs, call app/infrastructure/external/virtuozzo.py, update Postgres (job_logs, environments), publish status events.
  • RabbitMQ:
  • Jobs queue: durable, with DLQ for failures.
  • Events exchange: fanout/topic for status broadcasts.
  • SSE endpoint: subscribes to events exchange; filters by team/user/job_id; streams events.
  • Next.js UI: opens /events, keeps job_id, updates UI on matching events.

Implementation status (2025-12-18)

  • SSE brokerbackend/app/infrastructure/messaging/sse.py::SSEBroker consumes events.topic via aio_pika.connect_robust, declares durable queue settings.rabbitmq_events_queue, enforces QoS, tracks SSEMetrics, and performs bounded fan-out/backpressure logging. The broker is started/stopped inside the FastAPI lifespan hook (backend/app/core/app_factory.py), so every API instance maintains a resilient subscriber.
  • FastAPI endpointbackend/app/api/v1/events.py exposes GET /api/v1/events (StreamingResponse) guarded by get_current_user, events:read permission checks, tenant isolation, and active-team validation. The handler delegates to event_stream, which filters routing keys per connection (team_id, optional job_id) before yielding SSE frames.
  • Publisher helperbackend/app/infrastructure/messaging/events.publish_event (aio-pika) shares the same RabbitMQ settings, adds OpenTelemetry spans, and is already exercised by auth flows (backend/app/modules/auth/service.py) and health probes for regression coverage.
  • Configuration + env docs — RabbitMQ knobs live in backend/app/core/config.py, and env/backend-local.example documents RABBITMQ_URL, RABBITMQ_EVENTS_*, and RABBITMQ_PREFETCH_COUNT (container DNS defaults).
  • Client example + guidancedocs/development/WEB-NOTIF.md/sse-notif-scaffold.md provides the EventSource snippet (using WHATWG spec) plus reconnect notes for the Next.js App Router; architecture docs cite the official SSE references, and the /create flow (frontend/src/app/create/page.tsx) now opens a real EventSource to stream provisioning updates.
  • Testing + smoke coveragebackend/tests/integration/api/test_events_sse_permissions.py covers auth/tenant gates, and smoke utilities under backend/tests/smoke/sse/ publish sample events to events.topic and stream them back through the SSE endpoint to validate the RabbitMQ↔FastAPI↔EventSource round trip.
  • Worker smokebackend/tests/smoke/environments/smoke_worker_stub.py runs Celery in eager mode with stubbed Virtuozzo responses, exercising the full job log + environment persistence pipeline without external dependencies.
  • ObservabilitySSEMetrics, structured logs, and tracing spans (sse.broker.*, sse.stream.*, rabbitmq.publish) emit the signals required by docs/OBSERVABILITY/SIGNOZ-SMOKE-VERIFY.md.

Message design

  • Job (queue env.create):
  • job_id, team_id, user_id, session_key, display_name, shortdomain, owner_uid, payload (Virtuozzo params).
  • Status event (exchange events.status, topic team.{team_id}.job.{job_id}.status):
  • job_id, team_id, status (queued|started|polling|completed|failed), env_name, app_id (optional), message, occurred_at.

API surface (outline)

  • POST /api/v1/environments → publish job, return {job_id}.
  • GET /api/v1/events → SSE stream (auth required); optional filters team_id, job_id.

Worker behavior (mapped from legacy Laravel job)

  • Validate required params; fail early if missing.
  • Call Virtuozzo create; if appId missing, poll (bounded retries) as in legacy flow.
  • Persist environment (appId, shortdomain, team, encrypted session_key) and job_logs (start/end/status).
  • Publish status events at transitions (queued/started/polling/completed/failed).

SSE behavior

  • Single SSE endpoint; server-side subscriber to RabbitMQ events exchange.
  • Per-connection filters (team/user/job_id); send event: message\ndata: {...}\n\n.
  • Clients reconnect on drop; UI can also refetch job status from DB for gaps.

Queue/exchange topology (proposed)

  • Exchange: jobs.direct (type: direct) → Queue: jobs.env.create (durable, DLQ: jobs.env.create.dlq).
  • Exchange: events.topic (type: topic) with routing keys team.{team_id}.job.{job_id}.status.
  • TTL / DLQ: configure message TTL + dead-letter exchange for retries/backoff.

Persistence (Postgres)

  • job_logs: job_id (uuid), team_id, user_id, job_name, env_name, app_id, status, started_at, ended_at, message, payload (jsonb).
  • environments: app_id, env_name, shortdomain, team_id, session_key (encrypted), status, created_at, updated_at.

Env/config (examples)

  • RABBITMQ_URL=amqp://user:pass@rabbitmq:5672/
  • CELERY_BROKER_URL = RABBITMQ_URL
  • CELERY_RESULT_BACKEND (if used) = rpc:// or Redis (optional)
  • Queue/exchange names/routing keys configurable in settings.

Reliability and ops

  • RabbitMQ: durable queues, acks, retries, DLQ; TLS/auth; internal network only.
  • Events are lightweight; DB is source of truth for history. Status changes are persisted before/after emitting events.
  • Monitor queue depth, consumer lag, retry counts; set alerts.

Client considerations

  • SSE endpoint requires auth; include team filter to scope events.
  • On reconnect, UI can refetch job status from API/DB to fill gaps.
  • Handle failed status with user-facing error messaging.

References (official docs)

  • FastAPI SSE: https://fastapi.tiangolo.com/advanced/custom-response/ (streaming), SSE patterns.
  • Celery with RabbitMQ broker: https://docs.celeryq.dev/en/stable/getting-started/first-steps-with-celery.html and broker setup: https://docs.celeryq.dev/en/stable/getting-started/brokers/rabbitmq.html
  • RabbitMQ exchanges/queues: https://www.rabbitmq.com/tutorials/amqp-concepts.html

Next steps

  • Integrate the real Virtuozzo service layer (today’s worker smoke uses a stub) and add negative-path retries/polling caps aligned with Virtuozzo SLAs.
  • Expand the frontend to surface per-job history (persisted job logs) and deep links once the SSE stream reports completed/failed.
  • Replace the eager Celery smoke harness with a dockerized worker + RabbitMQ pipeline in CI once infrastructure is available, and record the results in docs/OBSERVABILITY/SIGNOZ-SMOKE-VERIFY.md.

Central RabbitMQ config (names, bindings)

  • Env vars (example):
  • RABBITMQ_URL=amqp://user:pass@rabbitmq:5672/
  • Jobs:
    • RABBITMQ_JOBS_EXCHANGE=jobs.direct (type: direct)
    • RABBITMQ_JOBS_QUEUE=jobs.env.create (durable)
    • RABBITMQ_JOBS_ROUTING_KEY=jobs.env.create
    • DLQ: RABBITMQ_DLX=jobs.dlx (direct), RABBITMQ_DLX_QUEUE=jobs.env.create.dlq, RABBITMQ_DLX_ROUTING_KEY=jobs.env.create.dlq
    • Retry/TTL (example defaults): RABBITMQ_JOBS_MESSAGE_TTL_MS=60000 (1 min), RABBITMQ_JOBS_MAX_RETRIES=5 (implemented via republish/backoff to DLX)
  • Events:
    • RABBITMQ_EVENTS_EXCHANGE=events.topic (type: topic)
    • RABBITMQ_EVENTS_QUEUE=events.sse (shared fan-out to SSE service)
    • RABBITMQ_EVENTS_BINDING_KEY=team.*.job.*.status
  • Bindings:
  • jobs.env.create → bind to jobs.direct with jobs.env.create.
  • jobs.env.create.dlq → bind to jobs.dlx with jobs.env.create.dlq.
  • events.sse → bind to events.topic with team.*.job.*.status (tighten routing if you add per-team queues).
  • Operational notes:
  • Enable acks; set worker prefetch (e.g., 1–10). Configure message TTL and DLX for retries/backoff.
  • Per-team event queues (optional for isolation): create events.team.{team_id} queues bound to events.topic with team.{team_id}.job.*.status; SSE service can subscribe per team if needed.
  • Keep events lightweight; persist authoritative status to Postgres.
  • RabbitMQ concepts: https://www.rabbitmq.com/tutorials/amqp-concepts.html