Issue
I'm a former legacy ActiveMQ user learning Kafka. And I have a question.
With Active MQ you can do this:
- Submit 100 messages into a queue
- Wait however long you want
- Consume those 100 messages from that queue. Guaranteed single consumer of the message.
I try in Kafka to do the same thing
import java.time.Duration;
import java.util.Collections;
import java.util.Properties;
import java.util.concurrent.ExecutionException;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;
import java.util.concurrent.Future;
import org.apache.kafka.clients.consumer.ConsumerConfig;
import org.apache.kafka.clients.consumer.ConsumerRecord;
import org.apache.kafka.clients.consumer.ConsumerRecords;
import org.apache.kafka.clients.consumer.KafkaConsumer;
import org.apache.kafka.clients.producer.KafkaProducer;
import org.apache.kafka.clients.producer.ProducerConfig;
import org.apache.kafka.clients.producer.ProducerRecord;
import org.apache.kafka.common.serialization.StringDeserializer;
import org.apache.kafka.common.serialization.StringSerializer;
import org.junit.After;
import org.junit.Before;
import org.junit.Test;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.testcontainers.containers.KafkaContainer;
import org.testcontainers.utility.DockerImageName;
public class KafkaTest {
private static final Logger LOG = LoggerFactory.getLogger(KafkaTest.class);
public static final String MY_GROUP_ID = "my-group-id";
public static final String TOPIC = "topic";
KafkaContainer kafka = new KafkaContainer(DockerImageName.parse("confluentinc/cp-kafka:6.2.1"));
@Before
public void before() {
kafka.start();
}
@After
public void after() {
kafka.close();
}
@Test
public void testPipes() throws ExecutionException, InterruptedException {
Properties consumerProps = new Properties();
consumerProps.put(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG, kafka.getBootstrapServers());
consumerProps.put("group.id", MY_GROUP_ID);
consumerProps.put(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class.getName());
consumerProps.put(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class.getName());
ExecutorService es = Executors.newCachedThreadPool();
Future consumerFuture = es.submit(() -> {
try (KafkaConsumer<String, String> consumer = new KafkaConsumer<>(consumerProps)) {
consumer.subscribe(Collections.singletonList(TOPIC));
while (true) {
ConsumerRecords<String, String> records = consumer.poll(Duration.ofMillis(1000));
for (ConsumerRecord<String, String> record : records) {
LOG.info("Thread: {}, Topic: {}, Partition: {}, Offset: {}, key: {}, value: {}", Thread.currentThread().getName(), record.topic(), record.partition(), record.offset(), record.key(), record.value().toUpperCase());
}
}
} catch (Exception e) {
LOG.error("Consumer error", e);
}
});
Thread.sleep(10000); // NOTICE! if you remove this, the consumer will not receive the messages. because the consumer won't be registered yet before the messages come rolling on in.
Properties producerProps = new Properties();
producerProps.put(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG, kafka.getBootstrapServers());
producerProps.put(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG, StringSerializer.class.getName());
producerProps.put(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG, StringSerializer.class.getName());
Future producerFuture = es.submit(() -> {
try (KafkaProducer<String, String> producer = new KafkaProducer<>(producerProps)) {
int counter = 0;
while (counter <= 100) {
System.out.println("Sent " + counter);
String msg = "Message " + counter;
producer.send(new ProducerRecord<>(TOPIC, msg));
counter++;
}
} catch (Exception e) {
LOG.error("Failed to send message by the producer", e);
}
});
producerFuture.get();
consumerFuture.get();
}
}
This example does not work if you do not start Consumer, wait for it to start, then run the producer.
Can anyone show me how to alter my example program to do things where the messages await to be consumed?
Solution
In your consumer config, you need to add auto.offset.reset=earliest
or call seekToBeginning
after subscribing.
Otherwise, it starts to read from the end of the topic. In other words, if you start the consumer after the producer, it'll begin to read after all the existing data.
Answered By - OneCricketeer
Answer Checked By - Katrina (JavaFixing Volunteer)