Issue
Versions:
- Spring boot: 2.4.1
- Kafka-clients: 2.6.0
Due some system limitations Kafka Producer is configured to send each message separately. Specifically, batches are disabled by setting linger.ms = 0
and max.batch.size = 0
.
Therefore, each message is new batch from Kafka's client perspective. I want to distribute messages equally between all available partitions, that's why I configured client RoundRobinPartitioner
to achieve it.
However, testing on configuration when one node is down and I have even number of partitions, I figured out that messages are distributed between half of all available partitions. The reason for such behaviour is double invocation of partitioner.partition(..)
in KafkaProducer.doSend(..)
'. Since RoundRobinPartitioner.partition()
under the hood increments counter and returns its remainder of the division by available partitions number, calling it twice within one record publish causes skipping of each second partition.
For instance, availablePartitions contains 6 partitions (1-6). Calling partition(..)
twice through KafkaProducer.doSend(..)
always skip 1,3,5 partitions.
private Future<RecordMetadata> doSend(ProducerRecord<K, V> record, Callback callback) {
..
int partition = partition(record, serializedKey, serializedValue, cluster);
..
RecordAccumulator.RecordAppendResult result = accumulator.append(tp, timestamp, serializedKey,
serializedValue, headers, interceptCallback, remainingWaitMs, true, nowMs);
//Since I disabled batches this if is always true
if (result.abortForNewBatch) {
int prevPartition = partition;
partitioner.onNewBatch(record.topic(), cluster, prevPartition);
partition = partition(record, serializedKey, serializedValue, cluster);
tp = new TopicPartition(record.topic(), partition);
if (log.isTraceEnabled()) {
log.trace("Retrying append due to new batch creation for topic {} partition {}. The old partition was {}", record.topic(), partition, prevPartition);
}
// producer callback will make sure to call both 'callback' and interceptor callback
interceptCallback = new InterceptorCallback<>(callback, this.interceptors, tp);
result = accumulator.append(tp, timestamp, serializedKey,
serializedValue, headers, interceptCallback, remainingWaitMs, false, nowMs);
}
- While it's easy to reimplement, I wonder whether this behaviour is desired?
- What are the reasons to update partition if current batch is closed?
- I'm confident that I'm not the first, How community overcomes this design?
Solution
As a workaround UniformStickyPartitioner
can be used. It does exactly what's needed: selects new partition only on newBatch(..)
call whereas multiple calls to partition(..)
produce the same output. The only difference between real Round Robin mechanism is UniformStickyPartitioner strategy is that partition is calculated as randomInt % availablePartitions. However, the distribution would still be rather equal.
Answered By - quento
Answer Checked By - David Goodson (JavaFixing Volunteer)