Interface BranchedKStream<K,V>
-
- Type Parameters:
K- Type of keysV- Type of values
public interface BranchedKStream<K,V>Branches the records in the original stream based on the predicates supplied for the branch definitions.Branches are defined with
branch(Predicate, Branched)ordefaultBranch(Branched)methods. Each record is evaluated against thepredicatesupplied viaBranchedparameters, and is routed to the first branch for which its respective predicate evaluates totrue. If a record does not match any predicates, it will be routed to the default branch, or dropped if no default branch is created.Each branch (which is a
KStreaminstance) then can be processed either by aFunctionor aConsumerprovided via aBranchedparameter. If certain conditions are met, it also can be accessed from theMapreturned by an optionaldefaultBranch(Branched)ornoDefaultBranch()method call (see usage examples).The branching happens on a first-match basis: A record in the original stream is assigned to the corresponding result stream for the first predicate that evaluates to
true, and is assigned to this stream only. If you need to route a record to multiple streams, you can apply multipleKStream.filter(Predicate)operators to the sameKStreaminstance, one for each predicate, instead of branching.The process of routing the records to different branches is a stateless record-by-record operation.
Rules of forming the resulting map
The keys of theMap<String, KStream<K, V>>entries returned bydefaultBranch(Branched)ornoDefaultBranch()are defined by the following rules:- If
Namedparameter was provided forKStream.split(Named), its value is used as a prefix for each key. By default, no prefix is used - If a branch name is provided in
branch(Predicate, Branched)via theBranchedparameter, its value is appended to the prefix to form theMapkey - If a name is not provided for the branch, then the key defaults to
prefix + positionof the branch as a decimal number, starting from"1" - If a name is not provided for the
defaultBranch(), then the key defaults toprefix + "0"
Map<Stream, KStream<K, V>>entries are formed as following:- If no chain function or consumer is provided in
branch(Predicate, Branched)via theBranchedparameter, then the the branch itself is added to theMap - If chain function is provided and it returns a non-null value for a given branch, then the value is the result returned by this function
- If a chain function returns
nullfor a given branch, then no entry is added to the map - If a consumer is provided for a given branch, then no entry is added to the map
Map<String, KStream<..., ...>> result = source.split(Named.as("foo-")) .branch(predicate1, Branched.as("bar")) // "foo-bar" .branch(predicate2, Branched.withConsumer(ks->ks.to("A")) // no entry: a Consumer is provided .branch(predicate3, Branched.withFunction(ks->null)) // no entry: chain function returns null .branch(predicate4, Branched.withFunction(ks->ks)) // "foo-4": chain function returns non-null value .branch(predicate5) // "foo-5": name defaults to the branch position .defaultBranch() // "foo-0": "0" is the default name for the default branchUsage examples
Direct Branch Consuming
In many cases we do not need to have a single scope for all the branches, each branch being processed completely independently from others. Then we can use 'consuming' lambdas or method references inBranchedparameter:source.split() .branch(predicate1, Branched.withConsumer(ks -> ks.to("A"))) .branch(predicate2, Branched.withConsumer(ks -> ks.to("B"))) .defaultBranch(Branched.withConsumer(ks->ks.to("C")));Collecting branches in a single scope
In other cases we want to combine branches again after splitting. The map returned bydefaultBranch()ornoDefaultBranch()methods provides access to all the branches in the same scope:Map<String, KStream<String, String>> branches = source.split(Named.as("split-")) .branch((key, value) -> value == null, Branched.withFunction(s -> s.mapValues(v->"NULL"), "null") .defaultBranch(Branched.as("non-null")); KStream<String, String> merged = branches.get("split-non-null").merge(branches.get("split-null"));Dynamic branching
There is also a case when we might need to create branches dynamically, e. g. one per enum value:BranchedKStream branched = stream.split(); for (RecordType recordType : RecordType.values()) branched.branch((k, v) -> v.getRecType() == recordType, Branched.withConsumer(recordType::processRecords));- See Also:
KStream
-
-
Method Summary
All Methods Instance Methods Abstract Methods Modifier and Type Method Description BranchedKStream<K,V>branch(Predicate<? super K,? super V> predicate)Define a branch for records that match the predicate.BranchedKStream<K,V>branch(Predicate<? super K,? super V> predicate, Branched<K,V> branched)Define a branch for records that match the predicate.Map<String,KStream<K,V>>defaultBranch()Finalize the construction of branches and defines the default branch for the messages not intercepted by other branches.Map<String,KStream<K,V>>defaultBranch(Branched<K,V> branched)Finalize the construction of branches and defines the default branch for the messages not intercepted by other branches.Map<String,KStream<K,V>>noDefaultBranch()Finalize the construction of branches without forming a default branch.
-
-
-
Method Detail
-
branch
BranchedKStream<K,V> branch(Predicate<? super K,? super V> predicate)
Define a branch for records that match the predicate.- Parameters:
predicate- APredicateinstance, against which each record will be evaluated. If this predicate returnstruefor a given record, the record will be routed to the current branch and will not be evaluated against the predicates for the remaining branches.- Returns:
thisto facilitate method chaining
-
branch
BranchedKStream<K,V> branch(Predicate<? super K,? super V> predicate, Branched<K,V> branched)
Define a branch for records that match the predicate.- Parameters:
predicate- APredicateinstance, against which each record will be evaluated. If this predicate returnstruefor a given record, the record will be routed to the current branch and will not be evaluated against the predicates for the remaining branches.branched- ABranchedparameter, that allows to define a branch name, an in-place branch consumer or branch mapper (see code examples forBranchedKStream)- Returns:
thisto facilitate method chaining
-
defaultBranch
Map<String,KStream<K,V>> defaultBranch()
Finalize the construction of branches and defines the default branch for the messages not intercepted by other branches. CallingdefaultBranchornoDefaultBranch()is optional.- Returns:
Mapof named branches. For rules of forming the resulting map, seeBranchedKStreamdescription.
-
defaultBranch
Map<String,KStream<K,V>> defaultBranch(Branched<K,V> branched)
Finalize the construction of branches and defines the default branch for the messages not intercepted by other branches. CallingdefaultBranchornoDefaultBranch()is optional.- Parameters:
branched- ABranchedparameter, that allows to define a branch name, an in-place branch consumer or branch mapper (see code examples forBranchedKStream)- Returns:
Mapof named branches. For rules of forming the resulting map, seeBranchedKStreamdescription.
-
noDefaultBranch
Map<String,KStream<K,V>> noDefaultBranch()
Finalize the construction of branches without forming a default branch. Calling#noDefaultBranch()ordefaultBranch()is optional.- Returns:
Mapof named branches. For rules of forming the resulting map, seeBranchedKStreamdescription.
-
-