Observability for Streams
In a request-driven system, a slow response is immediately visible—the caller times out, the error appears in logs, and the trace shows exactly where latency accumulated. In an event-driven system, slowness is invisible to the producer. A consumer can fall 10 million messages behind and the producer has no idea. By the time someone notices, inventory counts are hours stale, notifications are backlogged, and the support queue is growing.
Observability in event-driven systems requires a different mental model: you are not tracking request latency, you are tracking throughput, lag, and event age across every stage of the pipeline.
The Three Signals
Consumer lag is the most critical metric in Kafka-based systems. It measures how far behind a consumer group is from the latest produced offset—the number of messages yet to be processed.
Throughput (events per second) tells you whether the system is keeping up with demand. A consumer processing 1,000 events/s against a producer emitting 1,200 events/s will fall behind indefinitely.
Event age (end-to-end latency) measures the elapsed time from when an event was produced to when it was fully processed by a consumer. This is the metric that maps to actual business impact.
Consumer Lag Monitoring
Kafka exposes consumer group offsets and end offsets. Lag = end offset - committed offset.
# CLI view of consumer group lag
kafka-consumer-groups.sh \
--bootstrap-server kafka:9092 \
--describe \
--group inventory-service
# Output
GROUP TOPIC PARTITION CURRENT-OFFSET LOG-END-OFFSET LAG
inventory-service orders.placed 0 2450100 2451023 923
inventory-service orders.placed 1 2448750 2450100 1350
inventory-service orders.placed 2 2451200 2451200 0Automate this with prometheus-compatible exporters (kafka-exporter, JMX exporter) and alert on lag thresholds:
# Prometheus alert rule
groups:
- name: kafka-consumer-lag
rules:
- alert: ConsumerLagHigh
expr: |
kafka_consumergroup_lag{group="inventory-service"} > 10000
for: 5m
labels:
severity: warning
annotations:
summary: "Consumer group {{ $labels.group }} is {{ $value }} messages behind"
- alert: ConsumerLagCritical
expr: |
kafka_consumergroup_lag{group="inventory-service"} > 100000
for: 2m
labels:
severity: criticalLag alone does not tell you if the situation is improving or worsening. Pair it with lag rate of change:
# Lag increasing? Consumer is slower than producer
rate(kafka_consumergroup_lag[5m]) > 0
# Lag decreasing? Consumer is catching up
rate(kafka_consumergroup_lag[5m]) < 0Throughput and Saturation
Consumer throughput should be measured at the application level, not just at the broker level:
import prometheus_client as prom
events_processed = prom.Counter(
"events_processed_total",
"Total events processed",
["event_type", "consumer_group", "status"]
)
processing_duration = prom.Histogram(
"event_processing_duration_seconds",
"Time to process a single event",
["event_type"],
buckets=[0.001, 0.01, 0.05, 0.1, 0.5, 1.0, 5.0]
)
def handle_order_placed(event: dict) -> None:
with processing_duration.labels(event_type="OrderPlaced").time():
try:
process_order(event)
events_processed.labels(
event_type="OrderPlaced",
consumer_group="inventory-service",
status="success"
).inc()
except Exception as e:
events_processed.labels(
event_type="OrderPlaced",
consumer_group="inventory-service",
status="error"
).inc()
raiseDead-Letter Queues (DLQ)
Events that fail processing after retries should be routed to a DLQ—a dedicated topic for events that could not be processed successfully.
DLQ event structure should include failure metadata:
{
"originalEvent": {
"id": "evt-abc123",
"type": "OrderPlaced",
"data": { "orderId": "ord-456" }
},
"failureMetadata": {
"consumer": "inventory-service",
"consumerGroup": "inventory-service-v2",
"attemptCount": 3,
"lastError": "NullPointerException: item.sku was null",
"lastAttemptAt": "2025-10-27T14:32:00Z",
"originalTopic": "orders.placed",
"originalPartition": 2,
"originalOffset": 2451200
}
}MAX_RETRIES = 3
def consume_with_dlq(consumer, dlq_producer):
for message in consumer:
event = json.loads(message.value())
retry_count = int(message.headers().get("retry-count", b"0"))
try:
process_event(event)
consumer.commit(message)
except Exception as e:
if retry_count >= MAX_RETRIES:
send_to_dlq(dlq_producer, message, event, str(e))
consumer.commit(message) # Don't reprocess
else:
requeue_with_backoff(producer, message, retry_count + 1)
consumer.commit(message)Alert immediately when the DLQ receives messages. A growing DLQ is a silent data loss event.
End-to-End Distributed Tracing
Events flow across multiple services. A trace that only covers one service does not help you debug latency in the pipeline. You need to propagate trace context from producer to consumer.
Use the W3C traceparent header standard:
from opentelemetry import trace
from opentelemetry.propagate import inject, extract
tracer = trace.get_tracer("order-service")
def publish_event(event: dict, tenant_id: str) -> None:
headers = {
"tenant-id": tenant_id.encode(),
"event-type": event["type"].encode()
}
# Inject trace context into headers
with tracer.start_as_current_span("publish.OrderPlaced") as span:
span.set_attribute("event.id", event["id"])
span.set_attribute("event.type", event["type"])
inject(headers) # Adds traceparent, tracestate headers
producer.produce(
topic="orders.placed",
key=event["data"]["orderId"].encode(),
value=json.dumps(event).encode(),
headers=list(headers.items())
)Consumer extracts context and creates a child span:
def handle_order_placed(message) -> None:
headers = dict(message.headers())
ctx = extract(headers) # Restore trace context from headers
with tracer.start_as_current_span(
"consume.OrderPlaced",
context=ctx,
kind=trace.SpanKind.CONSUMER
) as span:
span.set_attribute("kafka.partition", message.partition())
span.set_attribute("kafka.offset", message.offset())
event_time = datetime.fromisoformat(event["occurredAt"])
end_to_end_latency = (datetime.utcnow() - event_time).total_seconds()
span.set_attribute("event.end_to_end_latency_s", end_to_end_latency)
process_order(json.loads(message.value()))Dashboards That Matter
A useful stream observability dashboard has four panels:
Set SLO-based alerts on end-to-end event age. If your SLO is "orders are reflected in inventory within 30 seconds," alert when p99 event age exceeds 25 seconds—before the SLO is breached.
Key Takeaways
- Consumer lag is the primary health signal in event-driven systems—monitor it per consumer group and per partition, not just in aggregate.
- Pair lag monitoring with rate-of-change metrics to distinguish between a stable backlog and a growing one.
- Dead-letter queues must be treated as paging events, not background noise—a message in the DLQ means data was not processed.
- Propagate W3C traceparent headers from producer to consumer to enable end-to-end distributed tracing across asynchronous service boundaries.
- End-to-end event age (production time to processing completion) is the business-relevant latency metric; use it to set SLOs.
- Alert before SLO breaches, not after—buffer your alert thresholds to give operators time to react.