我有一個storm topology,spout S0發送tuple到bolt B1,B1對接收的tuple進行實際的計算處理。bolt處理的速度跟不上spout發送資料的速度,解決方法之一當然是增加bolt,但當前的主要需求不是為了讓bolt跟上spout的發送速度,而是能夠自定義從spout接收來的tuple的處理順序。
方法之一,我在中間增加了一個bolt B0,用作緩沖區:
[spout S0] --tuples--> [buffer bolt B0] --tuples--> [processing bolt B1] -->...
緩沖bolt B0存在的唯一理由,是為了能夠在接收到spout發來的tuple后,用我們自定義的規則,插入到一個ArrayList中,在后續的bolt B1處理完畢后,從ArrayList表頭emit發送一個tuple給后續的bolt B1處理,從而達到自定義處理順序的目的。(或者也可以收到tuple后加入隊尾,但emit時根據自定的策略選擇一個tuple去發送)。
這個思路的問題在于:bolt只能夠重寫execute函式,它是在收到一個tuple后被執行的,此時我的確可以用自定義的策略去插入到arraylist,但bolt沒法使用類似于spout的nextTuple函式——我希望的是bolt一邊接收、一邊發送,而不是每次接收時去選擇發送或不發送。
另外一個思路,"Understanding the Internal Message Buffers of Storm"這篇文章提供了很多有用的資訊,如果我能夠修改storm中worker receive thread的buffer部分的代碼,也許也能夠實作自定義buffer收/發tuple的策略。但是我找不到storm中負責worker接收執行緒里buffer的類是哪個。
請教各位storm大神如何實作一個能夠同時接收和發送tuple、并能夠自定義收發tuple策略的bolt,或者:負責apache storm中worker receive thread里buffer的類是哪個,亦或者有其他的解決途徑?
uj5u.com熱心網友回復:
還是沒人回復,我先自己頂一下。uj5u.com熱心網友回復:
還是沒人回復,我先自己頂一下。我現在嘗試第二種解決途徑,即通過修改storm的原始碼,重新編譯storm,來實作該功能。現在已經找到worker receive thread的buffer可能與類:DisruptorQueue相關,而且storm內部使用了叫做lLMAX的庫。
轉載請註明出處,本文鏈接:https://www.uj5u.com/qita/92952.html
標籤:Spark
上一篇:hive運行job的時候報錯
