org.apache.kafka.streams.processor.internals.RecordCollector.Supplier, ProcessorContext@Evolving public class MockProcessorContext extends java.lang.Object implements ProcessorContext, org.apache.kafka.streams.processor.internals.RecordCollector.Supplier
MockProcessorContext is a mock of ProcessorContext for users to test their Processor,
Transformer, and ValueTransformer implementations.
The tests for this class (org.apache.kafka.streams.MockProcessorContextTest) include several behavioral tests that serve as example usage.
Note that this class does not take any automated actions (such as firing scheduled punctuators).
It simply captures any data it witnesses.
If you require more automated tests, we recommend wrapping your Processor in a minimal source-processor-sink
Topology and using the TopologyTestDriver.
| Modifier and Type | Class | Description |
|---|---|---|
static class |
MockProcessorContext.CapturedForward |
|
static class |
MockProcessorContext.CapturedPunctuator |
MockProcessorContext.CapturedPunctuator holds captured punctuators, along with their scheduling information. |
| Constructor | Description |
|---|---|
MockProcessorContext() |
|
MockProcessorContext(java.util.Properties config) |
|
MockProcessorContext(java.util.Properties config,
TaskId taskId,
java.io.File stateDir) |
Create a
MockProcessorContext with a specified taskId and null stateDir. |
| Modifier and Type | Method | Description |
|---|---|---|
java.util.Map<java.lang.String,java.lang.Object> |
appConfigs() |
Returns all the application config properties as key/value pairs.
|
java.util.Map<java.lang.String,java.lang.Object> |
appConfigsWithPrefix(java.lang.String prefix) |
Returns all the application config properties with the given key prefix, as key/value pairs
stripping the prefix.
|
java.lang.String |
applicationId() |
Returns the application id
|
void |
commit() |
Requests a commit
|
boolean |
committed() |
Whether
ProcessorContext.commit() has been called in this context. |
<K,V> void |
forward(K key,
V value) |
Forwards a key/value pair to all downstream processors.
|
<K,V> void |
forward(K key,
V value,
int childIndex) |
Forwards a key/value pair to one of the downstream processors designated by childIndex
|
<K,V> void |
forward(K key,
V value,
java.lang.String childName) |
Forwards a key/value pair to one of the downstream processors designated by the downstream processor name
|
<K,V> void |
forward(K key,
V value,
To to) |
Forwards a key/value pair to the specified downstream processors.
|
java.util.List<MockProcessorContext.CapturedForward> |
forwarded() |
Get all the forwarded data this context has observed.
|
java.util.List<MockProcessorContext.CapturedForward> |
forwarded(java.lang.String childName) |
Get all the forwarded data this context has observed for a specific child by name.
|
StateStore |
getStateStore(java.lang.String name) |
Get the state store given the store name.
|
Headers |
headers() |
Returns the headers of the current input record; could be null if it is not available
|
Serde<?> |
keySerde() |
Returns the default key serde
|
StreamsMetrics |
metrics() |
Returns Metrics instance
|
long |
offset() |
Returns the offset of the current input record; could be -1 if it is not
available (for example, if this method is invoked from the punctuate call)
|
int |
partition() |
Returns the partition id of the current input record; could be -1 if it is not
available (for example, if this method is invoked from the punctuate call)
|
org.apache.kafka.streams.processor.internals.RecordCollector |
recordCollector() |
|
void |
register(StateStore store,
StateRestoreCallback stateRestoreCallbackIsIgnoredInMock) |
Registers and possibly restores the specified storage engine.
|
void |
resetCommit() |
Reset the commit capture to
false (whether or not it was previously true). |
void |
resetForwards() |
Clear the captured forwarded data.
|
Cancellable |
schedule(long intervalMs,
PunctuationType type,
Punctuator callback) |
Schedules a periodic operation for processors.
|
java.util.List<MockProcessorContext.CapturedPunctuator> |
scheduledPunctuators() |
Get the punctuators scheduled so far.
|
void |
setHeaders(Headers headers) |
The context exposes this metadata for use in the processor.
|
void |
setOffset(long offset) |
The context exposes this metadata for use in the processor.
|
void |
setPartition(int partition) |
The context exposes this metadata for use in the processor.
|
void |
setRecordMetadata(java.lang.String topic,
int partition,
long offset,
Headers headers,
long timestamp) |
The context exposes these metadata for use in the processor.
|
void |
setTimestamp(long timestamp) |
The context exposes this metadata for use in the processor.
|
void |
setTopic(java.lang.String topic) |
The context exposes this metadata for use in the processor.
|
java.io.File |
stateDir() |
Returns the state directory for the partition.
|
TaskId |
taskId() |
Returns the task id
|
long |
timestamp() |
Returns the current timestamp.
|
java.lang.String |
topic() |
Returns the topic name of the current input record; could be null if it is not
available (for example, if this method is invoked from the punctuate call)
|
Serde<?> |
valueSerde() |
Returns the default value serde
|
public MockProcessorContext()
MockProcessorContext with dummy config and taskId and null stateDir.
Most unit tests using this mock won't need to know the taskId,
and most unit tests should be able to get by with the
InMemoryKeyValueStore, so the stateDir won't matter.public MockProcessorContext(java.util.Properties config)
MockProcessorContext with dummy taskId and null stateDir.
Most unit tests using this mock won't need to know the taskId,
and most unit tests should be able to get by with the
InMemoryKeyValueStore, so the stateDir won't matter.config - a Properties object, used to configure the context and the processor.public MockProcessorContext(java.util.Properties config,
TaskId taskId,
java.io.File stateDir)
MockProcessorContext with a specified taskId and null stateDir.config - a Properties object, used to configure the context and the processor.taskId - a TaskId, which the context makes available via taskId().stateDir - a File, which the context makes available viw stateDir().public java.lang.String applicationId()
ProcessorContextapplicationId in interface ProcessorContextpublic TaskId taskId()
ProcessorContexttaskId in interface ProcessorContextpublic java.util.Map<java.lang.String,java.lang.Object> appConfigs()
ProcessorContextStreamsConfig
object and associated to the ProcessorContext.
The type of the values is dependent on the type of the property
(e.g. the value of DEFAULT_KEY_SERDE_CLASS_CONFIG
will be of type Class, even if it was specified as a String to
StreamsConfig(Map)).
appConfigs in interface ProcessorContextpublic java.util.Map<java.lang.String,java.lang.Object> appConfigsWithPrefix(java.lang.String prefix)
ProcessorContextStreamsConfig
object and associated to the ProcessorContext.appConfigsWithPrefix in interface ProcessorContextprefix - the properties prefixpublic Serde<?> keySerde()
ProcessorContextkeySerde in interface ProcessorContextpublic Serde<?> valueSerde()
ProcessorContextvalueSerde in interface ProcessorContextpublic java.io.File stateDir()
ProcessorContextstateDir in interface ProcessorContextpublic StreamsMetrics metrics()
ProcessorContextmetrics in interface ProcessorContextpublic void setRecordMetadata(java.lang.String topic,
int partition,
long offset,
Headers headers,
long timestamp)
topic - A topic namepartition - A partition numberoffset - A record offsettimestamp - A record timestamppublic void setTopic(java.lang.String topic)
topic - A topic namepublic void setPartition(int partition)
partition - A partition numberpublic void setOffset(long offset)
offset - A record offsetpublic void setHeaders(Headers headers)
headers - Record headerspublic void setTimestamp(long timestamp)
timestamp - A record timestamppublic java.lang.String topic()
ProcessorContexttopic in interface ProcessorContextpublic int partition()
ProcessorContextpartition in interface ProcessorContextpublic long offset()
ProcessorContextoffset in interface ProcessorContextpublic Headers headers()
ProcessorContextheaders in interface ProcessorContextpublic long timestamp()
ProcessorContextConsumerRecord by TimestampExtractor.
If it is triggered while processing a record generated not from the source processor (for example,
if this method is invoked from the punctuate call), timestamp is defined as the current
task's stream time, which is defined as the smallest among all its input stream partition timestamps.timestamp in interface ProcessorContextpublic void register(StateStore store, StateRestoreCallback stateRestoreCallbackIsIgnoredInMock)
ProcessorContextregister in interface ProcessorContextstore - the storage enginestateRestoreCallbackIsIgnoredInMock - the restoration callback logic for log-backed state stores upon restartpublic StateStore getStateStore(java.lang.String name)
ProcessorContextgetStateStore in interface ProcessorContextname - The store namepublic Cancellable schedule(long intervalMs, PunctuationType type, Punctuator callback)
ProcessorContextinitialization or
processing to
schedule a periodic callback - called a punctuation - to Punctuator.punctuate(long).
The type parameter controls what notion of time is used for punctuation:
PunctuationType.STREAM_TIME - uses "stream time", which is advanced by the processing of messages
in accordance with the timestamp as extracted by the TimestampExtractor in use.
The first punctuation will be triggered by the first record that is processed.
NOTE: Only advanced if messages arrivePunctuationType.WALL_CLOCK_TIME - uses system time (the wall-clock time),
which is advanced at the polling interval (StreamsConfig.POLL_MS_CONFIG)
independent of whether new messages arrive.
The first punctuation will be triggered after interval has elapsed.
NOTE: This is best effort only as its granularity is limited by how long an iteration of the
processing loop takes to completePunctuationType.STREAM_TIME, when stream time advances more than intervalPunctuationType.WALL_CLOCK_TIME, on GC pause, too short interval, ...schedule in interface ProcessorContextintervalMs - the time interval between punctuations in millisecondstype - one of: PunctuationType.STREAM_TIME, PunctuationType.WALL_CLOCK_TIMEcallback - a function consuming timestamps representing the current stream or system timepublic java.util.List<MockProcessorContext.CapturedPunctuator> scheduledPunctuators()
schedule(...).public <K,V> void forward(K key,
V value)
ProcessorContextforward in interface ProcessorContextkey - keyvalue - valuepublic <K,V> void forward(K key,
V value,
To to)
ProcessorContextforward in interface ProcessorContextkey - keyvalue - valueto - the options to use when forwardingpublic <K,V> void forward(K key,
V value,
int childIndex)
ProcessorContextforward in interface ProcessorContextkey - keyvalue - valuechildIndex - index in list of children of this nodepublic <K,V> void forward(K key,
V value,
java.lang.String childName)
ProcessorContextforward in interface ProcessorContextkey - keyvalue - valuechildName - name of downstream processorpublic java.util.List<MockProcessorContext.CapturedForward> forwarded()
forward(...).public java.util.List<MockProcessorContext.CapturedForward> forwarded(java.lang.String childName)
forward(...).childName - The child name to retrieve forwards forpublic void resetForwards()
public void commit()
ProcessorContextcommit in interface ProcessorContextpublic boolean committed()
ProcessorContext.commit() has been called in this context.true iff ProcessorContext.commit() has been called in this context since construction or reset.public void resetCommit()
false (whether or not it was previously true).public org.apache.kafka.streams.processor.internals.RecordCollector recordCollector()
recordCollector in interface org.apache.kafka.streams.processor.internals.RecordCollector.Supplier