Class SessionWindows

java.lang.Object
org.apache.kafka.streams.kstream.SessionWindows

public final class SessionWindows
extends Object
A session based window specification used for aggregating events into sessions.

Sessions represent a period of activity separated by a defined gap of inactivity. Any events processed that fall within the inactivity gap of any existing sessions are merged into the existing sessions. If the event falls outside of the session gap then a new session will be created.

For example, if we have a session gap of 5 and the following data arrives:

 +--------------------------------------+
 |    key    |    value    |    time    |
 +-----------+-------------+------------+
 |    A      |     1       |     10     |
 +-----------+-------------+------------+
 |    A      |     2       |     12     |
 +-----------+-------------+------------+
 |    A      |     3       |     20     |
 +-----------+-------------+------------+
 
We'd have 2 sessions for key A. One starting from time 10 and ending at time 12 and another starting and ending at time 20. The length of the session is driven by the timestamps of the data within the session. Thus, session windows are no fixed-size windows (c.f. TimeWindows and JoinWindows).

If we then received another record:

 +--------------------------------------+
 |    key    |    value    |    time    |
 +-----------+-------------+------------+
 |    A      |     4       |     16     |
 +-----------+-------------+------------+
 
The previous 2 sessions would be merged into a single session with start time 10 and end time 20. The aggregate value for this session would be the result of aggregating all 4 values.

For time semantics, see TimestampExtractor.

See Also:
TimeWindows, UnlimitedWindows, JoinWindows, KGroupedStream.windowedBy(SessionWindows), TimestampExtractor
  • Method Details

    • ofInactivityGapWithNoGrace

      public static SessionWindows ofInactivityGapWithNoGrace​(Duration inactivityGap)
      Creates a new window specification with the specified inactivity gap.

      Note that new events may change the boundaries of session windows, so aggressive close times can lead to surprising results in which an out-of-order event is rejected and then a subsequent event moves the window boundary forward.

      CAUTION: Using this method implicitly sets the grace period to zero, which means that any out-of-order records arriving after the window ends are considered late and will be dropped.

      Parameters:
      inactivityGap - the gap of inactivity between sessions
      Returns:
      a window definition with the window size and no grace period. Note that this means out-of-order records arriving after the window end will be dropped
      Throws:
      IllegalArgumentException - if inactivityGap is zero or negative or can't be represented as long milliseconds
    • ofInactivityGapAndGrace

      public static SessionWindows ofInactivityGapAndGrace​(Duration inactivityGap, Duration afterWindowEnd)
      Creates a new window specification with the specified inactivity gap.

      Note that new events may change the boundaries of session windows, so aggressive close times can lead to surprising results in which an out-of-order event is rejected and then a subsequent event moves the window boundary forward.

      Using this method explicitly sets the grace period to the duration specified by afterWindowEnd, which means that only out-of-order records arriving more than the grace period after the window end will be dropped. The window close, after which any incoming records are considered late and will be rejected, is defined as windowEnd + afterWindowEnd

      Parameters:
      inactivityGap - the gap of inactivity between sessions
      afterWindowEnd - The grace period to admit out-of-order events to a window.
      Returns:
      A SessionWindows object with the specified inactivity gap and grace period
      Throws:
      IllegalArgumentException - if inactivityGap is zero or negative or can't be represented as long milliseconds if the afterWindowEnd is negative or can't be represented as long milliseconds
    • with

      @Deprecated public static SessionWindows with​(Duration inactivityGap)
      Deprecated.
      Create a new window specification with the specified inactivity gap.
      Parameters:
      inactivityGap - the gap of inactivity between sessions
      Returns:
      a new window specification without specifying a grace period (default to 24 hours minus inactivityGap)
      Throws:
      IllegalArgumentException - if inactivityGap is zero or negative or can't be represented as long milliseconds
    • grace

      @Deprecated public SessionWindows grace​(Duration afterWindowEnd) throws IllegalArgumentException
      Deprecated.
      Reject out-of-order events that arrive more than afterWindowEnd after the end of its window.

      Note that new events may change the boundaries of session windows, so aggressive close times can lead to surprising results in which an out-of-order event is rejected and then a subsequent event moves the window boundary forward.

      Parameters:
      afterWindowEnd - The grace period to admit out-of-order events to a window.
      Returns:
      this updated builder
      Throws:
      IllegalArgumentException - if the afterWindowEnd is negative or can't be represented as long milliseconds
    • gracePeriodMs

      public long gracePeriodMs()
    • inactivityGap

      public long inactivityGap()
      Return the specified gap for the session windows in milliseconds.
      Returns:
      the inactivity gap of the specified windows
    • equals

      public boolean equals​(Object o)
      Overrides:
      equals in class Object
    • hashCode

      public int hashCode()
      Overrides:
      hashCode in class Object
    • toString

      public String toString()
      Overrides:
      toString in class Object