Class TopologyTestDriver
- java.lang.Object
-
- org.apache.kafka.streams.TopologyTestDriver
-
- All Implemented Interfaces:
Closeable
,AutoCloseable
public class TopologyTestDriver extends Object implements Closeable
This class makes it easier to write tests to verify the behavior of topologies created withTopology
orStreamsBuilder
. You can test simple topologies that have a single processor, or very complex topologies that have multiple sources, processors, sinks, or sub-topologies. Best of all, the class works without a real Kafka broker, so the tests execute very quickly with very little overhead.Using the
TopologyTestDriver
in tests is easy: simply instantiate the driver and provide aTopology
(cf.StreamsBuilder.build()
) andconfig
,create
and use aTestInputTopic
to supply an input records to the topology, and thencreate
and use aTestOutputTopic
to read and verify any output records by the topology.Although the driver doesn't use a real Kafka broker, it does simulate Kafka
consumers
andproducers
that read and write rawbyte[]
messages. You can letTestInputTopic
andTestOutputTopic
to handle conversion form regular Java objects to raw bytes.Driver setup
In order to create aTopologyTestDriver
instance, you need aTopology
and aconfig
. The configuration needs to be representative of what you'd supply to the real topology, so that means including several key properties (cf.StreamsConfig
). For example, the following code fragment creates a configuration that specifies a timestamp extractor, default serializers and deserializers for string keys and values:Properties props = new Properties(); props.setProperty(StreamsConfig.DEFAULT_TIMESTAMP_EXTRACTOR_CLASS_CONFIG, CustomTimestampExtractor.class.getName()); props.setProperty(StreamsConfig.DEFAULT_KEY_SERDE_CLASS_CONFIG, Serdes.String().getClass().getName()); props.setProperty(StreamsConfig.DEFAULT_VALUE_SERDE_CLASS_CONFIG, Serdes.String().getClass().getName()); Topology topology = ... TopologyTestDriver driver = new TopologyTestDriver(topology, props);
Note that the
TopologyTestDriver
processes input records synchronously. This implies thatcommit.interval.ms
andcache.max.bytes.buffering
configuration have no effect. The driver behaves as if both configs would be set to zero, i.e., as if a "commit" (and thus "flush") would happen after each input record.Processing messages
Your test can supply new input records on any of the topics that the topology's sources consume. This test driver simulates single-partitioned input topics. Here's an example of an input message on the topic named
input-topic
:
WhenTestInputTopic<String, String> inputTopic = driver.createInputTopic("input-topic", stringSerdeSerializer, stringSerializer); inputTopic.pipeInput("key1", "value1");
TestInputTopic.pipeInput(Object, Object)
is called, the driver passes the input message through to the appropriate source that consumes the named topic, and will invoke the processor(s) downstream of the source. If your topology's processors forward messages to sinks, your test can then consume these output messages to verify they match the expected outcome. For example, if our topology should have generated 2 messages onoutput-topic-1
and 1 message onoutput-topic-2
, then our test can obtain these messages using theTestOutputTopic.readKeyValue()
method:
Again, our example topology generates messages with string keys and values, so we supply our string deserializer instance for use on both the keys and values. Your test logic can then verify whether these output records are correct.TestOutputTopic<String, String> outputTopic1 = driver.createOutputTopic("output-topic-1", stringDeserializer, stringDeserializer); TestOutputTopic<String, String> outputTopic2 = driver.createOutputTopic("output-topic-2", stringDeserializer, stringDeserializer); KeyValue<String, String> record1 = outputTopic1.readKeyValue(); KeyValue<String, String> record2 = outputTopic2.readKeyValue(); KeyValue<String, String> record3 = outputTopic1.readKeyValue();
Note, that calling
pipeInput()
will also triggerevent-time
basepunctuation
callbacks. However, you won't triggerwall-clock
type punctuations that you must trigger manually viaadvanceWallClockTime(long)
.Finally, when completed, make sure your tests
close()
the driver to release all resources andprocessors
.Processor state
Some processors use Kafka
state storage
, so this driver class provides the genericgetStateStore(String)
as well as store-type specific methods so that your tests can check the underlying state store(s) used by your topology's processors. In our previous example, after we supplied a single input message and checked the three output messages, our test could also check the key value store to verify the processor correctly added, removed, or updated internal state. Or, our test might have pre-populated some state before submitting the input message, and verified afterward that the processor(s) correctly updated the state.- See Also:
TestInputTopic
,TestOutputTopic
-
-
Constructor Summary
Constructors Constructor Description TopologyTestDriver(Topology topology)
Create a new test diver instance.TopologyTestDriver(Topology topology, Instant initialWallClockTimeMs)
Create a new test diver instance.TopologyTestDriver(Topology topology, Properties config)
Create a new test diver instance.TopologyTestDriver(Topology topology, Properties config, long initialWallClockTimeMs)
Deprecated.Since 2.4 useTopologyTestDriver(Topology, Properties, Instant)
TopologyTestDriver(Topology topology, Properties config, Instant initialWallClockTime)
Create a new test diver instance.
-
Method Summary
All Methods Instance Methods Concrete Methods Deprecated Methods Modifier and Type Method Description void
advanceWallClockTime(long advanceMs)
Deprecated.Since 2.4 useadvanceWallClockTime(Duration)
insteadvoid
advanceWallClockTime(Duration advance)
Advances the internally mocked wall-clock time.void
close()
Close the driver, its topology, and all processors.<K,V>
TestInputTopic<K,V>createInputTopic(String topicName, Serializer<K> keySerializer, Serializer<V> valueSerializer)
CreateTestInputTopic
to be used for piping records to topic Uses current system time as start timestamp for records.<K,V>
TestInputTopic<K,V>createInputTopic(String topicName, Serializer<K> keySerializer, Serializer<V> valueSerializer, Instant startTimestamp, Duration autoAdvance)
CreateTestInputTopic
to be used for piping records to topic Uses provided start timestamp and autoAdvance parameter for records<K,V>
TestOutputTopic<K,V>createOutputTopic(String topicName, Deserializer<K> keyDeserializer, Deserializer<V> valueDeserializer)
CreateTestOutputTopic
to be used for reading records from topicMap<String,StateStore>
getAllStateStores()
Get allStateStores
from the topology.<K,V>
KeyValueStore<K,V>getKeyValueStore(String name)
Get theKeyValueStore
orTimestampedKeyValueStore
with the given name.<K,V>
SessionStore<K,V>getSessionStore(String name)
Get theSessionStore
with the given name.StateStore
getStateStore(String name)
Get theStateStore
with the given name.<K,V>
KeyValueStore<K,ValueAndTimestamp<V>>getTimestampedKeyValueStore(String name)
Get theTimestampedKeyValueStore
with the given name.<K,V>
WindowStore<K,ValueAndTimestamp<V>>getTimestampedWindowStore(String name)
Get theTimestampedWindowStore
with the given name.<K,V>
WindowStore<K,V>getWindowStore(String name)
Get theWindowStore
orTimestampedWindowStore
with the given name.Map<MetricName,? extends Metric>
metrics()
Get read-only handle on global metrics registry.void
pipeInput(List<ConsumerRecord<byte[],byte[]>> records)
Deprecated.Since 2.4 use methods ofTestInputTopic
insteadvoid
pipeInput(ConsumerRecord<byte[],byte[]> consumerRecord)
Deprecated.Since 2.4 use methods ofTestInputTopic
insteadSet<String>
producedTopicNames()
Get all the names of all the topics to which records have been produced during the test run.ProducerRecord<byte[],byte[]>
readOutput(String topic)
Deprecated.Since 2.4 use methods ofTestOutputTopic
instead<K,V>
ProducerRecord<K,V>readOutput(String topic, Deserializer<K> keyDeserializer, Deserializer<V> valueDeserializer)
Deprecated.Since 2.4 use methods ofTestOutputTopic
instead
-
-
-
Constructor Detail
-
TopologyTestDriver
public TopologyTestDriver(Topology topology)
Create a new test diver instance. Default test properties are used to initialize the driver instance- Parameters:
topology
- the topology to be tested
-
TopologyTestDriver
public TopologyTestDriver(Topology topology, Properties config)
Create a new test diver instance. Initialized the internally mocked wall-clock time withcurrent system time
.- Parameters:
topology
- the topology to be testedconfig
- the configuration for the topology
-
TopologyTestDriver
public TopologyTestDriver(Topology topology, Instant initialWallClockTimeMs)
Create a new test diver instance.- Parameters:
topology
- the topology to be testedinitialWallClockTimeMs
- the initial value of internally mocked wall-clock time
-
TopologyTestDriver
@Deprecated public TopologyTestDriver(Topology topology, Properties config, long initialWallClockTimeMs)
Deprecated.Since 2.4 useTopologyTestDriver(Topology, Properties, Instant)
Create a new test diver instance.- Parameters:
topology
- the topology to be testedconfig
- the configuration for the topologyinitialWallClockTimeMs
- the initial value of internally mocked wall-clock time
-
TopologyTestDriver
public TopologyTestDriver(Topology topology, Properties config, Instant initialWallClockTime)
Create a new test diver instance.- Parameters:
topology
- the topology to be testedconfig
- the configuration for the topologyinitialWallClockTime
- the initial value of internally mocked wall-clock time
-
-
Method Detail
-
metrics
public Map<MetricName,? extends Metric> metrics()
Get read-only handle on global metrics registry.- Returns:
- Map of all metrics.
-
pipeInput
@Deprecated public void pipeInput(ConsumerRecord<byte[],byte[]> consumerRecord)
Deprecated.Since 2.4 use methods ofTestInputTopic
insteadSend an input message with the given key, value, and timestamp on the specified topic to the topology and then commit the messages.- Parameters:
consumerRecord
- the record to be processed
-
pipeInput
@Deprecated public void pipeInput(List<ConsumerRecord<byte[],byte[]>> records)
Deprecated.Since 2.4 use methods ofTestInputTopic
insteadSend input messages to the topology and then commit each message individually.- Parameters:
records
- a list of records to be processed
-
advanceWallClockTime
@Deprecated public void advanceWallClockTime(long advanceMs)
Deprecated.Since 2.4 useadvanceWallClockTime(Duration)
insteadAdvances the internally mocked wall-clock time. This might trigger awall-clock
typepunctuations
.- Parameters:
advanceMs
- the amount of time to advance wall-clock time in milliseconds
-
advanceWallClockTime
public void advanceWallClockTime(Duration advance)
Advances the internally mocked wall-clock time. This might trigger awall-clock
typepunctuations
.- Parameters:
advance
- the amount of time to advance wall-clock time
-
readOutput
@Deprecated public ProducerRecord<byte[],byte[]> readOutput(String topic)
Deprecated.Since 2.4 use methods ofTestOutputTopic
insteadRead the next record from the given topic. These records were output by the topology during the previous calls topipeInput(ConsumerRecord)
.- Parameters:
topic
- the name of the topic- Returns:
- the next record on that topic, or
null
if there is no record available
-
readOutput
@Deprecated public <K,V> ProducerRecord<K,V> readOutput(String topic, Deserializer<K> keyDeserializer, Deserializer<V> valueDeserializer)
Deprecated.Since 2.4 use methods ofTestOutputTopic
insteadRead the next record from the given topic. These records were output by the topology during the previous calls topipeInput(ConsumerRecord)
.- Parameters:
topic
- the name of the topickeyDeserializer
- the deserializer for the key typevalueDeserializer
- the deserializer for the value type- Returns:
- the next record on that topic, or
null
if there is no record available
-
createInputTopic
public final <K,V> TestInputTopic<K,V> createInputTopic(String topicName, Serializer<K> keySerializer, Serializer<V> valueSerializer)
CreateTestInputTopic
to be used for piping records to topic Uses current system time as start timestamp for records. Auto-advance is disabled.- Type Parameters:
K
- the key typeV
- the value type- Parameters:
topicName
- the name of the topickeySerializer
- the Serializer for the key typevalueSerializer
- the Serializer for the value type- Returns:
TestInputTopic
object
-
createInputTopic
public final <K,V> TestInputTopic<K,V> createInputTopic(String topicName, Serializer<K> keySerializer, Serializer<V> valueSerializer, Instant startTimestamp, Duration autoAdvance)
CreateTestInputTopic
to be used for piping records to topic Uses provided start timestamp and autoAdvance parameter for records- Type Parameters:
K
- the key typeV
- the value type- Parameters:
topicName
- the name of the topickeySerializer
- the Serializer for the key typevalueSerializer
- the Serializer for the value typestartTimestamp
- Start timestamp for auto-generated record timeautoAdvance
- autoAdvance duration for auto-generated record time- Returns:
TestInputTopic
object
-
createOutputTopic
public final <K,V> TestOutputTopic<K,V> createOutputTopic(String topicName, Deserializer<K> keyDeserializer, Deserializer<V> valueDeserializer)
CreateTestOutputTopic
to be used for reading records from topic- Type Parameters:
K
- the key typeV
- the value type- Parameters:
topicName
- the name of the topickeyDeserializer
- the Deserializer for the key typevalueDeserializer
- the Deserializer for the value type- Returns:
TestOutputTopic
object
-
producedTopicNames
public final Set<String> producedTopicNames()
Get all the names of all the topics to which records have been produced during the test run.Call this method after piping the input into the test driver to retrieve the full set of topic names the topology produced records to.
The returned set of topic names may include user (e.g., output) and internal (e.g., changelog, repartition) topic names.
- Returns:
- the set of topic names the topology has produced to
-
getAllStateStores
public Map<String,StateStore> getAllStateStores()
Get allStateStores
from the topology. The stores can be a "regular" or global stores.This is often useful in test cases to pre-populate the store before the test case instructs the topology to
process an input message
, and/or to check the store afterward.Note, that
StateStore
might benull
if a store is added but not connected to any processor.Caution: Using this method to access stores that are added by the DSL is unsafe as the store types may change. Stores added by the DSL should only be accessed via the corresponding typed methods like
getKeyValueStore(String)
etc.- Returns:
- all stores my name
- See Also:
getStateStore(String)
,getKeyValueStore(String)
,getTimestampedKeyValueStore(String)
,getWindowStore(String)
,getTimestampedWindowStore(String)
,getSessionStore(String)
-
getStateStore
public StateStore getStateStore(String name) throws IllegalArgumentException
Get theStateStore
with the given name. The store can be a "regular" or global store.Should be used for custom stores only. For built-in stores, the corresponding typed methods like
getKeyValueStore(String)
should be used.This is often useful in test cases to pre-populate the store before the test case instructs the topology to
process an input message
, and/or to check the store afterward.- Parameters:
name
- the name of the store- Returns:
- the state store, or
null
if no store has been registered with the given name - Throws:
IllegalArgumentException
- if the store is a built-in store likeKeyValueStore
,WindowStore
, orSessionStore
- See Also:
getAllStateStores()
,getKeyValueStore(String)
,getTimestampedKeyValueStore(String)
,getWindowStore(String)
,getTimestampedWindowStore(String)
,getSessionStore(String)
-
getKeyValueStore
public <K,V> KeyValueStore<K,V> getKeyValueStore(String name)
Get theKeyValueStore
orTimestampedKeyValueStore
with the given name. The store can be a "regular" or global store.If the registered store is a
TimestampedKeyValueStore
this method will return a value-only query interface. It is highly recommended to update the code for this case to avoid bugs and to usegetTimestampedKeyValueStore(String)
for full store access instead.This is often useful in test cases to pre-populate the store before the test case instructs the topology to
process an input message
, and/or to check the store afterward.- Parameters:
name
- the name of the store- Returns:
- the key value store, or
null
if noKeyValueStore
orTimestampedKeyValueStore
has been registered with the given name - See Also:
getAllStateStores()
,getStateStore(String)
,getTimestampedKeyValueStore(String)
,getWindowStore(String)
,getTimestampedWindowStore(String)
,getSessionStore(String)
-
getTimestampedKeyValueStore
public <K,V> KeyValueStore<K,ValueAndTimestamp<V>> getTimestampedKeyValueStore(String name)
Get theTimestampedKeyValueStore
with the given name. The store can be a "regular" or global store.This is often useful in test cases to pre-populate the store before the test case instructs the topology to
process an input message
, and/or to check the store afterward.- Parameters:
name
- the name of the store- Returns:
- the key value store, or
null
if noTimestampedKeyValueStore
has been registered with the given name - See Also:
getAllStateStores()
,getStateStore(String)
,getKeyValueStore(String)
,getWindowStore(String)
,getTimestampedWindowStore(String)
,getSessionStore(String)
-
getWindowStore
public <K,V> WindowStore<K,V> getWindowStore(String name)
Get theWindowStore
orTimestampedWindowStore
with the given name. The store can be a "regular" or global store.If the registered store is a
TimestampedWindowStore
this method will return a value-only query interface. It is highly recommended to update the code for this case to avoid bugs and to usegetTimestampedWindowStore(String)
for full store access instead.This is often useful in test cases to pre-populate the store before the test case instructs the topology to
process an input message
, and/or to check the store afterward.- Parameters:
name
- the name of the store- Returns:
- the key value store, or
null
if noWindowStore
orTimestampedWindowStore
has been registered with the given name - See Also:
getAllStateStores()
,getStateStore(String)
,getKeyValueStore(String)
,getTimestampedKeyValueStore(String)
,getTimestampedWindowStore(String)
,getSessionStore(String)
-
getTimestampedWindowStore
public <K,V> WindowStore<K,ValueAndTimestamp<V>> getTimestampedWindowStore(String name)
Get theTimestampedWindowStore
with the given name. The store can be a "regular" or global store.This is often useful in test cases to pre-populate the store before the test case instructs the topology to
process an input message
, and/or to check the store afterward.- Parameters:
name
- the name of the store- Returns:
- the key value store, or
null
if noTimestampedWindowStore
has been registered with the given name - See Also:
getAllStateStores()
,getStateStore(String)
,getKeyValueStore(String)
,getTimestampedKeyValueStore(String)
,getWindowStore(String)
,getSessionStore(String)
-
getSessionStore
public <K,V> SessionStore<K,V> getSessionStore(String name)
Get theSessionStore
with the given name. The store can be a "regular" or global store.This is often useful in test cases to pre-populate the store before the test case instructs the topology to
process an input message
, and/or to check the store afterward.- Parameters:
name
- the name of the store- Returns:
- the key value store, or
null
if noSessionStore
has been registered with the given name - See Also:
getAllStateStores()
,getStateStore(String)
,getKeyValueStore(String)
,getTimestampedKeyValueStore(String)
,getWindowStore(String)
,getTimestampedWindowStore(String)
-
close
public void close()
Close the driver, its topology, and all processors.- Specified by:
close
in interfaceAutoCloseable
- Specified by:
close
in interfaceCloseable
-
-