??在使用Java撰寫apache-flink程式的時候相信很多新手都遇到下面這樣的例外;
org.apache.flink.api.common.functions.InvalidTypesException: The return type of function 'main(DemoApp.java:29)' could not be determined automatically, due to type erasure. You can give type information hints by using the returns(...) method on the result of the transformation call, or by letting your function implement the 'ResultTypeQueryable' interface.
at org.apache.flink.api.dag.Transformation.getOutputType(Transformation.java:45
函式回傳型別由于型別洗掉,無法自動確定型別;可使用returns方法或使用函式實作ResultTypeQueryable介面;
org.apache.flink.api.common.functions.InvalidTypesException: The generic type parameters of 'Collector' are missing. In many cases lambda methods don't provide enough information for automatic type extraction when Java generics are involved. An easy workaround is to use an (anonymous) class instead that implements the 'org.apache.flink.api.common.functions.FlatMapFunction' interface. Otherwise the type has to be specified explicitly using type information.
??Collector泛型型別引數丟失,在使用Java泛型時lambda方法無法提供足夠的資訊來進行自動進行型別提取,一個簡單的解決方案是使用匿名內部類代替來實作FlatMapFunction介面,不然只能使用型別資訊顯式指定型別;
??拋出的上面這兩個例外描述的內容其實很明確了,簡單來說就是:在實作FlatMapFunction時使用lambda運算式導致了Collector變數的泛型型別引數丟(由于型別洗掉),簡單的解決方案是把lambda運算式換成匿名內部類或者顯式指定型別(使用returns方法或實作ResultTypeQueryable介面);
下面簡單談談Java的型別擦除與flink的顯式指定型別;??
Java型別擦除
??Java的泛型被很多人詬病稱為“偽泛型”,也是因為型別擦除這個原因,泛型在Java中就是屬于語法糖;
??在Java中JVM虛擬機層面并不存在泛型的概念,Java在編譯階段把泛型的型別引數給擦除掉了,在運行階段并沒有泛型的概念;
public class Data<T> {
private T obj;
public T getObj() {
return obj;
}
public void setObj(T obj) {
this.obj = obj;
}
}
??如上類,在經過Java編譯成為class檔案后其中的型別引數T將被擦除,欄位obj變成了Object型別,兩個get、set方法中的T也都換成了Object型別;

泛型實作主要有兩種:
??Code sharing:一個原始類的泛型型別只有一份目標代碼,
??Code specialization:對每個泛型型別都生成不同的代碼,
??Java屬于第一種,C#與C++屬于第二種,兩種實作各有春秋吧,這里不討論;
??為了保證Java的多型特性編譯器在進行型別擦除時還可能會生成橋接方法用于保證型別擦除所導致子類與父類方法實作不一致問題;
Flink中的泛型與lambda
stream.flatMap(new FlatMapFunction<Integer, String>() {
@Override
public void flatMap(Integer value, Collector<String> out) throws Exception {
System.out.println(value);
}
});
??在Flink中使用各種算子的時候可能會有類似上面面這種用法,上面這種方式使用并沒有什么問題,這里的FlatMapFunction就是一個泛型介面,使用了匿名內部類實作了該介面并傳遞給了flatMap算子;
stream.flatMap((FlatMapFunction<Integer, String>) (value, out) -> {
System.out.println(value);
})
??也有的人直接使用lambda運算式實作FlatMapFunction介面傳遞給flatMap算子,但這時候很多新手估計會發現程式運行的時候報錯了,拋出了本文最開始的那兩個例外;
??為什么使用匿名內部類就沒問題,而使用lambda運算式就不行報錯了,其實體外資訊已經描述很清楚了,這里簡單看看為什么匿名內部類可以,lambda運算式不可以,使用returns方法或實作ResultTypeQueryable介面也可以;
??上面介紹了在Java中會對泛型資訊進行型別引數擦除,但在這里為啥使用匿名內部類實作FlatMapFunction時卻還是可以獲取得到泛型引數?
??其實Java中編譯時的泛型型別擦除并不是把所以泛型相關的資訊全部擦干干凈凈,Javac編譯時擦除的只是結構化之外(程式執行流)的資訊這部分資訊存盤在位元組碼的Code屬性中,類、欄位、方法的泛型型別引數元資料都會被保留下來,這些存盤在Signature屬性中;可通過反射得到相關的泛型引數資訊;
s.flatMap(new FlatMapFunction<String, Integer>() {
@Override
public void flatMap(String value, List<Integer> out) {
System.out.println("stu");
}
});

??而lambda運算式實作FlatMapFunction卻獲取不到泛型引數,是的,
??匿名內部類會編譯成相關的類位元組碼存盤在class檔案中,而lambda運算式卻也只是Java的語法糖并不會存在相關的類位元組碼,只會在lambda運算式運行時呼叫invokedynamic指令執行邏輯,lambda運算式丟失了更多的型別資訊,也就導致了使用lambda運算式獲取不到泛型型別引數;
s.flatMap((FlatMapFunction<String, Integer>) (value, out) ->
System.out.println("stu"));

Flink中使用lambda后的寫法
??其實上面例外資訊已經說得非常清楚了,呼叫returns方法或實作ResultTypeQueryable介面,這里就簡單說這兩種用法;
returns方法
??呼叫該方法的用法也比較簡單,就是回傳的Collector需要哪個泛型型別引數你就呼叫returns方法注冊哪種型別,呼叫returns方法一定是要在某個算子之后緊接著第一個呼叫,簡單理解就是未某個算子注冊回傳型別;
stream.flatMap((FlatMapFunction<Integer, String>) (value, out) -> {
System.out.println(value);
})
.returns(String.class)
ResultTypeQueryable介面
??實作此介面就可以告訴系統此算子的回傳值型別,實作了此介面的優先級最高,不會再通過反射去獲取回傳值型別,還可以根據型別引數的不同使用不同的回傳值型別;實作此介面可定制化程度很高、靈活,Flink kafka相關的連接器中就是用了這種模式,
public class FlatFun implements ResultTypeQueryable<String>, FlatMapFunction<Integer, String> {
@Override
public TypeInformation getProducedType() {
return TypeInformation.of(String.class);
}
@Override
public void flatMap(Integer value, Collector<String> out) {
out.collect(String.valueOf(value));
System.out.println("flatFun");
}
}
stream.flatMap(new FlatFun())
.print();
轉載請註明出處,本文鏈接:https://www.uj5u.com/houduan/55067.html
標籤:Java
