??????
目錄
??????BroadcastState
BroadcastState介紹
需求-實作配置動態更新
編碼步驟
1.env
2.source
3.transformation
4.sink
5.execute
代碼實作
BroadcastState
BroadcastState介紹
在開發程序中,如果遇到需要下發/廣播配置、規則等低吞吐事件流到下游所有 task 時,就可以使用 Broadcast State,Broadcast State 是 Flink 1.5 引入的新特性,
下游的 task 接收這些配置、規則并保存為 BroadcastState, 將這些配置應用到另一個資料流的計算中 ,
- 場景舉例
- 動態更新計算規則: 如事件流需要根據最新的規則進行計算,則可將規則作為廣播狀態廣播到下游Task中,
- 實時增加額外欄位: 如事件流需要實時增加用戶的基礎資訊,則可將用戶的基礎資訊作為廣播狀態廣播到下游Task中,
- API介紹
首先創建一個Keyed 或Non-Keyed 的DataStream,
然后再創建一個BroadcastedStream,
最后通過DataStream來連接(呼叫connect 方法)到Broadcasted Stream 上,
這樣實作將BroadcastState廣播到Data Stream 下游的每個Task中,
1.如果DataStream是Keyed Stream ,則連接到Broadcasted Stream 后, 添加處理ProcessFunction 時需要使用KeyedBroadcastProcessFunction 來實作, 下面是KeyedBroadcastProcessFunction 的API,代碼如下所示:
public abstract class KeyedBroadcastProcessFunction<KS, IN1, IN2, OUT> extends BaseBroadcastProcessFunction {
public abstract void processElement(final IN1 value, final ReadOnlyContext ctx, final Collector<OUT> out) throws Exception;
public abstract void processBroadcastElement(final IN2 value, final Context ctx, final Collector<OUT> out) throws Exception;
}
l KS:表示Flink 程式從最上游的Source Operator 開始構建Stream,當呼叫keyBy 時所依賴的Key 的型別;上面泛型中的各個引數的含義,說明如下:
l IN1:表示非Broadcast 的Data Stream 中的資料記錄的型別;
l IN2:表示Broadcast Stream 中的資料記錄的型別;
l OUT:表示經過KeyedBroadcastProcessFunction 的processElement()和processBroadcastElement()方法處理后輸出結果資料記錄的型別,
2.如果Data Stream 是Non-Keyed Stream,則連接到Broadcasted Stream 后,添加處理ProcessFunction 時需要使用BroadcastProcessFunction 來實作, 下面是BroadcastProcessFunction 的API,代碼如下所示:
public abstract class BroadcastProcessFunction<IN1, IN2, OUT> extends BaseBroadcastProcessFunction {
public abstract void processElement(final IN1 value, final ReadOnlyContext ctx, final Collector<OUT> out) throws Exception;
public abstract void processBroadcastElement(final IN2 value, final Context ctx, final Collector<OUT> out) throws Exception;
}
具體如何使用上面的BroadcastProcessFunction,接下來我們會在通過實際編程,來以使用KeyedBroadcastProcessFunction 為例進行詳細說明,上面泛型中的各個引數的含義,與前面KeyedBroadcastProcessFunction 的泛型型別中的后3 個含義相同,只是沒有呼叫keyBy 操作對原始Stream 進行磁區操作,就不需要KS 泛型引數,
- 注意事項
1) Broadcast State 是Map 型別,即K-V 型別,
2) Broadcast State 只有在廣播的一側, 即在BroadcastProcessFunction 或KeyedBroadcastProcessFunction 的processBroadcastElement 方法中可以修改,在非廣播的一側, 即在BroadcastProcessFunction 或KeyedBroadcastProcessFunction 的processElement 方法中只讀,
3) Broadcast State 中元素的順序,在各Task 中可能不同,基于順序的處理,需要注意,
4) Broadcast State 在Checkpoint 時,每個Task 都會Checkpoint 廣播狀態,
5) Broadcast State 在運行時保存在記憶體中,目前還不能保存在RocksDB State Backend 中,
需求-實作配置動態更新

實時過濾出配置中的用戶,并在事件流中補全這批用戶的基礎資訊,
事件流:表示用戶在某個時刻瀏覽或點擊了某個商品,格式如下,
{"userID": "user_3", "eventTime": "2019-08-17 12:19:47", "eventType": "browse", "productID": 1}
{"userID": "user_2", "eventTime": "2019-08-17 12:19:48", "eventType": "click", "productID": 1}
配置資料: 表示用戶的詳細資訊,在Mysql中,如下,
DROP TABLE IF EXISTS `user_info`;
CREATE TABLE `user_info` (
`userID` varchar(20) CHARACTER SET utf8 COLLATE utf8_general_ci NOT NULL,
`userName` varchar(10) CHARACTER SET utf8 COLLATE utf8_general_ci NULL DEFAULT NULL,
`userAge` int(11) NULL DEFAULT NULL,
PRIMARY KEY (`userID`) USING BTREE
) ENGINE = MyISAM CHARACTER SET = utf8 COLLATE = utf8_general_ci ROW_FORMAT = Dynamic;
-- ----------------------------
-- Records of user_info
-- ----------------------------
INSERT INTO `user_info` VALUES ('user_1', '張三', 10);
INSERT INTO `user_info` VALUES ('user_2', '李四', 20);
INSERT INTO `user_info` VALUES ('user_3', '王五', 30);
INSERT INTO `user_info` VALUES ('user_4', '趙六', 40);
SET FOREIGN_KEY_CHECKS = 1;
輸出結果:
(user_3,2019-08-17 12:19:47,browse,1,王五,33)
(user_2,2019-08-17 12:19:48,click,1,李四,20)
編碼步驟
1.env
2.source
- -1.構建實時資料事件流-自定義隨機
<userID, eventTime, eventType, productID>
- -2.構建配置流-從MySQL
<用戶id,<姓名,年齡>>
3.transformation
- -1.定義狀態描述器
MapStateDescriptor<Void, Map<String, Tuple2<String, Integer>>> descriptor =
new MapStateDescriptor<>("config",Types.VOID, Types.MAP(Types.STRING, Types.TUPLE(Types.STRING, Types.INT)));
- -2.廣播配置流
BroadcastStream<Map<String, Tuple2<String, Integer>>> broadcastDS = configDS.broadcast(descriptor);
- -3.將事件流和廣播流進行連接
BroadcastConnectedStream<Tuple4<String, String, String, Integer>, Map<String, Tuple2<String, Integer>>> connectDS =eventDS.connect(broadcastDS);
- -4.處理連接后的流-根據配置流補全事件流中的用戶的資訊
4.sink
5.execute
代碼實作
package cn.itcast.action;
import org.apache.flink.api.common.state.BroadcastState;
import org.apache.flink.api.common.state.MapStateDescriptor;
import org.apache.flink.api.common.state.ReadOnlyBroadcastState;
import org.apache.flink.api.common.typeinfo.Types;
import org.apache.flink.api.java.tuple.Tuple2;
import org.apache.flink.api.java.tuple.Tuple4;
import org.apache.flink.api.java.tuple.Tuple6;
import org.apache.flink.configuration.Configuration;
import org.apache.flink.streaming.api.datastream.BroadcastConnectedStream;
import org.apache.flink.streaming.api.datastream.BroadcastStream;
import org.apache.flink.streaming.api.datastream.DataStreamSource;
import org.apache.flink.streaming.api.datastream.SingleOutputStreamOperator;
import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
import org.apache.flink.streaming.api.functions.co.BroadcastProcessFunction;
import org.apache.flink.streaming.api.functions.source.RichSourceFunction;
import org.apache.flink.streaming.api.functions.source.SourceFunction;
import org.apache.flink.util.Collector;
import java.sql.Connection;
import java.sql.DriverManager;
import java.sql.PreparedStatement;
import java.sql.ResultSet;
import java.text.SimpleDateFormat;
import java.util.Date;
import java.util.HashMap;
import java.util.Map;
import java.util.Random;
/**
* Author itcast
* Desc
* 需求:
* 使用Flink的BroadcastState來完成
* 事件流和配置流(需要廣播為State)的關聯,并實作配置的動態更新!
*/
public class BroadcastStateConfigUpdate {
public static void main(String[] args) throws Exception{
//1.env
StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
//2.source
//-1.構建實時的自定義隨機資料事件流-資料源源不斷產生,量會很大
//<userID, eventTime, eventType, productID>
DataStreamSource<Tuple4<String, String, String, Integer>> eventDS = env.addSource(new MySource());
//-2.構建配置流-從MySQL定期查詢最新的,資料量較小
//<用戶id,<姓名,年齡>>
DataStreamSource<Map<String, Tuple2<String, Integer>>> configDS = env.addSource(new MySQLSource());
//3.transformation
//-1.定義狀態描述器-準備將配置流作為狀態廣播
MapStateDescriptor<Void, Map<String, Tuple2<String, Integer>>> descriptor =
new MapStateDescriptor<>("config", Types.VOID, Types.MAP(Types.STRING, Types.TUPLE(Types.STRING, Types.INT)));
//-2.將配置流根據狀態描述器廣播出去,變成廣播狀態流
BroadcastStream<Map<String, Tuple2<String, Integer>>> broadcastDS = configDS.broadcast(descriptor);
//-3.將事件流和廣播流進行連接
BroadcastConnectedStream<Tuple4<String, String, String, Integer>, Map<String, Tuple2<String, Integer>>> connectDS =eventDS.connect(broadcastDS);
//-4.處理連接后的流-根據配置流補全事件流中的用戶的資訊
SingleOutputStreamOperator<Tuple6<String, String, String, Integer, String, Integer>> result = connectDS
//BroadcastProcessFunction<IN1, IN2, OUT>
.process(new BroadcastProcessFunction<
//<userID, eventTime, eventType, productID> //事件流
Tuple4<String, String, String, Integer>,
//<用戶id,<姓名,年齡>> //廣播流
Map<String, Tuple2<String, Integer>>,
//<用戶id,eventTime,eventType,productID,姓名,年齡> //需要收集的資料
Tuple6<String, String, String, Integer, String, Integer>>() {
//處理事件流中的元素
@Override
public void processElement(Tuple4<String, String, String, Integer> value, ReadOnlyContext ctx, Collector<Tuple6<String, String, String, Integer, String, Integer>> out) throws Exception {
//取出事件流中的userId
String userId = value.f0;
//根據狀態描述器獲取廣播狀態
ReadOnlyBroadcastState<Void, Map<String, Tuple2<String, Integer>>> broadcastState = ctx.getBroadcastState(descriptor);
if (broadcastState != null) {
//取出廣播狀態中的map<用戶id,<姓名,年齡>>
Map<String, Tuple2<String, Integer>> map = broadcastState.get(null);
if (map != null) {
//通過userId取map中的<姓名,年齡>
Tuple2<String, Integer> tuple2 = map.get(userId);
//取出tuple2中的姓名和年齡
String userName = tuple2.f0;
Integer userAge = tuple2.f1;
out.collect(Tuple6.of(userId, value.f1, value.f2, value.f3, userName, userAge));
}
}
}
//處理廣播流中的元素
@Override
public void processBroadcastElement(Map<String, Tuple2<String, Integer>> value, Context ctx, Collector<Tuple6<String, String, String, Integer, String, Integer>> out) throws Exception {
//value就是MySQLSource中每隔一段時間獲取到的最新的map資料
//先根據狀態描述器獲取歷史的廣播狀態
BroadcastState<Void, Map<String, Tuple2<String, Integer>>> broadcastState = ctx.getBroadcastState(descriptor);
//再清空歷史狀態資料
broadcastState.clear();
//最后將最新的廣播流資料放到state中(更新狀態資料)
broadcastState.put(null, value);
}
});
//4.sink
result.print();
//5.execute
env.execute();
}
/**
* <userID, eventTime, eventType, productID>
*/
public static class MySource implements SourceFunction<Tuple4<String, String, String, Integer>>{
private boolean isRunning = true;
@Override
public void run(SourceContext<Tuple4<String, String, String, Integer>> ctx) throws Exception {
Random random = new Random();
SimpleDateFormat df = new SimpleDateFormat("yyyy-MM-dd HH:mm:ss");
while (isRunning){
int id = random.nextInt(4) + 1;
String user_id = "user_" + id;
String eventTime = df.format(new Date());
String eventType = "type_" + random.nextInt(3);
int productId = random.nextInt(4);
ctx.collect(Tuple4.of(user_id,eventTime,eventType,productId));
Thread.sleep(500);
}
}
@Override
public void cancel() {
isRunning = false;
}
}
/**
* <用戶id,<姓名,年齡>>
*/
public static class MySQLSource extends RichSourceFunction<Map<String, Tuple2<String, Integer>>> {
private boolean flag = true;
private Connection conn = null;
private PreparedStatement ps = null;
private ResultSet rs = null;
@Override
public void open(Configuration parameters) throws Exception {
conn = DriverManager.getConnection("jdbc:mysql://localhost:3306/bigdata", "root", "root");
String sql = "select `userID`, `userName`, `userAge` from `user_info`";
ps = conn.prepareStatement(sql);
}
@Override
public void run(SourceContext<Map<String, Tuple2<String, Integer>>> ctx) throws Exception {
while (flag){
Map<String, Tuple2<String, Integer>> map = new HashMap<>();
ResultSet rs = ps.executeQuery();
while (rs.next()){
String userID = rs.getString("userID");
String userName = rs.getString("userName");
int userAge = rs.getInt("userAge");
//Map<String, Tuple2<String, Integer>>
map.put(userID,Tuple2.of(userName,userAge));
}
ctx.collect(map);
Thread.sleep(5000);//每隔5s更新一下用戶的配置資訊!
}
}
@Override
public void cancel() {
flag = false;
}
@Override
public void close() throws Exception {
if (conn != null) conn.close();
if (ps != null) ps.close();
if (rs != null) rs.close();
}
}
}
轉載請註明出處,本文鏈接:https://www.uj5u.com/qita/282896.html
標籤:其他
