Issue
I am using spring-cloud-stream-kafka-binder-3.0.4 to consume the messages in batch, after consuming, converting the Message into object but getting the above exception.
Here is the code:
@StreamListener(ActivityChannel.ACTIVITY_INPUT_CHANNEL)
public void handleActivity(List<Message<Event>> messages,
@Header(name = "deliveryAttempt", defaultValue = "1") int deliveryAttempt,
@Header(KafkaHeaders.ACKNOWLEDGMENT) Acknowledgment acknowledgment)
{
try
{
log.info("Received activity message with message length {} attempt {}",
messages.size(), deliveryAttempt);
List<Event> eventList = messages.stream().map(Message::getPayload).collect(Collectors.toList());
nodeConfigActivityBatchProcessor.processNodeConfigActivity(eventList);
acknowledgment.acknowledge();
log.debug("Processed activity message {} successfully!!", messages.size());
}
catch (Exception e)
{
throw e;
}
}
Configuration:
spring.cloud.stream.bindings.activity-input-channel.destination=TOPIC.FEED.NAME
spring.cloud.stream.bindings.activity-input-channel.contentType=application/json
spring.cloud.stream.bindings.activity-input-channel.consumer.batch-mode=true
spring.cloud.stream.bindings.activity-input-channel.consumer.max-attempts=1
spring.cloud.stream.kafka.bindings.activity-input-channel.consumer.auto-commit-offset=false
spring.cloud.stream.kafka.bindings.activity-input-channel.consumer.reset-offsets=true
spring.cloud.stream.kafka.bindings.activity-input-channel.consumer.start-offset=latest
spring.kafka.consumer.max-poll-records=5
spring.kafka.consumer.fetch-max-wait=60000
spring.kafka.consumer.fetch-min-size=500
I get the above error at this line List<Event> eventList = messages.stream().map(Message::getPayload).collect(Collectors.toList());
at .collect(Collectors.toList()). I am not able to figure out why??
if I check Message<Event> eventMessage = messages.get(0)
getting the same exception(messages is the list of Message variable).
if batch mode if false then it only consume a single message handleActivity(Message message), then it works fine, no exception.
Is there any deserializer needs to be added when using batch mode???
Solution
I managed to solve this Exception, by adding a Deserialiser.
So Below is my batch Listener, Instead of consuming List<Message<Event>> messages
as mentioned in the question, consuming List<Event> messages
.
@StreamListener(ActivityChannel.ACTIVITY_INPUT_CHANNEL)
public void handleActivity(List<Event> messages,@Header(KafkaHeaders.ACKNOWLEDGMENT) Acknowledgment acknowledgment)
{
try
{
log.info("Received activity message with message length {} attempt
{}",messages.size(), deliveryAttempt);
nodeConfigActivityBatchProcessor.processNodeConfigActivity(messages);
acknowledgment.acknowledge();
log.debug("Processed activity message {} successfully!!",
messages.size());
}
catch (Exception e)
{
throw e;
}
}
Added the below Deserialiser
public class EventDeserializer extends JsonDeserializer<Event> {
}
Added below value.deserializer property to properties file.
spring.cloud.stream.kafka.bindings.input-channel.consumer.configuration.value.deserializer=com.sample.messaging.util.EventDeserializer
I got list of events in my batch listener.
Answered By - APK
Answer Checked By - Candace Johnson (JavaFixing Volunteer)