---
title: "RabbitMQ + SSE Delivery Guide"
status: "AI-AGENT FRIENDLY / ZERO-GUESSING"
last_verified: "2025-12-18"
---
# 0) Purpose & scope
- Source of truth is the current FastAPI codebase (`backend/`), not the legacy Laravel jobs or speculative docs.
- This guide explains how Server-Sent Events (SSE) and RabbitMQ actually work in production code so an AI-agent or developer can extend or troubleshoot the system safely.
# 1) System snapshot (code-level)
## 1.1 High-level flow (Create Environment job)
1. **Next.js UI** calls `POST /api/v1/environments` with a shortdomain + metadata.
2. **FastAPI route** (`backend/app/api/v1/environments.py`) validates auth/tenant, inserts an initial `job_logs` row, publishes an initial status event, and dispatches a Celery task via RabbitMQ jobs queue (direct exchange).
3. **Celery worker** (`backend/app/domains/environments/tasks.py`) consumes the task, runs `execute_environment_job`, calls Virtuozzo, persists job + environment rows, and publishes status events through RabbitMQ topic exchange (`events.topic`).
4. **RabbitMQ events exchange** fan-outs messages to the shared queue the SSE broker listens on (`settings.rabbitmq_events_queue`, default `events.sse`).
5. **SSE broker** (`backend/app/infrastructure/messaging/sse.py`) keeps an aio-pika consumer alive, buffers messages per connected client, and yields them over HTTP streaming.
6. **FastAPI `/api/v1/events`** (`backend/app/api/v1/events.py`) enforces `events:read`, tenant scope, and active-team checks before attaching the HTTP client to the broker.
7. **Next.js EventSource** (see `docs/development/WEB-NOTIF.md/sse-notif-scaffold.md`) filters messages client-side using the same job_id/team_id it already holds from the POST response.
### ASCII overview
```text
+---------------+ POST /api/v1/environments +-----------------------------+
| Next.js UI | ----------------------------------> | FastAPI API (service layer) |
| (EventSource) | <================ SSE /api/v1/events | validates + enqueues job |
+-------+-------+ +---------------+-------------+
| |
| publish job payload (Celery) | job_id returned
| v
| +--------------------------+ consume job +------+------+
+------> | RabbitMQ jobs.direct | -----------------> | Celery |
| queue: jobs.env.create | | worker |
+--------------------------+ +------+------+
| Virtuozzo API
v
+--------------------------+ status events +--+-----------------+
| RabbitMQ events.topic | <---------------- | Domain service |
| queue: events.sse | | publishes SSE msg |
+------------+-------------+ +--+----------------+
| |
| aio-pika consumer |
v |
+------+------------------------+ |
| FastAPI SSE broker | fan-out |
| (SSEBroker + event_stream) | messages |
+------+------------------------+ |
| per-connection queues |
v |
+------+------------------------+ |
| Browser EventSource clients | <-------------+
+-------------------------------+
1.2 Component map¶
| Concern | Code reference | Notes |
|---|---|---|
| Settings / env knobs | backend/app/core/config.py |
RABBITMQ_URL, jobs/events exchange + queue names, RABBITMQ_PREFETCH_COUNT, ENABLE_SSE_BROKER, CELERY_TASK_ALWAYS_EAGER. |
| Celery app | backend/app/infrastructure/background/celery_app.py |
Uses rabbitmq_url for broker, direct exchange for jobs, durable queue, task_acks_late=True, worker_prefetch_multiplier=1. |
| API routes | backend/app/api/v1/environments.py, backend/app/api/v1/events.py |
HTTP surface for enqueue + SSE streaming. |
| Domain service | backend/app/domains/environments/service.py |
Business logic, job log persistence, CreateEnvironmentJobPayload, _publish_job_status_event. |
| Worker task | backend/app/domains/environments/tasks.py |
Celery task wrapper calling service.execute_environment_job. |
| RabbitMQ publisher helper | backend/app/infrastructure/messaging/events.py |
aio_pika publisher shared by services + observability endpoints. |
| SSE broker | backend/app/infrastructure/messaging/sse.py |
SSEBroker, event_stream, metrics, filters. |
| Observability | backend/app/core/app_factory.py (lifespan), backend/app/api/v1/health.py::observability_smoke, smoke scripts under backend/tests/smoke/sse/*. |
|
| Auth events | backend/app/modules/auth/service.py uses publish_event for concurrent-login + team status notifications, confirming SSE isn’t limited to provisioning. |
2) Message & queue contracts (actual code)¶
2.1 Jobs exchange (Celery)¶
- Exchange:
settings.rabbitmq_jobs_exchange(defaultjobs.direct), typedirect, declared incelery_app. - Queue:
settings.rabbitmq_jobs_queue(defaultjobs.env.create), bound with routing keysettings.rabbitmq_jobs_routing_key. - Payload:
CreateEnvironmentJobPayload(backend/app/domains/environments/schemas.py) — strict Pydantic v2 model containing IDs, Virtuozzo session info, env/node specs. - Dispatch:
_dispatch_jobinservice.pycallscelery_app.send_task("domains.environments.create_environment", kwargs={"payload": ...}).
2.2 Status / notification events exchange (SSE)¶
- Exchange:
settings.rabbitmq_events_exchange(defaultevents.topic), typetopic. - Routing key format for jobs:
team.{team_id}.job.{job_id}.status. Codes uses_publish_job_status_eventfor provisioning and similar team-scoped keys for auth events (e.g.,team.{team_id}.user.{user_id}.auth.concurrent_login_requested). - Message body: JSON with
job_id,team_id,status,message,env_name, optionalapp_id,occurred_atepoch seconds (see_publish_job_status_event). - SSE broker binding:
settings.rabbitmq_events_binding_keydefaultteam.*.job.*.status, queuesettings.rabbitmq_events_queuedefaultevents.sse.
2.3 HTTP SSE contract¶
- Endpoint:
GET /api/v1/events(registered under/api/v1router). ReturnsStreamingResponsewith media typetext/event-stream. - Query params:
team_id(forced to authenticated team), optionaljob_id. - Output frames:
event: message+data: {json}per SSE spec. - Connection guardrails: requires
events:readpermission, team statusTRIALING|ACTIVE, rejects cross-tenant access.
3) Components in depth¶
3.1 FastAPI HTTP surface¶
POST /api/v1/environmentscallsservice.enqueue_environment_creationand returns{job_id, status}perEnvironmentCreateResponse.GET /api/v1/eventschecks auth (get_current_user), DB team lookup (db_session), and usesevent_streamgenerator.- Lifespan (
backend/app/core/app_factory.py) starts/stops the SSE broker exactly once per process ifsettings.enable_sse_brokerisTrue.
3.2 Domain service + persistence¶
enqueue_environment_creationenforces tenant checks, ensures Virtuozzo metadata exists on the team row, records aJobLogrow, publishes an initialqueuedevent, and dispatches the job to Celery._transition_job_statusupdatesjob_logs, setsended_atwhen done, and immediately calls_publish_job_status_event._publish_job_status_eventconstructs routing keys + payload, then awaitspublish_event.
3.3 Worker responsibilities¶
execute_environment_jobdecrypts the stored Virtuozzo session key, emitsstarted/polling/completed/failedtransitions, callsvirtuozzo_service.create_environmentand fallback polling, persists theenvironmentsrow with encrypted session key, and publishes SSE events through_transition_job_status.- Celery task (
create_environment_task) runsasyncio.run(service.execute_environment_job(payload))and logs receipt.
3.4 RabbitMQ publisher helper¶
publish_eventencapsulates aio-pika connection pooling, declares the topic exchange, publishes JSON payloads, and records OpenTelemetry spans namedrabbitmq.publish.- It short-circuits when
settings.enable_sse_brokeris False (useful for unit tests without RabbitMQ). - Connections inject telemetry attributes
messaging.system,messaging.destination, and routing key metadata.
3.5 SSE broker + stream¶
SSEBrokerusesaio_pika.connect_robust, declares the topic exchange + durable queue, sets QoS viasettings.rabbitmq_prefetch_count, and runs an infinite consume loop with reconnect/backoff (5 seconds).- Each HTTP client gets an in-memory bounded
asyncio.Queue[SSEMessage](default max 100). On overflow the broker drops the oldest entry and logs a warning to limit backpressure. event_streamchecksrequest.is_disconnected()to tear down cleanly, filters byteam_idand optionaljob_idvia_matches_filters, yields SSE frames, and emits telemetry spans (sse.stream.*).- Metrics:
get_metrics()exposessubscribers_active, message counters, connection attempts/failures, and last error (import and call inside a FastAPI dependency or a REPL when debugging).
3.6 Client contract¶
- Official EventSource snippet + reconnect strategy lives at
docs/development/WEB-NOTIF.md/sse-notif-scaffold.md. - Clients must authenticate (browser cookies via
/api/v1/auth/login + session-exchange), then open/api/v1/events?team_id={team_id}&job_id={job_id}withAccept: text/event-stream. - On reconnect, UI should re-fetch persisted job status (
job_logs/environments) to fill gaps because SSE is best-effort.
3.7 Observability & tests¶
backend/app/api/v1/health.py::observability_smokepublishes a safe RabbitMQ event (team 0/job 0) and exercises DB+Redis instrumentation.- Integration tests (
backend/tests/integration/api/test_events_sse_permissions.py) cover permissions, tenant scope, and SSE handler lifecycles. - Smoke scripts:
backend/tests/smoke/sse/smoke_sse_stream.sh— logs in via API, opens a real SSE stream, publishes sample events through RabbitMQ, and asserts the curl output.backend/tests/smoke/sse/publish_status_event.py— CLI helper to publish arbitrary status events.backend/tests/smoke/sse/rabbitmq_roundtrip.py+.sh— ensures events exchange routing works without involving the SSE service.- Worker smoke (
backend/tests/smoke/environments/smoke_worker_stub.py) forces Celery eager mode, seeds fake Virtuozzo responses, and validates the full job lifecycle + DB persistence.
4) Wiring a new RabbitMQ+SSE job/process¶
Follow these exact steps to stay consistent with the existing system:
- Decide queue/exchange scope
- Reuse
settings.rabbitmq_jobs_*if the job shares the environment provisioning queue, or add new settings/env vars for a dedicated exchange + queue (mirror howSettingsdefines jobs/events). -
Always prefer service DNS (e.g.,
rabbitmq) inside containers; updateenv/backend-local.example. -
Define the job payload
- Create a strict Pydantic v2 model (
ConfigDict(extra="forbid")) to describe the task input. -
Store references (job_id, team_id, user_id) so you can publish SSE events later.
-
Enqueue from the HTTP/service layer
- Validate tenant + permissions in the FastAPI route/service (see
enqueue_environment_creationas the pattern). - Insert or update persistence records (e.g.,
job_logs) before dispatch to guarantee DB is the source of truth. - Publish an initial SSE event via
await publish_event(...)so clients get immediate feedback (statusqueued). -
Dispatch with
celery_app.send_task(..., queue=settings.rabbitmq_jobs_queue, routing_key=settings.rabbitmq_jobs_routing_key)or the new queue key you introduced. -
Implement the worker
- In the Celery task, call an async service (
asyncio.run(...)if necessary) that performs the work, writes DB state, and calls a helper like_transition_job_status. -
Ensure failures raise exceptions after writing
FAILEDstatus so Celery’s acks + retries behave correctly (task_acks_late=Truemeans the message is re-queued if the worker crashes before ack). -
Publish SSE events
- Use team-scoped routing keys so
event_streamfiltering keeps multi-tenancy safe:- Job status:
team.{team_id}.job.{job_id}.status. - User-specific flows:
team.{team_id}.user.{user_id}.auth.{event}(already supported by_matches_filters).
- Job status:
- Payloads must be JSON-serializable; include
occurred_attimestamps for ordering andmessagefor UI text. -
Call
await publish_event(...)inside transitions. Do not block the main loop;publish_eventalready handles connection reuse and telemetry. -
Expose client filters
- When adding new SSE consumers, extend UI docs/examples under
docs/development/WEB-NOTIF.mdso EventSource clients know which query params (team_id,job_id, plus any new filters) to send. -
Keep SSE route logic free of business logic: all filtering should be by routing key segments only.
-
Update docs & smoke tests
- Document new routing keys + payloads here and in architecture docs.
- Duplicate or adapt smoke scripts to assert the new routing works (publish + consume through RabbitMQ, watch SSE output).
5) Troubleshooting & smoke checklist¶
| Symptom | Checks (in order) |
|---|---|
| SSE endpoint 403 | Ensure the authenticated user has events:read and belongs to an ACTIVE/TRIALING team (backend/app/api/v1/events.py). |
| No messages in browser | Confirm settings.enable_sse_broker is True, broker logs show “RabbitMQ connection established”, and get_metrics() reports messages_consumed_total > 0. |
| RabbitMQ publish errors | Run backend/tests/smoke/sse/rabbitmq_roundtrip.py --team-id ... --job-id ... from inside the backend container to verify exchange + credentials. |
| SSE stream connects but never receives events | Use backend/tests/smoke/sse/publish_status_event.py --team-id X --job-id Y --count 3 to publish manual events, then tail the SSE curl output (see smoke_sse_stream.sh). |
| Job stuck in queued | Check Celery worker logs (domains.environments.create_environment task). The worker uses task_acks_late, so crashed tasks will be re-queued—verify there’s an active worker consuming jobs.env.create. |
| Slow or dropped SSE updates | Look for “SSE subscriber queue full, dropped oldest message” warnings. Increase per-subscriber queue size by calling broker.subscribe(max_queue_size=...) (if exposing new API) or reduce event volume. |
| Observability gaps | Call POST /api/v1/health/observability-smoke locally; it emits DB + Redis + RabbitMQ spans. Verify traces (sse.broker.*, rabbitmq.publish) inside SigNoz per docs/OBSERVABILITY/SIGNOZ-SMOKE-VERIFY.md. |
Smoke workflow summary:
1. make up (or docker compose up) to boot services.
2. backend/tests/smoke/sse/smoke_sse_stream.sh – end-to-end SSE check.
3. backend/tests/smoke/environments/smoke_worker_stub.py – job lifecycle without real Virtuozzo.
4. Inspect broker metrics: python - <<'PY'\nfrom app.infrastructure.messaging.sse import get_metrics\nprint(get_metrics())\nPY (run inside the backend container/venv).
6) Official references¶
- FastAPI StreamingResponse (SSE-friendly streaming): https://fastapi.tiangolo.com/advanced/custom-response/#streamingresponse
- aio-pika async consumer /
connect_robust: https://aio-pika.readthedocs.io/en/latest/quick-start.html - RabbitMQ topic/direct exchanges & routing: https://www.rabbitmq.com/tutorials/amqp-concepts.html
- Celery with RabbitMQ broker + reliability guidance: https://docs.celeryq.dev/en/stable/getting-started/first-steps-with-celery.html
- WHATWG EventSource spec (browser SSE client semantics): https://html.spec.whatwg.org/multipage/server-sent-events.html#the-eventsource-interface ```