Interface ConnectedStoreProvider
-
- All Known Subinterfaces:
ProcessorSupplier<KIn,VIn,KOut,VOut>
,ProcessorSupplier<K,V>
,TransformerSupplier<K,V,R>
,ValueTransformerSupplier<V,VR>
,ValueTransformerWithKeySupplier<K,V,VR>
public interface ConnectedStoreProvider
Provides a set ofStoreBuilder
s that will be automatically added to the topology and connected to the associated processor.Implementing this interface is recommended when the associated processor wants to encapsulate its usage of its state stores, rather than exposing them to the user building the topology.
In the event that separate but related processors may want to share the same store, different
ConnectedStoreProvider
s may provide the same instance ofStoreBuilder
, as shown below.class StateSharingProcessors { StoreBuilder<KeyValueStore<String, String>> storeBuilder = Stores.keyValueStoreBuilder(Stores.persistentKeyValueStore("myStore"), Serdes.String(), Serdes.String()); class SupplierA implements ProcessorSupplier<String, Integer> { Processor<String, Integer> get() { return new Processor() { private StateStore store; void init(ProcessorContext context) { this.store = context.getStateStore("myStore"); } void process(String key, Integer value) { // can access this.store } void close() { // can access this.store } } } Set<StoreBuilder<?>> stores() { return Collections.singleton(storeBuilder); } } class SupplierB implements ProcessorSupplier<String, String> { Processor<String, String> get() { return new Processor() { private StateStore store; void init(ProcessorContext context) { this.store = context.getStateStore("myStore"); } void process(String key, String value) { // can access this.store } void close() { // can access this.store } } } Set<StoreBuilder<?>> stores() { return Collections.singleton(storeBuilder); } } }
- See Also:
Topology.addProcessor(String, org.apache.kafka.streams.processor.api.ProcessorSupplier, String...)
,KStream.process(ProcessorSupplier, String...)
,KStream.process(ProcessorSupplier, Named, String...)
,KStream.transform(TransformerSupplier, String...)
,KStream.transform(TransformerSupplier, Named, String...)
,KStream.transformValues(ValueTransformerSupplier, String...)
,KStream.transformValues(ValueTransformerSupplier, Named, String...)
,KStream.transformValues(ValueTransformerWithKeySupplier, String...)
,KStream.transformValues(ValueTransformerWithKeySupplier, Named, String...)
,KStream.flatTransform(TransformerSupplier, String...)
,KStream.flatTransform(TransformerSupplier, Named, String...)
,KStream.flatTransformValues(ValueTransformerSupplier, String...)
,KStream.flatTransformValues(ValueTransformerSupplier, Named, String...)
,KStream.flatTransformValues(ValueTransformerWithKeySupplier, String...)
,KStream.flatTransformValues(ValueTransformerWithKeySupplier, Named, String...)
-
-
Method Summary
All Methods Instance Methods Default Methods Modifier and Type Method Description default Set<StoreBuilder<?>>
stores()
-
-
-
Method Detail
-
stores
default Set<StoreBuilder<?>> stores()
- Returns:
- the state stores to be connected and added, or null if no stores should be automatically connected and added.
-
-