public class MockProcessorContext extends Object implements ProcessorContext, org.apache.kafka.streams.processor.internals.RecordCollector.Supplier
MockProcessorContext
is a mock of ProcessorContext
for users to test their Processor
,
Transformer
, and ValueTransformer
implementations.
The tests for this class (org.apache.kafka.streams.MockProcessorContextTest) include several behavioral tests that serve as example usage.
Note that this class does not take any automated actions (such as firing scheduled punctuators).
It simply captures any data it witnesses.
If you require more automated tests, we recommend wrapping your Processor
in a minimal source-processor-sink
Topology
and using the TopologyTestDriver
.
Modifier and Type | Class and Description |
---|---|
static class |
MockProcessorContext.CapturedForward |
static class |
MockProcessorContext.CapturedPunctuator
MockProcessorContext.CapturedPunctuator holds captured punctuators, along with their scheduling information. |
Constructor and Description |
---|
MockProcessorContext()
|
MockProcessorContext(Properties config)
|
MockProcessorContext(Properties config,
TaskId taskId,
File stateDir)
Create a
MockProcessorContext with a specified taskId and null stateDir. |
Modifier and Type | Method and Description |
---|---|
Map<String,Object> |
appConfigs()
Return all the application config properties as key/value pairs.
|
Map<String,Object> |
appConfigsWithPrefix(String prefix)
Return all the application config properties with the given key prefix, as key/value pairs
stripping the prefix.
|
String |
applicationId()
Return the application id.
|
void |
commit()
Request a commit.
|
boolean |
committed()
Whether
ProcessorContext.commit() has been called in this context. |
long |
currentStreamTimeMs()
Return the current stream-time in milliseconds.
|
long |
currentSystemTimeMs()
Return the current system timestamp (also called wall-clock time) in milliseconds.
|
<K,V> void |
forward(K key,
V value)
Forward a key/value pair to all downstream processors.
|
<K,V> void |
forward(K key,
V value,
To to)
Forward a key/value pair to the specified downstream processors.
|
List<MockProcessorContext.CapturedForward> |
forwarded()
Get all the forwarded data this context has observed.
|
List<MockProcessorContext.CapturedForward> |
forwarded(String childName)
Get all the forwarded data this context has observed for a specific child by name.
|
<S extends StateStore> |
getStateStore(String name)
Get the state store given the store name.
|
Headers |
headers()
Returns the headers of the current input record; could be
null if it is not
available. |
Serde<?> |
keySerde()
Return the default key serde.
|
StreamsMetrics |
metrics()
Return Metrics instance.
|
long |
offset()
Return the offset of the current input record; could be
-1 if it is not
available. |
int |
partition()
Return the partition id of the current input record; could be
-1 if it is not
available. |
org.apache.kafka.streams.processor.internals.RecordCollector |
recordCollector() |
void |
register(StateStore store,
StateRestoreCallback stateRestoreCallbackIsIgnoredInMock)
Register and possibly restores the specified storage engine.
|
void |
resetCommit()
Reset the commit capture to
false (whether or not it was previously true ). |
void |
resetForwards()
Clear the captured forwarded data.
|
Cancellable |
schedule(Duration interval,
PunctuationType type,
Punctuator callback)
Schedule a periodic operation for processors.
|
List<MockProcessorContext.CapturedPunctuator> |
scheduledPunctuators()
Get the punctuators scheduled so far.
|
void |
setCurrentStreamTimeMs(long currentStreamTimeMs) |
void |
setCurrentSystemTimeMs(long currentSystemTimeMs) |
void |
setHeaders(Headers headers)
The context exposes this metadata for use in the processor.
|
void |
setOffset(long offset)
The context exposes this metadata for use in the processor.
|
void |
setPartition(int partition)
The context exposes this metadata for use in the processor.
|
void |
setRecordMetadata(String topic,
int partition,
long offset,
Headers headers,
long timestamp)
The context exposes these metadata for use in the processor.
|
void |
setRecordTimestamp(long recordTimestamp)
The context exposes this metadata for use in the processor.
|
void |
setTimestamp(long timestamp)
Deprecated.
Since 3.0.0; use
setRecordTimestamp(long) instead. |
void |
setTopic(String topic)
The context exposes this metadata for use in the processor.
|
File |
stateDir()
Return the state directory for the partition.
|
TaskId |
taskId()
Return the task id.
|
long |
timestamp()
Return the current timestamp.
|
String |
topic()
Return the topic name of the current input record; could be
null if it is not
available. |
Serde<?> |
valueSerde()
Return the default value serde.
|
public MockProcessorContext()
MockProcessorContext
with dummy config
and taskId
and null
stateDir
.
Most unit tests using this mock won't need to know the taskId,
and most unit tests should be able to get by with the
InMemoryKeyValueStore
, so the stateDir won't matter.public MockProcessorContext(Properties config)
MockProcessorContext
with dummy taskId
and null
stateDir
.
Most unit tests using this mock won't need to know the taskId,
and most unit tests should be able to get by with the
InMemoryKeyValueStore
, so the stateDir won't matter.config
- a Properties object, used to configure the context and the processor.public MockProcessorContext(Properties config, TaskId taskId, File stateDir)
MockProcessorContext
with a specified taskId and null stateDir.config
- a Properties
object, used to configure the context and the processor.taskId
- a TaskId
, which the context makes available via taskId()
.stateDir
- a File
, which the context makes available viw stateDir()
.public String applicationId()
ProcessorContext
applicationId
in interface ProcessorContext
public TaskId taskId()
ProcessorContext
taskId
in interface ProcessorContext
public Map<String,Object> appConfigs()
ProcessorContext
The config properties are defined in the StreamsConfig
object and associated to the ProcessorContext.
The type of the values is dependent on the type
of the property
(e.g. the value of DEFAULT_KEY_SERDE_CLASS_CONFIG
will be of type Class
, even if it was specified as a String to
StreamsConfig(Map)
).
appConfigs
in interface ProcessorContext
public Map<String,Object> appConfigsWithPrefix(String prefix)
ProcessorContext
The config properties are defined in the StreamsConfig
object and associated to the ProcessorContext.
appConfigsWithPrefix
in interface ProcessorContext
prefix
- the properties prefixpublic long currentSystemTimeMs()
ProcessorContext
Note: this method returns the internally cached system timestamp from the Kafka Stream runtime.
Thus, it may return a different value compared to System.currentTimeMillis()
.
currentSystemTimeMs
in interface ProcessorContext
public long currentStreamTimeMs()
ProcessorContext
Stream-time is the maximum observed record timestamp
so far
(including the currently processed record), i.e., it can be considered a high-watermark.
Stream-time is tracked on a per-task basis and is preserved across restarts and during task migration.
Note: this method is not supported for global processors (cf. Topology.addGlobalStore(org.apache.kafka.streams.state.StoreBuilder<?>, java.lang.String, org.apache.kafka.common.serialization.Deserializer<K>, org.apache.kafka.common.serialization.Deserializer<V>, java.lang.String, java.lang.String, org.apache.kafka.streams.processor.ProcessorSupplier<K, V>)
(...)
and StreamsBuilder.addGlobalStore(org.apache.kafka.streams.state.StoreBuilder<?>, java.lang.String, org.apache.kafka.streams.kstream.Consumed<K, V>, org.apache.kafka.streams.processor.ProcessorSupplier<K, V>)
(...),
because there is no concept of stream-time for this case.
Calling this method in a global processor will result in an UnsupportedOperationException
.
currentStreamTimeMs
in interface ProcessorContext
public Serde<?> keySerde()
ProcessorContext
keySerde
in interface ProcessorContext
public Serde<?> valueSerde()
ProcessorContext
valueSerde
in interface ProcessorContext
public File stateDir()
ProcessorContext
stateDir
in interface ProcessorContext
public StreamsMetrics metrics()
ProcessorContext
metrics
in interface ProcessorContext
public void setRecordMetadata(String topic, int partition, long offset, Headers headers, long timestamp)
topic
- A topic namepartition
- A partition numberoffset
- A record offsettimestamp
- A record timestamppublic void setTopic(String topic)
topic
- A topic namepublic void setPartition(int partition)
partition
- A partition numberpublic void setOffset(long offset)
offset
- A record offsetpublic void setHeaders(Headers headers)
headers
- Record headers@Deprecated public void setTimestamp(long timestamp)
setRecordTimestamp(long)
instead.timestamp
- A record timestamppublic void setRecordTimestamp(long recordTimestamp)
recordTimestamp
- A record timestamppublic void setCurrentSystemTimeMs(long currentSystemTimeMs)
public void setCurrentStreamTimeMs(long currentStreamTimeMs)
public String topic()
ProcessorContext
null
if it is not
available.
For example, if this method is invoked within a punctuation callback
, or while processing a record that was forwarded by a punctuation
callback, the record won't have an associated topic.
Another example is
KTable.transformValues(ValueTransformerWithKeySupplier, String...)
(and siblings), that do not always guarantee to provide a valid topic name, as they might be
executed "out-of-band" due to some internal optimizations applied by the Kafka Streams DSL.
topic
in interface ProcessorContext
public int partition()
ProcessorContext
-1
if it is not
available.
For example, if this method is invoked within a punctuation callback
, or while processing a record that was forwarded by a punctuation
callback, the record won't have an associated partition id.
Another example is
KTable.transformValues(ValueTransformerWithKeySupplier, String...)
(and siblings), that do not always guarantee to provide a valid partition id, as they might be
executed "out-of-band" due to some internal optimizations applied by the Kafka Streams DSL.
partition
in interface ProcessorContext
public long offset()
ProcessorContext
-1
if it is not
available.
For example, if this method is invoked within a punctuation callback
, or while processing a record that was forwarded by a punctuation
callback, the record won't have an associated offset.
Another example is
KTable.transformValues(ValueTransformerWithKeySupplier, String...)
(and siblings), that do not always guarantee to provide a valid offset, as they might be
executed "out-of-band" due to some internal optimizations applied by the Kafka Streams DSL.
offset
in interface ProcessorContext
public Headers headers()
null
if it is not
available.
Note, that headers should never be null
in the actual Kafka Streams runtime,
even if they could be empty. However, this mock does not guarantee non-null
headers.
Thus, you either need to add a null
check to your production code to use this mock
for testing or you always need to set headers manually via setHeaders(Headers)
to
avoid a NullPointerException
from your Processor
implementation.
headers
in interface ProcessorContext
public long timestamp()
ProcessorContext
If it is triggered while processing a record streamed from the source processor,
timestamp is defined as the timestamp of the current input record; the timestamp is extracted from
ConsumerRecord
by TimestampExtractor
.
Note, that an upstream Processor
might have set a new timestamp by calling
forward(..., To.all().withTimestamp(...))
.
In particular, some Kafka Streams DSL operators set result record timestamps explicitly,
to guarantee deterministic results.
If it is triggered while processing a record generated not from the source processor (for example, if this method is invoked from the punctuate call), timestamp is defined as the current task's stream time, which is defined as the largest timestamp of any record processed by the task.
timestamp
in interface ProcessorContext
public void register(StateStore store, StateRestoreCallback stateRestoreCallbackIsIgnoredInMock)
ProcessorContext
register
in interface ProcessorContext
store
- the storage enginestateRestoreCallbackIsIgnoredInMock
- the restoration callback logic for log-backed state stores upon restartpublic <S extends StateStore> S getStateStore(String name)
ProcessorContext
getStateStore
in interface ProcessorContext
S
- The type or interface of the store to returnname
- The store namepublic Cancellable schedule(Duration interval, PunctuationType type, Punctuator callback) throws IllegalArgumentException
ProcessorContext
initialization
or
processing
to
schedule a periodic callback — called a punctuation — to Punctuator.punctuate(long)
.
The type parameter controls what notion of time is used for punctuation:
PunctuationType.STREAM_TIME
— uses "stream time", which is advanced by the processing of messages
in accordance with the timestamp as extracted by the TimestampExtractor
in use.
The first punctuation will be triggered by the first record that is processed.
NOTE: Only advanced if messages arrivePunctuationType.WALL_CLOCK_TIME
— uses system time (the wall-clock time),
which is advanced independent of whether new messages arrive.
The first punctuation will be triggered after interval has elapsed.
NOTE: This is best effort only as its granularity is limited by how long an iteration of the
processing loop takes to completePunctuationType.STREAM_TIME
, when stream time advances more than intervalPunctuationType.WALL_CLOCK_TIME
, on GC pause, too short interval, ...schedule
in interface ProcessorContext
interval
- the time interval between punctuations (supported minimum is 1 millisecond)type
- one of: PunctuationType.STREAM_TIME
, PunctuationType.WALL_CLOCK_TIME
callback
- a function consuming timestamps representing the current stream or system timeIllegalArgumentException
- if the interval is not representable in millisecondspublic List<MockProcessorContext.CapturedPunctuator> scheduledPunctuators()
schedule(...)
.public <K,V> void forward(K key, V value)
ProcessorContext
If this method is called with Punctuator.punctuate(long)
the record that
is sent downstream won't have any associated record metadata like topic, partition, or offset.
forward
in interface ProcessorContext
key
- keyvalue
- valuepublic <K,V> void forward(K key, V value, To to)
ProcessorContext
If this method is called with Punctuator.punctuate(long)
the record that
is sent downstream won't have any associated record metadata like topic, partition, or offset.
forward
in interface ProcessorContext
key
- keyvalue
- valueto
- the options to use when forwardingpublic List<MockProcessorContext.CapturedForward> forwarded()
forward(...)
.public List<MockProcessorContext.CapturedForward> forwarded(String childName)
forward(...)
.childName
- The child name to retrieve forwards forpublic void resetForwards()
public void commit()
ProcessorContext
commit
in interface ProcessorContext
public boolean committed()
ProcessorContext.commit()
has been called in this context.true
iff ProcessorContext.commit()
has been called in this context since construction or reset.public void resetCommit()
false
(whether or not it was previously true
).public org.apache.kafka.streams.processor.internals.RecordCollector recordCollector()
recordCollector
in interface org.apache.kafka.streams.processor.internals.RecordCollector.Supplier