有什么方式能讓java后臺程式直接呼叫spark,并且直接收到spark計算后的回傳值?
目前我這邊能做的只是用java傳給kafka,kafka再給spark,spark計算完后再回傳給kafka,java程式再接收
我這樣做比較繞
有沒有什么方法能直接呼叫并得到回傳值的?
uj5u.com熱心網友回復:
其實你目前這種方案是比較好的。如果你是web同步與spark互動,會阻塞servlet執行緒,吞吐量有限。除非你的集群非常碉堡,能在很短時間完成回應。另外結果建議是寫到Redis上,前臺查詢后臺,會得到一個請求id,這個請求id就是redis的key。然后拿著這個id輪詢結果獲取介面。我們這邊做Spark即席查詢,都是SparkContext和Web Service分離,通過Zookeeper實作HA和負載均衡。Web Service接收SQL查詢請求,對SQL進行優化后,查詢ZK獲取較空閑的Driver,然后將SQL發送給Driver執行,結果是寫在HDFS上,然后把請求id作為key,HDFS路徑作為value,寫到Redis上。
uj5u.com熱心網友回復:
就看你怎么看待spark ,如果把spark當作服務,那可以使用spark的 rest的客戶端,提交job。如果你要把spark當作依賴,代碼中集成。不管是rest 還是集成,你都可以撰寫你的drvier,回傳你需要的值。uj5u.com熱心網友回復:
大俠能不能詳細講講,謝謝。有案例可供參考嗎?
uj5u.com熱心網友回復:
大概是:
1、撰寫一個Spark Driver,我們稱之為SparkJobServer,部署模式是Yarn client,打開DynamicAllocation特性,即會根據實際需要動態申請Executor。然后啟動的時候,在ZK的某個路徑下,創建一個PERSISTENT_SEQUENTIAL型別節點(參見:http://blog.csdn.net/heyutao007/article/details/38741207),得到自己的一個ID,然后在新建的這個節點下,設定自身的ip,Socket埠,以及負載值(當前有多少個SQL在跑)等資訊。啟動一個Socket Server,監聽上面說的埠,接收SQL請求,優化SQL,然后通過SQLContext.sql執行,負載值+1,執行完成后-1,結果放在請求報文定義的hdfs目錄上,并在redis設定對應的key value。另外有個細節就是利用ShutdownHook,在Driver意外退出的時候,洗掉ZK上自身資訊節點,實作自動下線。這個SparkJobServer會在不同機器啟動多個,以實作HA和負載均衡。
2、Web后端接收前端傳來的SQL,通過UUID生成一個請求Key,在ZK指定路徑下,找到目前可用且負載值最小的SparkJobServer,向它發送指定格式的TCP/IP報文。然后把key回傳給前端。
3、前端拿著這個key去輪詢另一個rest介面,這個rest介面直接去查詢redis,如果拿到值,就是SparkJobServer跑出來放在HDFS上的結果檔案路徑,直接讀取這個檔案,回傳給前端。如果拿不到值,前端定時輪詢直到取到值。
4、另外會有一個后臺服務去監控SparkJobServer的狀況。可以手工啟動或下線SparkJobServer。
我們這個方案是經過資料日增量10T,日即席查詢數1w+考驗的架構。可以根據自身需求修改。
uj5u.com熱心網友回復:
大神,1、那個SparkJobServer不是一個離線的程式嗎,如何保持多個SparkJobServer是啟動狀態?
2、如何在zk中保持每個SparkJobServer對應的元資料值(自身的ip,Socket埠,以及負載值)?
uj5u.com熱心網友回復:
大神,
1、那個SparkJobServer不是一個離線的程式嗎,如何保持多個SparkJobServer是啟動狀態?
2、如何在zk中保持每個SparkJobServer對應的元資料值(自身的ip,Socket埠,以及負載值)?
1、Driver不死,SparkJobServer就一直存在。SparkContext是執行緒安全的。每個Driver都會開啟若干個執行緒去處理前端傳過來的SQL查詢請求。
2、用zk 的API啊!ZK就是為服務注冊發現而生的
uj5u.com熱心網友回復:
你好,用java傳給kafka,kafka再給spark,spark計算完后再回傳給kafka,java程式再接收,這種型別的例子能不能分享給我,讓我學習下啊。謝謝啊。uj5u.com熱心網友回復:
樓主你好,用java傳給kafka,kafka再給spark,spark計算完后再回傳給kafka,java程式再接收,這種型別的例子能不能分享給我,讓我學習下啊。謝謝啊。uj5u.com熱心網友回復:
Spark Thrift Server,可以通過JDBC連接uj5u.com熱心網友回復:
其實你目前這種方案是比較好的。如果你是web同步與spark互動,會阻塞servlet執行緒,吞吐量有限。除非你的集群非常碉堡,能在很短時間完成回應。另外結果建議是寫到Redis上,前臺查詢后臺,會得到一個請求id,這個請求id就是redis的key。然后拿著這個id輪詢結果獲取介面。
我們這邊做Spark即席查詢,都是SparkContext和Web Service分離,通過Zookeeper實作HA和負載均衡。Web Service接收SQL查詢請求,對SQL進行優化后,查詢ZK獲取較空閑的Driver,然后將SQL發送給Driver執行,結果是寫在HDFS上,然后把請求id作為key,HDFS路徑作為value,寫到Redis上。
大俠能不能詳細講講,謝謝。有案例可供參考嗎?
大概是:
1、撰寫一個Spark Driver,我們稱之為SparkJobServer,部署模式是Yarn client,打開DynamicAllocation特性,即會根據實際需要動態申請Executor。然后啟動的時候,在ZK的某個路徑下,創建一個PERSISTENT_SEQUENTIAL型別節點(參見:http://blog.csdn.net/heyutao007/article/details/38741207),得到自己的一個ID,然后在新建的這個節點下,設定自身的ip,Socket埠,以及負載值(當前有多少個SQL在跑)等資訊。啟動一個Socket Server,監聽上面說的埠,接收SQL請求,優化SQL,然后通過SQLContext.sql執行,負載值+1,執行完成后-1,結果放在請求報文定義的hdfs目錄上,并在redis設定對應的key value。另外有個細節就是利用ShutdownHook,在Driver意外退出的時候,洗掉ZK上自身資訊節點,實作自動下線。這個SparkJobServer會在不同機器啟動多個,以實作HA和負載均衡。
2、Web后端接收前端傳來的SQL,通過UUID生成一個請求Key,在ZK指定路徑下,找到目前可用且負載值最小的SparkJobServer,向它發送指定格式的TCP/IP報文。然后把key回傳給前端。
3、前端拿著這個key去輪詢另一個rest介面,這個rest介面直接去查詢redis,如果拿到值,就是SparkJobServer跑出來放在HDFS上的結果檔案路徑,直接讀取這個檔案,回傳給前端。如果拿不到值,前端定時輪詢直到取到值。
4、另外會有一個后臺服務去監控SparkJobServer的狀況。可以手工啟動或下線SparkJobServer。
我們這個方案是經過資料日增量10T,日即席查詢數1w+考驗的架構。可以根據自身需求修改。
按這個思路,sparkjobserver完全可以基于dubbo模式開發。不過現在對于spark呼叫有oozie等框架可以使用,但是oozie太慢
轉載請註明出處,本文鏈接:https://www.uj5u.com/qita/20586.html
標籤:Spark
下一篇:k8s加入節點沒反應的問題
