Class JoinWindows

java.lang.Object
org.apache.kafka.streams.kstream.Windows<Window>
org.apache.kafka.streams.kstream.JoinWindows

public class JoinWindows
extends Windows<Window>
The window specifications used for joins.

A JoinWindows instance defines a maximum time difference for a join over two streams on the same key. In SQL-style you would express this join as


     SELECT * FROM stream1, stream2
     WHERE
       stream1.key = stream2.key
       AND
       stream1.ts - before <= stream2.ts AND stream2.ts <= stream1.ts + after
 
There are three different window configuration supported:
  • before = after = time-difference
  • before = 0 and after = time-difference
  • before = time-difference and after = 0
A join is symmetric in the sense, that a join specification on the first stream returns the same result record as a join specification on the second stream with flipped before and after values.

Both values (before and after) must not result in an "inverse" window, i.e., upper-interval bound cannot be smaller than lower-interval bound.

JoinWindows are sliding windows, thus, they are aligned to the actual record timestamps. This implies, that each input record defines its own window with start and end time being relative to the record's timestamp.

For time semantics, see TimestampExtractor.

See Also:
TimeWindows, UnlimitedWindows, SessionWindows, KStream.join(KStream, ValueJoiner, JoinWindows), KStream.join(KStream, ValueJoiner, JoinWindows, StreamJoined), KStream.leftJoin(KStream, ValueJoiner, JoinWindows), KStream.leftJoin(KStream, ValueJoiner, JoinWindows, StreamJoined), KStream.outerJoin(KStream, ValueJoiner, JoinWindows), KStream.outerJoin(KStream, ValueJoiner, JoinWindows, StreamJoined), TimestampExtractor
  • Field Summary

    Fields
    Modifier and Type Field Description
    long afterMs
    Maximum time difference for tuples that are after the join tuple.
    long beforeMs
    Maximum time difference for tuples that are before the join tuple.
    protected boolean enableSpuriousResultFix  

    Fields inherited from class org.apache.kafka.streams.kstream.Windows

    DEPRECATED_DEFAULT_24_HR_GRACE_PERIOD, NO_GRACE_PERIOD
  • Constructor Summary

    Constructors
    Modifier Constructor Description
    protected JoinWindows​(JoinWindows joinWindows)  
  • Method Summary

    Modifier and Type Method Description
    JoinWindows after​(Duration timeDifference)
    Changes the end window boundary to timeDifference but keep the start window boundary as is.
    JoinWindows before​(Duration timeDifference)
    Changes the start window boundary to timeDifference but keep the end window boundary as is.
    boolean equals​(Object o)  
    JoinWindows grace​(Duration afterWindowEnd)
    Reject out-of-order events that are delayed more than afterWindowEnd after the end of its window.
    long gracePeriodMs()
    Return the window grace period (the time to admit out-of-order events after the end of the window.) Delay is defined as (stream_time - record_timestamp).
    int hashCode()  
    static JoinWindows of​(Duration timeDifference)
    Specifies that records of the same key are joinable if their timestamps are within timeDifference, i.e., the timestamp of a record from the secondary stream is max timeDifference before or after the timestamp of the record from the primary stream.
    long size()
    Return the size of the specified windows in milliseconds.
    String toString()  
    Map<Long,​Window> windowsFor​(long timestamp)
    Not supported by JoinWindows.

    Methods inherited from class java.lang.Object

    clone, finalize, getClass, notify, notifyAll, wait, wait, wait
  • Field Details

    • beforeMs

      public final long beforeMs
      Maximum time difference for tuples that are before the join tuple.
    • afterMs

      public final long afterMs
      Maximum time difference for tuples that are after the join tuple.
    • enableSpuriousResultFix

      protected final boolean enableSpuriousResultFix
  • Constructor Details

    • JoinWindows

      protected JoinWindows​(JoinWindows joinWindows)
  • Method Details

    • of

      public static JoinWindows of​(Duration timeDifference) throws IllegalArgumentException
      Specifies that records of the same key are joinable if their timestamps are within timeDifference, i.e., the timestamp of a record from the secondary stream is max timeDifference before or after the timestamp of the record from the primary stream.
      Parameters:
      timeDifference - join window interval
      Returns:
      a new JoinWindows object with the window definition with and grace period (default to 24 hours minus timeDifference)
      Throws:
      IllegalArgumentException - if timeDifference is negative or can't be represented as long milliseconds
    • before

      public JoinWindows before​(Duration timeDifference) throws IllegalArgumentException
      Changes the start window boundary to timeDifference but keep the end window boundary as is. Thus, records of the same key are joinable if the timestamp of a record from the secondary stream is at most timeDifference earlier than the timestamp of the record from the primary stream. timeDifference can be negative but its absolute value must not be larger than current window "after" value (which would result in a negative window size).
      Parameters:
      timeDifference - relative window start time
      Throws:
      IllegalArgumentException - if the resulting window size is negative or timeDifference can't be represented as long milliseconds
    • after

      public JoinWindows after​(Duration timeDifference) throws IllegalArgumentException
      Changes the end window boundary to timeDifference but keep the start window boundary as is. Thus, records of the same key are joinable if the timestamp of a record from the secondary stream is at most timeDifference later than the timestamp of the record from the primary stream. timeDifference can be negative but its absolute value must not be larger than current window "before" value (which would result in a negative window size).
      Parameters:
      timeDifference - relative window end time
      Throws:
      IllegalArgumentException - if the resulting window size is negative or timeDifference can't be represented as long milliseconds
    • windowsFor

      public Map<Long,​Window> windowsFor​(long timestamp)
      Not supported by JoinWindows. Throws UnsupportedOperationException.
      Specified by:
      windowsFor in class Windows<Window>
      Parameters:
      timestamp - the timestamp window should get created for
      Returns:
      a map of windowStartTimestamp -> Window entries
      Throws:
      UnsupportedOperationException - at every invocation
    • size

      public long size()
      Description copied from class: Windows
      Return the size of the specified windows in milliseconds.
      Specified by:
      size in class Windows<Window>
      Returns:
      the size of the specified windows
    • grace

      public JoinWindows grace​(Duration afterWindowEnd) throws IllegalArgumentException
      Reject out-of-order events that are delayed more than afterWindowEnd after the end of its window.

      Delay is defined as (stream_time - record_timestamp).

      Parameters:
      afterWindowEnd - The grace period to admit out-of-order events to a window.
      Returns:
      this updated builder
      Throws:
      IllegalArgumentException - if the afterWindowEnd is negative or can't be represented as long milliseconds
    • gracePeriodMs

      public long gracePeriodMs()
      Description copied from class: Windows
      Return the window grace period (the time to admit out-of-order events after the end of the window.) Delay is defined as (stream_time - record_timestamp).
      Specified by:
      gracePeriodMs in class Windows<Window>
    • equals

      public boolean equals​(Object o)
      Overrides:
      equals in class Object
    • hashCode

      public int hashCode()
      Overrides:
      hashCode in class Object
    • toString

      public String toString()
      Overrides:
      toString in class Object