Class JoinWindows
A JoinWindows instance defines a maximum time difference for a join over two streams on the same key.
In SQL-style you would express this join as
SELECT * FROM stream1, stream2
WHERE
stream1.key = stream2.key
AND
stream1.ts - before <= stream2.ts AND stream2.ts <= stream1.ts + after
There are three different window configuration supported:
- before = after = time-difference
- before = 0 and after = time-difference
- before = time-difference and after = 0
Both values (before and after) must not result in an "inverse" window, i.e., upper-interval bound cannot be smaller than lower-interval bound.
JoinWindows are sliding windows, thus, they are aligned to the actual record timestamps.
This implies, that each input record defines its own window with start and end time being relative to the record's
timestamp.
For time semantics, see TimestampExtractor.
- See Also:
-
Field Summary
Fields -
Method Summary
Modifier and TypeMethodDescriptionChanges the end window boundary totimeDifferencebut keep the start window boundary as is.Changes the start window boundary totimeDifferencebut keep the end window boundary as is.booleanDeprecated.Since 3.0.longReturn the window grace period (the time to admit out-of-order events after the end of the window.) Delay is defined as (stream_time - record_timestamp).inthashCode()static JoinWindowsDeprecated.Since 3.0.static JoinWindowsofTimeDifferenceAndGrace(Duration timeDifference, Duration afterWindowEnd) Specifies that records of the same key are joinable if their timestamps are withintimeDifference, i.e., the timestamp of a record from the secondary stream is maxtimeDifferencebefore or after the timestamp of the record from the primary stream.static JoinWindowsofTimeDifferenceWithNoGrace(Duration timeDifference) Specifies that records of the same key are joinable if their timestamps are withintimeDifference, i.e., the timestamp of a record from the secondary stream is maxtimeDifferencebefore or after the timestamp of the record from the primary stream.longsize()Return the size of the specified windows in milliseconds.toString()windowsFor(long timestamp) Not supported byJoinWindows.
-
Field Details
-
beforeMs
public final long beforeMsMaximum time difference for tuples that are before the join tuple. -
afterMs
public final long afterMsMaximum time difference for tuples that are after the join tuple.
-
-
Method Details
-
ofTimeDifferenceAndGrace
public static JoinWindows ofTimeDifferenceAndGrace(Duration timeDifference, Duration afterWindowEnd) Specifies that records of the same key are joinable if their timestamps are withintimeDifference, i.e., the timestamp of a record from the secondary stream is maxtimeDifferencebefore or after the timestamp of the record from the primary stream.Using this method explicitly sets the grace period to the duration specified by
afterWindowEnd, which means that only out-of-order records arriving more than the grace period after the window end will be dropped. The window close, after which any incoming records are considered late and will be rejected, is defined aswindowEnd + afterWindowEnd- Parameters:
timeDifference- join window intervalafterWindowEnd- The grace period to admit out-of-order events to a window.- Returns:
- A new JoinWindows object with the specified window definition and grace period
- Throws:
IllegalArgumentException- iftimeDifferenceis negative or can't be represented aslong millisecondsifafterWindowEndis negative or can't be represented aslong milliseconds
-
ofTimeDifferenceWithNoGrace
Specifies that records of the same key are joinable if their timestamps are withintimeDifference, i.e., the timestamp of a record from the secondary stream is maxtimeDifferencebefore or after the timestamp of the record from the primary stream.CAUTION: Using this method implicitly sets the grace period to zero, which means that any out-of-order records arriving after the window ends are considered late and will be dropped.
- Parameters:
timeDifference- join window interval- Returns:
- a new JoinWindows object with the window definition and no grace period. Note that this means out-of-order records arriving after the window end will be dropped
- Throws:
IllegalArgumentException- iftimeDifferenceis negative or can't be represented aslong milliseconds
-
of
Deprecated.Since 3.0. UseofTimeDifferenceWithNoGrace(Duration)} instead.Specifies that records of the same key are joinable if their timestamps are withintimeDifference, i.e., the timestamp of a record from the secondary stream is maxtimeDifferencebefore or after the timestamp of the record from the primary stream.- Parameters:
timeDifference- join window interval- Returns:
- a new JoinWindows object with the window definition with and grace period (default to 24 hours minus
timeDifference) - Throws:
IllegalArgumentException- iftimeDifferenceis negative or can't be represented aslong milliseconds
-
before
Changes the start window boundary totimeDifferencebut keep the end window boundary as is. Thus, records of the same key are joinable if the timestamp of a record from the secondary stream is at mosttimeDifferenceearlier than the timestamp of the record from the primary stream.timeDifferencecan be negative but its absolute value must not be larger than current window "after" value (which would result in a negative window size).- Parameters:
timeDifference- relative window start time- Throws:
IllegalArgumentException- if the resulting window size is negative ortimeDifferencecan't be represented aslong milliseconds
-
after
Changes the end window boundary totimeDifferencebut keep the start window boundary as is. Thus, records of the same key are joinable if the timestamp of a record from the secondary stream is at mosttimeDifferencelater than the timestamp of the record from the primary stream.timeDifferencecan be negative but its absolute value must not be larger than current window "before" value (which would result in a negative window size).- Parameters:
timeDifference- relative window end time- Throws:
IllegalArgumentException- if the resulting window size is negative ortimeDifferencecan't be represented aslong milliseconds
-
windowsFor
Not supported byJoinWindows. ThrowsUnsupportedOperationException.- Specified by:
windowsForin classWindows<Window>- Parameters:
timestamp- the timestamp window should get created for- Returns:
- a map of
windowStartTimestamp -> Windowentries - Throws:
UnsupportedOperationException- at every invocation
-
size
public long size()Description copied from class:WindowsReturn the size of the specified windows in milliseconds. -
grace
Deprecated.Since 3.0. UseofTimeDifferenceAndGrace(Duration, Duration)instead.Reject out-of-order events that are delayed more thanafterWindowEndafter the end of its window.Delay is defined as (stream_time - record_timestamp).
- Parameters:
afterWindowEnd- The grace period to admit out-of-order events to a window.- Returns:
- this updated builder
- Throws:
IllegalArgumentException- if theafterWindowEndis negative or can't be represented aslong millisecondsIllegalStateException- ifgrace(Duration)is called afterofTimeDifferenceAndGrace(Duration, Duration)orofTimeDifferenceWithNoGrace(Duration)
-
gracePeriodMs
public long gracePeriodMs()Description copied from class:WindowsReturn the window grace period (the time to admit out-of-order events after the end of the window.) Delay is defined as (stream_time - record_timestamp).- Specified by:
gracePeriodMsin classWindows<Window>
-
equals
-
hashCode
public int hashCode() -
toString
-