Interface BranchedKStream<K,V>
- Type Parameters:
K
- Type of keysV
- Type of values
public interface BranchedKStream<K,V>
Branches are defined with branch(Predicate, Branched)
or
defaultBranch(Branched)
methods. Each record is evaluated against the predicate
supplied via Branched
parameters, and is routed to the first branch for which its respective predicate
evaluates to true
. If a record does not match any predicates, it will be routed to the default branch,
or dropped if no default branch is created.
Each branch (which is a KStream
instance) then can be processed either by
a Function
or a Consumer
provided via a Branched
parameter. If certain conditions are met, it also can be accessed from the Map
returned by an optional
defaultBranch(Branched)
or noDefaultBranch()
method call
(see usage examples).
The branching happens on a first-match basis: A record in the original stream is assigned to the corresponding result
stream for the first predicate that evaluates to true
, and is assigned to this stream only. If you need
to route a record to multiple streams, you can apply multiple KStream.filter(Predicate)
operators
to the same KStream
instance, one for each predicate, instead of branching.
The process of routing the records to different branches is a stateless record-by-record operation.
Rules of forming the resulting map
The keys of theMap<String, KStream<K, V>>
entries returned by defaultBranch(Branched)
or
noDefaultBranch()
are defined by the following rules:
- If
Named
parameter was provided forKStream.split(Named)
, its value is used as a prefix for each key. By default, no prefix is used - If a branch name is provided in
branch(Predicate, Branched)
via theBranched
parameter, its value is appended to the prefix to form theMap
key - If a name is not provided for the branch, then the key defaults to
prefix + position
of the branch as a decimal number, starting from"1"
- If a name is not provided for the
defaultBranch()
, then the key defaults toprefix + "0"
Map<Stream, KStream<K, V>>
entries are formed as following:
- If no chain function or consumer is provided in
branch(Predicate, Branched)
via theBranched
parameter, then the branch itself is added to theMap
- If chain function is provided and it returns a non-null value for a given branch, then the value is the result returned by this function
- If a chain function returns
null
for a given branch, then no entry is added to the map - If a consumer is provided for a given branch, then no entry is added to the map
Map<String, KStream<..., ...>> result =
source.split(Named.as("foo-"))
.branch(predicate1, Branched.as("bar")) // "foo-bar"
.branch(predicate2, Branched.withConsumer(ks->ks.to("A")) // no entry: a Consumer is provided
.branch(predicate3, Branched.withFunction(ks->null)) // no entry: chain function returns null
.branch(predicate4, Branched.withFunction(ks->ks)) // "foo-4": chain function returns non-null value
.branch(predicate5) // "foo-5": name defaults to the branch position
.defaultBranch() // "foo-0": "0" is the default name for the default branch
Usage examples
Direct Branch Consuming
In many cases we do not need to have a single scope for all the branches, each branch being processed completely independently from others. Then we can use 'consuming' lambdas or method references inBranched
parameter:
source.split()
.branch(predicate1, Branched.withConsumer(ks -> ks.to("A")))
.branch(predicate2, Branched.withConsumer(ks -> ks.to("B")))
.defaultBranch(Branched.withConsumer(ks->ks.to("C")));
Collecting branches in a single scope
In other cases we want to combine branches again after splitting. The map returned bydefaultBranch()
or noDefaultBranch()
methods provides
access to all the branches in the same scope:
Map<String, KStream<String, String>> branches = source.split(Named.as("split-"))
.branch((key, value) -> value == null, Branched.withFunction(s -> s.mapValues(v->"NULL"), "null")
.defaultBranch(Branched.as("non-null"));
KStream<String, String> merged = branches.get("split-non-null").merge(branches.get("split-null"));
Dynamic branching
There is also a case when we might need to create branches dynamically, e. g. one per enum value:
BranchedKStream branched = stream.split();
for (RecordType recordType : RecordType.values())
branched.branch((k, v) -> v.getRecType() == recordType,
Branched.withConsumer(recordType::processRecords));
- See Also:
KStream
-
Method Summary
Modifier and Type Method Description BranchedKStream<K,V>
branch(Predicate<? super K,? super V> predicate)
Define a branch for records that match the predicate.BranchedKStream<K,V>
branch(Predicate<? super K,? super V> predicate, Branched<K,V> branched)
Define a branch for records that match the predicate.Map<String,KStream<K,V>>
defaultBranch()
Finalize the construction of branches and defines the default branch for the messages not intercepted by other branches.Map<String,KStream<K,V>>
defaultBranch(Branched<K,V> branched)
Finalize the construction of branches and defines the default branch for the messages not intercepted by other branches.Map<String,KStream<K,V>>
noDefaultBranch()
Finalize the construction of branches without forming a default branch.
-
Method Details
-
branch
Define a branch for records that match the predicate.- Parameters:
predicate
- APredicate
instance, against which each record will be evaluated. If this predicate returnstrue
for a given record, the record will be routed to the current branch and will not be evaluated against the predicates for the remaining branches.- Returns:
this
to facilitate method chaining
-
branch
Define a branch for records that match the predicate.- Parameters:
predicate
- APredicate
instance, against which each record will be evaluated. If this predicate returnstrue
for a given record, the record will be routed to the current branch and will not be evaluated against the predicates for the remaining branches.branched
- ABranched
parameter, that allows to define a branch name, an in-place branch consumer or branch mapper (see code examples forBranchedKStream
)- Returns:
this
to facilitate method chaining
-
defaultBranch
Finalize the construction of branches and defines the default branch for the messages not intercepted by other branches. CallingdefaultBranch
ornoDefaultBranch()
is optional.- Returns:
Map
of named branches. For rules of forming the resulting map, seeBranchedKStream
description.
-
defaultBranch
Finalize the construction of branches and defines the default branch for the messages not intercepted by other branches. CallingdefaultBranch
ornoDefaultBranch()
is optional.- Parameters:
branched
- ABranched
parameter, that allows to define a branch name, an in-place branch consumer or branch mapper (see code examples forBranchedKStream
)- Returns:
Map
of named branches. For rules of forming the resulting map, seeBranchedKStream
description.
-
noDefaultBranch
Finalize the construction of branches without forming a default branch. Calling#noDefaultBranch()
ordefaultBranch()
is optional.- Returns:
Map
of named branches. For rules of forming the resulting map, seeBranchedKStream
description.
-