@InterfaceStability.Unstable public final class SessionWindows extends Object
Sessions represent a period of activity separated by a defined gap of inactivity. Any events processed that fall within the inactivity gap of any existing sessions are merged into the existing sessions. If the event falls outside of the session gap then a new session will be created.
For example, if we have a session gap of 5 and the following data arrives:
+--------------------------------------+ | key | value | time | +-----------+-------------+------------+ | A | 1 | 10 | +-----------+-------------+------------+ | A | 2 | 12 | +-----------+-------------+------------+ | A | 3 | 20 | +-----------+-------------+------------+We'd have 2 sessions for key A. One starting from time 10 and ending at time 12 and another starting and ending at time 20. The length of the session is driven by the timestamps of the data within the session. Thus, session windows are no fixed-size windows (c.f.
TimeWindows
and JoinWindows
).
If we then received another record:
+--------------------------------------+ | key | value | time | +-----------+-------------+------------+ | A | 4 | 16 | +-----------+-------------+------------+The previous 2 sessions would be merged into a single session with start time 10 and end time 20. The aggregate value for this session would be the result of aggregating all 4 values.
For time semantics, see TimestampExtractor
.
TimeWindows
,
UnlimitedWindows
,
JoinWindows
,
KGroupedStream.count(SessionWindows, String)
,
KGroupedStream.count(SessionWindows, org.apache.kafka.streams.processor.StateStoreSupplier)
,
KGroupedStream.reduce(Reducer, SessionWindows, String)
,
KGroupedStream.reduce(Reducer, SessionWindows, org.apache.kafka.streams.processor.StateStoreSupplier)
,
KGroupedStream.aggregate(Initializer, Aggregator, Merger, SessionWindows, org.apache.kafka.common.serialization.Serde, String)
,
KGroupedStream.aggregate(Initializer, Aggregator, Merger, SessionWindows, org.apache.kafka.common.serialization.Serde, org.apache.kafka.streams.processor.StateStoreSupplier)
,
TimestampExtractor
Modifier and Type | Method and Description |
---|---|
long |
inactivityGap()
Return the specified gap for the session windows in milliseconds.
|
long |
maintainMs()
Return the window maintain duration (retention time) in milliseconds.
|
SessionWindows |
until(long durationMs)
Set the window maintain duration (retention time) in milliseconds.
|
static SessionWindows |
with(long inactivityGapMs)
Create a new window specification with the specified inactivity gap in milliseconds.
|
public static SessionWindows with(long inactivityGapMs)
inactivityGapMs
- the gap of inactivity between sessions in millisecondsIllegalArgumentException
- if inactivityGapMs
is zero or negativepublic SessionWindows until(long durationMs) throws IllegalArgumentException
IllegalArgumentException
- if durationMs
is smaller than window gappublic long inactivityGap()
public long maintainMs()
For SessionWindows
the maintain duration is at least as small as the window gap.