Interface ConnectedStoreProvider
- All Known Subinterfaces:
FixedKeyProcessorSupplier<KIn,
,VIn, VOut> ProcessorSupplier<KIn,
,VIn, KOut, VOut> ProcessorSupplier<K,
,V> TransformerSupplier<K,
,V, R> ValueTransformerSupplier<V,
,VR> ValueTransformerWithKeySupplier<K,
V, VR>
public interface ConnectedStoreProvider
Provides a set of
StoreBuilder
s that will be automatically added to the topology and connected to the
associated processor.
Implementing this interface is recommended when the associated processor wants to encapsulate its usage of its state stores, rather than exposing them to the user building the topology.
In the event that separate but related processors may want to share the same store, different ConnectedStoreProvider
s
may provide the same instance of StoreBuilder
, as shown below.
class StateSharingProcessors {
StoreBuilder<KeyValueStore<String, String>> storeBuilder =
Stores.keyValueStoreBuilder(Stores.persistentKeyValueStore("myStore"), Serdes.String(), Serdes.String());
class SupplierA implements ProcessorSupplier<String, Integer> {
Processor<String, Integer> get() {
return new Processor() {
private StateStore store;
void init(ProcessorContext context) {
this.store = context.getStateStore("myStore");
}
void process(String key, Integer value) {
// can access this.store
}
void close() {
// can access this.store
}
}
}
Set<StoreBuilder<?>> stores() {
return Collections.singleton(storeBuilder);
}
}
class SupplierB implements ProcessorSupplier<String, String> {
Processor<String, String> get() {
return new Processor() {
private StateStore store;
void init(ProcessorContext context) {
this.store = context.getStateStore("myStore");
}
void process(String key, String value) {
// can access this.store
}
void close() {
// can access this.store
}
}
}
Set<StoreBuilder<?>> stores() {
return Collections.singleton(storeBuilder);
}
}
}
- See Also:
-
Topology.addProcessor(String, org.apache.kafka.streams.processor.api.ProcessorSupplier, String...)
KStream.process(org.apache.kafka.streams.processor.api.ProcessorSupplier, String...)
KStream.process(org.apache.kafka.streams.processor.api.ProcessorSupplier, Named, String...)
KStream.transform(TransformerSupplier, String...)
KStream.transform(TransformerSupplier, Named, String...)
KStream.transformValues(ValueTransformerSupplier, String...)
KStream.transformValues(ValueTransformerSupplier, Named, String...)
KStream.transformValues(ValueTransformerWithKeySupplier, String...)
KStream.transformValues(ValueTransformerWithKeySupplier, Named, String...)
KStream.flatTransform(TransformerSupplier, String...)
KStream.flatTransform(TransformerSupplier, Named, String...)
KStream.flatTransformValues(ValueTransformerSupplier, String...)
KStream.flatTransformValues(ValueTransformerSupplier, Named, String...)
KStream.flatTransformValues(ValueTransformerWithKeySupplier, String...)
KStream.flatTransformValues(ValueTransformerWithKeySupplier, Named, String...)
-
Method Summary
-
Method Details
-
stores
- Returns:
- the state stores to be connected and added, or null if no stores should be automatically connected and added.
-