Interface StateStore
- All Known Subinterfaces:
KeyValueStore<K,V>
,SessionStore<K,AGG>
,TimestampedKeyValueStore<K,V>
,TimestampedWindowStore<K,V>
,WindowStore<K,V>
public interface StateStore
If the store is implemented as a persistent store, it must use the store name as directory name and write
all data into this store directory.
The store directory must be created with the state directory.
The state directory can be obtained via #stateDir()
using the
ProcessorContext
provided via init(...)
.
Using nested store directories within the state directory isolates different state stores. If a state store would write into the state directory directly, it might conflict with others state stores and thus, data might get corrupted and/or Streams might fail with an error. Furthermore, Kafka Streams relies on using the store name as store directory name to perform internal cleanup tasks.
This interface does not specify any query capabilities, which, of course, would be query engine specific. Instead it just specifies the minimum functionality required to reload a storage engine from its changelog as well as basic lifecycle management.
-
Method Summary
Modifier and Type Method Description void
close()
Close the storage engine.void
flush()
Flush any cached datavoid
init(ProcessorContext context, StateStore root)
Deprecated.Since 2.7.0.default void
init(StateStoreContext context, StateStore root)
Initializes this state store.boolean
isOpen()
Is this store open for reading and writingString
name()
The name of this store.boolean
persistent()
Return if the storage is persistent or not.
-
Method Details
-
name
String name()The name of this store.- Returns:
- the storage name
-
init
Deprecated.Since 2.7.0. Callers should invokeinit(StateStoreContext, StateStore)
instead. Implementers may choose to implement this method for backward compatibility or to throw an informative exception instead.Initializes this state store.The implementation of this function must register the root store in the context via the
ProcessorContext.register(StateStore, StateRestoreCallback)
function, where the firstStateStore
parameter should always be the passed-inroot
object, and the second parameter should be an object of user's implementation of theStateRestoreCallback
interface used for restoring the state store from the changelog.Note that if the state store engine itself supports bulk writes, users can implement another interface
BatchingStateRestoreCallback
which extendsStateRestoreCallback
to let users implement bulk-load restoration logic instead of restoring one record at a time.This method is not called if
init(StateStoreContext, StateStore)
is implemented.- Throws:
IllegalStateException
- If store gets registered after initialized is already finishedStreamsException
- if the store's change log does not contain the partition
-
init
Initializes this state store.The implementation of this function must register the root store in the context via the
StateStoreContext.register(StateStore, StateRestoreCallback)
function, where the firstStateStore
parameter should always be the passed-inroot
object, and the second parameter should be an object of user's implementation of theStateRestoreCallback
interface used for restoring the state store from the changelog.Note that if the state store engine itself supports bulk writes, users can implement another interface
BatchingStateRestoreCallback
which extendsStateRestoreCallback
to let users implement bulk-load restoration logic instead of restoring one record at a time.- Throws:
IllegalStateException
- If store gets registered after initialized is already finishedStreamsException
- if the store's change log does not contain the partition
-
flush
void flush()Flush any cached data -
close
void close()Close the storage engine. Note that this function needs to be idempotent since it may be called several times on the same state store.Users only need to implement this function but should NEVER need to call this api explicitly as it will be called by the library automatically when necessary
-
persistent
boolean persistent()Return if the storage is persistent or not.- Returns:
true
if the storage is persistent—false
otherwise
-
isOpen
boolean isOpen()Is this store open for reading and writing- Returns:
true
if the store is open
-