關于旁路輸出的官方檔案:https://nightlies.apache.org/flink/flink-docs-release-1.14/zh/docs/dev/datastream/side_output/
除了由 DataStream 操作產生的主要流之外,我們還可以產生任意數量的旁路輸出結果流,結果流中的資料型別不必與主要流中的資料型別相匹配,并且不同旁路輸出的型別也可以不同,
使用旁路輸出時,首先需要定義用于標識旁路輸出流的OutputTag類物件,
構造方法的第一個引數表示一個區分旁路輸出流的id標識,第二個引數表示要處理的資料型別,
OutputTag<String> outputTag = new OutputTag<String>("side-output", Types.STRING);
定義旁路輸出標簽后,通過主輸出流的process方法,把資料發送到旁路輸出流中,
SingleOutputStreamOperator<String> process = input
.process(new ProcessFunction<String, Object>() {
@Override
public void processElement(
String value,
Context ctx,
Collector<Object> collector) throws Exception {
// 發送資料到主要的輸出
collector.collect(value);
// 發送資料到旁路輸出
ctx.output(outputTag, "sideout-" + value);
}
});
我們可以在 DataStream 運算結果上使用 getSideOutput(OutputTag) 方法獲取旁路輸出流,這將產生一個與旁路輸出流結果型別一致的 DataStream,
DataStream<String> sideOutputStream = mainDataStream.getSideOutput(outputTag);
以股票為例,在發送股票資料時,我們假設股價小于50就是低價股,否則就是高級股,在發送股票資料時,我們希望把高價股和低價股分別寫出到不同的檔案中保存起來,
首先,創建一個股票類Stock:
public class Stock {
//股票名稱
private String name;
//股票價格
private Integer price;
//構造方法、getter、setter、toString方法在此省略
}
其次,撰寫Flink消費者程式:
public class FlinkKafkaConsumer{
public static void main(String[] args) throws Exception {
//1.創建執行環境
StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
//2.組態檔
Properties props = new Properties();
props.put("bootstrap.servers","Kafka集群地址");
//3.構造消費者
FlinkKafkaConsumer<String> consumer = new FlinkKafkaConsumer<>("stock", new SimpleStringSchema(), props);
//4.配置消費者
DataStreamSource stream = env.addSource(consumer);
//5.data sick
stream.addSick(new SinkFunction<String>(){
@Override
public void invoke(String value, Context context){
Syestem.out.println("當前已處理的資料:" + JsonUtils.deserialize(value, Stock.class));
}
});
//6.執行程式
env.execute("消費者程式");
}
}
接著,撰寫生產者程式:
?public class FlinkProducer {
public static void main(String[] args) throws Exception {
//結果輸出路徑
String outputPath1 = "C:\\Users\\xxx\\Desktop\\result1";
String outputPath2 = "C:\\Users\\xxx\\Desktop\\result2";
//創建資料源
ArrayList<String> stocks = new ArrayList<>();
for (int i = 0; i < 100 ; i++) {
stocks.add(JsonUtils.serialize(new Stock("stock-" + i, (int)(Math.random() * 100))));
}
//創建消費者環境
StreamExecutionEnvironment producerEnvironment = StreamExecutionEnvironment.getExecutionEnvironment();
//組態檔
Properties props = new Properties();
props.put("bootstrap.servers", "10.225.173.107:9092,10.225.173.108:9092,10.225.173.109:9092");
//構建生產者
FlinkKafkaProducer<String> producer_all = new FlinkKafkaProducer<>("STOCK", new SimpleStringSchema(), props);
//配置資料源
DataStreamSource<String> stream_all = producerEnvironment.fromCollection(stocks);
//創建旁路輸出
final OutputTag<String> outputTag_lowPrice = new OutputTag<>("STOCK_LOW_PRICE", Types.STRING);
final OutputTag<String> outputTag_highPrice = new OutputTag<>("STOCK_HIGH_PRICE", Types.STRING);
//配置旁路輸出
SingleOutputStreamOperator<Object> process = stream_all.process(new ProcessFunction<String, Object>() {
@Override
public void processElement(String s, ProcessFunction<String, Object>.Context context, Collector<Object> collector) {
Stock stock = JsonUtils.deserialize(s, Stock.class);
collector.collect(s);
if (stock.getPrice() < 50) {
context.output(outputTag_lowPrice, s);
} else {
context.output(outputTag_highPrice, s);
}
}
});
//獲取低價股票和高價股票的旁路輸出
DataStream<String> stream_lowPrice = process.getSideOutput(outputTag_lowPrice);
DataStream<String> stream_highPrice = process.getSideOutput(outputTag_highPrice);
//配置生產者
stream_all.addSink(producer_all);
//處理低價股票
stream_lowPrice.writeAsText(outputPath1, FileSystem.WriteMode.OVERWRITE);
//處理高價股票
stream_highPrice.writeAsText(outputPath2, FileSystem.WriteMode.OVERWRITE);
//執行flink程式
producerEnvironment.execute("生產者流處理");
}
}
依次啟動消費者程式、生產者程式,觀察消費者程式控制臺中的輸出:

此時,桌面生成了兩個檔案夾,result1記錄了小于50的股票,result2相反:

轉載請註明出處,本文鏈接:https://www.uj5u.com/qita/432172.html
標籤:其他
上一篇:Spark環境搭建(保姆級教程)
