Operators
- map
DataStream → DataStream - flatMap
DataStream → DataStream - fliter
DataStream → DataStream - keyBy
DataStream → KeyedStream
對資料進行分流 - reduce
KeyedStream/WindowedStream/AllWindowedStream → DataStream
用于keyBy或者window/windowAll之后 - window
KeyedStream → WindowedStream
用于keyBy之后 - windowAll
DataStream → AllWindowedStream
不用于keyBy之后,此算子并行度始終為1 - apply
WindowedStream/AllWindowedStream → DataStream - union
DataStream* → DataStream
合并相同型別的流 - join
DataStream,DataStream → DataStream
比較兩條流中的元素,如果相等輸出,否則不進行輸出,
dataStream.join(otherStream)
.where(<key selector>).equalTo(<key selector>)
.window(TumblingEventTimeWindows.of(Time.seconds(3)))
.apply (new JoinFunction () {...});
- Interval Join
KeyedStream,KeyedStream → DataStream
// this will join the two streams so that
// key1 == key2 && leftTs - 2 < rightTs < leftTs + 2
keyedStream.intervalJoin(otherKeyedStream)
.between(Time.milliseconds(-2), Time.milliseconds(2)) // lower and upper bound
.upperBoundExclusive(true) // optional
.lowerBoundExclusive(true) // optional
.process(new IntervalJoinFunction() {...});
- CoGroup
DataStream,DataStream → DataStream
比較兩條流中的元素,如果相等則放在一起輸出,否則分開輸出,重點是group,
dataStream.coGroup(otherStream)
.where(0).equalTo(1)
.window(TumblingEventTimeWindows.of(Time.seconds(3)))
.apply (new CoGroupFunction () {...});
- Connect
DataStream,DataStream → ConnectedStream
“連接”兩條資料流,并保留他們的型別(型別可以不一樣),連接允許兩個流之間共享狀態,
DataStream<Integer> someStream = //...
DataStream<String> otherStream = //...
ConnectedStreams<Integer, String> connectedStreams = someStream.connect(otherStream);
- CoMap, CoFlatMap
ConnectedStream → DataStream
專門針對ConnectedStream流的算子
connectedStreams.map(new CoMapFunction<Integer, String, Boolean>() {
@Override
public Boolean map1(Integer value) {
return true;
}
@Override
public Boolean map2(String value) {
return false;
}
});
connectedStreams.flatMap(new CoFlatMapFunction<Integer, String, String>() {
@Override
public void flatMap1(Integer value, Collector<String> out) {
out.collect(value.toString());
}
@Override
public void flatMap2(String value, Collector<String> out) {
for (String word: value.split(" ")) {
out.collect(word);
}
}
});
- Iterate
DataStream → IterativeStream → ConnectedStream
一個流被分為兩部分,一部分持續不斷回圈輸出,另一部分正常輸出,
IterativeStream<Long> iteration = initialStream.iterate();
DataStream<Long> iterationBody = iteration.map (/*do something*/);
DataStream<Long> feedback = iterationBody.filter(new FilterFunction<Long>(){
@Override
public boolean filter(Long value) throws Exception {
return value > 0;
}
});
iteration.closeWith(feedback);
DataStream<Long> output = iterationBody.filter(new FilterFunction<Long>(){
@Override
public boolean filter(Long value) throws Exception {
return value <= 0;
}
});
轉載請註明出處,本文鏈接:https://www.uj5u.com/qita/423734.html
標籤:其他
上一篇:Elastic Stack 8.0 安裝 - 保護你的 Elastic Stack 現在比以往任何時候都簡單
下一篇:Linux 部署專案
