public class MockProcessorContext extends Object implements ProcessorContext, org.apache.kafka.streams.processor.internals.RecordCollector.Supplier
MockProcessorContext
is a mock of ProcessorContext
for users to test their Processor
,
Transformer
, and ValueTransformer
implementations.
The tests for this class (org.apache.kafka.streams.MockProcessorContextTest) include several behavioral tests that serve as example usage.
Note that this class does not take any automated actions (such as firing scheduled punctuators).
It simply captures any data it witnesses.
If you require more automated tests, we recommend wrapping your Processor
in a minimal source-processor-sink
Topology
and using the TopologyTestDriver
.
Modifier and Type | Class and Description |
---|---|
static class |
MockProcessorContext.CapturedForward |
static class |
MockProcessorContext.CapturedPunctuator
MockProcessorContext.CapturedPunctuator holds captured punctuators, along with their scheduling information. |
Constructor and Description |
---|
MockProcessorContext()
|
MockProcessorContext(Properties config)
|
MockProcessorContext(Properties config,
TaskId taskId,
File stateDir)
Create a
MockProcessorContext with a specified taskId and null stateDir. |
Modifier and Type | Method and Description |
---|---|
Map<String,Object> |
appConfigs()
Returns all the application config properties as key/value pairs.
|
Map<String,Object> |
appConfigsWithPrefix(String prefix)
Returns all the application config properties with the given key prefix, as key/value pairs
stripping the prefix.
|
String |
applicationId()
Returns the application id.
|
void |
commit()
Requests a commit.
|
boolean |
committed()
Whether
ProcessorContext.commit() has been called in this context. |
<K,V> void |
forward(K key,
V value)
Forwards a key/value pair to all downstream processors.
|
<K,V> void |
forward(K key,
V value,
int childIndex)
Deprecated.
|
<K,V> void |
forward(K key,
V value,
String childName)
Deprecated.
|
<K,V> void |
forward(K key,
V value,
To to)
Forwards a key/value pair to the specified downstream processors.
|
List<MockProcessorContext.CapturedForward> |
forwarded()
Get all the forwarded data this context has observed.
|
List<MockProcessorContext.CapturedForward> |
forwarded(String childName)
Get all the forwarded data this context has observed for a specific child by name.
|
<S extends StateStore> |
getStateStore(String name)
Get the state store given the store name.
|
Headers |
headers()
Returns the headers of the current input record; could be null if it is not
available (for example, if this method is invoked from the punctuate call).
|
Serde<?> |
keySerde()
Returns the default key serde.
|
StreamsMetrics |
metrics()
Returns Metrics instance.
|
long |
offset()
Returns the offset of the current input record; could be -1 if it is not
available (for example, if this method is invoked from the punctuate call).
|
int |
partition()
Returns the partition id of the current input record; could be -1 if it is not
available (for example, if this method is invoked from the punctuate call).
|
org.apache.kafka.streams.processor.internals.RecordCollector |
recordCollector() |
void |
register(StateStore store,
StateRestoreCallback stateRestoreCallbackIsIgnoredInMock)
Registers and possibly restores the specified storage engine.
|
void |
resetCommit()
Reset the commit capture to
false (whether or not it was previously true ). |
void |
resetForwards()
Clear the captured forwarded data.
|
Cancellable |
schedule(Duration interval,
PunctuationType type,
Punctuator callback)
Schedules a periodic operation for processors.
|
Cancellable |
schedule(long intervalMs,
PunctuationType type,
Punctuator callback)
Deprecated.
|
List<MockProcessorContext.CapturedPunctuator> |
scheduledPunctuators()
Get the punctuators scheduled so far.
|
void |
setHeaders(Headers headers)
The context exposes this metadata for use in the processor.
|
void |
setOffset(long offset)
The context exposes this metadata for use in the processor.
|
void |
setPartition(int partition)
The context exposes this metadata for use in the processor.
|
void |
setRecordMetadata(String topic,
int partition,
long offset,
Headers headers,
long timestamp)
The context exposes these metadata for use in the processor.
|
void |
setTimestamp(long timestamp)
The context exposes this metadata for use in the processor.
|
void |
setTopic(String topic)
The context exposes this metadata for use in the processor.
|
File |
stateDir()
Returns the state directory for the partition.
|
TaskId |
taskId()
Returns the task id.
|
long |
timestamp()
Returns the current timestamp.
|
String |
topic()
Returns the topic name of the current input record; could be null if it is not
available (for example, if this method is invoked from the punctuate call).
|
Serde<?> |
valueSerde()
Returns the default value serde.
|
public MockProcessorContext()
MockProcessorContext
with dummy config
and taskId
and null
stateDir
.
Most unit tests using this mock won't need to know the taskId,
and most unit tests should be able to get by with the
InMemoryKeyValueStore
, so the stateDir won't matter.public MockProcessorContext(Properties config)
MockProcessorContext
with dummy taskId
and null
stateDir
.
Most unit tests using this mock won't need to know the taskId,
and most unit tests should be able to get by with the
InMemoryKeyValueStore
, so the stateDir won't matter.config
- a Properties object, used to configure the context and the processor.public MockProcessorContext(Properties config, TaskId taskId, File stateDir)
MockProcessorContext
with a specified taskId and null stateDir.config
- a Properties
object, used to configure the context and the processor.taskId
- a TaskId
, which the context makes available via taskId()
.stateDir
- a File
, which the context makes available viw stateDir()
.public String applicationId()
ProcessorContext
applicationId
in interface ProcessorContext
public TaskId taskId()
ProcessorContext
taskId
in interface ProcessorContext
public Map<String,Object> appConfigs()
ProcessorContext
The config properties are defined in the StreamsConfig
object and associated to the ProcessorContext.
The type of the values is dependent on the type
of the property
(e.g. the value of DEFAULT_KEY_SERDE_CLASS_CONFIG
will be of type Class
, even if it was specified as a String to
StreamsConfig(Map)
).
appConfigs
in interface ProcessorContext
public Map<String,Object> appConfigsWithPrefix(String prefix)
ProcessorContext
The config properties are defined in the StreamsConfig
object and associated to the ProcessorContext.
appConfigsWithPrefix
in interface ProcessorContext
prefix
- the properties prefixpublic Serde<?> keySerde()
ProcessorContext
keySerde
in interface ProcessorContext
public Serde<?> valueSerde()
ProcessorContext
valueSerde
in interface ProcessorContext
public File stateDir()
ProcessorContext
stateDir
in interface ProcessorContext
public StreamsMetrics metrics()
ProcessorContext
metrics
in interface ProcessorContext
public void setRecordMetadata(String topic, int partition, long offset, Headers headers, long timestamp)
topic
- A topic namepartition
- A partition numberoffset
- A record offsettimestamp
- A record timestamppublic void setTopic(String topic)
topic
- A topic namepublic void setPartition(int partition)
partition
- A partition numberpublic void setOffset(long offset)
offset
- A record offsetpublic void setHeaders(Headers headers)
headers
- Record headerspublic void setTimestamp(long timestamp)
timestamp
- A record timestamppublic String topic()
ProcessorContext
topic
in interface ProcessorContext
public int partition()
ProcessorContext
partition
in interface ProcessorContext
public long offset()
ProcessorContext
offset
in interface ProcessorContext
public Headers headers()
ProcessorContext
headers
in interface ProcessorContext
public long timestamp()
ProcessorContext
If it is triggered while processing a record streamed from the source processor,
timestamp is defined as the timestamp of the current input record; the timestamp is extracted from
ConsumerRecord
by TimestampExtractor
.
If it is triggered while processing a record generated not from the source processor (for example, if this method is invoked from the punctuate call), timestamp is defined as the current task's stream time, which is defined as the largest timestamp of any record processed by the task.
timestamp
in interface ProcessorContext
public void register(StateStore store, StateRestoreCallback stateRestoreCallbackIsIgnoredInMock)
ProcessorContext
register
in interface ProcessorContext
store
- the storage enginestateRestoreCallbackIsIgnoredInMock
- the restoration callback logic for log-backed state stores upon restartpublic <S extends StateStore> S getStateStore(String name)
ProcessorContext
getStateStore
in interface ProcessorContext
S
- The type or interface of the store to returnname
- The store name@Deprecated public Cancellable schedule(long intervalMs, PunctuationType type, Punctuator callback)
ProcessorContext
initialization
or
processing
to
schedule a periodic callback — called a punctuation — to Punctuator.punctuate(long)
.
The type parameter controls what notion of time is used for punctuation:
PunctuationType.STREAM_TIME
— uses "stream time", which is advanced by the processing of messages
in accordance with the timestamp as extracted by the TimestampExtractor
in use.
The first punctuation will be triggered by the first record that is processed.
NOTE: Only advanced if messages arrivePunctuationType.WALL_CLOCK_TIME
— uses system time (the wall-clock time),
which is advanced independent of whether new messages arrive.
The first punctuation will be triggered after interval has elapsed.
NOTE: This is best effort only as its granularity is limited by how long an iteration of the
processing loop takes to completePunctuationType.STREAM_TIME
, when stream time advances more than intervalPunctuationType.WALL_CLOCK_TIME
, on GC pause, too short interval, ...schedule
in interface ProcessorContext
intervalMs
- the time interval between punctuations in millisecondstype
- one of: PunctuationType.STREAM_TIME
, PunctuationType.WALL_CLOCK_TIME
callback
- a function consuming timestamps representing the current stream or system timepublic Cancellable schedule(Duration interval, PunctuationType type, Punctuator callback) throws IllegalArgumentException
ProcessorContext
initialization
or
processing
to
schedule a periodic callback — called a punctuation — to Punctuator.punctuate(long)
.
The type parameter controls what notion of time is used for punctuation:
PunctuationType.STREAM_TIME
— uses "stream time", which is advanced by the processing of messages
in accordance with the timestamp as extracted by the TimestampExtractor
in use.
The first punctuation will be triggered by the first record that is processed.
NOTE: Only advanced if messages arrivePunctuationType.WALL_CLOCK_TIME
— uses system time (the wall-clock time),
which is advanced independent of whether new messages arrive.
The first punctuation will be triggered after interval has elapsed.
NOTE: This is best effort only as its granularity is limited by how long an iteration of the
processing loop takes to completePunctuationType.STREAM_TIME
, when stream time advances more than intervalPunctuationType.WALL_CLOCK_TIME
, on GC pause, too short interval, ...schedule
in interface ProcessorContext
interval
- the time interval between punctuations (supported minimum is 1 millisecond)type
- one of: PunctuationType.STREAM_TIME
, PunctuationType.WALL_CLOCK_TIME
callback
- a function consuming timestamps representing the current stream or system timeIllegalArgumentException
- if the interval is not representable in millisecondspublic List<MockProcessorContext.CapturedPunctuator> scheduledPunctuators()
schedule(...)
.public <K,V> void forward(K key, V value)
ProcessorContext
forward
in interface ProcessorContext
key
- keyvalue
- valuepublic <K,V> void forward(K key, V value, To to)
ProcessorContext
forward
in interface ProcessorContext
key
- keyvalue
- valueto
- the options to use when forwarding@Deprecated public <K,V> void forward(K key, V value, int childIndex)
ProcessorContext
forward
in interface ProcessorContext
key
- keyvalue
- valuechildIndex
- index in list of children of this node@Deprecated public <K,V> void forward(K key, V value, String childName)
ProcessorContext
forward
in interface ProcessorContext
key
- keyvalue
- valuechildName
- name of downstream processorpublic List<MockProcessorContext.CapturedForward> forwarded()
forward(...)
.public List<MockProcessorContext.CapturedForward> forwarded(String childName)
forward(...)
.childName
- The child name to retrieve forwards forpublic void resetForwards()
public void commit()
ProcessorContext
commit
in interface ProcessorContext
public boolean committed()
ProcessorContext.commit()
has been called in this context.true
iff ProcessorContext.commit()
has been called in this context since construction or reset.public void resetCommit()
false
(whether or not it was previously true
).public org.apache.kafka.streams.processor.internals.RecordCollector recordCollector()
recordCollector
in interface org.apache.kafka.streams.processor.internals.RecordCollector.Supplier