我有一個pyspark如下所示的資料框
df = spark.createDataFrame(
[
('2021-10-01','A',25),
('2021-10-02','B',24),
('2021-10-03','C',20),
('2021-10-04','D',21),
('2021-10-05','E',20),
('2021-10-06','F',22),
('2021-10-07','G',23),
('2021-10-08','H',24)],("RUN_DATE", "NAME", "VALUE"))
現在使用這個資料框我想更新一個表 MySql
# query to run should be similar to this
update_query = "UPDATE DB.TABLE SET DATE = '2021-10-01', VALUE = 25 WHERE NAME = 'A'"
# mysql_conn is a function which I use to connect to `MySql` from `pyspark` and run queries
# Invoking the function
mysql_conn(host, user_name, password, update_query)
現在,當我通過傳遞引數呼叫 mysql_conn 函式時,查詢成功運行并且記錄在MySql表中得到更新。
現在我想為資料框中的所有記錄運行更新陳述句。
對于每一個NAME有挑RUN_DATE和VALUE和更換update_query,并觸發mysql_conn。
我認為我們需要一個for loop但不確定如何進行。
uj5u.com熱心網友回復:
與其使用 for 回圈遍歷資料幀,不如使用foreachPartition. 此外,由于您正在撰寫自定義查詢而不是為每個查詢執行一個查詢,因此執行批處理操作以減少往返、延遲和并發連接會更有效。例如
def update_db(rows):
temp_table_query=""
for row in rows:
if len(temp_table_query) > 0:
temp_table_query = temp_table_query " UNION ALL "
temp_table_query = temp_table_query " SELECT '%s' as RUNDATE, '%s' as NAME, %d as VALUE " % (row.RUN_DATE,row.NAME,row.VALUE)
update_query="""
UPDATE DBTABLE
INNER JOIN (
%s
) new_records ON DBTABLE.NAME = new_records.NAME
SET
DBTABLE.DATE = new_records.RUNDATE,
DBTABLE.VALUE = new_records.VALUE
""" % (temp_table_query)
mysql_conn(host, user_name, password, update_query)
df.foreachPartition(update_db)
查看有關 UPDATE 查詢如何作業的演示
讓我知道這是否適合您。
轉載請註明出處,本文鏈接:https://www.uj5u.com/ruanti/311449.html
標籤:mysql 阿帕奇火花 火花 apache-spark-sql
