Class StreamJoined<K,V1,V2>

java.lang.Object
org.apache.kafka.streams.kstream.StreamJoined<K,V1,V2>
Type Parameters:
K - the key type
V1 - this value type
V2 - other value type

public class StreamJoined<K,V1,V2> extends Object
Class used to configure the name of the join processor, the repartition topic name, state stores or state store names in Stream-Stream join.
  • Field Details

    • keySerde

      protected final Serde<K> keySerde
    • valueSerde

      protected final Serde<V1> valueSerde
    • otherValueSerde

      protected final Serde<V2> otherValueSerde
    • thisStoreSupplier

      protected final WindowBytesStoreSupplier thisStoreSupplier
    • otherStoreSupplier

      protected final WindowBytesStoreSupplier otherStoreSupplier
    • name

      protected final String name
    • storeName

      protected final String storeName
    • loggingEnabled

      protected final boolean loggingEnabled
    • topicConfig

      protected final Map<String,String> topicConfig
    • dslStoreSuppliers

      protected DslStoreSuppliers dslStoreSuppliers
  • Constructor Details

  • Method Details

    • with

      public static <K, V1, V2> StreamJoined<K,V1,V2> with(WindowBytesStoreSupplier storeSupplier, WindowBytesStoreSupplier otherStoreSupplier)
      Creates a StreamJoined instance with the provided store suppliers. The store suppliers must implement the WindowBytesStoreSupplier interface. The store suppliers must provide unique names or a StreamsException is thrown.
      Type Parameters:
      K - the key type
      V1 - this value type
      V2 - other value type
      Parameters:
      storeSupplier - this store supplier
      otherStoreSupplier - other store supplier
      Returns:
      StreamJoined instance
    • with

      public static <K, V1, V2> StreamJoined<K,V1,V2> with(DslStoreSuppliers storeSuppliers)
      Creates a StreamJoined instance with the given DslStoreSuppliers. The store plugin will be used to get all the state stores in this operation that do not otherwise have an explicitly configured DslStoreSuppliers.
      Type Parameters:
      K - the key type
      V1 - this value type
      V2 - other value type
      Parameters:
      storeSuppliers - the store plugin that will be used for state stores
      Returns:
      StreamJoined instance
    • as

      public static <K, V1, V2> StreamJoined<K,V1,V2> as(String storeName)
      Creates a StreamJoined instance using the provided name for the state stores and hence the changelog topics for the join stores. The name for the stores will be ${applicationId}-<storeName>-this-join and ${applicationId}-<storeName>-other-join or ${applicationId}-<storeName>-outer-this-join and ${applicationId}-<storeName>-outer-other-join depending if the join is an inner-join or an outer join. The changelog topics will have the -changelog suffix. The user should note that even though the join stores will have a specified name, the stores will remain unavailable for querying. Please note that if you are using StreamJoined to replace deprecated KStream.join(org.apache.kafka.streams.kstream.KStream<K, VO>, org.apache.kafka.streams.kstream.ValueJoiner<? super V, ? super VO, ? extends VR>, org.apache.kafka.streams.kstream.JoinWindows) functions with Joined parameters in order to set the name for the join processors, you would need to create the StreamJoined object first and then call withName(java.lang.String)
      Type Parameters:
      K - The key type
      V1 - This value type
      V2 - Other value type
      Parameters:
      storeName - The name to use for the store
      Returns:
      StreamJoined instance
    • with

      public static <K, V1, V2> StreamJoined<K,V1,V2> with(Serde<K> keySerde, Serde<V1> valueSerde, Serde<V2> otherValueSerde)
      Creates a StreamJoined instance with the provided serdes to configure the stores for the join.
      Type Parameters:
      K - The key type
      V1 - This value type
      V2 - Other value type
      Parameters:
      keySerde - The key serde
      valueSerde - This value serde
      otherValueSerde - Other value serde
      Returns:
      StreamJoined instance
    • withName

      public StreamJoined<K,V1,V2> withName(String name)
      Set the name to use for the join processor and the repartition topic(s) if required.
      Parameters:
      name - the name to use
      Returns:
      a new StreamJoined instance
    • withStoreName

      public StreamJoined<K,V1,V2> withStoreName(String storeName)
      Sets the base store name to use for both sides of the join. The name for the state stores and hence the changelog topics for the join stores. The name for the stores will be ${applicationId}-<storeName>-this-join and ${applicationId}-<storeName>-other-join or ${applicationId}-<storeName>-outer-this-join and ${applicationId}-<storeName>-outer-other-join depending if the join is an inner-join or an outer join. The changelog topics will have the -changelog suffix. The user should note that even though the join stores will have a specified name, the stores will remain unavailable for querying.
      Parameters:
      storeName - the storeName to use
      Returns:
      a new StreamJoined instance
    • withKeySerde

      public StreamJoined<K,V1,V2> withKeySerde(Serde<K> keySerde)
      Configure with the provided Serde for the key
      Parameters:
      keySerde - the serde to use for the key
      Returns:
      a new StreamJoined configured with the keySerde
    • withValueSerde

      public StreamJoined<K,V1,V2> withValueSerde(Serde<V1> valueSerde)
      Configure with the provided Serde for this value
      Parameters:
      valueSerde - the serde to use for this value (calling or left side of the join)
      Returns:
      a new StreamJoined configured with the valueSerde
    • withOtherValueSerde

      public StreamJoined<K,V1,V2> withOtherValueSerde(Serde<V2> otherValueSerde)
      Configure with the provided Serde for the other value
      Parameters:
      otherValueSerde - the serde to use for the other value (other or right side of the join)
      Returns:
      a new StreamJoined configured with the otherValueSerde
    • withDslStoreSuppliers

      public StreamJoined<K,V1,V2> withDslStoreSuppliers(DslStoreSuppliers dslStoreSuppliers)
      Configure with the provided DslStoreSuppliers for store suppliers that are not provided.
      Parameters:
      dslStoreSuppliers - the default store suppliers to use for this StreamJoined
      Returns:
      a new StreamJoined configured with dslStoreSuppliers
    • withThisStoreSupplier

      public StreamJoined<K,V1,V2> withThisStoreSupplier(WindowBytesStoreSupplier thisStoreSupplier)
      Configure with the provided WindowBytesStoreSupplier for this store supplier. Please note this method only provides the store supplier for the left side of the join. If you wish to also provide a store supplier for the right (i.e., other) side you must use the withOtherStoreSupplier(WindowBytesStoreSupplier) method
      Parameters:
      thisStoreSupplier - the store supplier to use for this store supplier (calling or left side of the join)
      Returns:
      a new StreamJoined configured with thisStoreSupplier
    • withOtherStoreSupplier

      public StreamJoined<K,V1,V2> withOtherStoreSupplier(WindowBytesStoreSupplier otherStoreSupplier)
      Configure with the provided WindowBytesStoreSupplier for the other store supplier. Please note this method only provides the store supplier for the right side of the join. If you wish to also provide a store supplier for the left side you must use the withThisStoreSupplier(WindowBytesStoreSupplier) method
      Parameters:
      otherStoreSupplier - the store supplier to use for the other store supplier (other or right side of the join)
      Returns:
      a new StreamJoined configured with otherStoreSupplier
    • withLoggingEnabled

      public StreamJoined<K,V1,V2> withLoggingEnabled(Map<String,String> config)
      Configures logging for both state stores. The changelog will be created with the provided configs.

      Note: Any unrecognized configs will be ignored

      Parameters:
      config - configs applied to the changelog topic
      Returns:
      a new StreamJoined configured with logging enabled
    • withLoggingDisabled

      public StreamJoined<K,V1,V2> withLoggingDisabled()
      Disable change logging for both state stores.
      Returns:
      a new StreamJoined configured with logging disabled
    • toString

      public String toString()
      Overrides:
      toString in class Object