Package org.apache.kafka.streams.kstream
Class StreamJoined<K,V1,V2>
java.lang.Object
org.apache.kafka.streams.kstream.StreamJoined<K,V1,V2>
- Type Parameters:
K
- the key typeV1
- this value typeV2
- other value type
public class StreamJoined<K,V1,V2> extends Object
Class used to configure the name of the join processor, the repartition topic name,
state stores or state store names in Stream-Stream join.
-
Field Summary
Fields Modifier and Type Field Description protected Serde<K>
keySerde
protected boolean
loggingEnabled
protected String
name
protected WindowBytesStoreSupplier
otherStoreSupplier
protected Serde<V2>
otherValueSerde
protected String
storeName
protected WindowBytesStoreSupplier
thisStoreSupplier
protected Map<String,String>
topicConfig
protected Serde<V1>
valueSerde
-
Constructor Summary
Constructors Modifier Constructor Description protected
StreamJoined(StreamJoined<K,V1,V2> streamJoined)
-
Method Summary
Modifier and Type Method Description static <K, V1, V2> StreamJoined<K,V1,V2>
as(String storeName)
Creates aStreamJoined
instance using the provided name for the state stores and hence the changelog topics for the join stores.String
toString()
static <K, V1, V2> StreamJoined<K,V1,V2>
with(Serde<K> keySerde, Serde<V1> valueSerde, Serde<V2> otherValueSerde)
Creates aStreamJoined
instance with the provided serdes to configure the stores for the join.static <K, V1, V2> StreamJoined<K,V1,V2>
with(WindowBytesStoreSupplier storeSupplier, WindowBytesStoreSupplier otherStoreSupplier)
Creates a StreamJoined instance with the provided store suppliers.StreamJoined<K,V1,V2>
withKeySerde(Serde<K> keySerde)
Configure with the providedSerde
for the keyStreamJoined<K,V1,V2>
withLoggingDisabled()
Disable change logging for both state stores.StreamJoined<K,V1,V2>
withLoggingEnabled(Map<String,String> config)
Configures logging for both state stores.StreamJoined<K,V1,V2>
withName(String name)
Set the name to use for the join processor and the repartition topic(s) if required.StreamJoined<K,V1,V2>
withOtherStoreSupplier(WindowBytesStoreSupplier otherStoreSupplier)
Configure with the providedWindowBytesStoreSupplier
for the other store supplier.StreamJoined<K,V1,V2>
withOtherValueSerde(Serde<V2> otherValueSerde)
Configure with the providedSerde
for the other valueStreamJoined<K,V1,V2>
withStoreName(String storeName)
Sets the base store name to use for both sides of the join.StreamJoined<K,V1,V2>
withThisStoreSupplier(WindowBytesStoreSupplier thisStoreSupplier)
Configure with the providedWindowBytesStoreSupplier
for this store supplier.StreamJoined<K,V1,V2>
withValueSerde(Serde<V1> valueSerde)
Configure with the providedSerde
for this value
-
Field Details
-
keySerde
-
valueSerde
-
otherValueSerde
-
thisStoreSupplier
-
otherStoreSupplier
-
name
-
storeName
-
loggingEnabled
protected final boolean loggingEnabled -
topicConfig
-
-
Constructor Details
-
StreamJoined
-
-
Method Details
-
with
public static <K, V1, V2> StreamJoined<K,V1,V2> with(WindowBytesStoreSupplier storeSupplier, WindowBytesStoreSupplier otherStoreSupplier)Creates a StreamJoined instance with the provided store suppliers. The store suppliers must implement theWindowBytesStoreSupplier
interface. The store suppliers must provide unique names or aStreamsException
is thrown.- Type Parameters:
K
- the key typeV1
- this value typeV2
- other value type- Parameters:
storeSupplier
- this store supplierotherStoreSupplier
- other store supplier- Returns:
StreamJoined
instance
-
as
Creates aStreamJoined
instance using the provided name for the state stores and hence the changelog topics for the join stores. The name for the stores will be ${applicationId}-<storeName>-this-join and ${applicationId}-<storeName>-other-join or ${applicationId}-<storeName>-outer-this-join and ${applicationId}-<storeName>-outer-other-join depending if the join is an inner-join or an outer join. The changelog topics will have the -changelog suffix. The user should note that even though the join stores will have a specified name, the stores will remain unavailable for querying. Please note that if you are usingStreamJoined
to replace deprecatedKStream.join(org.apache.kafka.streams.kstream.KStream<K, VO>, org.apache.kafka.streams.kstream.ValueJoiner<? super V, ? super VO, ? extends VR>, org.apache.kafka.streams.kstream.JoinWindows)
functions withJoined
parameters in order to set the name for the join processors, you would need to create theStreamJoined
object first and then callwithName(java.lang.String)
- Type Parameters:
K
- The key typeV1
- This value typeV2
- Other value type- Parameters:
storeName
- The name to use for the store- Returns:
StreamJoined
instance
-
with
public static <K, V1, V2> StreamJoined<K,V1,V2> with(Serde<K> keySerde, Serde<V1> valueSerde, Serde<V2> otherValueSerde)Creates aStreamJoined
instance with the provided serdes to configure the stores for the join.- Type Parameters:
K
- The key typeV1
- This value typeV2
- Other value type- Parameters:
keySerde
- The key serdevalueSerde
- This value serdeotherValueSerde
- Other value serde- Returns:
StreamJoined
instance
-
withName
Set the name to use for the join processor and the repartition topic(s) if required.- Parameters:
name
- the name to use- Returns:
- a new
StreamJoined
instance
-
withStoreName
Sets the base store name to use for both sides of the join. The name for the state stores and hence the changelog topics for the join stores. The name for the stores will be ${applicationId}-<storeName>-this-join and ${applicationId}-<storeName>-other-join or ${applicationId}-<storeName>-outer-this-join and ${applicationId}-<storeName>-outer-other-join depending if the join is an inner-join or an outer join. The changelog topics will have the -changelog suffix. The user should note that even though the join stores will have a specified name, the stores will remain unavailable for querying.- Parameters:
storeName
- the storeName to use- Returns:
- a new
StreamJoined
instance
-
withKeySerde
Configure with the providedSerde
for the key- Parameters:
keySerde
- the serde to use for the key- Returns:
- a new
StreamJoined
configured with the keySerde
-
withValueSerde
Configure with the providedSerde
for this value- Parameters:
valueSerde
- the serde to use for this value (calling or left side of the join)- Returns:
- a new
StreamJoined
configured with the valueSerde
-
withOtherValueSerde
Configure with the providedSerde
for the other value- Parameters:
otherValueSerde
- the serde to use for the other value (other or right side of the join)- Returns:
- a new
StreamJoined
configured with the otherValueSerde
-
withThisStoreSupplier
Configure with the providedWindowBytesStoreSupplier
for this store supplier. Please note this method only provides the store supplier for the left side of the join. If you wish to also provide a store supplier for the right (i.e., other) side you must use thewithOtherStoreSupplier(WindowBytesStoreSupplier)
method- Parameters:
thisStoreSupplier
- the store supplier to use for this store supplier (calling or left side of the join)- Returns:
- a new
StreamJoined
configured with thisStoreSupplier
-
withOtherStoreSupplier
Configure with the providedWindowBytesStoreSupplier
for the other store supplier. Please note this method only provides the store supplier for the right side of the join. If you wish to also provide a store supplier for the left side you must use thewithThisStoreSupplier(WindowBytesStoreSupplier)
method- Parameters:
otherStoreSupplier
- the store supplier to use for the other store supplier (other or right side of the join)- Returns:
- a new
StreamJoined
configured with otherStoreSupplier
-
withLoggingEnabled
Configures logging for both state stores. The changelog will be created with the provided configs.Note: Any unrecognized configs will be ignored
- Parameters:
config
- configs applied to the changelog topic- Returns:
- a new
StreamJoined
configured with logging enabled
-
withLoggingDisabled
Disable change logging for both state stores.- Returns:
- a new
StreamJoined
configured with logging disabled
-
toString
-