Interface VersionedKeyValueStore<K,V>

Type Parameters:
K - The key type
V - The value type
All Superinterfaces:
StateStore

public interface VersionedKeyValueStore<K,V> extends StateStore
A key-value store that stores multiple record versions per key, and supports timestamp-based retrieval operations to return the latest record (per key) as of a specified timestamp. Only one record is stored per key and timestamp, i.e., a second call to put(Object, Object, long) with the same key and timestamp will replace the first.

Each store instance has an associated, fixed-duration "history retention" which specifies how long old record versions should be kept for. In particular, a versioned store guarantees to return accurate results for calls to get(Object, long) where the provided timestamp bound is within history retention of the current observed stream time. (Queries with timestamp bound older than the specified history retention are considered invalid.)

The store's "history retention" also doubles as its "grace period," which determines how far back in time writes to the store will be accepted. A versioned store will not accept writes (inserts, updates, or deletions) if the timestamp associated with the write is older than the current observed stream time by more than the grace period.

  • Field Summary

    Fields
    Modifier and Type
    Field
    Description
    static final long
     
    static final long
     
  • Method Summary

    Modifier and Type
    Method
    Description
    delete(K key, long timestamp)
    Delete the value associated with this key from the store, at the specified timestamp (if there is such a value), and return the deleted value.
    get(K key)
    Get the current (i.e., latest by timestamp) record associated with this key.
    get(K key, long asOfTimestamp)
    Get the record associated with this key as of the specified timestamp (i.e., the existing record with the largest timestamp not exceeding the provided timestamp bound).
    long
    put(K key, V value, long timestamp)
    Add a new record version associated with the specified key and timestamp.

    Methods inherited from interface org.apache.kafka.streams.processor.StateStore

    close, flush, getPosition, init, init, isOpen, name, persistent, query
  • Field Details

    • PUT_RETURN_CODE_VALID_TO_UNDEFINED

      static final long PUT_RETURN_CODE_VALID_TO_UNDEFINED
      See Also:
    • PUT_RETURN_CODE_NOT_PUT

      static final long PUT_RETURN_CODE_NOT_PUT
      See Also:
  • Method Details

    • put

      long put(K key, V value, long timestamp)
      Add a new record version associated with the specified key and timestamp.

      If the timestamp associated with the new record version is older than the store's grace period (i.e., history retention) relative to the current observed stream time, then the record will not be added.

      Parameters:
      key - The key
      value - The value, it can be null. null is interpreted as a delete.
      timestamp - The timestamp for this record version
      Returns:
      The validTo timestamp of the newly put record. Two special values, -1 and Long.MIN_VALUE carry specific meanings. -1 indicates that the record that was put is the latest record version for its key, and therefore the validTo timestamp is undefined. Long.MIN_VALUE indicates that the record was not put, due to grace period having been exceeded.
      Throws:
      NullPointerException - If null is used for key.
      InvalidStateStoreException - if the store is not initialized
    • delete

      VersionedRecord<V> delete(K key, long timestamp)
      Delete the value associated with this key from the store, at the specified timestamp (if there is such a value), and return the deleted value.

      If the timestamp associated with this deletion is older than the store's grace period (i.e., history retention) relative to the current observed stream time, then the deletion will not be performed and null will be returned.

      As a consequence of the above, the way to delete a record version is not to first call #get(key) or #get(key, timestamp) and use the returned VersionedRecord.timestamp() in a call to this delete(key, timestamp) method, as the returned timestamp may be older than the store's grace period (i.e., history retention) and will therefore not take place. Instead, you should pass a business logic inferred timestamp that specifies when the delete actually happens. For example, it could be the timestamp of the currently processed input record or the current stream time.

      This operation is semantically equivalent to #get(key, timestamp) followed by #put(key, null, timestamp), with a caveat that if the deletion timestamp is older than the store's grace period (i.e., history retention) then the return value is always null, regardless of what #get(key, timestamp) would return.

      Parameters:
      key - The key
      timestamp - The timestamp for this delete
      Returns:
      The value and timestamp of the record associated with this key as of the deletion timestamp (inclusive), or null if no such record exists (including if the deletion timestamp is older than this store's history retention time, i.e., the store no longer contains data for the provided timestamp). Note that the record timestamp r.timestamp() of the returned VersionedRecord may be smaller than the provided deletion timestamp.
      Throws:
      NullPointerException - If null is used for key.
      InvalidStateStoreException - if the store is not initialized
    • get

      VersionedRecord<V> get(K key)
      Get the current (i.e., latest by timestamp) record associated with this key.
      Parameters:
      key - The key to fetch
      Returns:
      The value and timestamp of the current record associated with this key, or null if there is no current record for this key.
      Throws:
      NullPointerException - If null is used for key.
      InvalidStateStoreException - if the store is not initialized
    • get

      VersionedRecord<V> get(K key, long asOfTimestamp)
      Get the record associated with this key as of the specified timestamp (i.e., the existing record with the largest timestamp not exceeding the provided timestamp bound).
      Parameters:
      key - The key to fetch
      asOfTimestamp - The timestamp bound. This bound is inclusive; if a record (for the specified key) exists with this timestamp, then this is the record that will be returned.
      Returns:
      The value and timestamp of the record associated with this key as of the provided timestamp, or null if no such record exists (including if the provided timestamp bound is older than this store's history retention time, i.e., the store no longer contains data for the provided timestamp). Note that the record timestamp r.timestamp() of the returned VersionedRecord may be smaller than the provided timestamp bound. Additionally, if the latest record version for the key is eligible for the provided timestamp bound, then that record will be returned even if the timestamp bound is older than the store's history retention.
      Throws:
      NullPointerException - If null is used for key.
      InvalidStateStoreException - if the store is not initialized