專案中需要使用spark來實作之前通過純java代碼寫的一個ETL功能,這個功能主要是從kafka讀取資料,然后進行ETL,最后保存到elasticsearch。其中保存到elasticsearch的時候,有一些額外的操作,比如要查看由日期拼接的索引是否存在,如果不存在那么要使用指定的模板創建一個索引,最后再往elasticsearch插入資料。
在網上查了一下spark集成elasticsearch的一些文章,基本上是要使用elasticsearch-hadoop這個組件來操作es,這個組件貌似只能對指定索引進行查詢和插入,使用模板創建索引,查詢索引是否存在這一類的操作好像沒有。
所以我的疑問就是在spark中能否使用原生的elasticsearch java client來操作elasticsearch(比如Java High Level REST Client,TransportClient)?如果不能用是什么原因?如果可以用有什么需要注意的地方?
同樣的,還有其他一些第三方的呼叫,比如資料庫的連接訪問、restful api的訪問、是否都可以脫離spark,使用原生的java api來呼叫?
uj5u.com熱心網友回復:
可以,你foreachPartition類的算子,然后在算子內,使用javaClient遍歷資料進行查詢。建議是用mapPartition實作過濾,然后再用elasticsearch-hadoop提供的方法進行寫入。性能會好一點uj5u.com熱心網友回復:
我用的是structured Streaming,foreachPartition貌似不能用,mappartition可以用,structured Streaming 的writeStream的foreach也是按partition來處理的,效果和foreachPartition應該一樣吧?
uj5u.com熱心網友回復:
不能用foreachPartition,那你就要通過單例工廠去創建JavaClient,并做后續的操作了。
轉載請註明出處,本文鏈接:https://www.uj5u.com/qita/45827.html
標籤:Spark
