Class JoinWindows

java.lang.Object
org.apache.kafka.streams.kstream.Windows<Window>
org.apache.kafka.streams.kstream.JoinWindows

public class JoinWindows extends Windows<Window>
The window specifications used for joins.

A JoinWindows instance defines a maximum time difference for a join over two streams on the same key. In SQL-style you would express this join as


     SELECT * FROM stream1, stream2
     WHERE
       stream1.key = stream2.key
       AND
       stream1.ts - before <= stream2.ts AND stream2.ts <= stream1.ts + after
 
There are three different window configuration supported:
  • before = after = time-difference
  • before = 0 and after = time-difference
  • before = time-difference and after = 0
A join is symmetric in the sense, that a join specification on the first stream returns the same result record as a join specification on the second stream with flipped before and after values.

Both values (before and after) must not result in an "inverse" window, i.e., upper-interval bound cannot be smaller than lower-interval bound.

JoinWindows are sliding windows, thus, they are aligned to the actual record timestamps. This implies, that each input record defines its own window with start and end time being relative to the record's timestamp.

For time semantics, see TimestampExtractor.

See Also:
  • Field Summary

    Fields
    Modifier and Type
    Field
    Description
    final long
    Maximum time difference for tuples that are after the join tuple.
    final long
    Maximum time difference for tuples that are before the join tuple.
  • Method Summary

    Modifier and Type
    Method
    Description
    after(Duration timeDifference)
    Changes the end window boundary to timeDifference but keep the start window boundary as is.
    before(Duration timeDifference)
    Changes the start window boundary to timeDifference but keep the end window boundary as is.
    boolean
     
    grace(Duration afterWindowEnd)
    Deprecated.
    since 3.0.
    long
    Return the window grace period (the time to admit out-of-order events after the end of the window.) Delay is defined as (stream_time - record_timestamp).
    int
     
    of(Duration timeDifference)
    Deprecated.
    since 3.0.
    ofTimeDifferenceAndGrace(Duration timeDifference, Duration afterWindowEnd)
    Specifies that records of the same key are joinable if their timestamps are within timeDifference, i.e., the timestamp of a record from the secondary stream is max timeDifference before or after the timestamp of the record from the primary stream.
    Specifies that records of the same key are joinable if their timestamps are within timeDifference, i.e., the timestamp of a record from the secondary stream is max timeDifference before or after the timestamp of the record from the primary stream.
    long
    Return the size of the specified windows in milliseconds.
     
    windowsFor(long timestamp)
    Not supported by JoinWindows.

    Methods inherited from class java.lang.Object

    getClass, notify, notifyAll, wait, wait, wait
  • Field Details

    • beforeMs

      public final long beforeMs
      Maximum time difference for tuples that are before the join tuple.
    • afterMs

      public final long afterMs
      Maximum time difference for tuples that are after the join tuple.
  • Method Details

    • ofTimeDifferenceAndGrace

      public static JoinWindows ofTimeDifferenceAndGrace(Duration timeDifference, Duration afterWindowEnd)
      Specifies that records of the same key are joinable if their timestamps are within timeDifference, i.e., the timestamp of a record from the secondary stream is max timeDifference before or after the timestamp of the record from the primary stream.

      Using this method explicitly sets the grace period to the duration specified by afterWindowEnd, which means that only out-of-order records arriving more than the grace period after the window end will be dropped. The window close, after which any incoming records are considered late and will be rejected, is defined as windowEnd + afterWindowEnd

      Parameters:
      timeDifference - join window interval
      afterWindowEnd - The grace period to admit out-of-order events to a window.
      Returns:
      A new JoinWindows object with the specified window definition and grace period
      Throws:
      IllegalArgumentException - if timeDifference is negative or can't be represented as long milliseconds if afterWindowEnd is negative or can't be represented as long milliseconds
    • ofTimeDifferenceWithNoGrace

      public static JoinWindows ofTimeDifferenceWithNoGrace(Duration timeDifference)
      Specifies that records of the same key are joinable if their timestamps are within timeDifference, i.e., the timestamp of a record from the secondary stream is max timeDifference before or after the timestamp of the record from the primary stream.

      CAUTION: Using this method implicitly sets the grace period to zero, which means that any out-of-order records arriving after the window ends are considered late and will be dropped.

      Parameters:
      timeDifference - join window interval
      Returns:
      a new JoinWindows object with the window definition and no grace period. Note that this means out-of-order records arriving after the window end will be dropped
      Throws:
      IllegalArgumentException - if timeDifference is negative or can't be represented as long milliseconds
    • of

      @Deprecated public static JoinWindows of(Duration timeDifference) throws IllegalArgumentException
      Deprecated.
      Specifies that records of the same key are joinable if their timestamps are within timeDifference, i.e., the timestamp of a record from the secondary stream is max timeDifference before or after the timestamp of the record from the primary stream.
      Parameters:
      timeDifference - join window interval
      Returns:
      a new JoinWindows object with the window definition with and grace period (default to 24 hours minus timeDifference)
      Throws:
      IllegalArgumentException - if timeDifference is negative or can't be represented as long milliseconds
    • before

      public JoinWindows before(Duration timeDifference) throws IllegalArgumentException
      Changes the start window boundary to timeDifference but keep the end window boundary as is. Thus, records of the same key are joinable if the timestamp of a record from the secondary stream is at most timeDifference earlier than the timestamp of the record from the primary stream. timeDifference can be negative but its absolute value must not be larger than current window "after" value (which would result in a negative window size).
      Parameters:
      timeDifference - relative window start time
      Throws:
      IllegalArgumentException - if the resulting window size is negative or timeDifference can't be represented as long milliseconds
    • after

      public JoinWindows after(Duration timeDifference) throws IllegalArgumentException
      Changes the end window boundary to timeDifference but keep the start window boundary as is. Thus, records of the same key are joinable if the timestamp of a record from the secondary stream is at most timeDifference later than the timestamp of the record from the primary stream. timeDifference can be negative but its absolute value must not be larger than current window "before" value (which would result in a negative window size).
      Parameters:
      timeDifference - relative window end time
      Throws:
      IllegalArgumentException - if the resulting window size is negative or timeDifference can't be represented as long milliseconds
    • windowsFor

      public Map<Long,Window> windowsFor(long timestamp)
      Not supported by JoinWindows. Throws UnsupportedOperationException.
      Specified by:
      windowsFor in class Windows<Window>
      Parameters:
      timestamp - the timestamp window should get created for
      Returns:
      a map of windowStartTimestamp -> Window entries
      Throws:
      UnsupportedOperationException - at every invocation
    • size

      public long size()
      Description copied from class: Windows
      Return the size of the specified windows in milliseconds.
      Specified by:
      size in class Windows<Window>
      Returns:
      the size of the specified windows
    • grace

      @Deprecated public JoinWindows grace(Duration afterWindowEnd) throws IllegalArgumentException
      Deprecated.
      Reject out-of-order events that are delayed more than afterWindowEnd after the end of its window.

      Delay is defined as (stream_time - record_timestamp).

      Parameters:
      afterWindowEnd - The grace period to admit out-of-order events to a window.
      Returns:
      this updated builder
      Throws:
      IllegalArgumentException - if the afterWindowEnd is negative or can't be represented as long milliseconds
      IllegalStateException - if grace(Duration) is called after ofTimeDifferenceAndGrace(Duration, Duration) or ofTimeDifferenceWithNoGrace(Duration)
    • gracePeriodMs

      public long gracePeriodMs()
      Description copied from class: Windows
      Return the window grace period (the time to admit out-of-order events after the end of the window.) Delay is defined as (stream_time - record_timestamp).
      Specified by:
      gracePeriodMs in class Windows<Window>
    • equals

      public boolean equals(Object o)
      Overrides:
      equals in class Object
    • hashCode

      public int hashCode()
      Overrides:
      hashCode in class Object
    • toString

      public String toString()
      Overrides:
      toString in class Object