Interface ConnectedStoreProvider

All Known Subinterfaces:
ProcessorSupplier<KIn,​VIn,​KOut,​VOut>, ProcessorSupplier<K,​V>, TransformerSupplier<K,​V,​R>, ValueTransformerSupplier<V,​VR>, ValueTransformerWithKeySupplier<K,​V,​VR>

public interface ConnectedStoreProvider
Provides a set of StoreBuilders that will be automatically added to the topology and connected to the associated processor.

Implementing this interface is recommended when the associated processor wants to encapsulate its usage of its state stores, rather than exposing them to the user building the topology.

In the event that separate but related processors may want to share the same store, different ConnectedStoreProviders may provide the same instance of StoreBuilder, as shown below.


 class StateSharingProcessors {
     StoreBuilder<KeyValueStore<String, String>> storeBuilder =
         Stores.keyValueStoreBuilder(Stores.persistentKeyValueStore("myStore"), Serdes.String(), Serdes.String());

     class SupplierA implements ProcessorSupplier<String, Integer> {
         Processor<String, Integer> get() {
             return new Processor() {
                 private StateStore store;

                 void init(ProcessorContext context) {
                     this.store = context.getStateStore("myStore");
                 }

                 void process(String key, Integer value) {
                     // can access this.store
                 }

                 void close() {
                     // can access this.store
                 }
             }
         }

         Set<StoreBuilder<?>> stores() {
             return Collections.singleton(storeBuilder);
         }
     }

     class SupplierB implements ProcessorSupplier<String, String> {
         Processor<String, String> get() {
             return new Processor() {
                 private StateStore store;

                 void init(ProcessorContext context) {
                     this.store = context.getStateStore("myStore");
                 }

                 void process(String key, String value) {
                     // can access this.store
                 }

                 void close() {
                     // can access this.store
                 }
             }
         }

         Set<StoreBuilder<?>> stores() {
             return Collections.singleton(storeBuilder);
         }
     }
 }
 
See Also:
Topology.addProcessor(String, org.apache.kafka.streams.processor.api.ProcessorSupplier, String...), KStream.process(org.apache.kafka.streams.processor.api.ProcessorSupplier, String...), KStream.process(org.apache.kafka.streams.processor.api.ProcessorSupplier, Named, String...), KStream.transform(TransformerSupplier, String...), KStream.transform(TransformerSupplier, Named, String...), KStream.transformValues(ValueTransformerSupplier, String...), KStream.transformValues(ValueTransformerSupplier, Named, String...), KStream.transformValues(ValueTransformerWithKeySupplier, String...), KStream.transformValues(ValueTransformerWithKeySupplier, Named, String...), KStream.flatTransform(TransformerSupplier, String...), KStream.flatTransform(TransformerSupplier, Named, String...), KStream.flatTransformValues(ValueTransformerSupplier, String...), KStream.flatTransformValues(ValueTransformerSupplier, Named, String...), KStream.flatTransformValues(ValueTransformerWithKeySupplier, String...), KStream.flatTransformValues(ValueTransformerWithKeySupplier, Named, String...)
  • Method Summary

    Modifier and Type Method Description
    default Set<StoreBuilder<?>> stores()  
  • Method Details

    • stores

      default Set<StoreBuilder<?>> stores()
      Returns:
      the state stores to be connected and added, or null if no stores should be automatically connected and added.