V
- value typeVR
- transformed value typepublic interface ValueTransformer<V,VR>
ValueTransformer
interface for stateful mapping of a value to a new value (with possible new type).
This is a stateful record-by-record operation, i.e, transform(Object)
is invoked individually for each
record of a stream and can access and modify a state that is available beyond a single call of
transform(Object)
(cf. ValueMapper
for stateless value transformation).
Additionally, this ValueTransformer
can schedule
a method to be called periodically
with the provided context.
If ValueTransformer
is applied to a KeyValue
pair record the record's key is preserved.
Use ValueTransformerSupplier
to provide new instances of ValueTransformer
to Kafka Stream's runtime.
If a record's key and value should be modified Transformer
can be used.
Modifier and Type | Method | Description |
---|---|---|
void |
close() |
Close this processor and clean up any resources.
|
void |
init(ProcessorContext context) |
Initialize this transformer.
|
VR |
punctuate(long timestamp) |
Deprecated.
Please use
Punctuator functional interface instead. |
VR |
transform(V value) |
Transform the given value to a new value.
|
void init(ProcessorContext context)
The provided context
can be used to access topology and record meta data, to
schedule
a method to be
called periodically
and to access attached StateStore
s.
Note that ProcessorContext
is updated in the background with the current record's meta data.
Thus, it only contains valid record meta data when accessed within transform(Object)
.
Note that using ProcessorContext.forward(Object, Object)
,
ProcessorContext.forward(Object, Object, int)
, or
ProcessorContext.forward(Object, Object, String)
is not allowed within any method of
ValueTransformer
and will result in an exception
.
context
- the contextjava.lang.IllegalStateException
- If store gets registered after initialization is already finishedStreamsException
- if the store's change log does not contain the partitionVR transform(V value)
StateStore
that is attached
to this operator can be accessed and modified arbitrarily (cf.
ProcessorContext.getStateStore(String)
).
Note, that using ProcessorContext.forward(Object, Object)
,
ProcessorContext.forward(Object, Object, int)
, and
ProcessorContext.forward(Object, Object, String)
is not allowed within transform
and
will result in an exception
.
value
- the value to be transformed@Deprecated VR punctuate(long timestamp)
Punctuator
functional interface instead.schedule itself
with
the context during initialization
.
It is not possible to return any new output records within punctuate
.
Using ProcessorContext.forward(Object, Object)
, ProcessorContext.forward(Object, Object, int)
,
or ProcessorContext.forward(Object, Object, String)
will result in an
exception
.
Furthermore, punctuate
must return null
.
Note, that punctuate
is called base on TimestampExtractor
)
and not based on wall-clock time.
timestamp
- the stream time when punctuate
is being callednull
—otherwise, an exception
will be thrownvoid close()
It is not possible to return any new output records within close()
.
Using ProcessorContext.forward(Object, Object)
, ProcessorContext.forward(Object, Object, int)
,
or ProcessorContext.forward(Object, Object, String)
will result in an exception
.