Skip to main content
Kafka in Production

Consumer Group Patterns

Ravinder··6 min read
KafkaStreamingDistributed SystemsConsumer GroupsOffset Management
Share:
Consumer Group Patterns

Consumer groups are where most Kafka operational pain actually lives. Producers are relatively stateless—they send and move on. Consumers carry state: which partition they own, where they left off, what they're currently processing. That state is the source of every rebalancing storm, every duplicate-on-restart, and every mysterious consumer lag that only appears under load.

This post covers how group membership and rebalancing actually work, the offset commit strategies and their failure modes, and patterns for maximizing consumer parallelism without thrashing.

How Consumer Group Rebalancing Works

When a consumer joins or leaves a group, or when partition ownership needs to change, Kafka triggers a rebalance. During a rebalance, all consumers in the group stop processing until partition assignments are resolved.

The default protocol (eager rebalancing) is blunt:

  1. All consumers revoke all partitions.
  2. The group coordinator (a broker) runs the assignment algorithm.
  3. All consumers receive new assignments and resume.
sequenceDiagram participant C1 as Consumer 1 participant GC as Group Coordinator participant C2 as Consumer 2 (new) C2->>GC: JoinGroup request GC->>C1: Rebalance triggered — revoke all GC->>C2: Rebalance triggered C1->>GC: SyncGroup (empty assignment) C2->>GC: SyncGroup (empty assignment) GC->>C1: New assignment: P0, P1 GC->>C2: New assignment: P2, P3 note over C1,C2: Stop-the-world: zero throughput
for duration of rebalance

For a group with 50 consumers processing high-volume topics, a single pod restart triggers a full stop-the-world rebalance across all 50. That can mean 30–60 seconds of zero throughput cluster-wide.

Cooperative (incremental) rebalancing fixes this. Introduced in Kafka 2.4 and made the default in 3.x with CooperativeStickyAssignor, it only revokes partitions that need to move:

props.put(ConsumerConfig.PARTITION_ASSIGNMENT_STRATEGY_CONFIG,
    CooperativeStickyAssignor.class.getName());

With CooperativeStickyAssignor, consumers keep their existing partitions during a rebalance. Only the partitions that need to be transferred are paused and moved. This turns a cluster-wide pause into a localized handoff.

Offset Commit Strategies

The offset you commit is the position the consumer will resume from after a crash or rebalance. Commit too late and you replay. Commit too early and you lose.

Auto-Commit (the default trap)

props.put(ConsumerConfig.ENABLE_AUTO_COMMIT_CONFIG, true);
props.put(ConsumerConfig.AUTO_COMMIT_INTERVAL_MS_CONFIG, 5000);

Auto-commit commits the offset of the last polled record every 5 seconds, regardless of whether your application has finished processing those records. The failure mode:

  1. Consumer polls records 0–99.
  2. Consumer processes records 0–49.
  3. Auto-commit fires, commits offset 100.
  4. Consumer crashes processing record 50.
  5. Consumer restarts, resumes from 100—records 50–99 are lost.

Auto-commit is appropriate only for idempotent, best-effort pipelines where at-most-once delivery is acceptable.

Manual Synchronous Commit

props.put(ConsumerConfig.ENABLE_AUTO_COMMIT_CONFIG, false);
 
while (true) {
    ConsumerRecords<String, String> records = consumer.poll(Duration.ofMillis(100));
    for (ConsumerRecord<String, String> record : records) {
        process(record);
    }
    // Commit after entire batch is processed
    consumer.commitSync();
}

commitSync() blocks until the broker acknowledges. Guarantees at-least-once delivery. The downside is throughput: every poll cycle includes a synchronous network round trip for the commit.

Manual Asynchronous Commit with Per-Partition Tracking

props.put(ConsumerConfig.ENABLE_AUTO_COMMIT_CONFIG, false);
 
Map<TopicPartition, OffsetAndMetadata> currentOffsets = new HashMap<>();
 
while (true) {
    ConsumerRecords<String, String> records = consumer.poll(Duration.ofMillis(100));
 
    for (ConsumerRecord<String, String> record : records) {
        process(record);
        currentOffsets.put(
            new TopicPartition(record.topic(), record.partition()),
            new OffsetAndMetadata(record.offset() + 1)
        );
    }
 
    consumer.commitAsync(currentOffsets, (offsets, exception) -> {
        if (exception != null) {
            log.error("Commit failed for offsets {}", offsets, exception);
            // Do NOT retry here — a later commitAsync will cover these offsets
        }
    });
}

commitAsync() does not block. The callback handles failures, but retrying a failed async commit is dangerous—a later commit may have already moved the offset forward, and retrying the older offset would roll it back.

The correct pattern on shutdown: flush with commitSync() as the final act:

try {
    while (running) {
        ConsumerRecords<String, String> records = consumer.poll(Duration.ofMillis(100));
        process(records);
        consumer.commitAsync();
    }
} finally {
    consumer.commitSync();  // Best-effort final commit before close
    consumer.close();
}

Session Timeout and Heartbeat Tuning

The group coordinator considers a consumer dead if it misses heartbeats for session.timeout.ms. The consumer sends heartbeats on a background thread at heartbeat.interval.ms.

props.put(ConsumerConfig.SESSION_TIMEOUT_MS_CONFIG, 30000);      // 30s
props.put(ConsumerConfig.HEARTBEAT_INTERVAL_MS_CONFIG, 10000);   // 10s
props.put(ConsumerConfig.MAX_POLL_INTERVAL_MS_CONFIG, 300000);   // 5 min

max.poll.interval.ms is separate from the heartbeat. If your application takes longer than this to return from the poll() call loop (e.g., because processing is slow), Kafka treats the consumer as dead and triggers a rebalance—even though the heartbeat thread is still healthy.

flowchart TD A[consumer.poll called] --> B[Records returned] B --> C[Application processes records] C --> D{Time since last poll > max.poll.interval.ms?} D -->|Yes| E[Kafka declares consumer dead] E --> F[Rebalance triggered] D -->|No| A

If your processing is slow, either reduce max.poll.records to process fewer records per poll, or increase max.poll.interval.ms. Don't just increase the timeout blindly—first understand why processing is slow.

props.put(ConsumerConfig.MAX_POLL_RECORDS_CONFIG, 100);  // Default 500 — reduce if processing is slow

Parallelism Patterns

Pattern 1: Scale consumers to partition count

Max parallelism = partition count. If you have 24 partitions, run 24 consumers in the group. More consumers than partitions sit idle.

Pattern 2: Thread-per-consumer with shared executor

ExecutorService executor = Executors.newFixedThreadPool(CONSUMER_COUNT);
for (int i = 0; i < CONSUMER_COUNT; i++) {
    executor.submit(new ConsumerRunnable(bootstrapServers, groupId, topic));
}

Each ConsumerRunnable creates its own KafkaConsumer. KafkaConsumer is not thread-safe—never share one instance across threads.

Pattern 3: Consumer with async processing pipeline

while (running) {
    ConsumerRecords<String, String> records = consumer.poll(Duration.ofMillis(100));
    List<CompletableFuture<Void>> futures = new ArrayList<>();
 
    for (ConsumerRecord<String, String> record : records) {
        futures.add(CompletableFuture.runAsync(() -> process(record), executor));
    }
 
    CompletableFuture.allOf(futures.toArray(new CompletableFuture[0])).join();
    consumer.commitSync();
}

Process records concurrently within a poll batch, then commit after all complete. This increases throughput when processing is CPU-bound but adds complexity to error handling.

Key Takeaways

  • Eager rebalancing pauses the entire consumer group—switch to CooperativeStickyAssignor to limit the blast radius to only the partitions that need to move.
  • Auto-commit is at-most-once delivery, not at-least-once—it commits before processing completes, which silently drops records on crash.
  • Never retry a failed commitAsync()—a later commit may have already advanced the offset; retrying the older one rolls it back and causes duplicates.
  • max.poll.interval.ms is not the same as the heartbeat timeout—a slow processing loop triggers a rebalance even when the heartbeat thread is healthy; fix it by reducing max.poll.records first.
  • Consumer count beyond partition count adds zero parallelism—extra consumers sit idle; the ceiling is the partition count of the subscribed topics.
  • Commit commitSync() on shutdown—async commits in flight during an orderly shutdown may not complete; a final synchronous commit prevents an unnecessary replay on the next startup.
Share: