K - Type of keysV - Type of valuespublic interface BranchedKStream<K,V>
Branches are defined with branch(Predicate, Branched) or
defaultBranch(Branched) methods. Each record is evaluated against the predicate
supplied via Branched parameters, and is routed to the first branch for which its respective predicate
evaluates to true. If a record does not match any predicates, it will be routed to the default branch,
or dropped if no default branch is created.
Each branch (which is a KStream instance) then can be processed either by
a Function or a Consumer provided via a Branched
parameter. If certain conditions are met, it also can be accessed from the Map returned by an optional
defaultBranch(Branched) or noDefaultBranch() method call
(see usage examples).
The branching happens on a first-match basis: A record in the original stream is assigned to the corresponding result
stream for the first predicate that evaluates to true, and is assigned to this stream only. If you need
to route a record to multiple streams, you can apply multiple KStream.filter(Predicate) operators
to the same KStream instance, one for each predicate, instead of branching.
The process of routing the records to different branches is a stateless record-by-record operation.
Map<String, KStream<K, V>> entries returned by defaultBranch(Branched) or
noDefaultBranch() are defined by the following rules:
Named parameter was provided for KStream.split(Named), its value is used as
a prefix for each key. By default, no prefix is used
branch(Predicate, Branched) via the
Branched parameter, its value is appended to the prefix to form the Map key
prefix + position of the branch
as a decimal number, starting from "1"
defaultBranch(), then the key defaults
to prefix + "0"
Map<Stream, KStream<K, V>> entries are formed as following:
branch(Predicate, Branched) via
the Branched parameter, then the branch itself is added to the Map
null for a given branch, then no entry is added to the map
Map<String, KStream<..., ...>> result =
source.split(Named.as("foo-"))
.branch(predicate1, Branched.as("bar")) // "foo-bar"
.branch(predicate2, Branched.withConsumer(ks->ks.to("A")) // no entry: a Consumer is provided
.branch(predicate3, Branched.withFunction(ks->null)) // no entry: chain function returns null
.branch(predicate4, Branched.withFunction(ks->ks)) // "foo-4": chain function returns non-null value
.branch(predicate5) // "foo-5": name defaults to the branch position
.defaultBranch() // "foo-0": "0" is the default name for the default branch
Branched parameter:
source.split()
.branch(predicate1, Branched.withConsumer(ks -> ks.to("A")))
.branch(predicate2, Branched.withConsumer(ks -> ks.to("B")))
.defaultBranch(Branched.withConsumer(ks->ks.to("C")));
defaultBranch() or noDefaultBranch() methods provides
access to all the branches in the same scope:
Map<String, KStream<String, String>> branches = source.split(Named.as("split-"))
.branch((key, value) -> value == null, Branched.withFunction(s -> s.mapValues(v->"NULL"), "null")
.defaultBranch(Branched.as("non-null"));
KStream<String, String> merged = branches.get("split-non-null").merge(branches.get("split-null"));
BranchedKStream branched = stream.split();
for (RecordType recordType : RecordType.values())
branched.branch((k, v) -> v.getRecType() == recordType,
Branched.withConsumer(recordType::processRecords));
KStream| Modifier and Type | Method and Description |
|---|---|
BranchedKStream<K,V> |
branch(Predicate<? super K,? super V> predicate)
Define a branch for records that match the predicate.
|
BranchedKStream<K,V> |
branch(Predicate<? super K,? super V> predicate,
Branched<K,V> branched)
Define a branch for records that match the predicate.
|
Map<String,KStream<K,V>> |
defaultBranch()
Finalize the construction of branches and defines the default branch for the messages not intercepted
by other branches.
|
Map<String,KStream<K,V>> |
defaultBranch(Branched<K,V> branched)
Finalize the construction of branches and defines the default branch for the messages not intercepted
by other branches.
|
Map<String,KStream<K,V>> |
noDefaultBranch()
Finalize the construction of branches without forming a default branch.
|
BranchedKStream<K,V> branch(Predicate<? super K,? super V> predicate)
predicate - A Predicate instance, against which each record will be evaluated.
If this predicate returns true for a given record, the record will be
routed to the current branch and will not be evaluated against the predicates
for the remaining branches.this to facilitate method chainingBranchedKStream<K,V> branch(Predicate<? super K,? super V> predicate, Branched<K,V> branched)
predicate - A Predicate instance, against which each record will be evaluated.
If this predicate returns true for a given record, the record will be
routed to the current branch and will not be evaluated against the predicates
for the remaining branches.branched - A Branched parameter, that allows to define a branch name, an in-place
branch consumer or branch mapper (see code examples
for BranchedKStream)this to facilitate method chainingMap<String,KStream<K,V>> defaultBranch()
defaultBranch or noDefaultBranch() is optional.Map of named branches. For rules of forming the resulting map, see BranchedKStream
description.Map<String,KStream<K,V>> defaultBranch(Branched<K,V> branched)
defaultBranch or noDefaultBranch() is optional.branched - A Branched parameter, that allows to define a branch name, an in-place
branch consumer or branch mapper (see code examples
for BranchedKStream)Map of named branches. For rules of forming the resulting map, see BranchedKStream
description.Map<String,KStream<K,V>> noDefaultBranch()
#noDefaultBranch()
or defaultBranch() is optional.Map of named branches. For rules of forming the resulting map, see BranchedKStream
description.