SSE Phases
SSE Notifications — Delivery Phases (Official-Docs Anchored)¶
Scope: implement the notification system described in SSE-Notif.md using FastAPI SSE (StreamingResponse), RabbitMQ topic exchanges, aio-pika async consumers, Celery publishers, and Next.js EventSource clients.
Phase 1 — Analysis / Architecture¶
- Confirm topology (RabbitMQ topic exchange
events.topic, routing keyteam.{team_id}.job.{job_id}.status, durable queue for SSE bridge). - Define filters: per-connection filtering on team/job; DB remains source of truth.
- Confirm container DNS (use
rabbitmq, notlocalhost) and env keys. - References:
- RabbitMQ exchanges/routing keys: https://www.rabbitmq.com/tutorials/amqp-concepts.html
- FastAPI StreamingResponse (SSE-friendly streaming): https://fastapi.tiangolo.com/advanced/custom-response/#streamingresponse
- aio-pika async consumer pattern: https://aio-pika.readthedocs.io/en/latest/quick-start.html
- EventSource protocol (client): https://html.spec.whatwg.org/multipage/server-sent-events.html#the-eventsource-interface
Phase 2 — Implementation¶
1) Backend infrastructure
- Add aio-pika dependency; pin versions (see list below).
- Create SSE broker: aio_pika.connect_robust, declare events.topic (topic, durable), declare/bind queue (e.g., events.sse) with binding key team.*.job.*.status, set QoS/prefetch.
- Fan-out in-process: async subscribers per connection; acknowledge messages on processing.
- SSE endpoint: FastAPI StreamingResponse returning async generator that yields text/event-stream frames; optional filters (team_id, job_id) applied on routing key.
2) Publishers
- Celery worker or FastAPI controller publishes status events to events.topic with routing key team.{team_id}.job.{job_id}.status using aio-pika publish.
- Ensure payload JSON matches contract in SSE-Notif.md.
3) Client
- Next.js client component uses new EventSource("/api/v1/events?team_id=...&job_id=..."); handle onmessage/onerror; reconnect strategy per app needs.
4) Configuration
- Env keys: RABBITMQ_URL, RABBITMQ_EVENTS_EXCHANGE, RABBITMQ_EVENTS_QUEUE, RABBITMQ_EVENTS_BINDING_KEY, RABBITMQ_PREFETCH_COUNT.
- Use service DNS (rabbitmq) for URLs in containers.
5) Ops/reliability
- DLQ/TTL/backoff for the events queue if needed.
- Metrics: queue depth, consumer lag, reconnects.
Phase 3 — Hardening / Auth / Observability¶
- AuthZ: add dependency to enforce tenant scoping once auth/team modules exist; reject cross-tenant access.
- Backpressure: bounded subscriber queues with drop-oldest or last-value.
- Persistence: optionally persist event states to DB for replay/fill gaps.
- Observability: log consumer reconnects, message counts; add tracing spans around consume/publish.
- Resilience: reconnect on channel/connection drops (aio-pika robust connection).
Version & Dependency References (official docs)¶
- FastAPI 0.124.2: https://fastapi.tiangolo.com/
- StreamingResponse docs: https://fastapi.tiangolo.com/advanced/custom-response/#streamingresponse
- aio-pika (>= 9.4.0): https://aio-pika.readthedocs.io/en/latest/
- RabbitMQ (3.13) concepts: https://www.rabbitmq.com/tutorials/amqp-concepts.html
- Uvicorn 0.38.0: https://www.uvicorn.org/
- Python 3.11: https://docs.python.org/3/
- Next.js 16 (App Router): https://nextjs.org/docs
- React 19 / EventSource API: https://html.spec.whatwg.org/multipage/server-sent-events.html#the-eventsource-interface
Deliverables Checklist (2025-12-18 verification)¶
- Broker: aio-pika consumer bound to
events.topicwith correct binding key and QoS. - Implemented via
backend/app/infrastructure/messaging/sse.py::SSEBroker; started/stopped inside the FastAPI lifespan hook (backend/app/core/app_factory.py). - Uses
connect_robust, durable queuesettings.rabbitmq_events_queue, QoS tunable viasettings.rabbitmq_prefetch_count, plus bounded subscriber queues/backpressure logs. - SSE endpoint: FastAPI streaming generator, filters applied, media type
text/event-stream. backend/app/api/v1/events.pyexposesGET /api/v1/events, enforcesevents:read, tenant scoping, and active-team checks before delegating toevent_stream.- Integration tests in
backend/tests/integration/api/test_events_sse_permissions.pycover permission, tenant, and lifecycle constraints. - Publisher helper: aio-pika publish with topic routing key format.
backend/app/infrastructure/messaging/events.publish_eventreuses the same RabbitMQ settings, adds telemetry spans, and is already invoked from auth workflows (backend/app/modules/auth/service.py) and health checks.- Env documented and wired (container DNS, no secrets in code).
- Settings live in
backend/app/core/config.py;.envtemplateenv/backend-local.exampledocumentsRABBITMQ_URL,RABBITMQ_EVENTS_*, andRABBITMQ_PREFETCH_COUNTwith service-DNS defaults. - Client example: EventSource usage documented.
docs/development/WEB-NOTIF.md/sse-notif-scaffold.mdincludes an officially sourced EventSource example plus reconnect guidance; architecture docs cite the WHATWG spec.- Ops: notes on DLQ/TTL, monitoring, reconnect handling.
- Phase 3 items (AuthZ, backpressure, observability) are implemented via
SSEMetrics, telemetry spans, and log hooks;docs/architecture/WEBSOCKET/SSE-Notif.md#Reliability and opscaptures DLQ/TTL guidance and reconnect strategy, and smoke playbooks underbackend/tests/smoke/sse/*validate RabbitMQ + SSE end-to-end. - Worker smoke validation lives at
backend/tests/smoke/environments/smoke_worker_stub.py(Celery eager mode + stubbed Virtuozzo) until infra for full dockerized end-to-end tests lands.