V - value typeVR - transformed value typepublic interface ValueTransformer<V,VR>
ValueTransformer interface for stateful mapping of a value to a new value (with possible new type).
This is a stateful record-by-record operation, i.e, transform(Object) is invoked individually for each
record of a stream and can access and modify a state that is available beyond a single call of
transform(Object) (cf. ValueMapper for stateless value transformation).
Additionally, the interface can be called in regular intervals based on the processing progress
(cf. punctuate(long).
If ValueTransformer is applied to a KeyValue pair record the record's key is preserved.
Use ValueTransformerSupplier to provide new instances of ValueTransformer to Kafka Stream's runtime.
If a record's key and value should be modified Transformer can be used.
| Modifier and Type | Method and Description |
|---|---|
void |
close()
Close this processor and clean up any resources.
|
void |
init(ProcessorContext context)
Initialize this transformer.
|
VR |
punctuate(long timestamp)
Perform any periodic operations if this processor
schedule itself with
the context during initialization. |
VR |
transform(V value)
Transform the given value to a new value.
|
void init(ProcessorContext context)
The provided context can be used to access topology and record meta data, to
schedule itself for periodical calls (cf. punctuate(long)), and
to access attached StateStores.
Note that ProcessorContext is updated in the background with the current record's meta data.
Thus, it only contains valid record meta data when accessed within transform(Object).
Note that using ProcessorContext.forward(Object, Object),
ProcessorContext.forward(Object, Object, int), or
ProcessorContext.forward(Object, Object, String) is not allowed within any method of
ValueTransformer and will result in an exception.
context - the contextVR transform(V value)
StateStore that is attached to this operator can be accessed and modified arbitrarily (cf.
ProcessorContext.getStateStore(String)).
Note, that using ProcessorContext.forward(Object, Object),
ProcessorContext.forward(Object, Object, int), and
ProcessorContext.forward(Object, Object, String) is not allowed within transform and
will result in an exception.
value - the value to be transformedVR punctuate(long timestamp)
schedule itself with
the context during initialization.
It is not possible to return any new output records within punctuate.
Using ProcessorContext.forward(Object, Object), ProcessorContext.forward(Object, Object, int),
or ProcessorContext.forward(Object, Object, String) will result in an
exception.
Furthermore, punctuate must return null.
Note, that punctuate is called base on TimestampExtractor)
and not based on wall-clock time.
timestamp - the stream time when punctuate is being callednull—otherwise, an exception will be thrownvoid close()
It is not possible to return any new output records within close().
Using ProcessorContext.forward(Object, Object), ProcessorContext.forward(Object, Object, int),
or ProcessorContext.forward(Object, Object, String) will result in an exception.