Interface ValueTransformerWithKey<K,V,VR>
- Type Parameters:
K
- key typeV
- value typeVR
- transformed value type
ValueTransformerWithKey
interface for stateful mapping of a value to a new value (with possible new type).
This is a stateful record-by-record operation, i.e, transform(Object, Object)
is invoked individually for each
record of a stream and can access and modify a state that is available beyond a single call of
transform(Object, Object)
(cf. ValueMapper
for stateless value transformation).
Additionally, this ValueTransformerWithKey
can
schedule
a method to be
called periodically
with the provided context.
Note that the key is read-only and should not be modified, as this can lead to corrupt partitioning.
If ValueTransformerWithKey
is applied to a KeyValue
pair record the record's key is preserved.
Use ValueTransformerWithKeySupplier
to provide new instances of ValueTransformerWithKey
to
Kafka Stream's runtime.
If a record's key and value should be modified Transformer
can be used.
-
Method Summary
-
Method Details
-
init
Initialize this transformer. This is called once per instance when the topology gets initialized.The provided
context
can be used to access topology and record meta data, toschedule
a method to becalled periodically
and to access attachedStateStore
s.Note that
ProcessorContext
is updated in the background with the current record's meta data. Thus, it only contains valid record meta data when accessed withintransform(Object, Object)
.Note that using
ProcessorContext.forward(Object, Object)
orProcessorContext.forward(Object, Object, To)
is not allowed within any method ofValueTransformerWithKey
and will result in anexception
.- Parameters:
context
- the context- Throws:
IllegalStateException
- If store gets registered after initialization is already finishedStreamsException
- if the store's change log does not contain the partition
-
transform
Transform the given [key and] value to a new value. Additionally, anyStateStore
that isattached
to this operator can be accessed and modified arbitrarily (cf.ProcessorContext.getStateStore(String)
).Note that using
ProcessorContext.forward(Object, Object)
orProcessorContext.forward(Object, Object, To)
is not allowed withintransform
and will result in anexception
.Note that if a
ValueTransformerWithKey
is used in aKTable.transformValues(ValueTransformerWithKeySupplier, String...)
(or any other overload ofKTable#transformValues(...)
) operation, then the providedProcessorContext
frominit(ProcessorContext)
does not guarantee that all context information will be available whentransform()
is executed, as it might be executed "out-of-band" due to some internal optimizations applied by the Kafka Streams DSL.- Parameters:
readOnlyKey
- the read-only keyvalue
- the value to be transformed- Returns:
- the new value
-
close
void close()Close this processor and clean up any resources.It is not possible to return any new output records within
close()
. UsingProcessorContext.forward(Object, Object)
orProcessorContext.forward(Object, Object, To)
, will result in anexception
.
-