Issue
While using TopologyTestDriver
I want to test my stream and do assrtions on intermediate state between incoming messages. But after using TestOutputTopic.readKeyValuesToMap()
tested topic is cleared. How to "peek" and do assertions between messages?
import org.apache.kafka.clients.producer.ProducerConfig;
import org.apache.kafka.common.serialization.Serdes;
import org.apache.kafka.streams.*;
import org.apache.kafka.streams.kstream.Consumed;
import org.apache.kafka.streams.kstream.Grouped;
import org.apache.kafka.streams.kstream.Materialized;
import org.apache.kafka.streams.kstream.Produced;
import org.apache.kafka.streams.test.TestRecord;
import org.junit.jupiter.api.AfterAll;
import org.junit.jupiter.api.BeforeAll;
import org.junit.jupiter.api.Test;
import java.util.Properties;
public class AggregationTest {
private static TestInputTopic<String, String> inputTopic;
private static TestOutputTopic<String, String> outputTopic;
private static TopologyTestDriver testDriver;
@BeforeAll
public static void setup() {
StreamsBuilder builder = new StreamsBuilder();
builder
.stream("inputTopic", Consumed.with(Serdes.String(), Serdes.String()))
.toTable(Materialized.with(Serdes.String(), Serdes.String()))
.groupBy(
KeyValue::pair,
Grouped.with("group-by-internal", Serdes.String(), Serdes.String()))
.aggregate(
() -> "",
(key, incomingMessage, existingMessage) -> incomingMessage + " " + existingMessage,
(key, incomingMessage, existingMessage) -> existingMessage
).toStream().to("outputTopic", Produced.with(Serdes.String(), Serdes.String()));
testDriver = new TopologyTestDriver(builder.build(), new Properties() {{
put(StreamsConfig.APPLICATION_ID_CONFIG, "test");
put(StreamsConfig.DEFAULT_KEY_SERDE_CLASS_CONFIG, Serdes.String().getClass().getName());
put(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG, Serdes.String().getClass().getName());
put(StreamsConfig.DEFAULT_VALUE_SERDE_CLASS_CONFIG, Serdes.String().getClass().getName());
put(StreamsConfig.STATE_DIR_CONFIG, "/tmp/kafka-streams");
}});
inputTopic = testDriver.createInputTopic("inputTopic", Serdes.String().serializer(), Serdes.String().serializer());
outputTopic = testDriver.createOutputTopic("outputTopic", Serdes.String().deserializer(), Serdes.String().deserializer());
}
@Test
public void testAggregation() {
TestRecord<String, String> message1 = new TestRecord<>( "Key1", "Value1");
TestRecord<String, String> message2 = new TestRecord<>( "Key2", "Value2");
TestRecord<String, String> message3 = new TestRecord<>( "Key1", "Value3");
inputTopic.pipeInput(message1);
inputTopic.pipeInput(message2);
var outputMap = outputTopic.readKeyValuesToMap();
System.out.println(outputMap); // {Key2=Value2 , Key1=Value1 }
// Assert that message1 and message2 was not effected
inputTopic.pipeInput(message3);
var outputMap2 = outputTopic.readKeyValuesToMap();
System.out.println(outputMap2); // {Key1=Value3 Value1 } // where message with Key2 disappeared?
// How to assert that message3 was merged with message1, but message2 was not effected?
}
@AfterAll
public static void tearDown() {
testDriver.close();
}
}
Solution
The JavaDoc indicates it should return the full, latest state of the topic (are you sure a tombstone event wasn't introduced, somehow?), so I am not sure why it would disappear.
If you want to aggregate the state of both maps, you can merge them rather than re-assign the previous reference, but that would fix the test, not necessarily actual runtime behavior...
You may want to revisit your aggregate function. Key2
has no existingMessage
when it is originally incoming. Therefore, you've returned null
there, and it would not exist in the map output. Only value you'd have is therefore Value3 Value1
Try this for instances where you only expect one value
(key, incomingMessage, existingMessage) -> existingMessage == null ? incomingMessage : existingMessage
Answered By - OneCricketeer
Answer Checked By - Timothy Miller (JavaFixing Admin)