Interface StateStore
- All Known Subinterfaces:
KeyValueStore<K,,V> SessionStore<K,,AGG> SessionStoreWithHeaders<K,,AGG> TimestampedKeyValueStore<K,,V> TimestampedKeyValueStoreWithHeaders<K,,V> TimestampedWindowStore<K,,V> TimestampedWindowStoreWithHeaders<K,,V> VersionedBytesStore,VersionedKeyValueStore<K,,V> WindowStore<K,V>
If the store is implemented as a persistent store, it must use the store name as directory name and write
all data into this store directory.
The store directory must be created with the state directory.
The state directory can be obtained via #stateDir() using the
ProcessorContext provided via init(...).
Using nested store directories within the state directory isolates different state stores. If a state store would write into the state directory directly, it might conflict with others state stores and thus, data might get corrupted and/or Streams might fail with an error. Furthermore, Kafka Streams relies on using the store name as store directory name to perform internal cleanup tasks.
This interface does not specify any query capabilities, which, of course, would be query engine specific. Instead, it just specifies the minimum functionality required to reload a storage engine from its changelog as well as basic lifecycle management.
-
Method Summary
Modifier and TypeMethodDescriptionvoidclose()Close the storage engine.default voidcommit(Map<TopicPartition, Long> changelogOffsets) Commit all written records to this StateStore.default LongcommittedOffset(TopicPartition partition) Returns the most recentlycommittedoffset for the givenTopicPartition.default voidflush()Deprecated.default PositionReturns the position the state store is at with respect to the input topic/partitionsvoidinit(StateStoreContext stateStoreContext, StateStore root) Initializes this state store.booleanisOpen()Is this store open for reading and writingdefault booleanDeprecated.New implementations should always returntrueand manage their own offsets.name()The name of this store.booleanReturn if the storage is persistent or not.default <R> QueryResult<R>query(Query<R> query, PositionBound positionBound, QueryConfig config) Execute a query.
-
Method Details
-
name
String name()The name of this store.- Returns:
- the storage name
-
init
Initializes this state store.The implementation of this function must register the root store in the stateStoreContext via the
StateStoreContext.register(StateStore, StateRestoreCallback, CommitCallback)function, where the firstStateStoreparameter should always be the passed-inrootobject, and the second parameter should be an object of user's implementation of theStateRestoreCallbackinterface used for restoring the state store from the changelog.Note that if the state store engine itself supports bulk writes, users can implement another interface
BatchingStateRestoreCallbackwhich extendsStateRestoreCallbackto let users implement bulk-load restoration logic instead of restoring one record at a time.- Throws:
IllegalStateException- If store gets registered after initialized is already finishedStreamsException- if the store's change log does not contain the partition
-
flush
Deprecated.UseProcessorContext#commit()instead.Flush any cached data -
commit
Commit all written records to this StateStore.This method CANNOT be called by users from
processors. Doing so will throw anUnsupportedOperationException.Instead, users should call
ProcessorContext#commit()to request a Task commit.If
managesOffsets()returnstrue, the givenchangelogOffsetswill be guaranteed to be persisted to disk along with the written records.changelogOffsetswill usually contain a single partition, in the case of a regular StateStore. However, they may contain multiple partitions in the case of a Global StateStore with multiple partitions. All provided partitions MUST be persisted to disk.Implementations SHOULD ensure that
changelogOffsetsare committed to disk atomically with the records they represent, if possible.Empty map: If
changelogOffsetsis empty, implementations that manage offsets MUST remove all previously committed offsets. After an empty commit,committedOffset(TopicPartition)should returnnullfor all partitions. This is used during corruption recovery to clear stale offsets so that restoration can restart from the beginning.Null values: If a value in
changelogOffsetsisnull, implementations that manage offsets MUST remove the committed offset for that partition. After such a commit,committedOffset(TopicPartition)should returnnullfor the affected partition.- Parameters:
changelogOffsets- The changelog offset(s) corresponding to the most recently written records. An empty map signals that all committed offsets should be cleared. Anullvalue for a partition signals that its committed offset should be removed.
-
committedOffset
Returns the most recentlycommittedoffset for the givenTopicPartition.If
managesOffsets()andpersistent()both returntrue, this method will return the offset that corresponds to the changelog record most recently written to this store, for the givenpartition.- Parameters:
partition- The partition to get the committed offset for.- Returns:
- The last
committedoffset for thepartition; ornullif no offset has been committed for the partition, or if eitherpersistent()ormanagesOffsets()returnfalse.
-
close
void close()Close the storage engine. Note that this function needs to be idempotent since it may be called several times on the same state store.Users only need to implement this function but should NEVER need to call this api explicitly as it will be called by the library automatically when necessary
-
persistent
boolean persistent()Return if the storage is persistent or not.- Returns:
trueif the storage is persistent—falseotherwise
-
managesOffsets
Deprecated.New implementations should always returntrueand manage their own offsets. In the future, this method will be removed and it will be assumed to always returntrue.Determines if this StateStore manages its own offsets.If this method returns
true, then offsets provided tocommit(Map)will be retrievable usingcommittedOffset(TopicPartition).If this method returns
false, offsets provided tocommit(Map)will be ignored, andcommittedOffset(TopicPartition)will be expected to always returnnull.This method is provided to enable custom StateStores to opt-in to managing their own offsets. This is required, to ensure that custom StateStores provide the consistency guarantees that Kafka Streams expects when operating under an
exactly-onceprocessing.guarantee.New implementations are required to implement this method and return
true. Existing implementations should upgrade to managing their own offsets as soon as possible, as the legacy offset management is deprecated and will be removed in a future version.- Returns:
- Whether this StateStore manages its own offsets.
-
isOpen
boolean isOpen()Is this store open for reading and writing- Returns:
trueif the store is open
-
query
@Evolving default <R> QueryResult<R> query(Query<R> query, PositionBound positionBound, QueryConfig config) Execute a query. Returns a QueryResult containing either result data or a failure.If the store doesn't know how to handle the given query, the result shall be a
FailureReason.UNKNOWN_QUERY_TYPE. If the store couldn't satisfy the given position bound, the result shall be aFailureReason.NOT_UP_TO_BOUND.Note to store implementers: if your store does not support position tracking, you can correctly respond
FailureReason.NOT_UP_TO_BOUNDif the argument is anything butPositionBound.unbounded(). Be sure to explain in the failure message that bounded positions are not supported.- Type Parameters:
R- The result type- Parameters:
query- The query to executepositionBound- The position the store must be at or pastconfig- Per query configuration parameters, such as whether the store should collect detailed execution info for the query
-
getPosition
Returns the position the state store is at with respect to the input topic/partitions
-
ProcessorContext#commit()instead.