第1章 簡介
本篇文章采用Flink DataStream API完成一次端到端的完成流計算案例,將資料從Kafka抽取,寫入Elasticsearch中,并且用kibana動態的展示出來,(客戶端=>Web API服務=>Kafka=>Flink=>Elasticsearch=>Kibana),

第2章 案例設計
先定一個簡單的需求(就根據當下的疫情情況來做吧):統計各地區新冠疫情風險等級,
我們假定每個地區確診病例(0-10]例為低風險地區,(10-50]例為中風險地區,大于50例為高風險地區,
概要設計:
- 模擬一個資料源,發送各地區疫情確診病例;
- 通過Flink進行視窗計算,每10秒鐘一個視窗,滑動視窗5秒,統計出視窗內出現的確診病例;
- 將統計出的實時結果寫入Elasticsearch中,并通過Kibana可視化的展示出一個排行榜,
第3章 docker-compose構建環境
3.1 創建docker-compose.yml檔案
這里不詳細展示了,詳見后續上傳的github,
3.2 啟動容器
docker-compose啟動容器,包含(zookeeper,flink,kafka,elasticsearch,kibana)
docker-compose up -d
停止
docker-compose down
第4章 創建Restful API介面專案
4.1建立API介面專案
這里不詳細展示了,詳見后續上傳的github,
4.2打包專案
mvn clean package -DskipTests
4.3創建docker鏡像
api專案我們單獨創建一個容器
docker build -t lotemall-webapi-es .
4.4 運行容器
docker run --link kafka:kafka --net flink-kafka2es_default -e TZ=Asia/Shanghai -d -p 8090:8080 lotemall-webapi-es
flink-kafka2es_default 通過docker network ls查詢,如下圖:

第5章 創建Flink作業
5.1 撰寫Flink作業代碼
Kafka2ESByEnd2End 主函式類:
/**
* 疫情低、中、高風險地區 (假定疫情時刻在變化)
* 1-10 低風險 0
* 11-50 中風險 1
* >=51 高風險 2
* 資料源:kafka {"city_code":"SZ","count":"6","timestamp":"1612847156743"}
* 資料匯:ES {"city_code":"SZ","level":"0"}
*/
public class Kafka2ESByEnd2End {
public static void main(String[] args) throws Exception {
StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
// 并行度設定
env.setParallelism(4);
// 設定Checkpoint 每個60*1000ms一次cp
env.enableCheckpointing(60 * 1000, CheckpointingMode.EXACTLY_ONCE);
// 10分鐘內 重啟三次 每次間隔10秒 超過則job失敗
env.setRestartStrategy(RestartStrategies.failureRateRestart(3, org.apache.flink.api.common.time.Time.of(10,TimeUnit.MINUTES), org.apache.flink.api.common.time.Time.of(10,TimeUnit.SECONDS)));
//設定statebackend 暫用Memory
env.setStateBackend(new MemoryStateBackend(true));
// 設定EventTime為事件時間
env.setStreamTimeCharacteristic(TimeCharacteristic.EventTime);
// source
Properties properties = new Properties();
// 集群配置多個kafka地址properties.setProperty("bootstrap.servers", "kafka120:9092,kafka121:9092");
properties.setProperty("bootstrap.servers", "kafka:9092");
properties.setProperty("group.id", "grouplevel");
DataStream<Covid19Event> dataStream = env
.addSource(new FlinkKafkaConsumer011<Covid19Event>("covid19count-log", new Covid19DesSchema(), properties));
// sink
List<HttpHost> httpHosts = new ArrayList<>();
httpHosts.add(new HttpHost("elasticsearch", 9200, "http"));
// 集群add多個地址
//httpHosts.add(new HttpHost("127.0.0.1", 9200, "http"));
ElasticsearchSink.Builder<Tuple3<String,Integer,Long>> esSinkBuilder = new ElasticsearchSink.Builder<>(
httpHosts,
new Covid19ESSink()
);
// 算子計算
dataStream.assignTimestampsAndWatermarks(new Covid19Watermark())
.map(new Covid19MapFunc())
.keyBy(0)//可以訪問keyedstate
.window(SlidingEventTimeWindows.of(Time.seconds(10), Time.seconds(5)))//10秒鐘的視窗,滑動間隔是5秒 滑動視窗可能觸發兩次計算
.aggregate(new Covid19AggFunc())
.addSink(esSinkBuilder.build());
env.execute("Covid19StaticLevel");
}
}
注意:SlidingEventTimeWindows視窗會觸發多次,因為每條資料可能處于多個視窗中,會被觸發計算多次,
**Covid19Watermark **自定義watermark類:
public class Covid19Watermark implements WatermarkStrategy<Covid19Event>{
@Override
public WatermarkGenerator<Covid19Event> createWatermarkGenerator(WatermarkGeneratorSupplier.Context context) {
return new Covid19WatermarkGenerator();
}
class Covid19WatermarkGenerator implements WatermarkGenerator<Covid19Event> {
private final long delayTime = 3000;// 毫秒
private long currentMaxTimestamp ;
@Override
public void onEvent(Covid19Event covid19Event, long eventTimestamp, WatermarkOutput watermarkOutput) {
currentMaxTimestamp = Math.max(currentMaxTimestamp, covid19Event.getTimestamp());
}
@Override
public void onPeriodicEmit(WatermarkOutput watermarkOutput) {
watermarkOutput.emitWatermark(new Watermark(System.currentTimeMillis() - delayTime));
}
}
}
Covid19MapFunc Map方法類:
public class Covid19MapFunc implements MapFunction<Covid19Event, Tuple3<String, Integer, Long>> {
/**
* Covid19Event -> Tuple3
* @param event
* @return
* @throws Exception
*/
@Override
public Tuple3<String, Integer, Long> map(Covid19Event event) throws Exception {
String cityCode = event.getCityCode();
Integer count = event.getCount();
Long timestamp = event.getTimestamp();
return new Tuple3<>(cityCode, count, timestamp);
}
}
Covid19AggFunc Window方法類:
public class Covid19AggFunc implements AggregateFunction<Tuple3<String, Integer, Long>, Tuple3<String, Integer, Long>, Tuple3<String, Integer, Long>> {
// 城市code -> 確診數量
// private MapState<String,Integer> mapState;
/**
* 初始化列累加器 .創建一個新的累加器,啟動一個新的聚合,負責迭代狀態的初始化
*
* @return
*/
@Override
public Tuple3<String, Integer, Long> createAccumulator() {
return new Tuple3<>("", 0, 0L);
}
/**
* 累加器的累加方法 來一條資料執行一次 對于資料的每條資料,和迭代資料的聚合的具體實作
*
* @param tpInput
* @param tpAcc
* @return 回傳新的累加器
*/
@Override
public Tuple3<String, Integer, Long> add(Tuple3<String, Integer, Long> tpInput, Tuple3<String, Integer, Long> tpAcc) {
if (tpAcc.f0.equals(tpInput.f0)) {
return new Tuple3<>(tpInput.f0, tpInput.f1 + tpAcc.f1, tpInput.f2);
} else {
return tpInput;
}
}
/**
* 回傳值 在視窗內滿足2個,計算結束的時候執行一次,從累加器獲取聚合的結果
*
* @param tpAcc
* @return
*/
@Override
public Tuple3<String, Integer, Long> getResult(Tuple3<String, Integer, Long> tpAcc) {
String city_code = tpAcc.f0;
Integer nowCount = tpAcc.f1;
Integer level;
if (nowCount.compareTo(50) > 0) {
//高風險
level = 2;
} else if (nowCount.compareTo(10) > 0 && nowCount.compareTo(50) <= 0) {
//中風險
level = 1;
} else {
//低風險
level = 0;
}
return new Tuple3<>(tpAcc.f0, level, tpAcc.f2);
}
/**
* 累加器合并 merge方法僅SessionWindow會呼叫
*
* @param stringIntegerTuple2
* @param acc1
* @return
*/
@Override
public Tuple3<String, Integer, Long> merge(Tuple3<String, Integer, Long> stringIntegerTuple2, Tuple3<String, Integer, Long> acc1) {
return null;
//return new Tuple2<>(stringIntegerTuple2.f0, stringIntegerTuple2.f1 + acc1.f1);
}
}
Covid19ESSink Elasticsearch Sink方法類:
public class Covid19ESSink implements ElasticsearchSinkFunction<Tuple3<String,Integer,Long>>, Serializable {
@Override
public void process(Tuple3<String, Integer, Long> element, RuntimeContext runtimeContext, RequestIndexer requestIndexer) {
requestIndexer.add(updateIndexRequest(element));
}
/* insert
public IndexRequest createIndexRequest(Tuple3<String,Integer,Long> element) {
Map<String, Object> json = new HashMap<>();
//json.put("data", String.format("{\"city_code\":\"%s\",\"level\":%d,\"timestamp\":%s}",element.f0,element.f1,element.f2));
json.put("city_code", element.f0);
json.put("level", element.f1);
json.put("timestamp", element.f2);
return Requests.indexRequest()
.index("covid19-index")
.type("covid19-type")
.id(element.f0)
.source(json);
}
*/
// upsert
public UpdateRequest updateIndexRequest(Tuple3<String,Integer,Long> element) {
Map<String, Object> map = new HashMap<>();
//json.put("data", String.format("{\"city_code\":\"%s\",\"level\":%d,\"timestamp\":%s}",element.f0,element.f1,element.f2));
map.put("city_code", element.f0);
map.put("level", element.f1);
map.put("timestamp", element.f2);
UpdateRequest updateRequest=new UpdateRequest();
updateRequest.docAsUpsert(true).retryOnConflict();
return updateRequest
.index("covid19-index")
.type("covid19-type")
.id(element.f0)
.doc(map);
}
}
5.2 打包Flink任務
mvn clean package -DskipTests
5.3 提交Flink作業
通過Flink Dashboard提交job,

第6章 創建Kibana圖表
咱們簡單的創建一個水平的柱狀圖,用來顯示地區風險排行榜,

圖表效果見后面的驗證,
到這里左右準備作業就完成了,下面我們檢查一下運行的容器
第7章 檢查運行中的容器
docker ps

可以看到所有我們需要的容器都已經運行起來:api專案,flink-jobmanager,flink-taskmanager,kafka,elasticsearch,kibana,zookeeper,
下面我們開始驗證,
第8章 發起請求
通過介面測驗工具,模擬應用產生資料,這里筆者直接發送json資料,包含城市code,新確診的數量和時間戳,

第9章 查看Flink作業運行情況



第10章 查看Kibana實時圖表

最終分析結果通過一個簡單的圖表展示(圖表有些粗糙,見諒),資料以高風險=>中風險=>低風險地區降序排列,實時的展示出各地區風險等級,
到這里,我們就通過Flink DataStream API完成了一次完整的端到端的流計算案例,
最后,祝愿大家都健健康康,今年春節順利返鄉,與親人團聚!希望疫情早日消散,2021牛氣沖天!!!
轉載請註明出處,本文鏈接:https://www.uj5u.com/qita/258389.html
標籤:其他
