Skip to main content
Observability in Depth

Tracing: End-to-End, Including the Queue

Ravinder··6 min read
ObservabilityTelemetryDistributed TracingOpenTelemetryKafka
Share:
Tracing: End-to-End, Including the Queue

Distributed traces look clean on slides: service A calls service B calls service C, waterfall rendered beautifully. In production, most interesting latency problems live in the gaps — the message queue between the order service and the fulfillment worker, the async job scheduler that fires three steps later, the webhook callback that arrives minutes after the original request returned 202.

These async hops are where traces break. Context propagation fails at queue boundaries not because the tools don't support it, but because nobody wired it up. This post is the wiring guide.

Where Traces Break

sequenceDiagram participant C as Client participant API as Order API participant Q as Kafka Topic participant W as Fulfillment Worker participant N as Notification Service C->>API: POST /orders (trace: A1B2) API->>Q: produce message Note over Q,W: ❌ trace context missing
from message headers W->>W: consume message (new root span: C3D4) W->>N: trigger notification (trace: C3D4) Note over C,N: Two disconnected traces
instead of one end-to-end

The fix is injecting the W3C traceparent header into the message before it enters the queue, and extracting it on the consumer side to create a child span.

W3C Trace Context in Kafka Messages

Kafka message headers are the propagation vehicle. OTel's TextMapPropagator works against any string key-value carrier — including Kafka headers.

Producer side (Go):

import (
    "go.opentelemetry.io/otel"
    "go.opentelemetry.io/otel/propagation"
    "github.com/IBM/sarama"
)
 
type kafkaHeaderCarrier sarama.RecordHeaders
 
func (c kafkaHeaderCarrier) Get(key string) string {
    for _, h := range c {
        if string(h.Key) == key {
            return string(h.Value)
        }
    }
    return ""
}
 
func (c *kafkaHeaderCarrier) Set(key, val string) {
    *c = append(*c, sarama.RecordHeader{
        Key:   []byte(key),
        Value: []byte(val),
    })
}
 
func (c kafkaHeaderCarrier) Keys() []string { return nil }
 
func publishOrder(ctx context.Context, producer sarama.SyncProducer, order Order) error {
    tracer := otel.Tracer("order-api")
    ctx, span := tracer.Start(ctx, "kafka.produce order.placed",
        trace.WithSpanKind(trace.SpanKindProducer),
    )
    defer span.End()
 
    var headers sarama.RecordHeaders
    carrier := kafkaHeaderCarrier(headers)
    otel.GetTextMapPropagator().Inject(ctx, &carrier)
 
    msg := &sarama.ProducerMessage{
        Topic:   "orders",
        Value:   sarama.ByteEncoder(orderJSON(order)),
        Headers: []sarama.RecordHeader(carrier),
    }
    _, _, err := producer.SendMessage(msg)
    return err
}

Consumer side (Go):

func processOrder(msg *sarama.ConsumerMessage) {
    // Extract context from Kafka headers
    carrier := kafkaHeaderCarrier(msg.Headers)
    ctx := otel.GetTextMapPropagator().Extract(context.Background(), carrier)
 
    tracer := otel.Tracer("fulfillment-worker")
    ctx, span := tracer.Start(ctx, "kafka.consume order.placed",
        trace.WithSpanKind(trace.SpanKindConsumer),
        trace.WithAttributes(
            semconv.MessagingSystemKafka,
            semconv.MessagingKafkaConsumerGroup("fulfillment-workers"),
            semconv.MessagingDestinationName("orders"),
        ),
    )
    defer span.End()
 
    // All downstream work is now a child of the original request span
    fulfillOrder(ctx, msg)
}

After this wiring, the waterfall in Tempo/Jaeger shows a single trace from the HTTP request through Kafka through the worker — queue latency visible as a span gap.

Python: Confluent Kafka with OTel

from opentelemetry import trace, propagate
from opentelemetry.semconv.trace import SpanAttributes
from confluent_kafka import Producer, Consumer
 
TRACER = trace.get_tracer("fulfillment-worker")
 
class KafkaCarrier(dict):
    """Carrier that maps OTel TextMap to Kafka headers (bytes dict)."""
    def get(self, key, default=None):
        val = super().get(key)
        return val.decode() if isinstance(val, bytes) else default
 
# Producer
def produce_with_context(ctx, producer: Producer, topic: str, value: bytes):
    carrier = {}
    propagate.inject(carrier, context=ctx)
    headers = [(k, v.encode()) for k, v in carrier.items()]
 
    with TRACER.start_as_current_span("kafka.produce",
            kind=trace.SpanKind.PRODUCER) as span:
        span.set_attribute(SpanAttributes.MESSAGING_SYSTEM, "kafka")
        producer.produce(topic, value, headers=headers)
 
# Consumer
def consume_loop(consumer: Consumer):
    while True:
        msg = consumer.poll(1.0)
        if msg is None:
            continue
        carrier = KafkaCarrier({k: v for k, v in msg.headers()})
        ctx = propagate.extract(carrier)
 
        with TRACER.start_as_current_span("kafka.consume",
                context=ctx, kind=trace.SpanKind.CONSUMER) as span:
            span.set_attribute(SpanAttributes.MESSAGING_SYSTEM, "kafka")
            process_message(ctx, msg)

Async Jobs and Scheduled Work

For tasks fired by a scheduler (cron, Celery beat, Temporal), there is no inbound HTTP request carrying a traceparent. The pattern here is a link rather than a parent-child relationship:

// Celery-equivalent in Go: job triggered by a scheduler
func runDailyReconciliation(ctx context.Context, triggerTraceID string) {
    // Parse the trace ID that was stored when the job was enqueued
    remoteCtx := traceIDToContext(triggerTraceID)
    link := trace.Link{SpanContext: trace.SpanContextFromContext(remoteCtx)}
 
    tracer := otel.Tracer("reconciliation-worker")
    _, span := tracer.Start(ctx, "daily.reconciliation",
        trace.WithLinks(link),
        trace.WithSpanKind(trace.SpanKindInternal),
        trace.WithAttributes(
            attribute.String("job.trigger", "scheduler"),
            attribute.String("job.type", "daily_reconciliation"),
        ),
    )
    defer span.End()
    // ...
}

Links appear in Tempo/Jaeger as clickable references — you can jump from the job's trace to the enqueuing trace even though they are separate root spans.

The OTel Collector as a Trace-Aware Router

Configure the Collector to tail-sample based on trace completeness — only keep traces that include at least one error or exceed a latency threshold:

# otelcol — tail sampling processor
processors:
  tail_sampling:
    decision_wait: 30s
    num_traces: 50000
    expected_new_traces_per_sec: 1000
    policies:
      - name: errors-policy
        type: status_code
        status_code:
          status_codes: [ERROR]
 
      - name: slow-traces
        type: latency
        latency:
          threshold_ms: 2000
 
      - name: probabilistic-baseline
        type: probabilistic
        probabilistic:
          sampling_percentage: 5
 
service:
  pipelines:
    traces:
      receivers: [otlp]
      processors: [tail_sampling, batch]
      exporters: [otlphttp/tempo]

Tail sampling requires all spans of a trace to arrive at the same Collector instance. Use a load-balancing Collector tier in front:

# Front-tier: route by trace ID to same back-tier instance
exporters:
  loadbalancing:
    protocol:
      otlp:
        tls:
          insecure: true
    resolver:
      dns:
        hostname: otelcol-backend
        port: 4317

Verifying Propagation End-to-End

A quick smoke test before shipping to production:

# 1. Make a request and capture the trace ID from the response header
TRACE_ID=$(curl -sI -X POST http://localhost:8080/orders \
  -H "Content-Type: application/json" \
  -d '{"sku":"ABC","qty":1}' | grep -i "x-trace-id" | awk '{print $2}' | tr -d '\r')
 
# 2. Wait for async processing
sleep 5
 
# 3. Query Tempo for all spans in this trace
curl "http://tempo:3200/api/traces/$TRACE_ID" | \
  jq '.batches[].scopeSpans[].spans[] | {name, kind, duration_ms: (.endTimeUnixNano - .startTimeUnixNano)/1e6}'

If the Kafka consumer and notification service spans appear in the output, context propagation is working across the async boundary.

Key Takeaways

  • Traces break at async boundaries because Kafka messages (and most queue protocols) do not carry context by default — you must inject and extract traceparent headers manually or via an OTel instrumentation library.
  • W3C Trace Context headers in Kafka message metadata is the standard mechanism; the OTel TextMapPropagator interface works against any string key-value carrier.
  • For scheduler-triggered jobs with no inbound context, use span links to connect the job trace back to the enqueuing trace without forcing an artificial parent-child hierarchy.
  • Tail sampling at the OTel Collector layer is necessary at scale — but requires a load-balancing tier to ensure all spans of a trace land on the same back-tier Collector.
  • Async hops appear as span gaps in the waterfall, making queue latency visible as a first-class latency signal rather than a blind spot.
  • Validate end-to-end propagation with a smoke test that checks for consumer and worker spans in the same trace ID as the originating HTTP request.
Share: