Interface BranchedKStream<K,V>

Type Parameters:
K - Type of keys
V - Type of values

public interface BranchedKStream<K,V>
Branches the records in the original stream based on the predicates supplied for the branch definitions.

Branches are defined with branch(Predicate, Branched) or defaultBranch(Branched) methods. Each record is evaluated against the predicate supplied via Branched parameters, and is routed to the first branch for which its respective predicate evaluates to true. If a record does not match any predicates, it will be routed to the default branch, or dropped if no default branch is created.

Each branch (which is a KStream instance) then can be processed either by a Function or a Consumer provided via a Branched parameter. If certain conditions are met, it also can be accessed from the Map returned by an optional defaultBranch(Branched) or noDefaultBranch() method call (see usage examples).

The branching happens on a first-match basis: A record in the original stream is assigned to the corresponding result stream for the first predicate that evaluates to true, and is assigned to this stream only. If you need to route a record to multiple streams, you can apply multiple KStream.filter(Predicate) operators to the same KStream instance, one for each predicate, instead of branching.

The process of routing the records to different branches is a stateless record-by-record operation.

Rules of forming the resulting map

The keys of the Map<String, KStream<K, V>> entries returned by defaultBranch(Branched) or noDefaultBranch() are defined by the following rules:
  • If Named parameter was provided for KStream.split(Named), its value is used as a prefix for each key. By default, no prefix is used
  • If a branch name is provided in branch(Predicate, Branched) via the Branched parameter, its value is appended to the prefix to form the Map key
  • If a name is not provided for the branch, then the key defaults to prefix + position of the branch as a decimal number, starting from "1"
  • If a name is not provided for the defaultBranch(), then the key defaults to prefix + "0"
The values of the respective Map<Stream, KStream<K, V>> entries are formed as following:
  • If no chain function or consumer is provided in branch(Predicate, Branched) via the Branched parameter, then the branch itself is added to the Map
  • If chain function is provided and it returns a non-null value for a given branch, then the value is the result returned by this function
  • If a chain function returns null for a given branch, then no entry is added to the map
  • If a consumer is provided for a given branch, then no entry is added to the map
For example:
 
 Map<String, KStream<..., ...>> result =
   source.split(Named.as("foo-"))
     .branch(predicate1, Branched.as("bar"))                    // "foo-bar"
     .branch(predicate2, Branched.withConsumer(ks->ks.to("A"))  // no entry: a Consumer is provided
     .branch(predicate3, Branched.withFunction(ks->null))       // no entry: chain function returns null
     .branch(predicate4, Branched.withFunction(ks->ks))         // "foo-4": chain function returns non-null value
     .branch(predicate5)                                        // "foo-5": name defaults to the branch position
     .defaultBranch()                                           // "foo-0": "0" is the default name for the default branch
 

Usage examples

Direct Branch Consuming

In many cases we do not need to have a single scope for all the branches, each branch being processed completely independently of others. Then we can use 'consuming' lambdas or method references in Branched parameter:
 
 source.split()
     .branch(predicate1, Branched.withConsumer(ks -> ks.to("A")))
     .branch(predicate2, Branched.withConsumer(ks -> ks.to("B")))
     .defaultBranch(Branched.withConsumer(ks->ks.to("C")));
 

Collecting branches in a single scope

In other cases we want to combine branches again after splitting. The map returned by defaultBranch() or noDefaultBranch() methods provides access to all the branches in the same scope:
 
 Map<String, KStream<String, String>> branches = source.split(Named.as("split-"))
     .branch((key, value) -> value == null, Branched.withFunction(s -> s.mapValues(v->"NULL"), "null")
     .defaultBranch(Branched.as("non-null"));

 KStream<String, String> merged = branches.get("split-non-null").merge(branches.get("split-null"));
 

Dynamic branching

There is also a case when we might need to create branches dynamically, e.g. one per enum value:
 
 BranchedKStream branched = stream.split();
 for (RecordType recordType : RecordType.values())
     branched.branch((k, v) -> v.getRecType() == recordType,
         Branched.withConsumer(recordType::processRecords));
 
See Also:
  • Method Details

    • branch

      BranchedKStream<K,V> branch(Predicate<? super K,? super V> predicate)
      Define a branch for records that match the predicate.
      Parameters:
      predicate - A Predicate instance, against which each record will be evaluated. If this predicate returns true for a given record, the record will be routed to the current branch and will not be evaluated against the predicates for the remaining branches.
      Returns:
      this to facilitate method chaining
    • branch

      BranchedKStream<K,V> branch(Predicate<? super K,? super V> predicate, Branched<K,V> branched)
      Define a branch for records that match the predicate.
      Parameters:
      predicate - A Predicate instance, against which each record will be evaluated. If this predicate returns true for a given record, the record will be routed to the current branch and will not be evaluated against the predicates for the remaining branches.
      branched - A Branched parameter, that allows to define a branch name, an in-place branch consumer or branch mapper (see code examples for BranchedKStream)
      Returns:
      this to facilitate method chaining
    • defaultBranch

      Map<String,KStream<K,V>> defaultBranch()
      Finalize the construction of branches and defines the default branch for the messages not intercepted by other branches. Calling defaultBranch or noDefaultBranch() is optional.
      Returns:
      Map of named branches. For rules of forming the resulting map, see BranchedKStream description.
    • defaultBranch

      Map<String,KStream<K,V>> defaultBranch(Branched<K,V> branched)
      Finalize the construction of branches and defines the default branch for the messages not intercepted by other branches. Calling defaultBranch or noDefaultBranch() is optional.
      Parameters:
      branched - A Branched parameter, that allows to define a branch name, an in-place branch consumer or branch mapper (see code examples for BranchedKStream)
      Returns:
      Map of named branches. For rules of forming the resulting map, see BranchedKStream description.
    • noDefaultBranch

      Map<String,KStream<K,V>> noDefaultBranch()
      Finalize the construction of branches without forming a default branch. Calling #noDefaultBranch() or defaultBranch() is optional.
      Returns:
      Map of named branches. For rules of forming the resulting map, see BranchedKStream description.