我正在嘗試執行 Flink (1.12.1) 批處理作業,步驟如下:
- 自定義 SourceFunction 與 MongoDB 連接
- 做任何平面圖和地圖來轉換一些資料
- 將其沉入其他 MongoDB
我正在嘗試在 StreamExecutionEnvironment 中使用 RuntimeExexutionMode.BATCH 運行它,但應用程式拋出例外,因為將我的源檢測為未系結...而且我無法將其設定為有界(它必須在收集 mongo 中的所有檔案后完成收藏 )
例外:
exception in thread "main" java.lang.IllegalStateException: Detected an UNBOUNDED source with the 'execution.runtime-mode' set to 'BATCH'. This combination is not allowed, please set the 'execution.runtime-mode' to STREAMING or AUTOMATIC
at org.apache.flink.util.Preconditions.checkState(Preconditions.java:193)
at org.apache.flink.streaming.api.graph.StreamGraphGenerator.shouldExecuteInBatchMode(StreamGraphGenerator.java:335)
at org.apache.flink.streaming.api.graph.StreamGraphGenerator.generate(StreamGraphGenerator.java:258)
at org.apache.flink.streaming.api.environment.StreamExecutionEnvironment.getStreamGraph(StreamExecutionEnvironment.java:1958)
at org.apache.flink.streaming.api.environment.StreamExecutionEnvironment.getStreamGraph(StreamExecutionEnvironment.java:1943)
at org.apache.flink.streaming.api.environment.StreamExecutionEnvironment.execute(StreamExecutionEnvironment.java:1782)
at com.grupotsk.bigdata.matadatapmexporter.MetadataPMExporter.main(MetadataPMExporter.java:33)
一些代碼:
執行環境
public static StreamExecutionEnvironment getBatch() {
final StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
env.setRuntimeMode(RuntimeExecutionMode.BATCH);
env.addSource(new MongoSource()).print();
return env;
}
蒙戈來源:
public class MongoSource extends RichSourceFunction<Document> {
private static final long serialVersionUID = 8321722349907219802L;
private MongoClient mongoClient;
private MongoCollection mc;
@Override
public void open(Configuration con) {
mongoClient = new MongoClient(
new MongoClientURI("mongodb://localhost:27017/database"));
mc=mongoClient.getDatabase("database").getCollection("collection");
}
@Override
public void run(SourceContext<Document> ctx) throws Exception {
MongoCursor<Document> itr=mc.find(Document.class).cursor();
while(itr.hasNext())
ctx.collect(itr.next());
this.cancel();
}
@Override
public void cancel() {
mongoClient.close();
}
謝謝 !
uj5u.com熱心網友回復:
使用 with 的源RuntimeExecutionMode.BATCH必須實作Source而不是SourceFunction. 并且接收器應該實作Sink而不是SinkFunction.
有關這些新介面的介紹,請參閱將 Flink 集成到您的生態系統 - 如何從頭開始構建 Flink 連接器。它們在FLIP-27: Refactor Source Interface和FLIP-143: Unified Sink API 中進行了描述。
轉載請註明出處,本文鏈接:https://www.uj5u.com/shujuku/372589.html
上一篇:從Selenium中的下拉串列中選擇——元素不可見(Python)
下一篇:在回圈中重新定義Bash變數
