Issue
i have an simple rest api that have a h2 database so my plan is when i run multiple instances of the same app they will have different in memory databases.Now i want to syncronize these databases beetwen them.I thought kafka to be a good solution , so for example when i get an POST for instance with port 8080 , i should post also for all other instances. Now my app acts as a producer/consumer at the same time and i do not know why only one instance receive the message. The code:
@EnableKafka
@Configuration
public class KafkaProducerConfigForDepartment {
@Value(value = "${kafka.bootstrapAddress}")
private String bootstrapAddress;
@Bean
public ProducerFactory<String, MessageEventForDepartment> producerFactoryForDepartment() {
Map<String, Object> configProps = new HashMap<>();
configProps.put(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG, bootstrapAddress);
configProps.put(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG, StringSerializer.class);
configProps.put(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG, JsonSerializer.class);
return new DefaultKafkaProducerFactory<>(configProps);
}
@Bean
public KafkaTemplate<String, MessageEventForDepartment> kafkaTemplate() {
return new KafkaTemplate<>(producerFactoryForDepartment());
}
}
@Configuration
public class KafkaTopicConfig {
@Value(value = "${kafka.bootstrapAddress}")
private String bootstrapAddress;
@Bean
public ConsumerFactory<String, MessageEventForDepartment> consumerFactoryForDepartments() {
Map<String, Object> props = new HashMap<>();
props.put(JsonDeserializer.TRUSTED_PACKAGES, "*");
props.put(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG, bootstrapAddress);
props.put(ConsumerConfig.GROUP_ID_CONFIG, "groupId");
return new DefaultKafkaConsumerFactory<>(props, new StringDeserializer(), new JsonDeserializer<>(MessageEventForDepartment.class));
}
@Bean
public NewTopic topic1() {
return TopicBuilder.name("topic12")
.partitions(10)
.replicas(10)
.build();
}
@Bean
public ConcurrentKafkaListenerContainerFactory<String, MessageEventForDepartment>
kafkaListenerContainerFactoryForDepartments() {
ConcurrentKafkaListenerContainerFactory<String, MessageEventForDepartment> factory =
new ConcurrentKafkaListenerContainerFactory<>();
factory.setConsumerFactory(consumerFactoryForDepartments());
return factory;
}
}
@Component
@Slf4j
public class DepartmentKafkaService {
@Autowired
private DepartmentService departmentService;
@KafkaListener(topics = "topic12" , groupId = "groupId",containerFactory = "kafkaListenerContainerFactoryForDepartments")
public void listenGroupFoo(MessageEventForDepartment message) {
log.info(message.toString());
}
}
Why is this happening ? or maybe my approach is not very good , what are your thoughts ,guys?
Solution
The reason why only one instance of the application receives each message is that each instance has the same ConsumerConfig.GROUP_ID_CONFIG
. Kafka's consumer protocol is such that each consumer group gets each message delivered once (obviously, there's a lot more nuance to it, but this is basically how it works).
Pawel's suggestion to use KafkaStreams is a good one—a GlobalKTable would provide what you want.
Luca Pette wrote a great primer on Kakfa Streams here: https://lucapette.me/writing/getting-started-with-kafka-streams/
Answered By - cmcnealy
Answer Checked By - Timothy Miller (JavaFixing Admin)