public class TopologyTestDriver extends Object implements Closeable
Topology or
 StreamsBuilder.
 You can test simple topologies that have a single processor, or very complex topologies that have multiple sources,
 processors, sinks, or sub-topologies.
 Best of all, the class works without a real Kafka broker, so the tests execute very quickly with very little overhead.
 
 Using the TopologyTestDriver in tests is easy: simply instantiate the driver and provide a Topology
 (cf. StreamsBuilder.build()) and configs, create
 and use a TestInputTopic to supply an input records to the topology,
 and then create and use a TestOutputTopic to read and
 verify any output records by the topology.
 
 Although the driver doesn't use a real Kafka broker, it does simulate Kafka consumers and
 producers that read and write raw byte[] messages.
 You can let TestInputTopic and TestOutputTopic to handle conversion
 form regular Java objects to raw bytes.
 
TopologyTestDriver instance, you need a Topology and a config.
 The configuration needs to be representative of what you'd supply to the real topology, so that means including
 several key properties (cf. StreamsConfig).
 For example, the following code fragment creates a configuration that specifies a local Kafka broker list (which is
 needed but not used), a timestamp extractor, and default serializers and deserializers for string keys and values:
 
 Properties props = new Properties();
 props.setProperty(StreamsConfig.BOOTSTRAP_SERVERS_CONFIG, "localhost:9091");
 props.setProperty(StreamsConfig.DEFAULT_TIMESTAMP_EXTRACTOR_CLASS_CONFIG, CustomTimestampExtractor.class.getName());
 props.setProperty(StreamsConfig.DEFAULT_KEY_SERDE_CLASS_CONFIG, Serdes.String().getClass().getName());
 props.setProperty(StreamsConfig.DEFAULT_VALUE_SERDE_CLASS_CONFIG, Serdes.String().getClass().getName());
 Topology topology = ...
 TopologyTestDriver driver = new TopologyTestDriver(topology, props);
 
 Your test can supply new input records on any of the topics that the topology's sources consume.
 This test driver simulates single-partitioned input topics.
 Here's an example of an input message on the topic named input-topic:
 
 TestInputTopic<String, String> inputTopic = driver.createInputTopic("input-topic", stringSerdeSerializer, stringSerializer);
 inputTopic.pipeInput("key1", "value1");
 TestInputTopic.pipeInput(Object, Object) is called, the driver passes the input message through to the appropriate source that
 consumes the named topic, and will invoke the processor(s) downstream of the source.
 If your topology's processors forward messages to sinks, your test can then consume these output messages to verify
 they match the expected outcome.
 For example, if our topology should have generated 2 messages on output-topic-1 and 1 message on
 output-topic-2, then our test can obtain these messages using the
 TestOutputTopic.readKeyValue()  method:
 
 TestOutputTopic<String, String> outputTopic1 = driver.createOutputTopic("output-topic-1", stringDeserializer, stringDeserializer);
 TestOutputTopic<String, String> outputTopic2 = driver.createOutputTopic("output-topic-2", stringDeserializer, stringDeserializer);
 KeyValue<String, String> record1 = outputTopic1.readKeyValue();
 KeyValue<String, String> record2 = outputTopic2.readKeyValue();
 KeyValue<String, String> record3 = outputTopic1.readKeyValue();
 
 Note, that calling pipeInput() will also trigger event-time base
 punctuation callbacks.
 However, you won't trigger wall-clock type punctuations that you must
 trigger manually via advanceWallClockTime(long).
 
 Finally, when completed, make sure your tests close() the driver to release all resources and
 processors.
 
 Some processors use Kafka state storage, so this driver class provides the generic
 getStateStore(String) as well as store-type specific methods so that your tests can check the underlying
 state store(s) used by your topology's processors.
 In our previous example, after we supplied a single input message and checked the three output messages, our test
 could also check the key value store to verify the processor correctly added, removed, or updated internal state.
 Or, our test might have pre-populated some state before submitting the input message, and verified afterward
 that the processor(s) correctly updated the state.
TestInputTopic, 
TestOutputTopic| Constructor and Description | 
|---|
| TopologyTestDriver(Topology topology,
                  Properties config)Create a new test diver instance. | 
| TopologyTestDriver(Topology topology,
                  Properties config,
                  Instant initialWallClockTime)Create a new test diver instance. | 
| TopologyTestDriver(Topology topology,
                  Properties config,
                  long initialWallClockTimeMs)Deprecated. 
 Since 2.4 use  TopologyTestDriver(Topology, Properties, Instant) | 
| Modifier and Type | Method and Description | 
|---|---|
| void | advanceWallClockTime(Duration advance)Advances the internally mocked wall-clock time. | 
| void | advanceWallClockTime(long advanceMs)Deprecated. 
 Since 2.4 use  advanceWallClockTime(Duration)instead | 
| void | close()Close the driver, its topology, and all processors. | 
| <K,V> TestInputTopic<K,V> | createInputTopic(String topicName,
                Serializer<K> keySerializer,
                Serializer<V> valueSerializer)Create  TestInputTopicto be used for piping records to topic
 Uses current system time as start timestamp for records. | 
| <K,V> TestInputTopic<K,V> | createInputTopic(String topicName,
                Serializer<K> keySerializer,
                Serializer<V> valueSerializer,
                Instant startTimestamp,
                Duration autoAdvance)Create  TestInputTopicto be used for piping records to topic
 Uses provided start timestamp and autoAdvance parameter for records | 
| <K,V> TestOutputTopic<K,V> | createOutputTopic(String topicName,
                 Deserializer<K> keyDeserializer,
                 Deserializer<V> valueDeserializer)Create  TestOutputTopicto be used for reading records from topic | 
| Map<String,StateStore> | getAllStateStores()Get all  StateStoresfrom the topology. | 
| <K,V> KeyValueStore<K,V> | getKeyValueStore(String name)Get the  KeyValueStoreorTimestampedKeyValueStorewith the given name. | 
| <K,V> SessionStore<K,V> | getSessionStore(String name)Get the  SessionStorewith the given name. | 
| StateStore | getStateStore(String name)Get the  StateStorewith the given name. | 
| <K,V> KeyValueStore<K,ValueAndTimestamp<V>> | getTimestampedKeyValueStore(String name)Get the  TimestampedKeyValueStorewith the given name. | 
| <K,V> WindowStore<K,ValueAndTimestamp<V>> | getTimestampedWindowStore(String name)Get the  TimestampedWindowStorewith the given name. | 
| <K,V> WindowStore<K,V> | getWindowStore(String name)Get the  WindowStoreorTimestampedWindowStorewith the given name. | 
| Map<MetricName,? extends Metric> | metrics()Get read-only handle on global metrics registry. | 
| void | pipeInput(ConsumerRecord<byte[],byte[]> consumerRecord)Deprecated. 
 Since 2.4 use methods of  TestInputTopicinstead | 
| void | pipeInput(List<ConsumerRecord<byte[],byte[]>> records)Deprecated. 
 Since 2.4 use methods of  TestInputTopicinstead | 
| ProducerRecord<byte[],byte[]> | readOutput(String topic)Deprecated. 
 Since 2.4 use methods of  TestOutputTopicinstead | 
| <K,V> ProducerRecord<K,V> | readOutput(String topic,
          Deserializer<K> keyDeserializer,
          Deserializer<V> valueDeserializer)Deprecated. 
 Since 2.4 use methods of  TestOutputTopicinstead | 
public TopologyTestDriver(Topology topology, Properties config)
current system time.topology - the topology to be testedconfig - the configuration for the topology@Deprecated public TopologyTestDriver(Topology topology, Properties config, long initialWallClockTimeMs)
TopologyTestDriver(Topology, Properties, Instant)topology - the topology to be testedconfig - the configuration for the topologyinitialWallClockTimeMs - the initial value of internally mocked wall-clock timepublic TopologyTestDriver(Topology topology, Properties config, Instant initialWallClockTime)
topology - the topology to be testedconfig - the configuration for the topologyinitialWallClockTime - the initial value of internally mocked wall-clock timepublic Map<MetricName,? extends Metric> metrics()
@Deprecated public void pipeInput(ConsumerRecord<byte[],byte[]> consumerRecord)
TestInputTopic insteadconsumerRecord - the record to be processed@Deprecated public void pipeInput(List<ConsumerRecord<byte[],byte[]>> records)
TestInputTopic insteadrecords - a list of records to be processed@Deprecated public void advanceWallClockTime(long advanceMs)
advanceWallClockTime(Duration) insteadwall-clock type
 punctuations.advanceMs - the amount of time to advance wall-clock time in millisecondspublic void advanceWallClockTime(Duration advance)
wall-clock type
 punctuations.advance - the amount of time to advance wall-clock time@Deprecated public ProducerRecord<byte[],byte[]> readOutput(String topic)
TestOutputTopic insteadpipeInput(ConsumerRecord).topic - the name of the topicnull if there is no record available@Deprecated public <K,V> ProducerRecord<K,V> readOutput(String topic, Deserializer<K> keyDeserializer, Deserializer<V> valueDeserializer)
TestOutputTopic insteadpipeInput(ConsumerRecord).topic - the name of the topickeyDeserializer - the deserializer for the key typevalueDeserializer - the deserializer for the value typenull if there is no record availablepublic final <K,V> TestInputTopic<K,V> createInputTopic(String topicName, Serializer<K> keySerializer, Serializer<V> valueSerializer)
TestInputTopic to be used for piping records to topic
 Uses current system time as start timestamp for records.
 Auto-advance is disabled.K - the key typeV - the value typetopicName - the name of the topickeySerializer - the Serializer for the key typevalueSerializer - the Serializer for the value typeTestInputTopic objectpublic final <K,V> TestInputTopic<K,V> createInputTopic(String topicName, Serializer<K> keySerializer, Serializer<V> valueSerializer, Instant startTimestamp, Duration autoAdvance)
TestInputTopic to be used for piping records to topic
 Uses provided start timestamp and autoAdvance parameter for recordsK - the key typeV - the value typetopicName - the name of the topickeySerializer - the Serializer for the key typevalueSerializer - the Serializer for the value typestartTimestamp - Start timestamp for auto-generated record timeautoAdvance - autoAdvance duration for auto-generated record timeTestInputTopic objectpublic final <K,V> TestOutputTopic<K,V> createOutputTopic(String topicName, Deserializer<K> keyDeserializer, Deserializer<V> valueDeserializer)
TestOutputTopic to be used for reading records from topicK - the key typeV - the value typetopicName - the name of the topickeyDeserializer - the Deserializer for the key typevalueDeserializer - the Deserializer for the value typeTestOutputTopic objectpublic Map<String,StateStore> getAllStateStores()
StateStores from the topology.
 The stores can be a "regular" or global stores.
 
 This is often useful in test cases to pre-populate the store before the test case instructs the topology to
 process an input message, and/or to check the store afterward.
 
 Note, that StateStore might be null if a store is added but not connected to any processor.
 
 Caution: Using this method to access stores that are added by the DSL is unsafe as the store
 types may change. Stores added by the DSL should only be accessed via the corresponding typed methods
 like getKeyValueStore(String) etc.
getStateStore(String), 
getKeyValueStore(String), 
getTimestampedKeyValueStore(String), 
getWindowStore(String), 
getTimestampedWindowStore(String), 
getSessionStore(String)public StateStore getStateStore(String name) throws IllegalArgumentException
StateStore with the given name.
 The store can be a "regular" or global store.
 
 Should be used for custom stores only.
 For built-in stores, the corresponding typed methods like getKeyValueStore(String) should be used.
 
 This is often useful in test cases to pre-populate the store before the test case instructs the topology to
 process an input message, and/or to check the store afterward.
name - the name of the storenull if no store has been registered with the given nameIllegalArgumentException - if the store is a built-in store like KeyValueStore,
 WindowStore, or SessionStoregetAllStateStores(), 
getKeyValueStore(String), 
getTimestampedKeyValueStore(String), 
getWindowStore(String), 
getTimestampedWindowStore(String), 
getSessionStore(String)public <K,V> KeyValueStore<K,V> getKeyValueStore(String name)
KeyValueStore or TimestampedKeyValueStore with the given name.
 The store can be a "regular" or global store.
 
 If the registered store is a TimestampedKeyValueStore this method will return a value-only query
 interface. It is highly recommended to update the code for this case to avoid bugs and to use
 getTimestampedKeyValueStore(String) for full store access instead.
 
 This is often useful in test cases to pre-populate the store before the test case instructs the topology to
 process an input message, and/or to check the store afterward.
name - the name of the storenull if no KeyValueStore or TimestampedKeyValueStore
 has been registered with the given namegetAllStateStores(), 
getStateStore(String), 
getTimestampedKeyValueStore(String), 
getWindowStore(String), 
getTimestampedWindowStore(String), 
getSessionStore(String)public <K,V> KeyValueStore<K,ValueAndTimestamp<V>> getTimestampedKeyValueStore(String name)
TimestampedKeyValueStore with the given name.
 The store can be a "regular" or global store.
 
 This is often useful in test cases to pre-populate the store before the test case instructs the topology to
 process an input message, and/or to check the store afterward.
name - the name of the storenull if no TimestampedKeyValueStore has been registered with the given namegetAllStateStores(), 
getStateStore(String), 
getKeyValueStore(String), 
getWindowStore(String), 
getTimestampedWindowStore(String), 
getSessionStore(String)public <K,V> WindowStore<K,V> getWindowStore(String name)
WindowStore or TimestampedWindowStore with the given name.
 The store can be a "regular" or global store.
 
 If the registered store is a TimestampedWindowStore this method will return a value-only query
 interface. It is highly recommended to update the code for this case to avoid bugs and to use
 getTimestampedWindowStore(String) for full store access instead.
 
 This is often useful in test cases to pre-populate the store before the test case instructs the topology to
 process an input message, and/or to check the store afterward.
name - the name of the storenull if no WindowStore or TimestampedWindowStore
 has been registered with the given namegetAllStateStores(), 
getStateStore(String), 
getKeyValueStore(String), 
getTimestampedKeyValueStore(String), 
getTimestampedWindowStore(String), 
getSessionStore(String)public <K,V> WindowStore<K,ValueAndTimestamp<V>> getTimestampedWindowStore(String name)
TimestampedWindowStore with the given name.
 The store can be a "regular" or global store.
 
 This is often useful in test cases to pre-populate the store before the test case instructs the topology to
 process an input message, and/or to check the store afterward.
name - the name of the storenull if no TimestampedWindowStore has been registered with the given namegetAllStateStores(), 
getStateStore(String), 
getKeyValueStore(String), 
getTimestampedKeyValueStore(String), 
getWindowStore(String), 
getSessionStore(String)public <K,V> SessionStore<K,V> getSessionStore(String name)
SessionStore with the given name.
 The store can be a "regular" or global store.
 
 This is often useful in test cases to pre-populate the store before the test case instructs the topology to
 process an input message, and/or to check the store afterward.
name - the name of the storenull if no SessionStore has been registered with the given namegetAllStateStores(), 
getStateStore(String), 
getKeyValueStore(String), 
getTimestampedKeyValueStore(String), 
getWindowStore(String), 
getTimestampedWindowStore(String)public void close()
close in interface Closeableclose in interface AutoCloseable