Skip to main content
Event-Driven Architecture

Observability for Streams

Ravinder··6 min read
Event-Driven ArchitectureDistributed SystemsArchitectureObservabilityKafkaDistributed Tracing
Share:
Observability for Streams

In a request-driven system, a slow response is immediately visible—the caller times out, the error appears in logs, and the trace shows exactly where latency accumulated. In an event-driven system, slowness is invisible to the producer. A consumer can fall 10 million messages behind and the producer has no idea. By the time someone notices, inventory counts are hours stale, notifications are backlogged, and the support queue is growing.

Observability in event-driven systems requires a different mental model: you are not tracking request latency, you are tracking throughput, lag, and event age across every stage of the pipeline.

The Three Signals

Consumer lag is the most critical metric in Kafka-based systems. It measures how far behind a consumer group is from the latest produced offset—the number of messages yet to be processed.

Throughput (events per second) tells you whether the system is keeping up with demand. A consumer processing 1,000 events/s against a producer emitting 1,200 events/s will fall behind indefinitely.

Event age (end-to-end latency) measures the elapsed time from when an event was produced to when it was fully processed by a consumer. This is the metric that maps to actual business impact.

graph LR P[Producer] -->|publishes| B[Broker] B --> C[Consumer] subgraph "Metrics to track" M1[Lag = end_offset - consumer_offset] M2[Throughput = events/second] M3[Age = now() - event.occurredAt] end C -.->|exposes| M1 C -.->|exposes| M2 C -.->|exposes| M3

Consumer Lag Monitoring

Kafka exposes consumer group offsets and end offsets. Lag = end offset - committed offset.

# CLI view of consumer group lag
kafka-consumer-groups.sh \
  --bootstrap-server kafka:9092 \
  --describe \
  --group inventory-service
 
# Output
GROUP              TOPIC           PARTITION  CURRENT-OFFSET  LOG-END-OFFSET  LAG
inventory-service  orders.placed   0          2450100         2451023         923
inventory-service  orders.placed   1          2448750         2450100         1350
inventory-service  orders.placed   2          2451200         2451200         0

Automate this with prometheus-compatible exporters (kafka-exporter, JMX exporter) and alert on lag thresholds:

# Prometheus alert rule
groups:
  - name: kafka-consumer-lag
    rules:
      - alert: ConsumerLagHigh
        expr: |
          kafka_consumergroup_lag{group="inventory-service"} > 10000
        for: 5m
        labels:
          severity: warning
        annotations:
          summary: "Consumer group {{ $labels.group }} is {{ $value }} messages behind"
 
      - alert: ConsumerLagCritical
        expr: |
          kafka_consumergroup_lag{group="inventory-service"} > 100000
        for: 2m
        labels:
          severity: critical

Lag alone does not tell you if the situation is improving or worsening. Pair it with lag rate of change:

# Lag increasing? Consumer is slower than producer
rate(kafka_consumergroup_lag[5m]) > 0
 
# Lag decreasing? Consumer is catching up
rate(kafka_consumergroup_lag[5m]) < 0

Throughput and Saturation

Consumer throughput should be measured at the application level, not just at the broker level:

import prometheus_client as prom
 
events_processed = prom.Counter(
    "events_processed_total",
    "Total events processed",
    ["event_type", "consumer_group", "status"]
)
 
processing_duration = prom.Histogram(
    "event_processing_duration_seconds",
    "Time to process a single event",
    ["event_type"],
    buckets=[0.001, 0.01, 0.05, 0.1, 0.5, 1.0, 5.0]
)
 
def handle_order_placed(event: dict) -> None:
    with processing_duration.labels(event_type="OrderPlaced").time():
        try:
            process_order(event)
            events_processed.labels(
                event_type="OrderPlaced",
                consumer_group="inventory-service",
                status="success"
            ).inc()
        except Exception as e:
            events_processed.labels(
                event_type="OrderPlaced",
                consumer_group="inventory-service",
                status="error"
            ).inc()
            raise

Dead-Letter Queues (DLQ)

Events that fail processing after retries should be routed to a DLQ—a dedicated topic for events that could not be processed successfully.

sequenceDiagram participant B as Broker participant C as Consumer participant DLQ as Dead Letter Topic participant Alert as Alert Manager B->>C: Deliver event C->>C: Process (attempt 1 - fails) C->>C: Process (attempt 2 - fails) C->>C: Process (attempt 3 - fails) C->>DLQ: Write to orders.placed.dlq DLQ-->>Alert: DLQ has messages → page on-call

DLQ event structure should include failure metadata:

{
  "originalEvent": {
    "id": "evt-abc123",
    "type": "OrderPlaced",
    "data": { "orderId": "ord-456" }
  },
  "failureMetadata": {
    "consumer": "inventory-service",
    "consumerGroup": "inventory-service-v2",
    "attemptCount": 3,
    "lastError": "NullPointerException: item.sku was null",
    "lastAttemptAt": "2025-10-27T14:32:00Z",
    "originalTopic": "orders.placed",
    "originalPartition": 2,
    "originalOffset": 2451200
  }
}
MAX_RETRIES = 3
 
def consume_with_dlq(consumer, dlq_producer):
    for message in consumer:
        event = json.loads(message.value())
        retry_count = int(message.headers().get("retry-count", b"0"))
        
        try:
            process_event(event)
            consumer.commit(message)
        except Exception as e:
            if retry_count >= MAX_RETRIES:
                send_to_dlq(dlq_producer, message, event, str(e))
                consumer.commit(message)  # Don't reprocess
            else:
                requeue_with_backoff(producer, message, retry_count + 1)
                consumer.commit(message)

Alert immediately when the DLQ receives messages. A growing DLQ is a silent data loss event.

End-to-End Distributed Tracing

Events flow across multiple services. A trace that only covers one service does not help you debug latency in the pipeline. You need to propagate trace context from producer to consumer.

Use the W3C traceparent header standard:

from opentelemetry import trace
from opentelemetry.propagate import inject, extract
 
tracer = trace.get_tracer("order-service")
 
def publish_event(event: dict, tenant_id: str) -> None:
    headers = {
        "tenant-id": tenant_id.encode(),
        "event-type": event["type"].encode()
    }
    
    # Inject trace context into headers
    with tracer.start_as_current_span("publish.OrderPlaced") as span:
        span.set_attribute("event.id", event["id"])
        span.set_attribute("event.type", event["type"])
        inject(headers)  # Adds traceparent, tracestate headers
        
        producer.produce(
            topic="orders.placed",
            key=event["data"]["orderId"].encode(),
            value=json.dumps(event).encode(),
            headers=list(headers.items())
        )

Consumer extracts context and creates a child span:

def handle_order_placed(message) -> None:
    headers = dict(message.headers())
    ctx = extract(headers)  # Restore trace context from headers
    
    with tracer.start_as_current_span(
        "consume.OrderPlaced",
        context=ctx,
        kind=trace.SpanKind.CONSUMER
    ) as span:
        span.set_attribute("kafka.partition", message.partition())
        span.set_attribute("kafka.offset", message.offset())
        
        event_time = datetime.fromisoformat(event["occurredAt"])
        end_to_end_latency = (datetime.utcnow() - event_time).total_seconds()
        span.set_attribute("event.end_to_end_latency_s", end_to_end_latency)
        
        process_order(json.loads(message.value()))

Dashboards That Matter

A useful stream observability dashboard has four panels:

graph TD subgraph "Stream Health Dashboard" P1[Consumer Lag per Group/Partition] P2[Events Processed/s by Type] P3[Processing Error Rate %] P4[DLQ Message Count] P5[End-to-End Event Age p50/p99] P6[Consumer Group Active Members] end

Set SLO-based alerts on end-to-end event age. If your SLO is "orders are reflected in inventory within 30 seconds," alert when p99 event age exceeds 25 seconds—before the SLO is breached.

Key Takeaways

  • Consumer lag is the primary health signal in event-driven systems—monitor it per consumer group and per partition, not just in aggregate.
  • Pair lag monitoring with rate-of-change metrics to distinguish between a stable backlog and a growing one.
  • Dead-letter queues must be treated as paging events, not background noise—a message in the DLQ means data was not processed.
  • Propagate W3C traceparent headers from producer to consumer to enable end-to-end distributed tracing across asynchronous service boundaries.
  • End-to-end event age (production time to processing completion) is the business-relevant latency metric; use it to set SLOs.
  • Alert before SLO breaches, not after—buffer your alert thresholds to give operators time to react.
Share: