@InterfaceStability.Unstable public final class JoinWindows extends Windows<Window>
A JoinWindows
instance defines a maximum time difference for a join over two streams
on the same key.
In SQL-style you would express this join as
SELECT * FROM stream1, stream2
WHERE
stream1.key = stream2.key
AND
stream1.ts - before <= stream2.ts AND stream2.ts <= stream1.ts + after
There are three different window configuration supported:
Both values (before and after) must not result in an "inverse" window, i.e., upper-interval bound cannot be smaller than lower-interval bound.
JoinWindows
are sliding windows, thus, they are aligned to the actual record timestamps.
This implies, that each input record defines its own window with start and end time being relative to the record's
timestamp.
For time semantics, see TimestampExtractor
.
TimeWindows
,
UnlimitedWindows
,
SessionWindows
,
KStream.join(KStream, ValueJoiner, JoinWindows)
,
KStream.join(KStream, ValueJoiner, JoinWindows, org.apache.kafka.common.serialization.Serde, org.apache.kafka.common.serialization.Serde, org.apache.kafka.common.serialization.Serde)
,
KStream.leftJoin(KStream, ValueJoiner, JoinWindows)
,
KStream.leftJoin(KStream, ValueJoiner, JoinWindows, org.apache.kafka.common.serialization.Serde, org.apache.kafka.common.serialization.Serde, org.apache.kafka.common.serialization.Serde)
,
KStream.outerJoin(KStream, ValueJoiner, JoinWindows)
,
KStream.outerJoin(KStream, ValueJoiner, JoinWindows)
,
TimestampExtractor
Modifier and Type | Field and Description |
---|---|
long |
afterMs
Maximum time difference for tuples that are after the join tuple.
|
long |
beforeMs
Maximum time difference for tuples that are before the join tuple.
|
Modifier and Type | Method and Description |
---|---|
JoinWindows |
after(long timeDifferenceMs)
Changes the end window boundary to
timeDifferenceMs but keep the start window boundary as is. |
JoinWindows |
before(long timeDifferenceMs)
Changes the start window boundary to
timeDifferenceMs but keep the end window boundary as is. |
boolean |
equals(Object o) |
int |
hashCode() |
long |
maintainMs()
Return the window maintain duration (retention time) in milliseconds.
|
static JoinWindows |
of(long timeDifferenceMs)
Specifies that records of the same key are joinable if their timestamps are within
timeDifferenceMs ,
i.e., the timestamp of a record from the secondary stream is max timeDifferenceMs earlier or later than
the timestamp of the record from the primary stream. |
long |
size()
Return the size of the specified windows in milliseconds.
|
JoinWindows |
until(long durationMs)
Set the window maintain duration (retention time) in milliseconds.
|
Map<Long,Window> |
windowsFor(long timestamp)
Not supported by
JoinWindows . |
public final long beforeMs
public final long afterMs
public static JoinWindows of(long timeDifferenceMs) throws IllegalArgumentException
timeDifferenceMs
,
i.e., the timestamp of a record from the secondary stream is max timeDifferenceMs
earlier or later than
the timestamp of the record from the primary stream.timeDifferenceMs
- join window interval in millisecondsIllegalArgumentException
- if timeDifferenceMs
is negativepublic JoinWindows before(long timeDifferenceMs) throws IllegalArgumentException
timeDifferenceMs
but keep the end window boundary as is.
Thus, records of the same key are joinable if the timestamp of a record from the secondary stream is at most
timeDifferenceMs
earlier than the timestamp of the record from the primary stream.
timeDifferenceMs
can be negative but it's absolute value must not be larger than current window "after"
value (which would result in a negative window size).timeDifferenceMs
- relative window start time in millisecondsIllegalArgumentException
- if the resulting window size is negativepublic JoinWindows after(long timeDifferenceMs) throws IllegalArgumentException
timeDifferenceMs
but keep the start window boundary as is.
Thus, records of the same key are joinable if the timestamp of a record from the secondary stream is at most
timeDifferenceMs
later than the timestamp of the record from the primary stream.
timeDifferenceMs
can be negative but it's absolute value must not be larger than current window "before"
value (which would result in a negative window size).timeDifferenceMs
- relative window end time in millisecondsIllegalArgumentException
- if the resulting window size is negativepublic Map<Long,Window> windowsFor(long timestamp)
JoinWindows
.
Throws UnsupportedOperationException
.windowsFor
in class Windows<Window>
timestamp
- the timestamp window should get created forwindowStartTimestamp -> Window
entriesUnsupportedOperationException
- at every invocationpublic long size()
Windows
public JoinWindows until(long durationMs) throws IllegalArgumentException
Windows
until
in class Windows<Window>
durationMs
- the window retention time in millisecondsIllegalArgumentException
- if durationMs
is smaller than the window sizepublic long maintainMs()
For TimeWindows
the maintain duration is at least as small as the window size.
maintainMs
in class Windows<Window>