@InterfaceStability.Evolving public class TopologyTestDriver extends Object implements Closeable
Topology
or
StreamsBuilder
.
You can test simple topologies that have a single processor, or very complex topologies that have multiple sources,
processors, sinks, or sub-topologies.
Best of all, the class works without a real Kafka broker, so the tests execute very quickly with very little overhead.
Using the TopologyTestDriver
in tests is easy: simply instantiate the driver and provide a Topology
(cf. StreamsBuilder.build()
) and configs
, use the driver to supply an
input message to the topology, and then use the driver to read and verify any messages output by the topology.
Although the driver doesn't use a real Kafka broker, it does simulate Kafka consumers
and
producers
that read and write raw byte[]
messages.
You can either deal with messages that have byte[]
keys and values or you use ConsumerRecordFactory
and OutputVerifier
that work with regular Java objects instead of raw bytes.
TopologyTestDriver
instance, you need a Topology
and a config
.
The configuration needs to be representative of what you'd supply to the real topology, so that means including
several key properties (cf. StreamsConfig
).
For example, the following code fragment creates a configuration that specifies a local Kafka broker list (which is
needed but not used), a timestamp extractor, and default serializers and deserializers for string keys and values:
Properties props = new Properties();
props.setProperty(StreamsConfig.BOOTSTRAP_SERVERS_CONFIG, "localhost:9091");
props.setProperty(StreamsConfig.DEFAULT_TIMESTAMP_EXTRACTOR_CLASS_CONFIG, CustomTimestampExtractor.class.getName());
props.setProperty(StreamsConfig.DEFAULT_KEY_SERDE_CLASS_CONFIG, Serdes.String().getClass().getName());
props.setProperty(StreamsConfig.DEFAULT_VALUE_SERDE_CLASS_CONFIG, Serdes.String().getClass().getName());
Topology topology = ...
TopologyTestDriver driver = new TopologyTestDriver(topology, props);
Your test can supply new input records on any of the topics that the topology's sources consume.
This test driver simulates single-partitioned input topics.
Here's an example of an input message on the topic named input-topic
:
ConsumerRecordFactory factory = new ConsumerRecordFactory(strSerializer, strSerializer); driver.pipeInput(factory.create("input-topic","key1", "value1"));When
#pipeInput()
is called, the driver passes the input message through to the appropriate source that
consumes the named topic, and will invoke the processor(s) downstream of the source.
If your topology's processors forward messages to sinks, your test can then consume these output messages to verify
they match the expected outcome.
For example, if our topology should have generated 2 messages on output-topic-1
and 1 message on
output-topic-2
, then our test can obtain these messages using the
readOutput(String, Deserializer, Deserializer)
method:
ProducerRecord<String, String> record1 = driver.readOutput("output-topic-1", strDeserializer, strDeserializer);
ProducerRecord<String, String> record2 = driver.readOutput("output-topic-1", strDeserializer, strDeserializer);
ProducerRecord<String, String> record3 = driver.readOutput("output-topic-2", strDeserializer, strDeserializer);
Again, our example topology generates messages with string keys and values, so we supply our string deserializer
instance for use on both the keys and values. Your test logic can then verify whether these output records are
correct.
Note, that calling ProducerRecord.equals(Object)
compares all attributes including key, value, timestamp,
topic, partition, and headers.
If you only want to compare key and value (and maybe timestamp), using OutputVerifier
instead of
ProducerRecord.equals(Object)
can simplify your code as you can ignore attributes you are not interested in.
Note, that calling pipeInput()
will also trigger event-time
base
punctuation
callbacks.
However, you won't trigger wall-clock
type punctuations that you must
trigger manually via advanceWallClockTime(long)
.
Finally, when completed, make sure your tests close()
the driver to release all resources and
processors
.
Some processors use Kafka state storage
, so this driver class provides the generic
getStateStore(String)
as well as store-type specific methods so that your tests can check the underlying
state store(s) used by your topology's processors.
In our previous example, after we supplied a single input message and checked the three output messages, our test
could also check the key value store to verify the processor correctly added, removed, or updated internal state.
Or, our test might have pre-populated some state before submitting the input message, and verified afterward
that the processor(s) correctly updated the state.
ConsumerRecordFactory
,
OutputVerifier
Constructor and Description |
---|
TopologyTestDriver(Topology topology,
Properties config)
Create a new test diver instance.
|
TopologyTestDriver(Topology topology,
Properties config,
long initialWallClockTimeMs)
Create a new test diver instance.
|
Modifier and Type | Method and Description |
---|---|
void |
advanceWallClockTime(long advanceMs)
Advances the internally mocked wall-clock time.
|
void |
close()
Close the driver, its topology, and all processors.
|
Map<String,StateStore> |
getAllStateStores()
Get all
StateStores from the topology. |
<K,V> KeyValueStore<K,V> |
getKeyValueStore(String name)
Get the
KeyValueStore or TimestampedKeyValueStore with the given name. |
<K,V> SessionStore<K,V> |
getSessionStore(String name)
Get the
SessionStore with the given name. |
StateStore |
getStateStore(String name)
Get the
StateStore with the given name. |
<K,V> KeyValueStore<K,ValueAndTimestamp<V>> |
getTimestampedKeyValueStore(String name)
Get the
TimestampedKeyValueStore with the given name. |
<K,V> WindowStore<K,ValueAndTimestamp<V>> |
getTimestampedWindowStore(String name)
Get the
TimestampedWindowStore with the given name. |
<K,V> WindowStore<K,V> |
getWindowStore(String name)
Get the
WindowStore or TimestampedWindowStore with the given name. |
Map<MetricName,? extends Metric> |
metrics()
Get read-only handle on global metrics registry.
|
void |
pipeInput(ConsumerRecord<byte[],byte[]> consumerRecord)
Send an input message with the given key, value, and timestamp on the specified topic to the topology and then
commit the messages.
|
void |
pipeInput(List<ConsumerRecord<byte[],byte[]>> records)
Send input messages to the topology and then commit each message individually.
|
ProducerRecord<byte[],byte[]> |
readOutput(String topic)
Read the next record from the given topic.
|
<K,V> ProducerRecord<K,V> |
readOutput(String topic,
Deserializer<K> keyDeserializer,
Deserializer<V> valueDeserializer)
Read the next record from the given topic.
|
public TopologyTestDriver(Topology topology, Properties config)
current system time
.topology
- the topology to be testedconfig
- the configuration for the topologypublic TopologyTestDriver(Topology topology, Properties config, long initialWallClockTimeMs)
topology
- the topology to be testedconfig
- the configuration for the topologyinitialWallClockTimeMs
- the initial value of internally mocked wall-clock timepublic Map<MetricName,? extends Metric> metrics()
public void pipeInput(ConsumerRecord<byte[],byte[]> consumerRecord)
consumerRecord
- the record to be processedpublic void pipeInput(List<ConsumerRecord<byte[],byte[]>> records)
records
- a list of records to be processedpublic void advanceWallClockTime(long advanceMs)
wall-clock
type
punctuations
.advanceMs
- the amount of time to advance wall-clock time in millisecondspublic ProducerRecord<byte[],byte[]> readOutput(String topic)
pipeInput(ConsumerRecord)
.topic
- the name of the topicnull
if there is no record availablepublic <K,V> ProducerRecord<K,V> readOutput(String topic, Deserializer<K> keyDeserializer, Deserializer<V> valueDeserializer)
pipeInput(ConsumerRecord)
.topic
- the name of the topickeyDeserializer
- the deserializer for the key typevalueDeserializer
- the deserializer for the value typenull
if there is no record availablepublic Map<String,StateStore> getAllStateStores()
StateStores
from the topology.
The stores can be a "regular" or global stores.
This is often useful in test cases to pre-populate the store before the test case instructs the topology to
process an input message
, and/or to check the store afterward.
Note, that StateStore
might be null
if a store is added but not connected to any processor.
Caution: Using this method to access stores that are added by the DSL is unsafe as the store
types may change. Stores added by the DSL should only be accessed via the corresponding typed methods
like getKeyValueStore(String)
etc.
getStateStore(String)
,
getKeyValueStore(String)
,
getTimestampedKeyValueStore(String)
,
getWindowStore(String)
,
getTimestampedWindowStore(String)
,
getSessionStore(String)
public StateStore getStateStore(String name) throws IllegalArgumentException
StateStore
with the given name.
The store can be a "regular" or global store.
Should be used for custom stores only.
For built-in stores, the corresponding typed methods like getKeyValueStore(String)
should be used.
This is often useful in test cases to pre-populate the store before the test case instructs the topology to
process an input message
, and/or to check the store afterward.
name
- the name of the storenull
if no store has been registered with the given nameIllegalArgumentException
- if the store is a built-in store like KeyValueStore
,
WindowStore
, or SessionStore
getAllStateStores()
,
getKeyValueStore(String)
,
getTimestampedKeyValueStore(String)
,
getWindowStore(String)
,
getTimestampedWindowStore(String)
,
getSessionStore(String)
public <K,V> KeyValueStore<K,V> getKeyValueStore(String name)
KeyValueStore
or TimestampedKeyValueStore
with the given name.
The store can be a "regular" or global store.
If the registered store is a TimestampedKeyValueStore
this method will return a value-only query
interface. It is highly recommended to update the code for this case to avoid bugs and to use
getTimestampedKeyValueStore(String)
for full store access instead.
This is often useful in test cases to pre-populate the store before the test case instructs the topology to
process an input message
, and/or to check the store afterward.
name
- the name of the storenull
if no KeyValueStore
or TimestampedKeyValueStore
has been registered with the given namegetAllStateStores()
,
getStateStore(String)
,
getTimestampedKeyValueStore(String)
,
getWindowStore(String)
,
getTimestampedWindowStore(String)
,
getSessionStore(String)
public <K,V> KeyValueStore<K,ValueAndTimestamp<V>> getTimestampedKeyValueStore(String name)
TimestampedKeyValueStore
with the given name.
The store can be a "regular" or global store.
This is often useful in test cases to pre-populate the store before the test case instructs the topology to
process an input message
, and/or to check the store afterward.
name
- the name of the storenull
if no TimestampedKeyValueStore
has been registered with the given namegetAllStateStores()
,
getStateStore(String)
,
getKeyValueStore(String)
,
getWindowStore(String)
,
getTimestampedWindowStore(String)
,
getSessionStore(String)
public <K,V> WindowStore<K,V> getWindowStore(String name)
WindowStore
or TimestampedWindowStore
with the given name.
The store can be a "regular" or global store.
If the registered store is a TimestampedWindowStore
this method will return a value-only query
interface. It is highly recommended to update the code for this case to avoid bugs and to use
getTimestampedWindowStore(String)
for full store access instead.
This is often useful in test cases to pre-populate the store before the test case instructs the topology to
process an input message
, and/or to check the store afterward.
name
- the name of the storenull
if no WindowStore
or TimestampedWindowStore
has been registered with the given namegetAllStateStores()
,
getStateStore(String)
,
getKeyValueStore(String)
,
getTimestampedKeyValueStore(String)
,
getTimestampedWindowStore(String)
,
getSessionStore(String)
public <K,V> WindowStore<K,ValueAndTimestamp<V>> getTimestampedWindowStore(String name)
TimestampedWindowStore
with the given name.
The store can be a "regular" or global store.
This is often useful in test cases to pre-populate the store before the test case instructs the topology to
process an input message
, and/or to check the store afterward.
name
- the name of the storenull
if no TimestampedWindowStore
has been registered with the given namegetAllStateStores()
,
getStateStore(String)
,
getKeyValueStore(String)
,
getTimestampedKeyValueStore(String)
,
getWindowStore(String)
,
getSessionStore(String)
public <K,V> SessionStore<K,V> getSessionStore(String name)
SessionStore
with the given name.
The store can be a "regular" or global store.
This is often useful in test cases to pre-populate the store before the test case instructs the topology to
process an input message
, and/or to check the store afterward.
name
- the name of the storenull
if no SessionStore
has been registered with the given namegetAllStateStores()
,
getStateStore(String)
,
getKeyValueStore(String)
,
getTimestampedKeyValueStore(String)
,
getWindowStore(String)
,
getTimestampedWindowStore(String)
public void close()
close
in interface Closeable
close
in interface AutoCloseable