一、背景
Flink在處理流式任務的時候有很大的優勢,其中windows等運算子可以很方便的完成聚合任務,但是Flink是一套獨立的服務,業務流程中如果想使用需要將資料發到kafka,用Flink處理完再發到kafka,然后再做業務處理,流程很繁瑣,
比如在業務代碼中想要實作類似Flink的window按時間批量聚合功能,如果純手動寫代碼比較繁瑣,使用Flink又太重,這種場景下使用回應式編程RxJava、Reactor等的window、buffer運算子可以很方便的實作,
回應式編程框架也早已有了背壓以及豐富的運算子支持,能不能用回應式編程框架處理類似Flink的操作呢,答案是肯定的,
本文使用Reactor來實作Flink的window功能來舉例,其他運算子理論上相同,文中涉及的代碼:github
二、實作程序
Flink對流式處理做的很好的封裝,使用Flink的時候幾乎不用關心執行緒池、積壓、資料丟失等問題,但是使用Reactor實作類似的功能就必須對Reactor運行原理比較了解,并且經過不同場景下測驗,否則很容易出問題,
下面列舉出實作程序中的核心點:
1、創建Flux和發送資料分離
入門Reactor的時候給的示例都是創建Flux的時候同時就把資料賦值了,比如:Flux.just、Flux.range等,從3.4.0版本后先創建Flux,再發送資料可使用Sinks完成,有兩個比較容易混淆的方法:
- Sinks.many().multicast() 如果沒有訂閱者,那么接收的訊息直接丟棄
- Sinks.many().unicast() 如果沒有訂閱者,那么保存接收的訊息直到第一個訂閱者訂閱
- Sinks.many().replay() 不管有多少訂閱者,都保存所有訊息
在此示例場景中,選擇的是Sinks.many().unicast()
官方檔案:https://projectreactor.io/docs/core/release/reference/#processors
2、背壓支持
上面方法的物件背壓策略支持兩種:BackpressureBuffer、BackpressureError,在此場景肯定是選擇BackpressureBuffer,需要指定快取佇列,初始化方法如下:Queues.
資料提交有兩個方法:
- emitNext 指定提交失敗策略同步提交
- tryEmitNext 異步提交,回傳提交成功、失敗狀態
在此場景我們不希望丟資料,可自定義失敗策略,提交失敗無限重試,當然也可以呼叫異步方法自己重試,
Sinks.EmitFailureHandler ALWAYS_RETRY_HANDLER = (signalType, emitResult) -> emitResult.isFailure();
在此之后就就可以呼叫Sinks.asFlux開心的使用各種運算子了,
3、視窗函式
Reactor支持兩類視窗聚合函式:
- window類:回傳Mono(Flux
) - buffer類:回傳List
在此場景中,使用buffer即可滿足需求,bufferTimeout(int maxSize, Duration maxTime)支持最大個數,最大等待時間操作,Flink中的keys操作可以用groupBy、collectMap來實作,
4、消費者處理
Reactor經過buffer后是一個一個的發送資料,如果使用publishOn或subscribeOn處理的話,只等待下游的subscribe處理完成才會重新request新的資料,buffer運算子才會重新發送資料,如果此時subscribe消費者耗時較長,資料流會在buffer流程阻塞,顯然并不是我們想要的,
理想的操作是消費者在一個執行緒池里操作,可多執行緒并行處理,如果執行緒池滿,再阻塞buffer運算子,解決方案是自定義一個執行緒池,并且當然執行緒池如果任務滿submit支持阻塞,可以用自定義RejectedExecutionHandler來實作:
RejectedExecutionHandler executionHandler = (r, executor) -> {
try {
executor.getQueue().put(r);
} catch (InterruptedException e) {
Thread.currentThread().interrupt();
throw new RejectedExecutionException("Producer thread interrupted", e);
}
};
new ThreadPoolExecutor(poolSize, poolSize,
0L, TimeUnit.MILLISECONDS,
new SynchronousQueue<>(),
executionHandler);
三、總結
1、總結一下整體的執行流程
- 提交任務:提交資料支持同步異步兩種方式,支持多執行緒提交,正常情況下回應很快,同步的方法如果佇列滿則阻塞,
- 豐富的運算子處理流式資料,
- buffer運算子產生的資料多執行緒處理:同步提交到單獨的消費者執行緒池,執行緒池任務滿則阻塞,
- 消費者執行緒池:支持阻塞提交,保證不丟訊息,同時佇列長度設定成0,因為前面已經有佇列了,
- 背壓:消費者執行緒池阻塞后,會背壓到buffer運算子,并背壓到緩沖佇列,快取佇列滿背壓到資料提交者,
2、和Flink的對比
實作的Flink的功能:
- 不輸Flink的豐富運算子
- 支持背壓,不丟資料
優勢:輕量級,可直接在業務代碼中使用
劣勢:
- 內部執行流程復雜,容易踩坑,不如Flink傻瓜化
- 沒有watermark功能,也就意味著只支持無序資料處理
- 沒有savepoint功能,雖然我們用背壓解決了部分問題,但是宕機后開始會丟失快取佇列和消費者執行緒池里的資料,補救措施是添加Java Hook功能
- 只支持單機,意味著你的快取佇列不能設定無限大,要考慮執行緒池的大小,且沒有flink globalWindow等功能
- 需考慮對上游資料源的影響,Flink的上游一般是mq,資料量大時可自動堆積,如果本文的方案上游是http、rpc呼叫,產生的阻塞影響就不能忽略,補償方案是每次提交資料都使用異步方法,如果失敗則提交到mq中緩沖并消費該mq無限重試,
四、附錄
本文原始碼地址:https://github.com/sofn/reactor-window-like-flink
Reactor官方檔案:https://projectreactor.io/docs/core/release/reference/
Flink檔案:https://ci.apache.org/projects/flink/flink-docs-stable/
Reactive運算子:http://reactivex.io/documentation/operators.html
本文作者:木小豐,美團Java高級工程師,關注架構、軟體工程、全堆疊等,不定期分享軟體研發程序中的實踐、思考,歡迎關注公共號:Java研發
本文鏈接:https://lesofn.com/archives/shi-yong-reactor-wan-cheng-lei-shi-de-flink-de-cao-zuo

轉載請註明出處,本文鏈接:https://www.uj5u.com/houduan/264372.html
標籤:Java
上一篇:spring事務
下一篇:Calendar類(日歷)
