我有一個應用程式,它從不同的目錄中讀取數以千計的檔案,它讀取它們,對它們進行一些處理,然后將資料發送到一個資料庫。我有一個問題,完成一個目錄中的所有檔案需要大約1小時,我有19個目錄(將來可能會更多)。現在,它正在一個目錄接著一個目錄地做,我想平行地運行所有的東西,以便加快事情的進展。
這是我的代碼:
import mysql.connector
import csv
import os
import time
from datetime import datetime
import ntpath
import configparser
config = configparser.ConfigParser()
config.read('C:DesktopEnergyfile_cfg.ini')
source = config['PATHS']['source']
archive = config['PATHS']['archive']
mydb = mysql.connector.connect(
host= config['DB']['host']。
user = config['DB'/span>]['user'/span>]。
passwd = config['DB'/span>]['passwd'/span>]。
database= config['DB']['database'].
)
cursor = mydb.cursor()
select_antenna = "SELECT * FROM `antenna`")
cursor.execute(select_antenna)
mp_mysql = [i[0] for i in cursor.fetchall() ]
mp_server = os.listdir(source)
# microbeats clean.
cursor.execute("TRUNCATE TABLE microbeats"/span>)
for mp in mp_mysql:
if mp in mp_server:
subdir_paths = os.path.join(source, mp)
for file in os.listdir(subdir_paths) 。
file_paths = os.path.join(subdir_paths, file)
cr_time_s = os.path.getctime(file_paths)
cr_time = time.strftime("%Y-%m-%d %H:%M:%S", time.localtime(cr_time_s)
all_file_paths = [os.path.join(subdir_paths, f) for f in os.listdir(subdir_paths) ]
full_file_paths = [p for p in all_file_paths if os.path.getsize(p)> 0] #<----- 控制空檔案。]
if full_file_paths !=[]。
newest_file_paths = max(full_file_paths, key=os.path.getctime)
for file in all_file_paths:
if file == newest_file_paths and os.path.getctime(newest_file_paths) < time.time() - 120:
with open(file, 'rt') as f.
reader = csv.reader(f, delimiter =' ')
line_data0 = list()
col = next(reader)
for line in reader:
line.insert(0, mp)
line.insert(1, cr_time)
if line !=[]: #<----- 控制空目錄。
line_data0.append(line)
q1 = ("INSERT INTO microbeats"/span>
"(`antenna`,`datetime`,`system`,`item`,`event`, `status`, `accident`) "
"VALUES (%s, %s, %s,%s, %s, %s)")
for line in line_data0:
cursor.execute(q1, line)
uj5u.com熱心網友回復:
我正在使用多行程,每個行程都有自己的資料庫連接。我已經對你的代碼做了最小的改動,試圖以并行方式處理目錄。然而,我不確定像subdir_paths這樣的變數的命名是否正確,因為其名稱末尾的 "s "意味著它包含多個路徑名稱。
之所以有人建議這個問題更適合于代碼審查,是因為你可能有一個已經在運行的程式,而且你只是在尋找一個性能改進(當然,這適用于SO上發布的很大一部分被標記為multiprocessing的問題)。這種型別的問題應該被張貼在https://codereview.stackexchange.com/上。
import mysql.connector
import csv
import os
import time
from datetime import datetime
import ntpath
import configparser
from multiprocessing import Pool, cpu_count
config = configparser.ConfigParser()
config.read('C:DesktopEnergyfile_cfg.ini')
source = config['PATHS']['source']
archive = config['PATHS']['archive']
def get_connnection()。
mydb = mysql.connector.connect(
host= config['DB'/span>]['host'/span>]。
user = config['DB'/span>]['user'/span>]。
passwd = config['DB'/span>]['passwd'/span>]。
database= config['DB']['資料庫']
)
return mydb
def get_mp_list()。
select_antenna = "SELECT * FROM `antenna`"。
mydb = get_connection()
cursor = mydb.cursor()
cursor.execute(select_antenna)
mp_mysql = [i[0] for i in cursor.fetchall() ]
mp_server = os.listdir(source)
# microbeats clean.
cursor.execute("TRUNCATE TABLE microbeats")
mydb.commit()
mydb.close()
mp_list = [mp for mp in mp_mysql if mpin mp_server]
return mp_list
def process_mp(mp)。
subdir_paths = os.path.join(source, mp)
for file in os.listdir(subdir_paths) 。
file_paths = os.path.join(subdir_paths, file)
cr_time_s = os.path.getctime(file_paths)
cr_time = time.strftime("%Y-%m-%d %H:%M:%S", time.localtime(cr_time_s))
all_file_paths = [os.path.join(subdir_paths, f) for f in os.listdir(subdir_paths) ]
full_file_paths = [p for p in all_file_paths if os.path.getsize(p)> 0] #<----- 控制空檔案。]
if full_file_paths !=[]。
newest_file_paths = max(full_file_paths, key=os.path.getctime)
mydb = get_connection()
cursor = mydb.cursor()
did_insert = False[/span
q1 = ("INSERT INTO microbeats")
"(`antenna',`datetime',`system',`item',`event', `status', `accident`)"
"VALUES (%s, %s, %s,%s, %s, %s)")
for file in all_file_paths。
if file == newest_file_paths and os.path.getctime(newest_file_paths) < time.time() - 120:
with open(file, 'rt') as f.
reader = csv.reader(f, delimiter =' ')
line_data0 = list()
col = next(reader)
for line in reader:
line.insert(0, mp)
line.insert(1, cr_time)
if line !=[]: #<----- 控制空目錄。
line_data0.append(line)
if line_data0:
cursor.executemany(q1, line_data0)
did_insert = True[/span] 。
if did_insert:
mydb.commit()
mydb.close()
def main()。
mp_list = get_mp_list()
pool = Pool(min(cpu_count(), len(mp_list))
結果 = pool.imap_unordered(process_mp, mp_list)
while True:
try:
結果 = next(results)
except StopIteration:
breakexcept BaseException as e:
print(e)
if __name__ == '__main__'/span>:
main()
轉載請註明出處,本文鏈接:https://www.uj5u.com/yidong/308327.html
標籤:
上一篇:MD5CryptoServiceProvider能否被安全地重新用于計算多執行緒代碼中的md5哈希值?
下一篇:AWSEC2保留實體
