我正在嘗試在 Elasticsearch 集群中保存(和索引)170GB 檔案(約 9.15 億行和 25 列)。我在 5 個節點的彈性搜索集群上表現糟糕。該任務大約需要 5 小時。Spark 集群有 150 個核心 10x(15 個 CPU,64 個 RAM)。
這是我目前的作業流程:
- 從 S3 的多個 parquet 檔案構建 Spark Dataframe。
- 然后使用Spark中的“ org.elasticsearch.spark.sql ”源將此資料幀保存到 ElasticSearch 索引。(我嘗試了許多分片和復制配置組合而沒有獲得性能)
這是集群節點特征
- 每個節點 5 個節點(16 個 CPU、64 個 RAM、700GB 磁盤)。
- HEAP_SIZE 大約是可用 RAM 的 50%,即每個節點上 32GB。在 /etc/elasticsearch/jvm.options 中配置
這是將資料幀寫入 ElasticSearch 的代碼(用 scala 撰寫)
writeDFToEs(whole_df, "main-index")
writeDFToEs函式:
def writeDFToEs(df: DataFrame, index: String) = {
df.write
.format("org.elasticsearch.spark.sql")
.option("es.nodes", "192.168.1.xxx")
.option("es.http.timeout", 600000)
.option("es.http.max_content_length", "2000mb")
.option("es.port", 9200)
.mode("overwrite")
.save(s"$index")
}
你能幫我找出我做得不好的地方以及如何解決嗎?
提前致謝。
uj5u.com熱心網友回復:
回答我自己的問題。
正如@warkolm 所建議的那樣,我專注于_bulk.
我正在使用es-hadoop連接器,所以我不得不調整es.batch.size.entries引數。
在運行了一堆測驗(測驗各種值)之后,我終于在 ES 索引模板中es.batch.size.entries設定為10000和以下值得到了更好的結果(盡管仍然不是最優的)。
{
"index": {
"number_of_shards": "10",
"number_of_replicas": "0",
"refresh_interval": "60s"
}
}
最后,我的df.write樣子是這樣的:
df.write
.format("org.elasticsearch.spark.sql")
.option("es.nodes", es_nodes)
.option("es.port", es_port)
.option("es.http.timeout", 600000)
.option("es.batch.size.entries", 10000)
.option("es.http.max_content_length", "2000mb")
.mode("overwrite")
.save(s"$writeTo")
現在該程序需要約 3 小時(2 小時 55 分鐘)而不是 5 小時。
我仍在改進配置和代碼。如果我有更好的表現,我會更新。
轉載請註明出處,本文鏈接:https://www.uj5u.com/ruanti/530387.html
標籤:斯卡拉阿帕奇火花弹性搜索
上一篇:如何防止Nest聚合的前綴
