Skip to content

Phase 6: SSE/RabbitMQ Observability - Implementation Summary

What was completed

Implemented comprehensive observability for Server-Sent Events (SSE) and RabbitMQ messaging infrastructure to provide production-ready visibility into connection health, message flow, backpressure, and tenant isolation.

Implementation details

Task 6.1: SSE Endpoint Lifecycle Logs ✅

Files modified: - backend/app/api/v1/events.py - backend/app/infrastructure/messaging/sse.py

Features: - SSE stream lifecycle logging: - Stream start: team_id, job_id, client host, user_id - Stream end: team_id, job_id, messages_sent count - Client disconnection detection - Error handling with exc_info

  • Security violation logging:
  • Missing permissions (events:read)
  • Missing team context
  • Cross-tenant access attempts
  • All violations logged via log_auth_violation()

  • PII safety: Never logs payload bodies or message content

Task 6.2: Broker Connection/Retry Visibility ✅

Files modified: - backend/app/infrastructure/messaging/sse.py

Features: - Start/Stop logging: - Broker start: exchange, queue, binding_key configuration - Broker stop: full metrics summary (subscribers, messages consumed/forwarded/dropped)

  • Connection lifecycle:
  • Connection attempts with sanitized URL (credentials hidden)
  • Successful connection: exchange, queue, binding_key, prefetch_count
  • Connection failures: error type, attempt count, failure count, exc_info
  • Retry logic: 5-second delay between attempts

  • Cancellation handling: Clean shutdown on asyncio.CancelledError

Task 6.3: Message Flow Metrics ✅

Files modified: - backend/app/infrastructure/messaging/sse.py

New components: - SSEMetrics dataclass:

subscribers_active: int = 0          # Current SSE clients (gauge)
messages_consumed_total: int = 0     # RabbitMQ messages received
messages_forwarded_total: int = 0    # Messages delivered to clients
messages_dropped_total: int = 0      # Backpressure drops
connection_attempts: int = 0         # RabbitMQ connect tries
connection_failures: int = 0         # Failed connections

  • get_metrics() function:
  • Exposes current metrics for monitoring/dashboards
  • Includes connection state and last_error
  • Can be integrated with /metrics endpoint

  • Metric updates:

  • Subscribe/unsubscribe: updates subscribers_active
  • Message handling: increments consumed/forwarded/dropped
  • Connection attempts/failures tracked
  • Backpressure detection with detailed logging

Task 6.4: Backpressure Strategy Observability ✅

Implementation: - Queue full detection: Logs when subscriber queues hit capacity - Drop oldest strategy: Documented and logged - Backpressure warnings: - routing_key - queue_size - dropped/delivered counts - subscribers_active

Task 6.5: Subscriber Management Logging ✅

Features: - Subscribe events: logs max_queue_size, subscribers_active - Unsubscribe events: logs subscribers_active - Stale queue cleanup: automatically removes closed queues

Files created/modified

Modified: - backend/app/infrastructure/messaging/sse.py - Core observability implementation - backend/app/api/v1/events.py - Security violation logging

No new files (all changes integrated into existing SSE infrastructure)

Log Examples

Connection Success

{
  "event": "RabbitMQ connection established",
  "exchange": "events.topic",
  "queue": "events.sse",
  "binding_key": "team.*.job.*.status",
  "prefetch_count": 10,
  "timestamp": "2025-12-16T..."
}

SSE Stream Lifecycle

{
  "event": "SSE stream started",
  "team_id": "42",
  "job_id": "job-123",
  "client": "192.168.1.10",
  "correlation_id": "...",
  "user_id": 100
}

Backpressure Warning

{
  "level": "warning",
  "event": "Messages dropped due to backpressure",
  "routing_key": "team.42.job.123.status",
  "dropped": 1,
  "delivered": 5,
  "subscribers_active": 6
}

Cross-Tenant Violation

{
  "level": "warning",
  "event": "auth_violation",
  "violation": "cross_tenant_sse_access",
  "http_status": 403,
  "detail": "Cross-tenant SSE access attempt",
  "team_id": 42,
  "user_id": 100,
  "requested_team_id": "99"
}

Production Benefits

Troubleshooting: Immediate visibility into connection failures ✅ Performance: Track message throughput and backpressure ✅ Security: Log all cross-tenant access attempts ✅ Capacity planning: Monitor subscribers_active and queue depth ✅ Reliability: Connection retry visibility with error context

Next Steps

  • Phase 7: Auth/Security logging (partially complete)
  • Phase 8: External API logging (partially complete)
  • Future: Export metrics to Prometheus/DataDog/CloudWatch

Metrics Integration

The get_metrics() function can be exposed via a /metrics endpoint:

@router.get("/admin/sse-metrics")
async def sse_metrics():
    return get_metrics()

Verification

Test the observability:

# Monitor logs while running SSE smoke tests
tail -f logs/app.jsonl | grep -E "SSE|RabbitMQ"

# Run SSE smoke tests
bash backend/tests/smoke/sse/smoke_sse_stream.sh
bash backend/tests/smoke/sse/smoke_rabbitmq_roundtrip.sh


Status: Production-ready Breaking changes: None Dependencies: No new dependencies required