Issue
I am using embeddedKafkaBroker and Kafka Binder Streams.
@Configuration
@Profile({"dev", "test"})
@Slf4j
public class EmbeddedKafkaBrokerConfig {
private static final String TMP_EMBEDDED_KAFKA_LOGS =
String.format("/tmp/embedded-kafka-logs-%1$s/", UUID.randomUUID());
private static final String PORT = "port";
private static final String LOG_DIRS = "log.dirs";
private static final String LISTENERS = "listeners";
private static final Integer KAFKA_PORT = 9092;
private static final String LISTENERS_VALUE = "PLAINTEXT://localhost:" + KAFKA_PORT;
private static final Integer ZOOKEEPER_PORT = 2181;
private EmbeddedKafkaBroker embeddedKafkaBroker;
/**
* bean for the embeddedKafkaBroker.
*
* @return local embeddedKafkaBroker
*/
@Bean
@Qualifier("embeddedKafkaBroker")
public EmbeddedKafkaBroker embeddedKafkaBroker() {
Map<String, String> brokerProperties = new HashMap<>();
brokerProperties.put(LISTENERS, LISTENERS_VALUE);
brokerProperties.put(PORT, KAFKA_PORT.toString());
brokerProperties.put(LOG_DIRS, TMP_EMBEDDED_KAFKA_LOGS);
this.embeddedKafkaBroker =
new EmbeddedKafkaBroker(1, true, 2)
.kafkaPorts(KAFKA_PORT)
.zkPort(ZOOKEEPER_PORT)
.brokerProperties(brokerProperties);
return embeddedKafkaBroker;
}
/** close the embeddedKafkaBroker on destroy. */
@PreDestroy
public void preDestroy() {
if (embeddedKafkaBroker != null) {
log.warn("[EmbeddedKafkaBrokerConfig] destroying kafka broker {}", embeddedKafkaBroker);
embeddedKafkaBroker.destroy();
}
}
}
Using Rest Controller to trigger publish data to the topic
@RestController
@RequestMapping("/v1/demo/")
public class DemoController {
@Autowired
DemoSupplier demoSupplier;
@GetMapping("hello")
public String helloController(){
demoSupplier.supply();
return "Hello World!";
}
}
DemoSupplier.class
@Component
public class DemoSupplier {
@Autowired
@Qualifier("embeddedKafkaBroker")
public EmbeddedKafkaBroker kafkaBroker;
@Autowired
private KafkaTemplate<String,String> kafkaTemplate;
@Value("${demo.topic}")
private String topicName;
@Bean
public KafkaTemplate<String, String> stringKafkaTemplate(){
Map<String, Object> producerConfigs =new HashMap<>();
producerConfigs.put(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG,"localhost:9092");
producerConfigs.put(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG, StringSerializer.class);
producerConfigs.put(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG,StringSerializer.class);
return new KafkaTemplate<>(new DefaultKafkaProducerFactory<>(producerConfigs));
}
public void supply(){
for(int i =0 ;i<100;i++){
kafkaTemplate.send(topicName, "Message:"+i*2);
}
}
}
Consumers
@Component
public class DemoConsumer {
@Bean
@Qualifier("demoConsumerProcessor")
public Consumer<KStream<String, String>> demoConsumerProcessor(){
return input -> input.foreach(((key, value) -> System.out.println(value)));
}
@Bean
@Qualifier("demoConsumerProcessor2")
public Consumer<KStream<String, String>> demoConsumerProcessor2(){
return input -> input.foreach(((key, value) -> System.out.println("This is second consumer 2: "+value)));
}
}
Application.properties-
# ===============================
# = Profiles
# ===============================
spring.profiles.active=dev
server.port=8181
# ===============================
# = Kafka Topics
# ===============================
demo.topic=demoTopic
object.demo.topic=objectDemoTopic
# ===============================
# = SPRING CLOUD STREAM
# ===============================
spring.cloud.stream.bindings.demoConsumerProcessor-in-0.destination=demoTopic
spring.cloud.stream.bindings.demoConsumerProcessor2-in-0.destination=demoTopic
spring.cloud.stream.function.definition=demoConsumerProcessor,demoConsumerProcessor2
spring.cloud.stream.kafka.streams.binder.functions.demoConsumerProcessor.applicationId=group_id
spring.cloud.stream.kafka.streams.binder.functions.demoConsumerProcessor2.applicationId=group_id
spring.kafka.producer.key-serializer=org.apache.kafka.common.serialization.StringSerializer
spring.kafka.producer.value-serializer=org.apache.kafka.common.serialization.StringSerializer
spring.kafka.consumer.value-deserializer=org.apache.kafka.common.serialization.StringDeserializer
Note- In this property(spring.cloud.stream.function.definition), the name of bean that appears first will consume the message published to the topic. But only one of them receives them. Both consumer have the same group id, according to my knowledge set using applicationId, saw the same in logs also.
Now here comes my deduction-
Number of partitions created by embedded Kafka are always 1. I have tried changing it to 2 when I am creating its bean(See constuctor of it- (count:1, controlledShutdownn:true, partitions:2). But i think somethings are not in places.
Important logs-
[Consumer clientId=group_id-359878ed-1b41-4cf0-b9b8-6e21e5e1f0fe-StreamThread-1-consumer, groupId=group_id] Updating assignment with
Assigned partitions: [demoTopic-0]
Current owned partitions: []
Added partitions (assigned - owned): [demoTopic-0]
Revoked partitions (owned - assigned): []
Consumer clientId=group_id-4dce1ba5-7d97-4c18-92c3-cb79dab271b5-StreamThread-1-consumer, groupId=group_id] Updating assignment with
Assigned partitions: []
Current owned partitions: []
Added partitions (assigned - owned): []
Revoked partitions (owned - assigned): []
Now according to logs , maybe only one partition is created for the topic.
Now some confusion is regarding "Updating assignment", is their any more property i have to set to use multiple consumer. Or Some problem with embeddedKafa. Please look from other perspective dont want to be a XY Problem. Full logs are too big. I will share if needed.
Solution
The solution i found is not ideal, but it did the job-
I am asking the broker to create topic while creating its bean-
this.embeddedKafkaBroker = new EmbeddedKafkaBroker(1, true, 2,"demoTopic")
.kafkaPorts(KAFKA_PORT)
.zkPort(ZOOKEEPER_PORT)
.brokerProperties(brokerProperties);
return embeddedKafkaBroker;
Now both consumer receiving the record.
Answered By - DEEPAK MALIK
Answer Checked By - Cary Denson (JavaFixing Admin)