我正在使用pyspark和elasticsearch(py庫)作業,在更新ES中的一個檔案時,我得到了以下錯誤。
202109-08 06:31: 49 ERROR JobScheduler:91 - Error running job streaming job 1631082700000 ms. 1
org.apache.spark.SparkException。Python引發了一個例外。
回溯(最近一次呼叫)。
檔案"/usr/bin/spark-2.4.0-bin-hadoop2.7/python/lib/pyspark.zip/pyspark/streaming/util.py",行68,in呼叫
r = self.func(t, *rdds)
檔案 "/usr/bin/spark-2.4.0-bin-hadoop2.7/python/lib/pyspark.zip/pyspark/streaming/dstream. py",第161,in <lambda>。
func = lambda t, rdd: old_func(rdd)
檔案 "/usr/bin/spark-2.4.0-bin-hadoop2.7/spark_scripts/main.py", line 124, in RDDfromKafkaStream
posttoES(row)
檔案 "/usr/bin/spark-2.4.0-bin-hadoop2.7/spark_scripts/main.py", line 100, in posttoES
es.update(index="anonprofile", id = jsonid, body=query)
檔案 "/usr/local/lib/python3.7/dist-packages/elasticsearch/client/utils.py", 行 168, in _wrapped
return func(*args, params=params, headers=headers, **kwargs)
檔案 "/usr/local/lib/python3.7/dist-packages/elasticsearch/client/__init__.py", 行 1903, 在更新
"POST", path, params=引數, headers=頭檔案, body=正文
檔案 "/usr/local/lib/python3.7/dist-packages/elasticsearch/transport.py", 行 458, 在 perform_request
raise e
檔案 "/usr/local/lib/python3.7/dist-packages/elasticsearch/transport.py", line 426, in perform_request
timeout=timeout。
檔案 "/usr/local/lib/python3.7/dist-packages/elasticsearch/connection/http_urllib3.py", 行 277, in performance_request
self._raise_error(response.status, raw_data)
檔案 "/usr/local/lib/python3.7/dist-packages/elasticsearch/connection/base.py", 行 331, in _raise_error
status_code, error_message, additional_info
elasticsearch.exceptions.RequestError。RequestError(400, 'illegal_argument_exception', '[f756ea2593ee][172.18.0.4:9300][indices:data/write/update[s]')
at org.apache.spark.streaming.api.python.TransformFunction.callPythonTransformFunction(PythonDStream.scala:95)
at org.apache.spark.streaming.api.python.TransformFunction.apply(PythonDStream.scala:78)
at org.apache.spark.streaming.api.python.PythonDStream$$anonfun$callForeachRDD$1.apply(PythonDStream.scala:179)
at org.apache.spark.streaming.api.python.PythonDStream$$anonfun$callForeachRDD$1.apply(PythonDStream.scala:179)
at org.apache.spark.streaming.dstream.ForEachDStream$$anonfun$1$$anonfun$apply$mcV$sp$1.apply$mcV$sp(ForEachDStream.scala:51)。
at org.apache.spark.streaming.dstream.ForEachDStream$$anonfun$1$$anonfun$apply$mcV$sp$1.apply(ForEachDStream.scala:51)。
at org.apache.spark.streaming.dstream.ForEachDStream$$anonfun$1$$anonfun$apply$mcV$sp$1.apply(ForEachDStream.scala:51)。
at org.apache.spark.streaming.dstream.DStream.createRDDWithLocalProperties(DStream.scala:416)
at org.apache.spark.streaming.dstream.ForEachDStream$$anonfun$1.apply$mcV$sp(ForEachDStream.scala:50)。
at org.apache.spark.streaming.dstream.ForEachDStream$$anonfun$1.apply(ForEachDStream.scala:50)
at org.apache.spark.streaming.dstream.ForEachDStream$$anonfun$1.apply(ForEachDStream.scala:50)
at scala.util.Try$.apply(Try.scala:192)
at org.apache.spark.streaming.scheduler.Job.run(Job.scala:39)
at org.apache.spark.streaming.scheduler.JobScheduler$JobHandler$$anonfun$run$1.apply$mcV$sp(JobScheduler.scala:257)
at org.apache.spark.streaming.scaler.JobScheduler$JobHandler$$anonfun$run$1.apply(JobScheduler.scala:257)
at org.apache.spark.streaming.scaler.JobScheduler$JobHandler$$anonfun$run$1.apply(JobScheduler.scala:257)
at scala.util.DynamicVariable.withValue(DynamicVariable.scala:58)
at org.apache.spark.streaming.scheduler.JobScheduler$JobHandler.run(JobScheduler.scala:256)
at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1149)
at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:624)
at java.lang.Thread.run(Thread.java:748)
回溯(最近一次呼叫)。
檔案"/usr/bin/spark-2.4.0-bin-hadoop2.7/spark_scripts/main.py",行172,in <module>
ssc.awaitTermination()
檔案 "/usr/bin/spark-2.4.0-bin-hadoop2.7/python/lib/pyspark.zip/pyspark/streaming/context.py", 行 192, in awaitTermination
檔案 "/usr/bin/spark-2.4.0-bin-hadoop2.7/python/lib/py4j-0.10.7-src.zip/py4j/java_gateway.py", line 1257, in __call__
檔案 "/usr/bin/spark-2.4.0-bin-hadoop2.7/python/lib/pyspark.zip/pyspark/sql/utils.py", 行 63, in deco
檔案 "/usr/bin/spark-2.4.0-bin-hadoop2.7/python/lib/py4j-0.10.7-src.zip/py4j/protocol.py", 行 328, inget_return_value
py4j.protocol.Py4JJavaError。同時呼叫o31.awaitTermination時發生錯誤。
: org.apache.spark.SparkException。Python引發了一個例外。
回溯(最近一次呼叫)。
檔案 "/usr/bin/spark-2.4.0-bin-hadoop2.7/python/lib/pyspark.zip/pyspark/streaming/util.py", 行 68, in call
r = self.func(t, *rdds)
檔案 "/usr/bin/spark-2.4.0-bin-hadoop2.7/python/lib/pyspark.zip/pyspark/streaming/dstream. py",第161,in <lambda>。
func = lambda t, rdd: old_func(rdd)
檔案 "/usr/bin/spark-2.4.0-bin-hadoop2.7/spark_scripts/main.py", line 124, in RDDfromKafkaStream
posttoES(row)
檔案 "/usr/bin/spark-2.4.0-bin-hadoop2.7/spark_scripts/main.py", line 100, in posttoES
es.update(index="anonprofile", id = jsonid, body=query)
檔案 "/usr/local/lib/python3.7/dist-packages/elasticsearch/client/utils.py", 行 168, in _wrapped
return func(*args, params=params, headers=headers, **kwargs)
檔案 "/usr/local/lib/python3.7/dist-packages/elasticsearch/client/__init__.py", 行 1903, 在更新
"POST", path, params=引數, headers=頭檔案, body=正文
檔案 "/usr/local/lib/python3.7/dist-packages/elasticsearch/transport.py", 行 458, 在 perform_request
raise e
檔案 "/usr/local/lib/python3.7/dist-packages/elasticsearch/transport.py", line 426, in perform_request
timeout=timeout。
檔案 "/usr/local/lib/python3.7/dist-packages/elasticsearch/connection/http_urllib3.py", 行 277, in performance_request
self._raise_error(response.status, raw_data)
檔案 "/usr/local/lib/python3.7/dist-packages/elasticsearch/connection/base.py", 行 331, in _raise_error
status_code, error_message, additional_info
elasticsearch.exceptions.RequestError。RequestError(400, 'illegal_argument_exception', '[f756ea2593ee][172.18.0.4:9300][indices:data/write/update[s]')
at org.apache.spark.streaming.api.python.TransformFunction.callPythonTransformFunction(PythonDStream.scala:95)
at org.apache.spark.streaming.api.python.TransformFunction.apply(PythonDStream.scala:78)
at org.apache.spark.streaming.api.python.PythonDStream$$anonfun$callForeachRDD$1.apply(PythonDStream.scala:179)
at org.apache.spark.streaming.api.python.PythonDStream$$anonfun$callForeachRDD$1.apply(PythonDStream.scala:179)
at org.apache.spark.streaming.dstream.ForEachDStream$$anonfun$1$$anonfun$apply$mcV$sp$1.apply$mcV$sp(ForEachDStream.scala:51)。
at org.apache.spark.streaming.dstream.ForEachDStream$$anonfun$1$$anonfun$apply$mcV$sp$1.apply(ForEachDStream.scala:51)。
at org.apache.spark.streaming.dstream.ForEachDStream$$anonfun$1$$anonfun$apply$mcV$sp$1.apply(ForEachDStream.scala:51)。
at org.apache.spark.streaming.dstream.DStream.createRDDWithLocalProperties(DStream.scala:416)
at org.apache.spark.streaming.dstream.ForEachDStream$$anonfun$1.apply$mcV$sp(ForEachDStream.scala:50)。
at org.apache.spark.streaming.dstream.ForEachDStream$$anonfun$1.apply(ForEachDStream.scala:50)
at org.apache.spark.streaming.dstream.ForEachDStream$$anonfun$1.apply(ForEachDStream.scala:50)
at scala.util.Try$.apply(Try.scala:192)
at org.apache.spark.streaming.scheduler.Job.run(Job.scala:39)
at org.apache.spark.streaming.scheduler.JobScheduler$JobHandler$$anonfun$run$1.apply$mcV$sp(JobScheduler.scala:257)
at org.apache.spark.streaming.scaler.JobScheduler$JobHandler$$anonfun$run$1.apply(JobScheduler.scala:257)
at org.apache.spark.streaming.scaler.JobScheduler$JobHandler$$anonfun$run$1.apply(JobScheduler.scala:257)
at scala.util.DynamicVariable.withValue(DynamicVariable.scala:58)
at org.apache.spark.streaming.scheduler.JobScheduler$JobHandler.run(JobScheduler.scala:256)
at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1149)
at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:624)
at java.lang.Thread.run(Thread.java:748)
代碼片段是-
row = json.load(row)
count = row["count"]
row.pop("Count")
jsonid = hashlib.sha224(json.dumps(row).encode('ascii', 'ignore')) .hexdigest()
if es.existence(index="anonprofile", id=jsonid) 。
q = {
"script": {
"source": "ctx._source.Count ={}".format(count)。
"lang"。"painless"。
}
}
es.update(index="anonprofile", id = jsonid, body=q)
else:
row["count"] = count
es.index(index="anonprofile", id=jsonid, body={'doc' : row})
第一個檔案進入else塊,并按預期作業,但進入if塊后,在更新呼叫時回傳錯誤。
在互聯網上查找,我試著改變查詢方式,但似乎沒有任何效果。
P.S. 為了學習,我試圖在不整合 pyspark 的情況下執行同樣的任務,這似乎是可行的檔案。這方面的代碼是這里
uj5u.com熱心網友回復:
問題是你在doc欄位中插入資訊,該欄位被轉換成一個屬性,因為row變數是一個值的dict,你試圖更新_source.count而不是_source.doc.count
body引數和doc欄位只對update有用,例如,當檔案不存在時的upsert或script。
因此,對于example :
row["count"] = count
body = {
"script": {
"source": "ctx._source.Count ={}".format(計數)。
"lang"。"painless"。
}
"upsert": 行
}
es.update(index="anonprofile", id=jsonid, body=body)
代替你的if exists...
轉載請註明出處,本文鏈接:https://www.uj5u.com/shujuku/315416.html
標籤:
