Interface BranchedKStream<K,V>
- Type Parameters:
K- Type of keysV- Type of values
Branches are defined with branch(Predicate, Branched) or
defaultBranch(Branched) methods. Each record is evaluated against the predicate
supplied via Branched parameters, and is routed to the first branch for which its respective predicate
evaluates to true. If a record does not match any predicates, it will be routed to the default branch,
or dropped if no default branch is created.
Each branch (which is a KStream instance) then can be processed either by
a Function or a Consumer provided via a Branched
parameter. If certain conditions are met, it also can be accessed from the Map returned by an optional
defaultBranch(Branched) or noDefaultBranch() method call
(see usage examples).
The branching happens on a first-match basis: A record in the original stream is assigned to the corresponding result
stream for the first predicate that evaluates to true, and is assigned to this stream only. If you need
to route a record to multiple streams, you can apply multiple KStream.filter(Predicate) operators
to the same KStream instance, one for each predicate, instead of branching.
The process of routing the records to different branches is a stateless record-by-record operation.
Rules of forming the resulting map
The keys of theMap<String, KStream<K, V>> entries returned by defaultBranch(Branched) or
noDefaultBranch() are defined by the following rules:
- If
Namedparameter was provided forKStream.split(Named), its value is used as a prefix for each key. By default, no prefix is used - If a branch name is provided in
branch(Predicate, Branched)via theBranchedparameter, its value is appended to the prefix to form theMapkey - If a name is not provided for the branch, then the key defaults to
prefix + positionof the branch as a decimal number, starting from"1" - If a name is not provided for the
defaultBranch(), then the key defaults toprefix + "0"
Map<Stream, KStream<K, V>> entries are formed as following:
- If no chain function or consumer is provided in
branch(Predicate, Branched)via theBranchedparameter, then the branch itself is added to theMap - If chain function is provided and it returns a non-null value for a given branch, then the value is the result returned by this function
- If a chain function returns
nullfor a given branch, then no entry is added to the map - If a consumer is provided for a given branch, then no entry is added to the map
Map<String, KStream<..., ...>> result =
source.split(Named.as("foo-"))
.branch(predicate1, Branched.as("bar")) // "foo-bar"
.branch(predicate2, Branched.withConsumer(ks->ks.to("A")) // no entry: a Consumer is provided
.branch(predicate3, Branched.withFunction(ks->null)) // no entry: chain function returns null
.branch(predicate4, Branched.withFunction(ks->ks)) // "foo-4": chain function returns non-null value
.branch(predicate5) // "foo-5": name defaults to the branch position
.defaultBranch() // "foo-0": "0" is the default name for the default branch
Usage examples
Direct Branch Consuming
In many cases we do not need to have a single scope for all the branches, each branch being processed completely independently of others. Then we can use 'consuming' lambdas or method references inBranched parameter:
source.split()
.branch(predicate1, Branched.withConsumer(ks -> ks.to("A")))
.branch(predicate2, Branched.withConsumer(ks -> ks.to("B")))
.defaultBranch(Branched.withConsumer(ks->ks.to("C")));
Collecting branches in a single scope
In other cases we want to combine branches again after splitting. The map returned bydefaultBranch() or noDefaultBranch() methods provides
access to all the branches in the same scope:
Map<String, KStream<String, String>> branches = source.split(Named.as("split-"))
.branch((key, value) -> value == null, Branched.withFunction(s -> s.mapValues(v->"NULL"), "null")
.defaultBranch(Branched.as("non-null"));
KStream<String, String> merged = branches.get("split-non-null").merge(branches.get("split-null"));
Dynamic branching
There is also a case when we might need to create branches dynamically, e.g. one per enum value:
BranchedKStream branched = stream.split();
for (RecordType recordType : RecordType.values())
branched.branch((k, v) -> v.getRecType() == recordType,
Branched.withConsumer(recordType::processRecords));
- See Also:
-
Method Summary
Modifier and TypeMethodDescriptionDefine a branch for records that match the predicate.Define a branch for records that match the predicate.Finalize the construction of branches and defines the default branch for the messages not intercepted by other branches.defaultBranch(Branched<K, V> branched) Finalize the construction of branches and defines the default branch for the messages not intercepted by other branches.Finalize the construction of branches without forming a default branch.
-
Method Details
-
branch
Define a branch for records that match the predicate.- Parameters:
predicate- APredicateinstance, against which each record will be evaluated. If this predicate returnstruefor a given record, the record will be routed to the current branch and will not be evaluated against the predicates for the remaining branches.- Returns:
thisto facilitate method chaining
-
branch
Define a branch for records that match the predicate.- Parameters:
predicate- APredicateinstance, against which each record will be evaluated. If this predicate returnstruefor a given record, the record will be routed to the current branch and will not be evaluated against the predicates for the remaining branches.branched- ABranchedparameter, that allows to define a branch name, an in-place branch consumer or branch mapper (see code examples forBranchedKStream)- Returns:
thisto facilitate method chaining
-
defaultBranch
Finalize the construction of branches and defines the default branch for the messages not intercepted by other branches. CallingdefaultBranchornoDefaultBranch()is optional.- Returns:
Mapof named branches. For rules of forming the resulting map, seeBranchedKStreamdescription.
-
defaultBranch
Finalize the construction of branches and defines the default branch for the messages not intercepted by other branches. CallingdefaultBranchornoDefaultBranch()is optional.- Parameters:
branched- ABranchedparameter, that allows to define a branch name, an in-place branch consumer or branch mapper (see code examples forBranchedKStream)- Returns:
Mapof named branches. For rules of forming the resulting map, seeBranchedKStreamdescription.
-
noDefaultBranch
Finalize the construction of branches without forming a default branch. Calling#noDefaultBranch()ordefaultBranch()is optional.- Returns:
Mapof named branches. For rules of forming the resulting map, seeBranchedKStreamdescription.
-