我有一個從csv讀取的代碼(kafka_producer.py)>>創建熊貓資料框>>將熊貓資料框轉換為火花資料框>>在火花資料框上呼叫foreach方法以將訊息發布到kafka。正在df.foreachPartition(self.send_to_kafka)投擲。
代碼如下:PicklingError: Could not serialize object: TypeError: can't pickle _thread.RLock objects
def get_kafka_producer(topic):
kafkaBrokers='kafka.broker:9093'
caRootLocation='/path/to/CARoot.pem'
certLocation='/path/to/certificate.pem'
keyLocation='/path/to/key.pem'
password='abc123'
producer = KafkaProducer(bootstrap_servers=kafkaBrokers,
security_protocol='SSL',
ssl_check_hostname=False,
ssl_cafile=caRootLocation,
ssl_certfile=certLocation,
ssl_keyfile=keyLocation,
ssl_password=password)
return producer
class SparkKafkaWriter:
topic = None
producer = None
def __init__(self,topic):
self.topic = topic
def send_to_kafka(self,rows):
print("Writing Data")
for row in rows:
json_str = json.dumps(row)
self.producer.send(self.topic, key=None, value=bytes(json_str,'utf-8'))
self.producer.flush()
def post_spark_to_kafka(self,df):
producer = get_kafka_producer()
self.producer = producer
df.foreachPartition(self.send_to_kafka)
print("Dataframe Posted")
def run_kafka_producer(path,sep,topic):
df = pd.read_csv(path,sep=sep)
if isinstance(df, pd.DataFrame):
print("Converting Pandas DF to Spark DF")
spark = get_spark_session("session_name")
df = spark.createDataFrame(df)
writer = SparkKafkaWriter(topic)
writer.post_spark_to_kafka(df)
if __name__ == __main__:
path = "/path/to/data.csv"
sep = "|"
topic = "TEST_TOPIC"
run_kafka_producer(path,sep,topic)
錯誤是:
File "/path/to/kafka_producer.py", line 45, in post_spark_to_kafka
df.foreachPartition(self.send_to_kafka)
File "/opt/cloudera/parcels/IMMUTA/python/pyspark/sql/dataframe.py", line 596, in foreachPartition
self.rdd.foreachPartition(f)
File "/opt/cloudera/parcels/IMMUTA/python/pyspark/rdd.py", line 806, in foreachPartition
self.mapPartitions(func).count() # Force evaluation
File "/opt/cloudera/parcels/IMMUTA/python/pyspark/rdd.py", line 1055, in count
return self.mapPartitions(lambda i: [sum(1 for _ in i)]).sum()
File "/opt/cloudera/parcels/IMMUTA/python/pyspark/rdd.py", line 1046, in sum
return self.mapPartitions(lambda x: [sum(x)]).fold(0, operator.add)
File "/opt/cloudera/parcels/IMMUTA/python/pyspark/rdd.py", line 917, in fold
vals = self.mapPartitions(func).collect()
File "/opt/cloudera/parcels/IMMUTA/python/pyspark/rdd.py", line 816, in collect
sock_info = self.ctx._jvm.PythonRDD.collectAndServe(self._jrdd.rdd())
File "/opt/cloudera/parcels/IMMUTA/python/pyspark/rdd.py", line 2532, in _jrdd
self._jrdd_deserializer, profiler)
File "/opt/cloudera/parcels/IMMUTA/python/pyspark/rdd.py", line 2434, in _wrap_function
pickled_command, broadcast_vars, env, includes = _prepare_for_python_RDD(sc, command)
File "/opt/cloudera/parcels/IMMUTA/python/pyspark/rdd.py", line 2420, in _prepare_for_python_RDD
pickled_command = ser.dumps(command)
File "/opt/cloudera/parcels/IMMUTA/python/pyspark/serializers.py", line 600, in dumps
raise pickle.PicklingError(msg)
_pickle.PicklingError: Could not serialize object: TypeError: can't pickle _thread.RLock objects
uj5u.com熱心網友回復:
我認為你不明白這里發生了什么。
您正在驅動程式中創建一個 kafaka 連接,然后嘗試通過您的網路將驅動程式的實時連接發送到執行程式以完成作業。(您在 foreachPartitions 中的函式在執行程式上執行。)
這就是 spark 告訴你的“不能腌制 _thread.RLock 物件”。(它無法序列化您與 kafka 的實時連接以將其發送給執行程式。)
您需要get_kafka_producer()從代碼塊內部呼叫 foreachPartition 這將從執行程式內部初始化與資料庫的連接。(以及您需要做的任何其他簿記。)
僅供參考:我要指出的最糟糕的部分是此代碼將在您的本地計算機上運行。這是因為它既是執行者又是驅動者。此外,這將或多或少同時為每個執行者打開與 kafka 的連接。(5 個執行者 = 5 個打開的連接)。它將為每個磁區打開一個連接(默認為 200),因此您要確保在完成后關閉它們。
def send_to_kafka(self,rows):
print("Writing Data")
producer = get_kafka_producer()
self.producer = producer
#do topic configuration
for row in rows:
json_str = json.dumps(row)
self.producer.send(self.topic, key=None, value=bytes(json_str,'utf-8'))
self.producer.flush()
#Do something to close connection
def post_spark_to_kafka(self,df):
df.foreachPartition(self.send_to_kafka)
print("Dataframe Posted")
轉載請註明出處,本文鏈接:https://www.uj5u.com/net/482053.html
上一篇:更新收藏以更改排名
