Class Stores


  • public final class Stores
    extends Object
    Factory for creating state stores in Kafka Streams.

    When using the high-level DSL, i.e., StreamsBuilder, users create StoreSuppliers that can be further customized via Materialized. For example, a topic read as KTable can be materialized into an in-memory store with custom key/value serdes and caching disabled:

    
     StreamsBuilder builder = new StreamsBuilder();
     KeyValueBytesStoreSupplier storeSupplier = Stores.inMemoryKeyValueStore("queryable-store-name");
     KTable<Long,String> table = builder.table(
       "topicName",
       Materialized.<Long,String>as(storeSupplier)
                   .withKeySerde(Serdes.Long())
                   .withValueSerde(Serdes.String())
                   .withCachingDisabled());
     
    When using the Processor API, i.e., Topology, users create StoreBuilders that can be attached to Processors. For example, you can create a windowed RocksDB store with custom changelog topic configuration like:
    
     Topology topology = new Topology();
     topology.addProcessor("processorName", ...);
    
     Map<String,String> topicConfig = new HashMap<>();
     StoreBuilder<WindowStore<Integer, Long>> storeBuilder = Stores
       .windowStoreBuilder(
         Stores.persistentWindowStore("queryable-store-name", ...),
         Serdes.Integer(),
         Serdes.Long())
       .withLoggingEnabled(topicConfig);
    
     topology.addStateStore(storeBuilder, "processorName");