Issue
I'm new to Kafka, Wanna enable processing in batches through the consumer.
Read through documentation and found that Starting with version 3.0 we can enable batch processing.
Currently we are using Spring Boot 2.1.3.RELEASE
and below dependencies for kafka:
<dependencyManagement>
<dependencies>
<dependency>
<groupId>org.springframework.cloud</groupId>
<artifactId>spring-cloud-dependencies</artifactId>
<version>Greenwich.SR3</version>
<type>pom</type>
<scope>import</scope>
</dependency>
</dependencies>
</dependencyManagement>
<dependency>
<groupId>org.springframework.cloud</groupId>
<artifactId>spring-cloud-stream</artifactId>
</dependency>
<dependency>
<groupId>org.springframework.cloud</groupId>
<artifactId>spring-cloud-stream-binder-kafka</artifactId>
</dependency>
<dependency>
<groupId>org.springframework.kafka</groupId>
<artifactId>spring-kafka</artifactId>
</dependency>
What changes do I need to do in pom.xml before starting properties and code changes? Do i need to change Springboot version?
Solution
You can consume it as a batch with @StreamListener. You just need to give a deserializer. Example :
You just need to give a deserializer.
public class Person {
private String name;
private String surname;
.........
}
@StreamListener(value = PersonStream.INPUT)
private void personBulkReceiver(List<Person> person) {
System.out.println("personBulkReceiver : " + person.size());
}
spring:
cloud:
stream:
kafka:
binders:
bulkKafka:
type: kafka
environment:
spring:
cloud:
stream:
kafka:
binder:
brokers: localhost:9092
configuration:
max.poll.records: 1500
fetch.min.bytes: 1000000
fetch.max.wait.ms: 10000
value.deserializer: tr.cloud.stream.examples.PersonDeserializer
bindings:
person-topic-in:
binder: bulkKafka
destination: person-topic
contentType: application/person
group : omercelik
consumer:
batch-mode: true
public class PersonDeserializer extends JsonDeserializer<Person> {
}
Answered By - omerstack
Answer Checked By - Katrina (JavaFixing Volunteer)