StreamExecutionEnvironment env =StreamExecutionEnvironment.getExecutionEnvironment();
DataStream<Tuple2<String,Integer>> ds =env.fromElements(Tuple2.of("tom", 12),Tuple2.of("jack", 20)
,Tuple2.of("rose", 18),Tuple2.of("tom",20),Tuple2.of("jack", 32)
,Tuple2.of("rose", 11),Tuple2.of("frank", 12),Tuple2.of("putin", 67)
,Tuple2.of("frank", 22),Tuple2.of("tom", 18),Tuple2.of("putin", 99)
,Tuple2.of("jack", 28),Tuple2.of("rose", 19),Tuple2.of("putin", 33)
,Tuple2.of("rose", 88),Tuple2.of("jack", 47),Tuple2.of("tom", 66));
KeyedStream<Tuple2<String,Integer>, String> ks =ds.keyBy(new KeySelector<Tuple2<String,Integer>, String>() {
@Override
public String getKey(Tuple2<String, Integer> value) throws Exception {
String key =value.f0;
return key;
}
});
System.out.println(Tuple2.of("rose", 18).f0.hashCode());
System.out.println(Tuple2.of("frank", 12).f0.hashCode());
//ks.print();
DataStream<Tuple2<String, Integer>> st =ks.max(1);
st.print();
最近剛學flink遇到個很疑惑的問題

請問 keyby磁區后為何rose和frank分入一個區了?還有就是 keyby磁區后呼叫max/min聚合為啥不是各個磁區的最大值?
uj5u.com熱心網友回復:
keyby磁區后呼叫max/min聚合為啥不是各個磁區的最大值?假如輸入有3條,max以后的結果也是3條,但是最后一條是max的結果。比如max(“score”),原來的score是99 98 100 97,那么輸出的結果,就是99 99 100 100!看懂沒!我解釋一下,第一個99,由于max沒有值,所以max為99,第二個97比99小,輸出就為存盤的max值99,第三個100比99大輸出100。。。也就是他是一個迭代的程序。轉載請註明出處,本文鏈接:https://www.uj5u.com/qita/68373.html
標籤:高性能計算
