Issue
I'm wondering a way to perform a callback using StreamBridge
, I want to do something similar to KafkaTemplate.send
that returns a ListenableFuture
.
Is it possible with spring cloud stream to publish some events using kafka binder and use a callback like onSuccess and onFailure?
example: producer.send(record, new callback { ... })
Solution
You can either set sync
on the producer binding and the send will wait internally on the future completion, or you can configure a recordMetadataChannel
to get the results of the send asynchronously.
recordMetadataChannel
The bean name of a
MessageChannel
to which successful send results should be sent; the bean must exist in the application context. The message sent to the channel is the sent message (after conversion, if any) with an additional headerKafkaHeaders.RECORD_METADATA
. The header contains aRecordMetadata
object provided by the Kafka client; it includes the partition and offset where the record was written in the topic.
ResultMetadata meta =
sendResultMsg.getHeaders().get(KafkaHeaders.RECORD_METADATA, RecordMetadata.class)
Failed sends go the producer error channel (if configured); see Error Channels.
EDIT
Here's an example:
spring.cloud.stream.bindings.output-out-0.destination=dest1
spring.cloud.stream.bindings.output-out-0.producer.error-channel-enabled=true
spring.cloud.stream.kafka.bindings.output-out-0.producer.record-metadata-channel=meta
spring.cloud.stream.kafka.bindings.output-out-0.producer.configuration.[max.block.ms]=5000
spring.cloud.stream.kafka.bindings.output-out-0.producer.configuration.[request.timeout.ms]=5000
spring.cloud.stream.kafka.bindings.output-out-0.producer.configuration.[retries]=0
@SpringBootApplication
public class So72900966Application {
public static void main(String[] args) {
SpringApplication.run(So72900966Application.class, args);
}
@Bean
ApplicationRunner runner(StreamBridge bridge) {
return args -> {
bridge.send("output-out-0", "foo");
System.out.println("Delete topic dest1 from broker; then hit Enter");
System.in.read();
bridge.send("output-out-0", "foo");
Thread.sleep(2_000);
};
}
}
@Component
class ResultHandler {
@ServiceActivator(inputChannel = "meta")
void meta(Message<?> result) {
System.out.println(result.getHeaders().get(KafkaHeaders.RECORD_METADATA, RecordMetadata.class));
}
@ServiceActivator(inputChannel = "errorChannel")
void errors(Message<?> error) {
System.out.println(error);
}
}
After the first result is received:
kafka-topics.sh --bootstrap-server localhost:9092 --delete --topic dest1
Then hit enter.
Result:
Delete topic dest1 from broker; then hit Enter
dest1-0@0
...
ErrorMessage [payload=org.springframework.integration.kafka.support.KafkaSendFailureException: ...
2022-07-07 13:36:19.185 ERROR 11735 --- [ad | producer-1] o.s.k.support.LoggingProducerListener : Exception thrown when sending a message with key='null' and payload='byte[3]' to topic dest1:
org.apache.kafka.common.errors.UnknownTopicOrPartitionException: This server does not host this topic-partition.
Answered By - Gary Russell
Answer Checked By - Mary Flores (JavaFixing Volunteer)