我在 Python 中有一個 CSV 檔案決議器腳本來處理一個大的 CSV 檔案。大約有100萬。行,因此該程序需要一些時間。
import csv
import sys
with open('csvfeed.csv', newline='', encoding='utf-8') as csvfile:
reader = csv.reader(csvfile, delimiter=';', quotechar='|')
for row in reader:
ParserFunction(row)
def ParserFunction(row):
#Some logic with row
有沒有辦法多執行緒這個回圈函式,以降低執行時間?
謝謝
uj5u.com熱心網友回復:
您可以使用單個執行緒劃分要處理的每一行,而不是等待上一行完成處理以繼續下一行的主執行緒:
import csv
import sys
import threading
def ParserFunction(row):
#Some logic with row
pass
with open('csvfeed.csv', newline='', encoding='utf-8') as csvfile:
reader = csv.reader(csvfile, delimiter=';', quotechar='|')
for row in reader:
threading.start_new_thread(ParserFunction, row)
但是這樣做的確切方法需要知道你想要對每一行做什么的邏輯以及它是否依賴于其他行
uj5u.com熱心網友回復:
謝謝@Bemwa Malak,讓我敞開心扉。是的,檔案很大,大約 400MB,超過 1 個。行。我正在使用 Python3,所以我不得不稍微修改一下你的想法。
這是限制執行緒數量的正確想法嗎?
# Threading
def runThreads():
global data
global loopParameter
with open('csvfeed.csv', newline='', encoding='utf-8') as csvfile:
reader = csv.reader(csvfile, delimiter=';', quotechar='|')
threads = []
for row in reader:
t = threading.Thread(target=ParserFunction, args=(row,))
t.start()
threads.append(t)
if len(threads) >= 500:
for thread in threads:
thread.join()
threads = []
for thread in threads:
thread.join()
我的邏輯的完整代碼是這樣的,通過對較小檔案的測驗,我得到了大約 60 秒沒有執行緒的執行時間和大約 7 秒的執行緒,所以它現在快得多。
我在原始檔案上對其進行了測驗,我得到了 45 分鐘而不是 4 小時的執行時間,所以不知何故它正在作業。
from sqlalchemy import create_engine
import csv
import sys
import threading
import pandas as pd
import time
# Create a variable to hold the data
engine = create_engine('mssql pyodbc://SECRET')
data = []
counter = 0
loopParameter = 0
# Starter
def main():
global data
global totalRows
runThreads()
totalRows = len(data)
# Save data variable to a new CSV file
with open('output.csv', 'w', newline='', encoding='utf-8') as f2:
writer = csv.writer(f2, delimiter=';')
writer.writerows(data)
# Log
with open('log2', 'a') as f3:
sys.stdout = f3 # Change the standard output to the file we created.
print("Version: 1")
print("Matched: ", len(data))
# My Parser
def ParserFunction(row):
global data
global counter
query = ("SELECT (SELECT Count(*) FROM myTab WHERE myColumn='" row[5] "') "
"(SELECT Count(*) FROM myTab2 WHERE myColumn='" row[5] "') "
"(SELECT Count(*) FROM myTab3 WHERE myColumn='" row[5] "') "
"(SELECT COUNT(*) FROM myTab4 WHERE myColumn='" row[1] "')")
with engine.connect() as con:
rs = con.execute(query)
# We have match
if(rs.fetchone()[0] > 0):
# Add the row to the data variable
data.append(row)
pass
# Threading
def runThreads():
global data
global loopParameter
with open('csvfeed.csv', newline='', encoding='utf-8') as csvfile:
reader = csv.reader(csvfile, delimiter=';', quotechar='|')
threads = []
for row in reader:
t = threading.Thread(target=ParserFunction, args=(row,))
t.start()
threads.append(t)
if len(threads) >= 500:
for thread in threads:
thread.join()
threads = []
for thread in threads:
thread.join()
main()
轉載請註明出處,本文鏈接:https://www.uj5u.com/qiye/513759.html
標籤:Python多线程python-多处理python-多线程
下一篇:Pythonbs4lxml決議慢
