public class MockProcessorContext extends Object implements ProcessorContext, org.apache.kafka.streams.processor.internals.RecordCollector.Supplier
MockProcessorContext is a mock of ProcessorContext for users to test their Processor,
Transformer, and ValueTransformer implementations.
The tests for this class (org.apache.kafka.streams.MockProcessorContextTest) include several behavioral tests that serve as example usage.
Note that this class does not take any automated actions (such as firing scheduled punctuators).
It simply captures any data it witnesses.
If you require more automated tests, we recommend wrapping your Processor in a minimal source-processor-sink
Topology and using the TopologyTestDriver.
| Modifier and Type | Class and Description |
|---|---|
static class |
MockProcessorContext.CapturedForward |
static class |
MockProcessorContext.CapturedPunctuator
MockProcessorContext.CapturedPunctuator holds captured punctuators, along with their scheduling information. |
| Constructor and Description |
|---|
MockProcessorContext()
|
MockProcessorContext(Properties config)
|
MockProcessorContext(Properties config,
TaskId taskId,
File stateDir)
Create a
MockProcessorContext with a specified taskId and null stateDir. |
| Modifier and Type | Method and Description |
|---|---|
Map<String,Object> |
appConfigs()
Return all the application config properties as key/value pairs.
|
Map<String,Object> |
appConfigsWithPrefix(String prefix)
Return all the application config properties with the given key prefix, as key/value pairs
stripping the prefix.
|
String |
applicationId()
Return the application id.
|
void |
commit()
Request a commit.
|
boolean |
committed()
Whether
ProcessorContext.commit() has been called in this context. |
long |
currentStreamTimeMs()
Return the current stream-time in milliseconds.
|
long |
currentSystemTimeMs()
Return the current system timestamp (also called wall-clock time) in milliseconds.
|
<K,V> void |
forward(K key,
V value)
Forward a key/value pair to all downstream processors.
|
<K,V> void |
forward(K key,
V value,
To to)
Forward a key/value pair to the specified downstream processors.
|
List<MockProcessorContext.CapturedForward> |
forwarded()
Get all the forwarded data this context has observed.
|
List<MockProcessorContext.CapturedForward> |
forwarded(String childName)
Get all the forwarded data this context has observed for a specific child by name.
|
<S extends StateStore> |
getStateStore(String name)
Get the state store given the store name.
|
Headers |
headers()
Returns the headers of the current input record; could be
null if it is not
available. |
Serde<?> |
keySerde()
Return the default key serde.
|
StreamsMetrics |
metrics()
Return Metrics instance.
|
long |
offset()
Return the offset of the current input record; could be
-1 if it is not
available. |
int |
partition()
Return the partition id of the current input record; could be
-1 if it is not
available. |
org.apache.kafka.streams.processor.internals.RecordCollector |
recordCollector() |
void |
register(StateStore store,
StateRestoreCallback stateRestoreCallbackIsIgnoredInMock)
Register and possibly restores the specified storage engine.
|
void |
resetCommit()
Reset the commit capture to
false (whether or not it was previously true). |
void |
resetForwards()
Clear the captured forwarded data.
|
Cancellable |
schedule(Duration interval,
PunctuationType type,
Punctuator callback)
Schedule a periodic operation for processors.
|
List<MockProcessorContext.CapturedPunctuator> |
scheduledPunctuators()
Get the punctuators scheduled so far.
|
void |
setCurrentStreamTimeMs(long currentStreamTimeMs) |
void |
setCurrentSystemTimeMs(long currentSystemTimeMs) |
void |
setHeaders(Headers headers)
The context exposes this metadata for use in the processor.
|
void |
setOffset(long offset)
The context exposes this metadata for use in the processor.
|
void |
setPartition(int partition)
The context exposes this metadata for use in the processor.
|
void |
setRecordMetadata(String topic,
int partition,
long offset,
Headers headers,
long timestamp)
The context exposes these metadata for use in the processor.
|
void |
setRecordTimestamp(long recordTimestamp)
The context exposes this metadata for use in the processor.
|
void |
setTimestamp(long timestamp)
Deprecated.
Since 3.0.0; use
setRecordTimestamp(long) instead. |
void |
setTopic(String topic)
The context exposes this metadata for use in the processor.
|
File |
stateDir()
Return the state directory for the partition.
|
TaskId |
taskId()
Return the task id.
|
long |
timestamp()
Return the current timestamp.
|
String |
topic()
Return the topic name of the current input record; could be
null if it is not
available. |
Serde<?> |
valueSerde()
Return the default value serde.
|
public MockProcessorContext()
MockProcessorContext with dummy config and taskId and null stateDir.
Most unit tests using this mock won't need to know the taskId,
and most unit tests should be able to get by with the
InMemoryKeyValueStore, so the stateDir won't matter.public MockProcessorContext(Properties config)
MockProcessorContext with dummy taskId and null stateDir.
Most unit tests using this mock won't need to know the taskId,
and most unit tests should be able to get by with the
InMemoryKeyValueStore, so the stateDir won't matter.config - a Properties object, used to configure the context and the processor.public MockProcessorContext(Properties config, TaskId taskId, File stateDir)
MockProcessorContext with a specified taskId and null stateDir.config - a Properties object, used to configure the context and the processor.taskId - a TaskId, which the context makes available via taskId().stateDir - a File, which the context makes available viw stateDir().public String applicationId()
ProcessorContextapplicationId in interface ProcessorContextpublic TaskId taskId()
ProcessorContexttaskId in interface ProcessorContextpublic Map<String,Object> appConfigs()
ProcessorContext The config properties are defined in the StreamsConfig
object and associated to the ProcessorContext.
The type of the values is dependent on the type of the property
(e.g. the value of DEFAULT_KEY_SERDE_CLASS_CONFIG
will be of type Class, even if it was specified as a String to
StreamsConfig(Map)).
appConfigs in interface ProcessorContextpublic Map<String,Object> appConfigsWithPrefix(String prefix)
ProcessorContext The config properties are defined in the StreamsConfig
object and associated to the ProcessorContext.
appConfigsWithPrefix in interface ProcessorContextprefix - the properties prefixpublic long currentSystemTimeMs()
ProcessorContext Note: this method returns the internally cached system timestamp from the Kafka Stream runtime.
Thus, it may return a different value compared to System.currentTimeMillis().
currentSystemTimeMs in interface ProcessorContextpublic long currentStreamTimeMs()
ProcessorContext Stream-time is the maximum observed record timestamp so far
(including the currently processed record), i.e., it can be considered a high-watermark.
Stream-time is tracked on a per-task basis and is preserved across restarts and during task migration.
Note: this method is not supported for global processors (cf. Topology.addGlobalStore(org.apache.kafka.streams.state.StoreBuilder<?>, java.lang.String, org.apache.kafka.common.serialization.Deserializer<K>, org.apache.kafka.common.serialization.Deserializer<V>, java.lang.String, java.lang.String, org.apache.kafka.streams.processor.ProcessorSupplier<K, V>) (...)
and StreamsBuilder.addGlobalStore(org.apache.kafka.streams.state.StoreBuilder<?>, java.lang.String, org.apache.kafka.streams.kstream.Consumed<K, V>, org.apache.kafka.streams.processor.ProcessorSupplier<K, V>) (...),
because there is no concept of stream-time for this case.
Calling this method in a global processor will result in an UnsupportedOperationException.
currentStreamTimeMs in interface ProcessorContextpublic Serde<?> keySerde()
ProcessorContextkeySerde in interface ProcessorContextpublic Serde<?> valueSerde()
ProcessorContextvalueSerde in interface ProcessorContextpublic File stateDir()
ProcessorContextstateDir in interface ProcessorContextpublic StreamsMetrics metrics()
ProcessorContextmetrics in interface ProcessorContextpublic void setRecordMetadata(String topic, int partition, long offset, Headers headers, long timestamp)
topic - A topic namepartition - A partition numberoffset - A record offsettimestamp - A record timestamppublic void setTopic(String topic)
topic - A topic namepublic void setPartition(int partition)
partition - A partition numberpublic void setOffset(long offset)
offset - A record offsetpublic void setHeaders(Headers headers)
headers - Record headers@Deprecated public void setTimestamp(long timestamp)
setRecordTimestamp(long) instead.timestamp - A record timestamppublic void setRecordTimestamp(long recordTimestamp)
recordTimestamp - A record timestamppublic void setCurrentSystemTimeMs(long currentSystemTimeMs)
public void setCurrentStreamTimeMs(long currentStreamTimeMs)
public String topic()
ProcessorContextnull if it is not
available.
For example, if this method is invoked within a punctuation callback, or while processing a record that was forwarded by a punctuation
callback, the record won't have an associated topic.
Another example is
KTable.transformValues(ValueTransformerWithKeySupplier, String...)
(and siblings), that do not always guarantee to provide a valid topic name, as they might be
executed "out-of-band" due to some internal optimizations applied by the Kafka Streams DSL.
topic in interface ProcessorContextpublic int partition()
ProcessorContext-1 if it is not
available.
For example, if this method is invoked within a punctuation callback, or while processing a record that was forwarded by a punctuation
callback, the record won't have an associated partition id.
Another example is
KTable.transformValues(ValueTransformerWithKeySupplier, String...)
(and siblings), that do not always guarantee to provide a valid partition id, as they might be
executed "out-of-band" due to some internal optimizations applied by the Kafka Streams DSL.
partition in interface ProcessorContextpublic long offset()
ProcessorContext-1 if it is not
available.
For example, if this method is invoked within a punctuation callback, or while processing a record that was forwarded by a punctuation
callback, the record won't have an associated offset.
Another example is
KTable.transformValues(ValueTransformerWithKeySupplier, String...)
(and siblings), that do not always guarantee to provide a valid offset, as they might be
executed "out-of-band" due to some internal optimizations applied by the Kafka Streams DSL.
offset in interface ProcessorContextpublic Headers headers()
null if it is not
available.
Note, that headers should never be null in the actual Kafka Streams runtime,
even if they could be empty. However, this mock does not guarantee non-null headers.
Thus, you either need to add a null check to your production code to use this mock
for testing or you always need to set headers manually via setHeaders(Headers) to
avoid a NullPointerException from your Processor implementation.
headers in interface ProcessorContextpublic long timestamp()
ProcessorContext If it is triggered while processing a record streamed from the source processor,
timestamp is defined as the timestamp of the current input record; the timestamp is extracted from
ConsumerRecord by TimestampExtractor.
Note, that an upstream Processor might have set a new timestamp by calling
forward(..., To.all().withTimestamp(...)).
In particular, some Kafka Streams DSL operators set result record timestamps explicitly,
to guarantee deterministic results.
If it is triggered while processing a record generated not from the source processor (for example, if this method is invoked from the punctuate call), timestamp is defined as the current task's stream time, which is defined as the largest timestamp of any record processed by the task.
timestamp in interface ProcessorContextpublic void register(StateStore store, StateRestoreCallback stateRestoreCallbackIsIgnoredInMock)
ProcessorContextregister in interface ProcessorContextstore - the storage enginestateRestoreCallbackIsIgnoredInMock - the restoration callback logic for log-backed state stores upon restartpublic <S extends StateStore> S getStateStore(String name)
ProcessorContextgetStateStore in interface ProcessorContextS - The type or interface of the store to returnname - The store namepublic Cancellable schedule(Duration interval, PunctuationType type, Punctuator callback) throws IllegalArgumentException
ProcessorContextinitialization or
processing to
schedule a periodic callback — called a punctuation — to Punctuator.punctuate(long).
The type parameter controls what notion of time is used for punctuation:
PunctuationType.STREAM_TIME — uses "stream time", which is advanced by the processing of messages
in accordance with the timestamp as extracted by the TimestampExtractor in use.
The first punctuation will be triggered by the first record that is processed.
NOTE: Only advanced if messages arrivePunctuationType.WALL_CLOCK_TIME — uses system time (the wall-clock time),
which is advanced independent of whether new messages arrive.
The first punctuation will be triggered after interval has elapsed.
NOTE: This is best effort only as its granularity is limited by how long an iteration of the
processing loop takes to completePunctuationType.STREAM_TIME, when stream time advances more than intervalPunctuationType.WALL_CLOCK_TIME, on GC pause, too short interval, ...schedule in interface ProcessorContextinterval - the time interval between punctuations (supported minimum is 1 millisecond)type - one of: PunctuationType.STREAM_TIME, PunctuationType.WALL_CLOCK_TIMEcallback - a function consuming timestamps representing the current stream or system timeIllegalArgumentException - if the interval is not representable in millisecondspublic List<MockProcessorContext.CapturedPunctuator> scheduledPunctuators()
schedule(...).public <K,V> void forward(K key,
V value)
ProcessorContext If this method is called with Punctuator.punctuate(long) the record that
is sent downstream won't have any associated record metadata like topic, partition, or offset.
forward in interface ProcessorContextkey - keyvalue - valuepublic <K,V> void forward(K key,
V value,
To to)
ProcessorContext If this method is called with Punctuator.punctuate(long) the record that
is sent downstream won't have any associated record metadata like topic, partition, or offset.
forward in interface ProcessorContextkey - keyvalue - valueto - the options to use when forwardingpublic List<MockProcessorContext.CapturedForward> forwarded()
forward(...).public List<MockProcessorContext.CapturedForward> forwarded(String childName)
forward(...).childName - The child name to retrieve forwards forpublic void resetForwards()
public void commit()
ProcessorContextcommit in interface ProcessorContextpublic boolean committed()
ProcessorContext.commit() has been called in this context.true iff ProcessorContext.commit() has been called in this context since construction or reset.public void resetCommit()
false (whether or not it was previously true).public org.apache.kafka.streams.processor.internals.RecordCollector recordCollector()
recordCollector in interface org.apache.kafka.streams.processor.internals.RecordCollector.Supplier