Skip to content
---
title: "RabbitMQ + SSE Delivery Guide"
status: "AI-AGENT FRIENDLY / ZERO-GUESSING"
last_verified: "2025-12-18"
---

# 0) Purpose & scope
- Source of truth is the current FastAPI codebase (`backend/`), not the legacy Laravel jobs or speculative docs.
- This guide explains how Server-Sent Events (SSE) and RabbitMQ actually work in production code so an AI-agent or developer can extend or troubleshoot the system safely.

# 1) System snapshot (code-level)

## 1.1 High-level flow (Create Environment job)
1. **Next.js UI** calls `POST /api/v1/environments` with a shortdomain + metadata.
2. **FastAPI route** (`backend/app/api/v1/environments.py`) validates auth/tenant, inserts an initial `job_logs` row, publishes an initial status event, and dispatches a Celery task via RabbitMQ jobs queue (direct exchange).
3. **Celery worker** (`backend/app/domains/environments/tasks.py`) consumes the task, runs `execute_environment_job`, calls Virtuozzo, persists job + environment rows, and publishes status events through RabbitMQ topic exchange (`events.topic`).
4. **RabbitMQ events exchange** fan-outs messages to the shared queue the SSE broker listens on (`settings.rabbitmq_events_queue`, default `events.sse`).
5. **SSE broker** (`backend/app/infrastructure/messaging/sse.py`) keeps an aio-pika consumer alive, buffers messages per connected client, and yields them over HTTP streaming.
6. **FastAPI `/api/v1/events`** (`backend/app/api/v1/events.py`) enforces `events:read`, tenant scope, and active-team checks before attaching the HTTP client to the broker.
7. **Next.js EventSource** (see `docs/development/WEB-NOTIF.md/sse-notif-scaffold.md`) filters messages client-side using the same job_id/team_id it already holds from the POST response.

### ASCII overview
```text
+---------------+      POST /api/v1/environments      +-----------------------------+
| Next.js UI    | ----------------------------------> | FastAPI API (service layer) |
| (EventSource) | <================ SSE /api/v1/events |  validates + enqueues job   |
+-------+-------+                                     +---------------+-------------+
        |                                                               |
        | publish job payload (Celery)                                  | job_id returned
        |                                                               v
        |        +--------------------------+    consume job     +------+------+
        +------> | RabbitMQ jobs.direct     | -----------------> | Celery      |
                 | queue: jobs.env.create   |                    | worker      |
                 +--------------------------+                    +------+------+
                                                                   |  Virtuozzo API
                                                                   v
                 +--------------------------+   status events   +--+-----------------+
                 | RabbitMQ events.topic    | <---------------- | Domain service    |
                 | queue: events.sse        |                   | publishes SSE msg |
                 +------------+-------------+                   +--+----------------+
                              |                                        |
                              | aio-pika consumer                      |
                              v                                        |
                       +------+------------------------+               |
                       | FastAPI SSE broker            | fan-out       |
                       | (SSEBroker + event_stream)    |  messages     |
                       +------+------------------------+               |
                              | per-connection queues                  |
                              v                                        |
                       +------+------------------------+               |
                       | Browser EventSource clients   | <-------------+
                       +-------------------------------+

1.2 Component map

Concern Code reference Notes
Settings / env knobs backend/app/core/config.py RABBITMQ_URL, jobs/events exchange + queue names, RABBITMQ_PREFETCH_COUNT, ENABLE_SSE_BROKER, CELERY_TASK_ALWAYS_EAGER.
Celery app backend/app/infrastructure/background/celery_app.py Uses rabbitmq_url for broker, direct exchange for jobs, durable queue, task_acks_late=True, worker_prefetch_multiplier=1.
API routes backend/app/api/v1/environments.py, backend/app/api/v1/events.py HTTP surface for enqueue + SSE streaming.
Domain service backend/app/domains/environments/service.py Business logic, job log persistence, CreateEnvironmentJobPayload, _publish_job_status_event.
Worker task backend/app/domains/environments/tasks.py Celery task wrapper calling service.execute_environment_job.
RabbitMQ publisher helper backend/app/infrastructure/messaging/events.py aio_pika publisher shared by services + observability endpoints.
SSE broker backend/app/infrastructure/messaging/sse.py SSEBroker, event_stream, metrics, filters.
Observability backend/app/core/app_factory.py (lifespan), backend/app/api/v1/health.py::observability_smoke, smoke scripts under backend/tests/smoke/sse/*.
Auth events backend/app/modules/auth/service.py uses publish_event for concurrent-login + team status notifications, confirming SSE isn’t limited to provisioning.

2) Message & queue contracts (actual code)

2.1 Jobs exchange (Celery)

  • Exchange: settings.rabbitmq_jobs_exchange (default jobs.direct), type direct, declared in celery_app.
  • Queue: settings.rabbitmq_jobs_queue (default jobs.env.create), bound with routing key settings.rabbitmq_jobs_routing_key.
  • Payload: CreateEnvironmentJobPayload (backend/app/domains/environments/schemas.py) — strict Pydantic v2 model containing IDs, Virtuozzo session info, env/node specs.
  • Dispatch: _dispatch_job in service.py calls celery_app.send_task("domains.environments.create_environment", kwargs={"payload": ...}).

2.2 Status / notification events exchange (SSE)

  • Exchange: settings.rabbitmq_events_exchange (default events.topic), type topic.
  • Routing key format for jobs: team.{team_id}.job.{job_id}.status. Codes uses _publish_job_status_event for provisioning and similar team-scoped keys for auth events (e.g., team.{team_id}.user.{user_id}.auth.concurrent_login_requested).
  • Message body: JSON with job_id, team_id, status, message, env_name, optional app_id, occurred_at epoch seconds (see _publish_job_status_event).
  • SSE broker binding: settings.rabbitmq_events_binding_key default team.*.job.*.status, queue settings.rabbitmq_events_queue default events.sse.

2.3 HTTP SSE contract

  • Endpoint: GET /api/v1/events (registered under /api/v1 router). Returns StreamingResponse with media type text/event-stream.
  • Query params: team_id (forced to authenticated team), optional job_id.
  • Output frames: event: message + data: {json} per SSE spec.
  • Connection guardrails: requires events:read permission, team status TRIALING|ACTIVE, rejects cross-tenant access.

3) Components in depth

3.1 FastAPI HTTP surface

  • POST /api/v1/environments calls service.enqueue_environment_creation and returns {job_id, status} per EnvironmentCreateResponse.
  • GET /api/v1/events checks auth (get_current_user), DB team lookup (db_session), and uses event_stream generator.
  • Lifespan (backend/app/core/app_factory.py) starts/stops the SSE broker exactly once per process if settings.enable_sse_broker is True.

3.2 Domain service + persistence

  • enqueue_environment_creation enforces tenant checks, ensures Virtuozzo metadata exists on the team row, records a JobLog row, publishes an initial queued event, and dispatches the job to Celery.
  • _transition_job_status updates job_logs, sets ended_at when done, and immediately calls _publish_job_status_event.
  • _publish_job_status_event constructs routing keys + payload, then awaits publish_event.

3.3 Worker responsibilities

  • execute_environment_job decrypts the stored Virtuozzo session key, emits started/polling/completed/failed transitions, calls virtuozzo_service.create_environment and fallback polling, persists the environments row with encrypted session key, and publishes SSE events through _transition_job_status.
  • Celery task (create_environment_task) runs asyncio.run(service.execute_environment_job(payload)) and logs receipt.

3.4 RabbitMQ publisher helper

  • publish_event encapsulates aio-pika connection pooling, declares the topic exchange, publishes JSON payloads, and records OpenTelemetry spans named rabbitmq.publish.
  • It short-circuits when settings.enable_sse_broker is False (useful for unit tests without RabbitMQ).
  • Connections inject telemetry attributes messaging.system, messaging.destination, and routing key metadata.

3.5 SSE broker + stream

  • SSEBroker uses aio_pika.connect_robust, declares the topic exchange + durable queue, sets QoS via settings.rabbitmq_prefetch_count, and runs an infinite consume loop with reconnect/backoff (5 seconds).
  • Each HTTP client gets an in-memory bounded asyncio.Queue[SSEMessage] (default max 100). On overflow the broker drops the oldest entry and logs a warning to limit backpressure.
  • event_stream checks request.is_disconnected() to tear down cleanly, filters by team_id and optional job_id via _matches_filters, yields SSE frames, and emits telemetry spans (sse.stream.*).
  • Metrics: get_metrics() exposes subscribers_active, message counters, connection attempts/failures, and last error (import and call inside a FastAPI dependency or a REPL when debugging).

3.6 Client contract

  • Official EventSource snippet + reconnect strategy lives at docs/development/WEB-NOTIF.md/sse-notif-scaffold.md.
  • Clients must authenticate (browser cookies via /api/v1/auth/login + session-exchange), then open /api/v1/events?team_id={team_id}&job_id={job_id} with Accept: text/event-stream.
  • On reconnect, UI should re-fetch persisted job status (job_logs / environments) to fill gaps because SSE is best-effort.

3.7 Observability & tests

  • backend/app/api/v1/health.py::observability_smoke publishes a safe RabbitMQ event (team 0/job 0) and exercises DB+Redis instrumentation.
  • Integration tests (backend/tests/integration/api/test_events_sse_permissions.py) cover permissions, tenant scope, and SSE handler lifecycles.
  • Smoke scripts:
  • backend/tests/smoke/sse/smoke_sse_stream.sh — logs in via API, opens a real SSE stream, publishes sample events through RabbitMQ, and asserts the curl output.
  • backend/tests/smoke/sse/publish_status_event.py — CLI helper to publish arbitrary status events.
  • backend/tests/smoke/sse/rabbitmq_roundtrip.py + .sh — ensures events exchange routing works without involving the SSE service.
  • Worker smoke (backend/tests/smoke/environments/smoke_worker_stub.py) forces Celery eager mode, seeds fake Virtuozzo responses, and validates the full job lifecycle + DB persistence.

4) Wiring a new RabbitMQ+SSE job/process

Follow these exact steps to stay consistent with the existing system:

  1. Decide queue/exchange scope
  2. Reuse settings.rabbitmq_jobs_* if the job shares the environment provisioning queue, or add new settings/env vars for a dedicated exchange + queue (mirror how Settings defines jobs/events).
  3. Always prefer service DNS (e.g., rabbitmq) inside containers; update env/backend-local.example.

  4. Define the job payload

  5. Create a strict Pydantic v2 model (ConfigDict(extra="forbid")) to describe the task input.
  6. Store references (job_id, team_id, user_id) so you can publish SSE events later.

  7. Enqueue from the HTTP/service layer

  8. Validate tenant + permissions in the FastAPI route/service (see enqueue_environment_creation as the pattern).
  9. Insert or update persistence records (e.g., job_logs) before dispatch to guarantee DB is the source of truth.
  10. Publish an initial SSE event via await publish_event(...) so clients get immediate feedback (status queued).
  11. Dispatch with celery_app.send_task(..., queue=settings.rabbitmq_jobs_queue, routing_key=settings.rabbitmq_jobs_routing_key) or the new queue key you introduced.

  12. Implement the worker

  13. In the Celery task, call an async service (asyncio.run(...) if necessary) that performs the work, writes DB state, and calls a helper like _transition_job_status.
  14. Ensure failures raise exceptions after writing FAILED status so Celery’s acks + retries behave correctly (task_acks_late=True means the message is re-queued if the worker crashes before ack).

  15. Publish SSE events

  16. Use team-scoped routing keys so event_stream filtering keeps multi-tenancy safe:
    • Job status: team.{team_id}.job.{job_id}.status.
    • User-specific flows: team.{team_id}.user.{user_id}.auth.{event} (already supported by _matches_filters).
  17. Payloads must be JSON-serializable; include occurred_at timestamps for ordering and message for UI text.
  18. Call await publish_event(...) inside transitions. Do not block the main loop; publish_event already handles connection reuse and telemetry.

  19. Expose client filters

  20. When adding new SSE consumers, extend UI docs/examples under docs/development/WEB-NOTIF.md so EventSource clients know which query params (team_id, job_id, plus any new filters) to send.
  21. Keep SSE route logic free of business logic: all filtering should be by routing key segments only.

  22. Update docs & smoke tests

  23. Document new routing keys + payloads here and in architecture docs.
  24. Duplicate or adapt smoke scripts to assert the new routing works (publish + consume through RabbitMQ, watch SSE output).

5) Troubleshooting & smoke checklist

Symptom Checks (in order)
SSE endpoint 403 Ensure the authenticated user has events:read and belongs to an ACTIVE/TRIALING team (backend/app/api/v1/events.py).
No messages in browser Confirm settings.enable_sse_broker is True, broker logs show “RabbitMQ connection established”, and get_metrics() reports messages_consumed_total > 0.
RabbitMQ publish errors Run backend/tests/smoke/sse/rabbitmq_roundtrip.py --team-id ... --job-id ... from inside the backend container to verify exchange + credentials.
SSE stream connects but never receives events Use backend/tests/smoke/sse/publish_status_event.py --team-id X --job-id Y --count 3 to publish manual events, then tail the SSE curl output (see smoke_sse_stream.sh).
Job stuck in queued Check Celery worker logs (domains.environments.create_environment task). The worker uses task_acks_late, so crashed tasks will be re-queued—verify there’s an active worker consuming jobs.env.create.
Slow or dropped SSE updates Look for “SSE subscriber queue full, dropped oldest message” warnings. Increase per-subscriber queue size by calling broker.subscribe(max_queue_size=...) (if exposing new API) or reduce event volume.
Observability gaps Call POST /api/v1/health/observability-smoke locally; it emits DB + Redis + RabbitMQ spans. Verify traces (sse.broker.*, rabbitmq.publish) inside SigNoz per docs/OBSERVABILITY/SIGNOZ-SMOKE-VERIFY.md.

Smoke workflow summary: 1. make up (or docker compose up) to boot services. 2. backend/tests/smoke/sse/smoke_sse_stream.sh – end-to-end SSE check. 3. backend/tests/smoke/environments/smoke_worker_stub.py – job lifecycle without real Virtuozzo. 4. Inspect broker metrics: python - <<'PY'\nfrom app.infrastructure.messaging.sse import get_metrics\nprint(get_metrics())\nPY (run inside the backend container/venv).

6) Official references

  • FastAPI StreamingResponse (SSE-friendly streaming): https://fastapi.tiangolo.com/advanced/custom-response/#streamingresponse
  • aio-pika async consumer / connect_robust: https://aio-pika.readthedocs.io/en/latest/quick-start.html
  • RabbitMQ topic/direct exchanges & routing: https://www.rabbitmq.com/tutorials/amqp-concepts.html
  • Celery with RabbitMQ broker + reliability guidance: https://docs.celeryq.dev/en/stable/getting-started/first-steps-with-celery.html
  • WHATWG EventSource spec (browser SSE client semantics): https://html.spec.whatwg.org/multipage/server-sent-events.html#the-eventsource-interface ```