Class JoinWindows
A JoinWindows
instance defines a maximum time difference for a join over two streams
on the same key.
In SQL-style you would express this join as
SELECT * FROM stream1, stream2
WHERE
stream1.key = stream2.key
AND
stream1.ts - before <= stream2.ts AND stream2.ts <= stream1.ts + after
There are three different window configuration supported:
- before = after = time-difference
- before = 0 and after = time-difference
- before = time-difference and after = 0
Both values (before and after) must not result in an "inverse" window, i.e., upper-interval bound cannot be smaller than lower-interval bound.
JoinWindows
are sliding windows, thus, they are aligned to the actual record timestamps.
This implies, that each input record defines its own window with start and end time being relative to the record's
timestamp.
For time semantics, see TimestampExtractor
.
- See Also:
-
TimeWindows
UnlimitedWindows
SessionWindows
KStream.join(KStream, ValueJoiner, JoinWindows)
KStream.join(KStream, ValueJoiner, JoinWindows, StreamJoined)
KStream.leftJoin(KStream, ValueJoiner, JoinWindows)
KStream.leftJoin(KStream, ValueJoiner, JoinWindows, StreamJoined)
KStream.outerJoin(KStream, ValueJoiner, JoinWindows)
KStream.outerJoin(KStream, ValueJoiner, JoinWindows, StreamJoined)
TimestampExtractor
-
Field Summary
-
Method Summary
Modifier and TypeMethodDescriptionChanges the end window boundary totimeDifference
but keep the start window boundary as is.Changes the start window boundary totimeDifference
but keep the end window boundary as is.boolean
Deprecated.since 3.0.long
Return the window grace period (the time to admit out-of-order events after the end of the window.) Delay is defined as (stream_time - record_timestamp).int
hashCode()
static JoinWindows
Deprecated.since 3.0.static JoinWindows
ofTimeDifferenceAndGrace
(Duration timeDifference, Duration afterWindowEnd) Specifies that records of the same key are joinable if their timestamps are withintimeDifference
, i.e., the timestamp of a record from the secondary stream is maxtimeDifference
before or after the timestamp of the record from the primary stream.static JoinWindows
ofTimeDifferenceWithNoGrace
(Duration timeDifference) Specifies that records of the same key are joinable if their timestamps are withintimeDifference
, i.e., the timestamp of a record from the secondary stream is maxtimeDifference
before or after the timestamp of the record from the primary stream.long
size()
Return the size of the specified windows in milliseconds.toString()
windowsFor
(long timestamp) Not supported byJoinWindows
.
-
Field Details
-
beforeMs
public final long beforeMsMaximum time difference for tuples that are before the join tuple. -
afterMs
public final long afterMsMaximum time difference for tuples that are after the join tuple.
-
-
Method Details
-
ofTimeDifferenceAndGrace
public static JoinWindows ofTimeDifferenceAndGrace(Duration timeDifference, Duration afterWindowEnd) Specifies that records of the same key are joinable if their timestamps are withintimeDifference
, i.e., the timestamp of a record from the secondary stream is maxtimeDifference
before or after the timestamp of the record from the primary stream.Using this method explicitly sets the grace period to the duration specified by
afterWindowEnd
, which means that only out-of-order records arriving more than the grace period after the window end will be dropped. The window close, after which any incoming records are considered late and will be rejected, is defined aswindowEnd + afterWindowEnd
- Parameters:
timeDifference
- join window intervalafterWindowEnd
- The grace period to admit out-of-order events to a window.- Returns:
- A new JoinWindows object with the specified window definition and grace period
- Throws:
IllegalArgumentException
- iftimeDifference
is negative or can't be represented aslong milliseconds
ifafterWindowEnd
is negative or can't be represented aslong milliseconds
-
ofTimeDifferenceWithNoGrace
Specifies that records of the same key are joinable if their timestamps are withintimeDifference
, i.e., the timestamp of a record from the secondary stream is maxtimeDifference
before or after the timestamp of the record from the primary stream.CAUTION: Using this method implicitly sets the grace period to zero, which means that any out-of-order records arriving after the window ends are considered late and will be dropped.
- Parameters:
timeDifference
- join window interval- Returns:
- a new JoinWindows object with the window definition and no grace period. Note that this means out-of-order records arriving after the window end will be dropped
- Throws:
IllegalArgumentException
- iftimeDifference
is negative or can't be represented aslong milliseconds
-
of
Deprecated.since 3.0. UseofTimeDifferenceWithNoGrace(Duration)
} insteadSpecifies that records of the same key are joinable if their timestamps are withintimeDifference
, i.e., the timestamp of a record from the secondary stream is maxtimeDifference
before or after the timestamp of the record from the primary stream.- Parameters:
timeDifference
- join window interval- Returns:
- a new JoinWindows object with the window definition with and grace period (default to 24 hours minus
timeDifference
) - Throws:
IllegalArgumentException
- iftimeDifference
is negative or can't be represented aslong milliseconds
-
before
Changes the start window boundary totimeDifference
but keep the end window boundary as is. Thus, records of the same key are joinable if the timestamp of a record from the secondary stream is at mosttimeDifference
earlier than the timestamp of the record from the primary stream.timeDifference
can be negative but its absolute value must not be larger than current window "after" value (which would result in a negative window size).- Parameters:
timeDifference
- relative window start time- Throws:
IllegalArgumentException
- if the resulting window size is negative ortimeDifference
can't be represented aslong milliseconds
-
after
Changes the end window boundary totimeDifference
but keep the start window boundary as is. Thus, records of the same key are joinable if the timestamp of a record from the secondary stream is at mosttimeDifference
later than the timestamp of the record from the primary stream.timeDifference
can be negative but its absolute value must not be larger than current window "before" value (which would result in a negative window size).- Parameters:
timeDifference
- relative window end time- Throws:
IllegalArgumentException
- if the resulting window size is negative ortimeDifference
can't be represented aslong milliseconds
-
windowsFor
Not supported byJoinWindows
. ThrowsUnsupportedOperationException
.- Specified by:
windowsFor
in classWindows<Window>
- Parameters:
timestamp
- the timestamp window should get created for- Returns:
- a map of
windowStartTimestamp -> Window
entries - Throws:
UnsupportedOperationException
- at every invocation
-
size
public long size()Description copied from class:Windows
Return the size of the specified windows in milliseconds. -
grace
Deprecated.since 3.0. UseofTimeDifferenceAndGrace(Duration, Duration)
insteadReject out-of-order events that are delayed more thanafterWindowEnd
after the end of its window.Delay is defined as (stream_time - record_timestamp).
- Parameters:
afterWindowEnd
- The grace period to admit out-of-order events to a window.- Returns:
- this updated builder
- Throws:
IllegalArgumentException
- if theafterWindowEnd
is negative or can't be represented aslong milliseconds
IllegalStateException
- ifgrace(Duration)
is called afterofTimeDifferenceAndGrace(Duration, Duration)
orofTimeDifferenceWithNoGrace(Duration)
-
gracePeriodMs
public long gracePeriodMs()Description copied from class:Windows
Return the window grace period (the time to admit out-of-order events after the end of the window.) Delay is defined as (stream_time - record_timestamp).- Specified by:
gracePeriodMs
in classWindows<Window>
-
equals
-
hashCode
public int hashCode() -
toString
-