Skip to content

SSE Phases

SSE Notifications — Delivery Phases (Official-Docs Anchored)

Scope: implement the notification system described in SSE-Notif.md using FastAPI SSE (StreamingResponse), RabbitMQ topic exchanges, aio-pika async consumers, Celery publishers, and Next.js EventSource clients.

Phase 1 — Analysis / Architecture

  • Confirm topology (RabbitMQ topic exchange events.topic, routing key team.{team_id}.job.{job_id}.status, durable queue for SSE bridge).
  • Define filters: per-connection filtering on team/job; DB remains source of truth.
  • Confirm container DNS (use rabbitmq, not localhost) and env keys.
  • References:
  • RabbitMQ exchanges/routing keys: https://www.rabbitmq.com/tutorials/amqp-concepts.html
  • FastAPI StreamingResponse (SSE-friendly streaming): https://fastapi.tiangolo.com/advanced/custom-response/#streamingresponse
  • aio-pika async consumer pattern: https://aio-pika.readthedocs.io/en/latest/quick-start.html
  • EventSource protocol (client): https://html.spec.whatwg.org/multipage/server-sent-events.html#the-eventsource-interface

Phase 2 — Implementation

1) Backend infrastructure - Add aio-pika dependency; pin versions (see list below). - Create SSE broker: aio_pika.connect_robust, declare events.topic (topic, durable), declare/bind queue (e.g., events.sse) with binding key team.*.job.*.status, set QoS/prefetch. - Fan-out in-process: async subscribers per connection; acknowledge messages on processing. - SSE endpoint: FastAPI StreamingResponse returning async generator that yields text/event-stream frames; optional filters (team_id, job_id) applied on routing key. 2) Publishers - Celery worker or FastAPI controller publishes status events to events.topic with routing key team.{team_id}.job.{job_id}.status using aio-pika publish. - Ensure payload JSON matches contract in SSE-Notif.md. 3) Client - Next.js client component uses new EventSource("/api/v1/events?team_id=...&job_id=..."); handle onmessage/onerror; reconnect strategy per app needs. 4) Configuration - Env keys: RABBITMQ_URL, RABBITMQ_EVENTS_EXCHANGE, RABBITMQ_EVENTS_QUEUE, RABBITMQ_EVENTS_BINDING_KEY, RABBITMQ_PREFETCH_COUNT. - Use service DNS (rabbitmq) for URLs in containers. 5) Ops/reliability - DLQ/TTL/backoff for the events queue if needed. - Metrics: queue depth, consumer lag, reconnects.

Phase 3 — Hardening / Auth / Observability

  • AuthZ: add dependency to enforce tenant scoping once auth/team modules exist; reject cross-tenant access.
  • Backpressure: bounded subscriber queues with drop-oldest or last-value.
  • Persistence: optionally persist event states to DB for replay/fill gaps.
  • Observability: log consumer reconnects, message counts; add tracing spans around consume/publish.
  • Resilience: reconnect on channel/connection drops (aio-pika robust connection).

Version & Dependency References (official docs)

  • FastAPI 0.124.2: https://fastapi.tiangolo.com/
  • StreamingResponse docs: https://fastapi.tiangolo.com/advanced/custom-response/#streamingresponse
  • aio-pika (>= 9.4.0): https://aio-pika.readthedocs.io/en/latest/
  • RabbitMQ (3.13) concepts: https://www.rabbitmq.com/tutorials/amqp-concepts.html
  • Uvicorn 0.38.0: https://www.uvicorn.org/
  • Python 3.11: https://docs.python.org/3/
  • Next.js 16 (App Router): https://nextjs.org/docs
  • React 19 / EventSource API: https://html.spec.whatwg.org/multipage/server-sent-events.html#the-eventsource-interface

Deliverables Checklist (2025-12-18 verification)

  • Broker: aio-pika consumer bound to events.topic with correct binding key and QoS.
  • Implemented via backend/app/infrastructure/messaging/sse.py::SSEBroker; started/stopped inside the FastAPI lifespan hook (backend/app/core/app_factory.py).
  • Uses connect_robust, durable queue settings.rabbitmq_events_queue, QoS tunable via settings.rabbitmq_prefetch_count, plus bounded subscriber queues/backpressure logs.
  • SSE endpoint: FastAPI streaming generator, filters applied, media type text/event-stream.
  • backend/app/api/v1/events.py exposes GET /api/v1/events, enforces events:read, tenant scoping, and active-team checks before delegating to event_stream.
  • Integration tests in backend/tests/integration/api/test_events_sse_permissions.py cover permission, tenant, and lifecycle constraints.
  • Publisher helper: aio-pika publish with topic routing key format.
  • backend/app/infrastructure/messaging/events.publish_event reuses the same RabbitMQ settings, adds telemetry spans, and is already invoked from auth workflows (backend/app/modules/auth/service.py) and health checks.
  • Env documented and wired (container DNS, no secrets in code).
  • Settings live in backend/app/core/config.py; .env template env/backend-local.example documents RABBITMQ_URL, RABBITMQ_EVENTS_*, and RABBITMQ_PREFETCH_COUNT with service-DNS defaults.
  • Client example: EventSource usage documented.
  • docs/development/WEB-NOTIF.md/sse-notif-scaffold.md includes an officially sourced EventSource example plus reconnect guidance; architecture docs cite the WHATWG spec.
  • Ops: notes on DLQ/TTL, monitoring, reconnect handling.
  • Phase 3 items (AuthZ, backpressure, observability) are implemented via SSEMetrics, telemetry spans, and log hooks; docs/architecture/WEBSOCKET/SSE-Notif.md#Reliability and ops captures DLQ/TTL guidance and reconnect strategy, and smoke playbooks under backend/tests/smoke/sse/* validate RabbitMQ + SSE end-to-end.
  • Worker smoke validation lives at backend/tests/smoke/environments/smoke_worker_stub.py (Celery eager mode + stubbed Virtuozzo) until infra for full dockerized end-to-end tests lands.