Issue
I want to create a Kafka-Streams application with Spring-Cloud-Streams which integrates 2 different Kafka Clusters / setups. I tried to implement it using multi-binder configurations as mentioned in the documentation and similar to the examples here: href="https://github.com/spring-cloud/spring-cloud-stream-samples/tree/main/multi-binder-samples" rel="nofollow noreferrer">https://github.com/spring-cloud/spring-cloud-stream-samples/tree/main/multi-binder-samples
Given a simple function like this:
@Bean
public Function<KStream<String, AnalyticsEvent>, KStream<String, UpdateEvent>> analyticsEventProcessor() {
return input -> input
.filter(new AnalyticsPredicate())
.map(new AnalyticsToUpdateEventMapper());
}
In the configuration i'm trying to bind these to different binders.
spring.cloud:
stream:
bindings:
analyticsEventProcessor-in-0:
destination: analytics-events
binder: cluster1-kstream
analyticsEventProcessor-out-0:
destination: update-events
binder: cluster2-kstream
binders:
cluster1-kstream:
type: kstream
environment:
spring:
cloud:
stream:
kafka:
binder:
brokers: <url cluster1>:9093
configuration:
security.protocol: SSL
schema.registry.url: <schema-registry-url-cluster1>
schema.registry.ssl.truststore.location: /mnt/secrets/cluster1/truststore.jks
schema.registry.ssl.truststore.password: ${SPRING_KAFKA_SSL_CLUSTER1_TRUST-STORE-PASSWORD}
schema.registry.ssl.keystore.location: /mnt/secrets/cluster1/keystore.jks
schema.registry.ssl.keystore.password: ${SPRING_KAFKA_SSL_CLUSTER1_KEY-STORE-PASSWORD}
ssl.truststore.location: /mnt/secrets/cluster1/truststore.jks
ssl.truststore.password: ${SPRING_KAFKA_SSL_CLUSTER1_TRUST-STORE-PASSWORD}
ssl.truststore.type: JKS
ssl.keystore.location: /mnt/secrets/cluster1/keystore.jks
ssl.keystore.password: ${SPRING_KAFKA_SSL_CLUSTER1_KEY-STORE-PASSWORD}
ssl.keystore.type: JKS
ssl.enabled.protocols: TLSv1.2
streams:
binder:
brokers: <url cluster1>:9093
configuration:
security.protocol: SSL
schema.registry.url: <schema-registry-url-cluster1>
schema.registry.ssl.truststore.location: /mnt/secrets/cluster1/truststore.jks
schema.registry.ssl.truststore.password: ${SPRING_KAFKA_SSL_CLUSTER1_TRUST-STORE-PASSWORD}
schema.registry.ssl.keystore.location: /mnt/secrets/cluster1/keystore.jks
schema.registry.ssl.keystore.password: ${SPRING_KAFKA_SSL_CLUSTER1_KEY-STORE-PASSWORD}
ssl.truststore.location: /mnt/secrets/cluster1/truststore.jks
ssl.truststore.password: ${SPRING_KAFKA_SSL_CLUSTER1_TRUST-STORE-PASSWORD}
ssl.truststore.type: JKS
ssl.keystore.location: /mnt/secrets/cluster1/keystore.jks
ssl.keystore.password: ${SPRING_KAFKA_SSL_CLUSTER1_KEY-STORE-PASSWORD}
ssl.keystore.type: JKS
ssl.enabled.protocols: TLSv1.2
cluster2-kstream:
type: kstream
environment:
spring:
cloud:
stream:
kafka:
binder:
brokers: <url cluster2>:9093
configuration:
security.protocol: SSL
schema.registry.url: <schema-registry-url-cluster2>
schema.registry.ssl.truststore.location: /mnt/secrets/cluster2/truststore.jks
schema.registry.ssl.truststore.password: ${SPRING_KAFKA_SSL_CLUSTER2_TRUST-STORE-PASSWORD}
schema.registry.ssl.keystore.location: /mnt/secrets/cluster2/keystore.jks
schema.registry.ssl.keystore.password: ${SPRING_KAFKA_SSL_CLUSTER2_KEY-STORE-PASSWORD}
ssl.truststore.location: /mnt/secrets/cluster2/truststore.jks
ssl.truststore.password: ${SPRING_KAFKA_SSL_CLUSTER2_TRUST-STORE-PASSWORD}
ssl.truststore.type: JKS
ssl.keystore.location: /mnt/secrets/cluster2/keystore.jks
ssl.keystore.password: ${SPRING_KAFKA_SSL_CLUSTER2_KEY-STORE-PASSWORD}
ssl.keystore.type: JKS
ssl.enabled.protocols: TLSv1.2
streams:
binder:
brokers: <url cluster2>:9093
configuration:
security.protocol: SSL
schema.registry.url: <schema-registry-url-cluster2>
schema.registry.ssl.truststore.location: /mnt/secrets/cluster2/truststore.jks
schema.registry.ssl.truststore.password: ${SPRING_KAFKA_SSL_CLUSTER2_TRUST-STORE-PASSWORD}
schema.registry.ssl.keystore.location: /mnt/secrets/cluster2/keystore.jks
schema.registry.ssl.keystore.password: ${SPRING_KAFKA_SSL_CLUSTER2_KEY-STORE-PASSWORD}
ssl.truststore.location: /mnt/secrets/cluster2/truststore.jks
ssl.truststore.password: ${SPRING_KAFKA_SSL_CLUSTER2_TRUST-STORE-PASSWORD}
ssl.truststore.type: JKS
ssl.keystore.location: /mnt/secrets/cluster2/keystore.jks
ssl.keystore.password: ${SPRING_KAFKA_SSL_CLUSTER2_KEY-STORE-PASSWORD}
ssl.keystore.type: JKS
ssl.enabled.protocols: TLSv1.2
I tried first to run the application completely in a single cluster which worked well. When i run this i always get an error:
2022-08-10 15:28:42.892 WARN 1 --- [-StreamThread-2] org.apache.kafka.clients.NetworkClient : [Consumer clientId=<clientid>-StreamThread-2-consumer, groupId=<group-id>] Error while fetching metadata with correlation id 2 : {analytics-events=TOPIC_AUTHORIZATION_FAILED}
2022-08-10 15:28:42.893 ERROR 1 --- [-StreamThread-2] org.apache.kafka.clients.Metadata : [Consumer clientId=<client-id>, groupId=<group-id>] Topic authorization failed for topics [analytics-events]
2022-08-10 15:28:42.893 INFO 1 --- [-StreamThread-2] org.apache.kafka.clients.Metadata : [Consumer clientId=<client-id>, groupId=<group-id>] Cluster ID: <cluster-id>
2022-08-10 15:28:42.893 ERROR 1 --- [-StreamThread-2] c.s.a.a.e.UncaughtExceptionHandler : org.apache.kafka.common.errors.TopicAuthorizationException: Not authorized to access topics: [analytics-events]
2022-08-10 15:28:42.893 ERROR 1 --- [-StreamThread-2] org.apache.kafka.streams.KafkaStreams : stream-client [<client-id>] Replacing thread in the streams uncaught exception handler
org.apache.kafka.streams.errors.StreamsException: org.apache.kafka.common.errors.TopicAuthorizationException: Not authorized to access topics: [analytics-events]
at org.apache.kafka.streams.processor.internals.StreamThread.runLoop(StreamThread.java:642) ~[kafka-streams-3.1.1.jar!/:na]
at org.apache.kafka.streams.processor.internals.StreamThread.run(StreamThread.java:576) ~[kafka-streams-3.1.1.jar!/:na]
Caused by: org.apache.kafka.common.errors.TopicAuthorizationException: Not authorized to access topics: [analytics-events]
I verified the kafka-client certificates they should be correct. I looked at them with keytool, also the password env is set correctly. The consumerConfig also uses the correct broker URL.
Is it possible to use within a KStream Function different kafka clusters with multi-binder for the input for a stream and for the output, is this possible or does it only work with type kafka binders?
Solution
In Kafka Streams, you cannot connect to two different clusters in a single application. This means that you cannot receive from a cluster on the inbound and write to another cluster on the outbound when using a Spring Cloud Stream function. See this SO [thread][1] for more details.
You can probably receive from and write to the same cluster in your Kafka Streams function as a workaround. Then, using a regular Kafka binder-based function, simply bridge the output topic to the second cluster. In regular functions (non-Kafka Streams), it can consume from and publish to multiple clusters.
@Bean
public Function<KStream<String, AnalyticsEvent>, KStream<String, UpdateEvent>> analyticsEventProcessor() {
return input -> input
.filter(new AnalyticsPredicate())
.map(new AnalyticsToUpdateEventMapper());
}
This function needs to receive and write to the same cluster. Then you can have another function as below.
@Bean
public Function<?, ?> bridgeFunction() {
....
}
For this function, input is cluster-1 and output is cluster-2.
When using this workaround, make sure to include the regular Kafka binder also as a dependency - spring-cloud-stream-binder-kafka
.
Keep in mind that there are disadvantages to this approach, such as adding an extra topic overhead, latency from that etc. However, this is a potential workaround for this use case. For more options, see the SO thread, I mentioned above.
[1]: https://stackoverflow.com/questions/45847690/how-to-connect-to-multiple-clusters-in-a-single-kafka-streams-application
Answered By - sobychacko
Answer Checked By - Dawn Plyler (JavaFixing Volunteer)