Issue
We are trying to push a Kafka notification to the external Kafka Topic by sending the Avro Schema Message to the Message Channel.
On sending the message to the channel, we are getting the below exception:
Failed to send Message to channel 'DemoChannel';
nested exception is java.lang.IllegalStateException:
Failed to convert message: 'GenericMessage [payload={"location":"US"}, headers={id=46cf666d-647f-36b7-f43c-259621b78842, contentType=avro/bytes, timestamp=1651571094238}]' to outbound message.
org.springframework.messaging.MessageDeliveryException: failed to send Message to channel 'DemoChannel'; nested exception is java.lang.IllegalStateException: Failed to convert message: 'GenericMessage [payload={"location":"US"}, headers={id=46cf666d-647f-36b7-f43c-259621b78842, contentType=avro/bytes, timestamp=1651571094238}]
at org.springframework.integration.support.utils.IntegrationUtils.wrapInDeliveryExceptionIfNecessary(IntegrationUtils.java:167)
at org.springframework.integration.channel.AbstractMessageChannel.send(AbstractMessageChannel.java:339)
at org.springframework.integration.channel.AbstractMessageChannel.send(AbstractMessageChannel.java:272)
Below is the configuration for the Kafka Topic and Message channel in application.yml file of the Spring Boot service.
cloud:
stream:
bindings:
DemoChannel:
destination: demoTest
content-type: avro/bytes
kafka:
binder:
replication-factor: 1
brokers: ${broker-ip-and-port}
zkNodes: ${zookeeper-ip-and-port}
autoCreateTopics: false
zkConnectionTimeout: 36000
Below is the Message Channel class file:
import org.springframework.cloud.stream.annotation.Output;
import org.springframework.messaging.MessageChannel;
public interface CustomDemoChannel {
@Output("DemoChannel")
MessageChannel customDemoChannel();
}
Below is the Producer code trying to send the Avro Message to the Message channel
//initialized by the autowired CustomDemoChannel variable
MessageChannel messageChannel ;
//DemoChannel is the Avro Generated class file based on the Avro schema file
//avroSchemaObject is constructed and initialized by the inner Builder class of the Avro generated DemoChannel class
DemoChannel avroSchemaObject;
//Message to be published is built with payload
Message<DemoChannel> message = MessageBuilder.withPayload(avroSchemaObject).build();
//Sending the message to the message channel
messageChannel.send(message);
How can I resolve this exception?
Solution
Thanks Soby!
The issue was resolved by making an additional change: i.e defining an AvroSchemaConverter and including this as a resource in the existing Kafka Producer class.
Step 1: Define your AvroSchemaConverter
@Configuration
public class AvroNoSchemaRegistryConfiguration {
public static final String CONVERTER_DEMO = "DemoConverter";
@Bean(name = CONVERTER_DEMO)
public AvroSchemaMessageConverter demoConverter() {
AvroSchemaMessageConverter converter = new AvroSchemaMessageConverter(MimeType.valueOf("avro/bytes"));
return converter;
}
}
Step 2: Use the AvroSchemaConverter as a resource in your producer class file like below:
@Resource(name = AvroNoSchemaRegistryConfiguration.CONVERTER_DEMO)
private AvroSchemaMessageConverter converterDemo;
The resource will be autowired by the Spring Cloud Stream jars and the message gets converted into the Avro format at runtime allowing the message to get published to the topic.
Answered By - durgapmenon
Answer Checked By - Pedro (JavaFixing Volunteer)