Issue
I am using kafka to publish both async and sync messages to the broker .One listener would listen to the topic and respond for both sync and async calls. I am using same request topic for both the templates .. When using fire and forget(Async) I don't see any issues since listener would listen to the messages randomly from topic.When using synchronous call I am getting timeout exception.
- Do I need to maintain multiple listeners for different templates ?
- With same topic for both synchronous and async operations would there be any issues?
KafkaConfig.java
//Template for synchornous call
@Bean
public ReplyingKafkaTemplate<String, Model, Model> replyingKafkaTemplate (
ProducerFactory<String, Model> pf,
ConcurrentMessageListenerContainer<String, Model> repliesContainer)
{
ReplyingKafkaTemplate<String, Model, Model> replyTemplate =
new ReplyingKafkaTemplate<>(pf, repliesContainer);
replyTemplate.setSharedReplyTopic(true);
return replyTemplate;
}
@Bean //register ConcurrentMessageListenerContainer bean
public ConcurrentMessageListenerContainer<String, Model> repliesContainer (
ConcurrentKafkaListenerContainerFactory<String, Model> containerFactory)
{
ConcurrentMessageListenerContainer<String, Model> repliesContainer =
containerFactory.createContainer("responseTopic");
repliesContainer.getContainerProperties().setGroupId(UUID.randomUUID().toString());
repliesContainer.setAutoStartup(false);
return repliesContainer;
}
//Template for asynchronous call
@Bean
@Qualifier("kafkaTemplate")
public KafkaTemplate<String, Model> kafkaTemplate (
ProducerFactory<String, Model> pf,
ConcurrentKafkaListenerContainerFactory<String, Model> factory)
{
KafkaTemplate<String, Model> kafkaTemplate = new KafkaTemplate<>(pf);
factory.setReplyTemplate(kafkaTemplate);
return kafkaTemplate;
}
Here is service class
@Service
public class KafkaService
{
@Autowired
private ReplyingKafkaTemplate<String, Model, Model> replyingKafkaTemplate;
@Autowired
private KafkaTemplate<String, Model> kafkaTemplate;
@Autowired
private KafkaConfig config;
public Object sendAndReceive (Model model)
{
ProducerRecord<String, Model> producerRecord =
new ProducerRecord("requestTopic", model);
producerRecord.headers()
.add(
new RecordHeader(KafkaHeaders.REPLY_TOPIC, "replyTopic"));
RequestReplyFuture<String, Model, Model> replyFuture =
replyingKafkaTemplate.sendAndReceive(producerRecord, Duration.ofSeconds(timeout));
ConsumerRecord<String, Model> consumerRecord =
replyFuture.get(timeout, TimeUnit.SECONDS);
return consumerRecord.value();
}
public ResponseEntity<Object> send (final Model model)
{
final ProducerRecord<String, Model> producerRecord =
new ProducerRecord("requestTopic", model);
final ListenableFuture<SendResult<String, Model>> future =
kafkaTemplate.send(producerRecord);
final SendResult<String, Model> sendResult = future.get(timeout, TimeUnit.SECONDS);
return new ResponseEntity<>(sendResult, HttpStatus.ACCEPTED);
}
}
Here is the listener class.
@Slf4j
@Service
public class MessageListener
{
@KafkaListener(groupId = "${group.id}", topics = "requestTopic", containerFactory = "kafkaListenerContainerFactory")
@SendTo
public Model consumer (Model model)
{
switch (model.getType()) {
case "async":
System.out.println("Async messages are retrieved");
case "sync":
System.out.println("Sync messages are retrieved");
return model;
}
return model;
}
@Bean
public ConcurrentKafkaListenerContainerFactory<?, ?> kafkaListenerContainerFactory (
ConcurrentKafkaListenerContainerFactoryConfigurer configurer,
ConsumerFactory<Object, Object> kafkaConsumerFactory)
{
ConcurrentKafkaListenerContainerFactory<Object, Object> factory = new ConcurrentKafkaListenerContainerFactory<>();
configurer.configure(factory, kafkaConsumerFactory);
return factory;
}
}
Solution
This works exactly as I expected...
@SpringBootApplication
public class So73657031Application {
public static void main(String[] args) {
SpringApplication.run(So73657031Application.class, args);
}
@Bean
ReplyingKafkaTemplate<String, String, String> rkt(ProducerFactory<String, String> pf,
ConcurrentKafkaListenerContainerFactory<String, String> factory,
KafkaTemplate<String, String> template) {
ConcurrentMessageListenerContainer<String, String> container = factory.createContainer("so73657031-replies");
container.getContainerProperties().setGroupId("so73657031-replies");
return new ReplyingKafkaTemplate<>(pf, container);
}
@Bean
KafkaTemplate<String, String> template(ProducerFactory<String, String> pf) {
return new KafkaTemplate<>(pf);
}
@Bean
NewTopic topic1() {
return TopicBuilder.name("so73657031").partitions(1).replicas(1).build();
}
@Bean
NewTopic topic2() {
return TopicBuilder.name("so73657031-replies").partitions(1).replicas(1).build();
}
@Bean
public ApplicationRunner runner(ReplyingKafkaTemplate<String, String, String> rTemplate,
KafkaTemplate<String, String> template) {
return args -> {
RequestReplyFuture<String, String, String> future =
rTemplate.sendAndReceive(new ProducerRecord<String, String>("so73657031", 0, null, "test"),
Duration.ofSeconds(30));
System.out.println(future.getSendFuture().get(10, TimeUnit.SECONDS).getRecordMetadata());
System.out.println(future.get(30, TimeUnit.SECONDS).value());
ListenableFuture<SendResult<String, String>> future2 = template.send("so73657031", "oneWay");
System.out.println(future2.get(10, TimeUnit.SECONDS).getRecordMetadata());
};
}
}
@Component
class Listener {
@KafkaListener(id = "so73657031", topics = "so73657031")
@SendTo
String listen(String in) {
System.out.println(in);
return in.toUpperCase();
}
}
logging.level.root=warn
logging.level.org.springframework.kafka.listener.adapter=debug
so73657031-0@2
2022-09-15 15:36:34.496 DEBUG 71184 --- [o73657031-0-C-1] .a.RecordMessagingMessageListenerAdapter : Processing [GenericMessage [payload=test, headers={kafka_offset=2, kafka_consumer=org.apache.kafka.clients.consumer.KafkaConsumer@1582e8e4, kafka_correlationId=[B@2a266829, kafka_timestampType=CREATE_TIME, kafka_deliveryAttempt=1, kafka_replyTopic=[B@3dad3e81, kafka_receivedPartitionId=0, kafka_receivedTopic=so73657031, kafka_receivedTimestamp=1663270594381, kafka_groupId=so73657031}]]
test
2022-09-15 15:36:34.499 DEBUG 71184 --- [o73657031-0-C-1] .a.RecordMessagingMessageListenerAdapter : Listener method returned result [TEST] - generating response message for it
TEST
so73657031-0@3
2022-09-15 15:36:34.519 DEBUG 71184 --- [o73657031-0-C-1] .a.RecordMessagingMessageListenerAdapter : Processing [GenericMessage [payload=oneWay, headers={kafka_offset=3, kafka_consumer=org.apache.kafka.clients.consumer.KafkaConsumer@1582e8e4, kafka_timestampType=CREATE_TIME, kafka_deliveryAttempt=1, kafka_receivedPartitionId=0, kafka_receivedTopic=so73657031, kafka_receivedTimestamp=1663270594514, kafka_groupId=so73657031}]]
oneWay
2022-09-15 15:36:34.519 DEBUG 71184 --- [o73657031-0-C-1] .a.RecordMessagingMessageListenerAdapter : Listener method returned result [ONEWAY] - generating response message for it
2022-09-15 15:36:34.519 DEBUG 71184 --- [o73657031-0-C-1] .a.RecordMessagingMessageListenerAdapter : No replyTopic to handle the reply: ONEWAY
Answered By - Gary Russell
Answer Checked By - Dawn Plyler (JavaFixing Volunteer)