需求:
事件流(kafka中):userID,eventTime,eventType,productID
廣播流(mysql中):userID,userName,userAge
1.根據廣播流中的用戶資料將事件流中的資料補全:userID,eventTime,eventType,productID,userName,userAge
2.修改廣播流中的資料,新合并后的結果資料實時更新(事件流可以捕捉到廣播流資料的變化)
實作方法:
1.flink消費kafka資料,用mapfunction處理資料時直接查詢mysql中的資料進行補全,性能差,因為每次新到一條資料都要去mysql現查;
2.將廣播流的資料放入redis中,用mapfunction處理資料時從redis中查詢資料進行補全,性能還湊合,每次新到一條資料都要去redis中查;
3.在flink中實作雙流join,但是如果對mysql中的資料進行更新了,該流(mysql所在的流)需要及時更新資料,效率差;
4.采用雙流connect+broadCastState(廣播流state),廣播流會實時從mysql中讀取最新資料,放入broadCastState中,事件流從broadCastState獲取廣播流中的資料,效率高,
核心代碼
StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
env.setParallelism(1);
Properties properties=new Properties();
properties.setProperty(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG,"zcx4:9092,zcx5:9092,zcx6:9092");
FlinkKafkaConsumer kafka = new FlinkKafkaConsumer("zcx1", new SimpleStringSchema(), properties);
DataStreamSource kafkadataStreamSource = env.addSource(kafka);
SingleOutputStreamOperator kafkaDs = kafkadataStreamSource.process(new ProcessFunction<String, Operation>() {
@Override
public void processElement(String s, ProcessFunction<String, Operation>.Context context, Collector<Operation> collector) throws Exception {
JSONObject jsonObject = JSON.parseObject(s);
String userID = jsonObject.getString("userID");
String eventTime = jsonObject.getString("eventTime");
String eventType = jsonObject.getString("eventType");
int productID = jsonObject.getIntValue("productID");
collector.collect(new Operation(userID, eventTime, eventType, productID));
}
});
DataStreamSource<Map<String,UserInfo>> mysqlDs = env.addSource(new RichSourceFunction<Map<String,UserInfo>>() {
boolean isRunning = true;
Connection connection = null;
PreparedStatement preparedStatement = null;
@Override
public void open(Configuration parameters) throws Exception {
Class.forName("com.mysql.jdbc.Driver");
connection = DriverManager.getConnection("jdbc:mysql://localhost:3306/test", "root", "root");
preparedStatement = connection.prepareStatement("select * from user_info");
}
@Override
public void close() throws Exception {
if (null != preparedStatement) {
preparedStatement.close();
}
if (null != connection) {
connection.close();
}
}
@Override
public void run(SourceContext<Map<String,UserInfo>> sourceContext) throws Exception {
while (isRunning) {
ResultSet resultSet = preparedStatement.executeQuery();
while (resultSet.next()) {
HashMap<String,UserInfo> hashMap=new HashMap<>();
hashMap.put(resultSet.getString("userID"),
new UserInfo(resultSet.getString("userName"),resultSet.getInt("userAge")));
sourceContext.collect(hashMap);
}
Thread.sleep(10000);
}
}
@Override
public void cancel() {
isRunning = false;
}
});
// 注意:broadcaststate是MapStateDescriptor,map型別,所以需要將mysqlsource的資料處理成map型別
BroadcastStream<Map<String, UserInfo>> broadcastStream = mysqlDs.broadcast(new MapStateDescriptor<String, UserInfo>("broadcastState", String.class, UserInfo.class));
//雙流connect
BroadcastConnectedStream connect = kafkaDs.connect(broadcastStream);
connect.process(new BroadcastProcessFunction<Operation,Map<String,UserInfo>,UserOperation>() {
MapStateDescriptor broadCastStateDescriptor= new MapStateDescriptor<String, UserInfo>("broadcastState", String.class, UserInfo.class);
@Override
//從broadcastState獲取廣播流中的資料
public void processElement(Operation operation, BroadcastProcessFunction<Operation, Map<String, UserInfo>, UserOperation>.ReadOnlyContext readOnlyContext, Collector<UserOperation> collector) throws Exception {
ReadOnlyBroadcastState<String,UserInfo> broadcastState = readOnlyContext.getBroadcastState(broadCastStateDescriptor);
if(broadcastState.contains(operation.userID)){
UserInfo userInfo = broadcastState.get(operation.userID);
collector.collect(new UserOperation(operation.userID,operation.eventTime,operation.eventType,operation.productID,userInfo.userName,userInfo.userAge));
}
}
@Override
//將廣播流中資料放入broadcastState
public void processBroadcastElement(Map<String, UserInfo> stringUserInfoMap, BroadcastProcessFunction<Operation, Map<String, UserInfo>, UserOperation>.Context context, Collector<UserOperation> collector) throws Exception {
BroadcastState<String, UserInfo> broadcastState = context.getBroadcastState(broadCastStateDescriptor);
Iterator<String> iterator = stringUserInfoMap.keySet().iterator();
if(iterator.hasNext()){
String next = iterator.next();
broadcastState.put(next,stringUserInfoMap.get(next));
}
}
}).print();
env.execute();
測驗
1.啟動flink程式后,往kafka中寫入資料:
{"userID": "user_3", "eventTime": "2022-02-01 12:19:47", "eventType": "browse", "productID": 1}
可以從console看到:
UserOperation{userID='user_3', eventTime='2022-02-01 12:19:47', eventType='browse', productID=1, userName='user_name3', userAge=30}
雙流合并成功,
2.修改mysql中的資料,將user_3的userAge=100,再次向kafka中寫入上述資料,可以看到:
UserOperation{userID='user_3', eventTime='2022-02-01 12:19:47', eventType='browse', productID=1, userName='user_name3', userAge=100}
合并后的資料中 涉及到原來廣播流的資料是最新的,
轉載請註明出處,本文鏈接:https://www.uj5u.com/qita/423610.html
標籤:其他
上一篇:1.k-近鄰演算法(KNN)
下一篇:跳槽? 我只想多賺點罷了
