public class JoinWindows extends Windows<Window>
A JoinWindows
instance defines a maximum time difference for a join over two streams
on the same key.
In SQL-style you would express this join as
SELECT * FROM stream1, stream2
WHERE
stream1.key = stream2.key
AND
stream1.ts - before <= stream2.ts AND stream2.ts <= stream1.ts + after
There are three different window configuration supported:
Both values (before and after) must not result in an "inverse" window, i.e., upper-interval bound cannot be smaller than lower-interval bound.
JoinWindows
are sliding windows, thus, they are aligned to the actual record timestamps.
This implies, that each input record defines its own window with start and end time being relative to the record's
timestamp.
For time semantics, see TimestampExtractor
.
TimeWindows
,
UnlimitedWindows
,
SessionWindows
,
KStream.join(KStream, ValueJoiner, JoinWindows)
,
KStream.join(KStream, ValueJoiner, JoinWindows, StreamJoined)
,
KStream.leftJoin(KStream, ValueJoiner, JoinWindows)
,
KStream.leftJoin(KStream, ValueJoiner, JoinWindows, StreamJoined)
,
KStream.outerJoin(KStream, ValueJoiner, JoinWindows)
,
KStream.outerJoin(KStream, ValueJoiner, JoinWindows, StreamJoined)
,
TimestampExtractor
Modifier and Type | Field and Description |
---|---|
long |
afterMs
Maximum time difference for tuples that are after the join tuple.
|
long |
beforeMs
Maximum time difference for tuples that are before the join tuple.
|
protected boolean |
enableSpuriousResultFix
Enable left/outer stream-stream join, by not emitting left/outer results eagerly, but only after the grace period passed.
|
DEPRECATED_DEFAULT_24_HR_GRACE_PERIOD, NO_GRACE_PERIOD
Modifier | Constructor and Description |
---|---|
protected |
JoinWindows(JoinWindows joinWindows) |
Modifier and Type | Method and Description |
---|---|
JoinWindows |
after(Duration timeDifference)
Changes the end window boundary to
timeDifference but keep the start window boundary as is. |
JoinWindows |
before(Duration timeDifference)
Changes the start window boundary to
timeDifference but keep the end window boundary as is. |
boolean |
equals(Object o) |
JoinWindows |
grace(Duration afterWindowEnd)
Deprecated.
since 3.0. Use
ofTimeDifferenceAndGrace(Duration, Duration) instead |
long |
gracePeriodMs()
Return the window grace period (the time to admit
out-of-order events after the end of the window.)
Delay is defined as (stream_time - record_timestamp).
|
int |
hashCode() |
static JoinWindows |
of(Duration timeDifference)
Deprecated.
since 3.0. Use
ofTimeDifferenceWithNoGrace(Duration) } instead |
static JoinWindows |
ofTimeDifferenceAndGrace(Duration timeDifference,
Duration afterWindowEnd)
Specifies that records of the same key are joinable if their timestamps are within
timeDifference ,
i.e., the timestamp of a record from the secondary stream is max timeDifference before or after
the timestamp of the record from the primary stream. |
static JoinWindows |
ofTimeDifferenceWithNoGrace(Duration timeDifference)
Specifies that records of the same key are joinable if their timestamps are within
timeDifference ,
i.e., the timestamp of a record from the secondary stream is max timeDifference before or after
the timestamp of the record from the primary stream. |
long |
size()
Return the size of the specified windows in milliseconds.
|
String |
toString() |
Map<Long,Window> |
windowsFor(long timestamp)
Not supported by
JoinWindows . |
public final long beforeMs
public final long afterMs
protected final boolean enableSpuriousResultFix
protected JoinWindows(JoinWindows joinWindows)
public static JoinWindows ofTimeDifferenceAndGrace(Duration timeDifference, Duration afterWindowEnd)
timeDifference
,
i.e., the timestamp of a record from the secondary stream is max timeDifference
before or after
the timestamp of the record from the primary stream.
Using this method explicitly sets the grace period to the duration specified by afterWindowEnd
, which
means that only out-of-order records arriving more than the grace period after the window end will be dropped.
The window close, after which any incoming records are considered late and will be rejected, is defined as
windowEnd + afterWindowEnd
timeDifference
- join window intervalafterWindowEnd
- The grace period to admit out-of-order events to a window.IllegalArgumentException
- if timeDifference
is negative or can't be represented as long milliseconds
if afterWindowEnd
is negative or can't be represented as long milliseconds
public static JoinWindows ofTimeDifferenceWithNoGrace(Duration timeDifference)
timeDifference
,
i.e., the timestamp of a record from the secondary stream is max timeDifference
before or after
the timestamp of the record from the primary stream.
CAUTION: Using this method implicitly sets the grace period to zero, which means that any out-of-order records arriving after the window ends are considered late and will be dropped.
timeDifference
- join window intervalIllegalArgumentException
- if timeDifference
is negative or can't be represented as long milliseconds
@Deprecated public static JoinWindows of(Duration timeDifference) throws IllegalArgumentException
ofTimeDifferenceWithNoGrace(Duration)
} insteadtimeDifference
,
i.e., the timestamp of a record from the secondary stream is max timeDifference
before or after
the timestamp of the record from the primary stream.timeDifference
- join window intervaltimeDifference
)IllegalArgumentException
- if timeDifference
is negative or can't be represented as long milliseconds
public JoinWindows before(Duration timeDifference) throws IllegalArgumentException
timeDifference
but keep the end window boundary as is.
Thus, records of the same key are joinable if the timestamp of a record from the secondary stream is at most
timeDifference
earlier than the timestamp of the record from the primary stream.
timeDifference
can be negative but its absolute value must not be larger than current window "after"
value (which would result in a negative window size).timeDifference
- relative window start timeIllegalArgumentException
- if the resulting window size is negative or timeDifference
can't be represented as long milliseconds
public JoinWindows after(Duration timeDifference) throws IllegalArgumentException
timeDifference
but keep the start window boundary as is.
Thus, records of the same key are joinable if the timestamp of a record from the secondary stream is at most
timeDifference
later than the timestamp of the record from the primary stream.
timeDifference
can be negative but its absolute value must not be larger than current window "before"
value (which would result in a negative window size).timeDifference
- relative window end timeIllegalArgumentException
- if the resulting window size is negative or timeDifference
can't be represented as long milliseconds
public Map<Long,Window> windowsFor(long timestamp)
JoinWindows
.
Throws UnsupportedOperationException
.windowsFor
in class Windows<Window>
timestamp
- the timestamp window should get created forwindowStartTimestamp -> Window
entriesUnsupportedOperationException
- at every invocationpublic long size()
Windows
@Deprecated public JoinWindows grace(Duration afterWindowEnd) throws IllegalArgumentException
ofTimeDifferenceAndGrace(Duration, Duration)
insteadafterWindowEnd
after the end of its window.
Delay is defined as (stream_time - record_timestamp).
afterWindowEnd
- The grace period to admit out-of-order events to a window.IllegalArgumentException
- if the afterWindowEnd
is negative or can't be represented as long milliseconds
IllegalStateException
- if grace(Duration)
is called after ofTimeDifferenceAndGrace(Duration, Duration)
or ofTimeDifferenceWithNoGrace(Duration)
public long gracePeriodMs()
Windows
gracePeriodMs
in class Windows<Window>