Issue
I'm using Spring-Kafka 2.7.1 in a spring boot project.
When I connect it to a SSL-configured Kafka Broker it gives a "OutofMemory" Error as below even though I have increased Heap Size multiple times to no avail.
Log Below :
java.lang.OutOfMemoryError: Java heap space\
at java.base/java.nio.HeapByteBuffer.<init>(HeapByteBuffer.java:61) ~[na:na]\
at java.base/java.nio.ByteBuffer.allocate(ByteBuffer.java:348) ~[na:na]\
at org.apache.kafka.common.memory.MemoryPool$1.tryAllocate(MemoryPool.java:30) ~[kafka-clients-2.7.1.jar!/:na]\
at org.apache.kafka.common.network.NetworkReceive.readFrom(NetworkReceive.java:113) ~[kafka-clients-2.7.1.jar!/:na]\
at org.apache.kafka.common.network.KafkaChannel.receive(KafkaChannel.java:447) ~[kafka-clients-2.7.1.jar!/:na]\
at org.apache.kafka.common.network.KafkaChannel.read(KafkaChannel.java:397) ~[kafka-clients-2.7.1.jar!/:na]\
at org.apache.kafka.common.network.Selector.attemptRead(Selector.java:674) ~[kafka-clients-2.7.1.jar!/:na]\
at org.apache.kafka.common.network.Selector.pollSelectionKeys(Selector.java:576) ~[kafka-clients-2.7.1.jar!/:na]\
at org.apache.kafka.common.network.Selector.poll(Selector.java:481) ~[kafka-clients-2.7.1.jar!/:na]\
at org.apache.kafka.clients.NetworkClient.poll(NetworkClient.java:563) ~[kafka-clients-2.7.1.jar!/:na]\
at org.apache.kafka.clients.consumer.internals.ConsumerNetworkClient.poll(ConsumerNetworkClient.java:265) ~[kafka-clients-2.7.1.jar!/:na]\
at org.apache.kafka.clients.consumer.internals.ConsumerNetworkClient.poll(ConsumerNetworkClient.java:236) ~[kafka-clients-2.7.1.jar!/:na]\
at org.apache.kafka.clients.consumer.internals.ConsumerNetworkClient.poll(ConsumerNetworkClient.java:215) ~[kafka-clients-2.7.1.jar!/:na]\
at org.apache.kafka.clients.consumer.internals.AbstractCoordinator.ensureCoordinatorReady(AbstractCoordinator.java:245) ~[kafka-clients-2.7.1.jar!/:na]\
at org.apache.kafka.clients.consumer.internals.ConsumerCoordinator.poll(ConsumerCoordinator.java:480) ~[kafka-clients-2.7.1.jar!/:na]\
at org.apache.kafka.clients.consumer.KafkaConsumer.updateAssignmentMetadataIfNeeded(KafkaConsumer.java:1257) ~[kafka-clients-2.7.1.jar!/:na]\
at org.apache.kafka.clients.consumer.KafkaConsumer.poll(KafkaConsumer.java:1226) ~[kafka-clients-2.7.1.jar!/:na]\
at org.apache.kafka.clients.consumer.KafkaConsumer.poll(KafkaConsumer.java:1206) ~[kafka-clients-2.7.1.jar!/:na]\
at org.springframework.kafka.listener.KafkaMessageListenerContainer$ListenerConsumer.doPoll(KafkaMessageListenerContainer.java:1414) ~[spring-kafka-2.7.7.jar!/:2.7.7]\
at org.springframework.kafka.listener.KafkaMessageListenerContainer$ListenerConsumer.pollAndInvoke(KafkaMessageListenerContainer.java:1251) ~[spring-kafka-2.7.7.jar!/:2.7.7]\
at org.springframework.kafka.listener.KafkaMessageListenerContainer$ListenerConsumer.run(KafkaMessageListenerContainer.java:1163) ~[spring-kafka-2.7.7.jar!/:2.7.7]\
at java.base/java.util.concurrent.Executors$RunnableAdapter.call(Executors.java:515) ~[na:na]\
at java.base/java.util.concurrent.FutureTask.run(FutureTask.java:264) ~[na:na]\
at java.base/java.lang.Thread.run(Thread.java:829) ~[na:na]\
My Current YAML configuration is as below:
spring:
kafka:
bootstrap-servers: KAFKA_BOOTSTRAP_SERVER
security:
protocol: "SSL"
consumer:
auto-offset-reset: earliest
producer:
topic: TOPIC
bootstrap-servers: KAFKA_BOOTSTRAP_SERVER
consumer:
topic: TOPIC
bootstrap-servers: KAFKA_BOOTSTRAP_SERVERS
It works as expected when connected to a NON-SSL Kafka Broker.
I have tested all other possiblities and singled out that it's related to the SSL configuration of the client.
Solution
It is possible to run into out of memory errors when trying to use Kafka secured endpoint in a non-secure way. (It is a known issue when wrong security protocol is used or required authentication properties are not passed; OOM error is totally unrelated but it is what it is)
In case of Kafka CLI commands, usually, a property file path is passed with the command to provide security related properties.
For example:
kafka-topics --command-config <String: filename>
kafka-console-producer --producer.config <String: filename>
kafka-console-consumer --consumer.config <String: filename>
Generally contains,
security.protocol=<kafka_security_protocol>
ssl.truststore.location=<ssl_truststore_filename>
ssl.truststore.password=<truststore_password>
ssl.keystore.location=<client_keystore.jks>
ssl.keystore.password=<password>
ssl.key.password=<password>
From the question, I assumed, both producer and consumer components are connecting to the same broker(s) and declared all the required properties to connect to secured broker under spring.kafka section in the following example.
spring:
kafka:
bootstrap-servers: KAFKA_BOOTSTRAP_SERVER
security:
protocol: "SSL"
ssl:
trust-store-location: "truststore.jks"
trust-store-password: "<password>"
key-store-location: "keystore.jks"
key-store-password: "<password>"
key-password: "<password>"
If the producer and consumer are connecting to different broker(s), these properties should be specified under spring.kafka.producer and spring.kafka.consumer sections respectively.
spring:
kafka:
bootstrap-servers: KAFKA_BOOTSTRAP_SERVER
security:
protocol: "SSL"
producer:
topic: TOPIC
bootstrap-servers: KAFKA_BOOTSTRAP_SERVER
ssl.protocol: "SSL"
ssl.endpoint.identification.algorithm: "https"
ssl:
keystore-location: "<keystore.jks>"
keystore-password: "<password>"
consumer:
topic: TOPIC
auto-offset-reset: "earliest"
bootstrap-servers: KAFKA_BOOTSTRAP_SERVERS
ssl.protocol: "SSL"
ssl.endpoint.identification.algorithm: "https"
ssl:
keystore-location: "<keystore.jks>"
keystore-password: "<password>"
If there is no client authentication required from the broker side, then the following is a minimal configuration example:
security.protocol=SSL
ssl.truststore.location=<kafka.client.truststore.jks>
ssl.truststore.password=<password>
If client authentication is required, following properties are also needs to be included.
ssl.keystore.location=<kafka.client.keystore.jks>
ssl.keystore.password=<password>
ssl.key.password=<password>
Please note that the property naming convention might differ in Spring Kafka configuration.
More details on Kafka security - Official Doc
Answered By - arunkvelu