大資料量文本檔案高效決議方案代碼實作
測驗環境
Python 3.6.2
Win 10 記憶體 8G,CPU I5 1.6 GHz
背景描述
這個作品來源于一個日志決議工具的開發,這個開發程序中遇到的一個痛點,就是日志檔案多,日志資料量大,決議耗時長,在這種情況下,尋思一種高效決議資料決議方案,
解決方案描述
1、采用多執行緒讀取檔案
2、采用按塊讀取檔案替代按行讀取檔案
由于日志檔案都是文本檔案,需要讀取其中每一行進行決議,所以一開始會很自然想到采用按行讀取,后面發現合理配置下,按塊讀取,會比按行讀取更高效,
按塊讀取來的問題就是,可能導致完整的資料行分散在不同資料塊中,那怎么解決這個問題呢?解答如下:
將資料塊按換行符\n切分得到日志行串列,串列第一個元素可能是一個完整的日志行,也可能是上一個資料塊末尾日志行的組成部分,串列最后一個元素可能是不完整的日志行(即下一個資料塊開頭日志行的組成部分),也可能是空字串(日志塊中的日志行資料全部是完整的),根據這個規律,得出以下公式,通過該公式,可以得到一個新的資料塊,對該資料塊二次切分,可以得到資料完整的日志行
上一個日志塊首部日志行 +\n + 尾部日志行 + 下一個資料塊首部日志行 + \n + 尾部日志行 + ...
3、將資料決議操作拆分為可并行決議部分和不可并行決議部分
資料決議往往涉及一些不可并行的操作,比如資料求和,最值統計等,如果不進行拆分,并行決議時勢必需要添加互斥鎖,避免資料覆寫,這樣就會大大降低執行的效率,特別是不可并行操作占比較大的情況下,
對資料決議操作進行拆分后,可并行決議操作部分不用加鎖,考慮到Python GIL的問題,不可并行決議部分替換為單行程決議,
4、采用多行程決議替代多執行緒決議
采用多行程決議替代多執行緒決議,可以避開Python GIL全域解釋鎖帶來的執行效率問題,從而提高決議效率,
5、采用佇列實作“協同”效果
引入佇列機制,實作一邊讀取日志,一邊進行資料決議:
- 日志讀取執行緒將日志塊存盤到佇列,決議行程從佇列獲取已讀取日志塊,執行可并行決議操作
- 并行決議操作行程將決議后的結果存盤到另一個佇列,另一個決議行程從佇列獲取資料,執行不可并行決議操作,
代碼實作
#!/usr/bin/env python
# -*- coding:utf-8 -*-
import re
import time
from datetime import datetime
from joblib import Parallel, delayed, parallel_backend
from collections import deque
from multiprocessing import cpu_count
import threading
class LogParser(object):
def __init__(self, chunk_size=1024*1024*10, process_num_for_log_parsing=cpu_count()):
self.log_unparsed_queue = deque() # 用于存盤未決議日志
self.log_line_parsed_queue = deque() # 用于存盤已決議日志行
self.is_all_files_read = False # 標識是否已讀取所有日志檔案
self.process_num_for_log_parsing = process_num_for_log_parsing # 并發決議日志檔案行程數
self.chunk_size = chunk_size # 每次讀取日志的日志塊大小
self.files_read_list = [] # 存放已讀取日志檔案
self.log_parsing_finished = False # 標識是否完成日志決議
def read_in_chunks(self, filePath, chunk_size=1024*1024):
"""
惰性函式(生成器),用于逐塊讀取檔案,
默認區塊大小:1M
"""
with open(filePath, 'r', encoding='utf-8') as f:
while True:
chunk_data = f.read(chunk_size)
if not chunk_data:
break
yield chunk_data
def read_log_file(self, logfile_path):
'''
讀取日志檔案
這里假設日志檔案都是文本檔案,按塊讀取后,可按換行符進行二次切分,以便獲取行日志
'''
temp_list = [] # 二次切分后,頭,尾行日志可能是不完整的,所以需要將日志塊頭尾行日志相連接,進行拼接
for chunk in self.read_in_chunks(logfile_path, self.chunk_size):
log_chunk = chunk.split('\n')
temp_list.extend([log_chunk[0], '\n'])
temp_list.append(log_chunk[-1])
self.log_unparsed_queue.append(log_chunk[1:-1])
self.log_unparsed_queue.append(''.join(temp_list).split('\n'))
self.files_read_list.remove(logfile_path)
def start_processes_for_log_parsing(self):
'''啟動日志決議行程'''
with parallel_backend("multiprocessing", n_jobs=self.process_num_for_log_parsing):
Parallel(require='sharedmem')(delayed(self.parse_logs)() for i in range(self.process_num_for_log_parsing))
self.log_parsing_finished = True
def parse_logs(self):
'''決議日志'''
method_url_re_pattern = re.compile('(HEAD|POST|GET)\s+([^\s]+?)\s+',re.DOTALL)
url_time_taken_extractor = re.compile('HTTP/1\.1.+\|(.+)\|\d+\|', re.DOTALL)
while self.log_unparsed_queue or self.files_read_list:
if not self.log_unparsed_queue:
continue
log_line_list = self.log_unparsed_queue.popleft()
for log_line in log_line_list:
#### do something with log_line
if not log_line.strip():
continue
res = method_url_re_pattern.findall(log_line)
if not res:
print('日志未匹配到請求URL,已忽略:\n%s' % log_line)
continue
method = res[0][0]
url = res[0][1].split('?')[0] # 去掉了 ?及后面的url引數
# 提取耗時
res = url_time_taken_extractor.findall(log_line)
if res:
time_taken = float(res[0])
else:
print('未從日志提取到請求耗時,已忽略日志:\n%s' % log_line)
continue
# 存盤決議后的日志資訊
self.log_line_parsed_queue.append({'method': method,
'url': url,
'time_taken': time_taken,
})
def collect_statistics(self):
'''收集統計資料'''
def _collect_statistics():
while self.log_line_parsed_queue or not self.log_parsing_finished:
if not self.log_line_parsed_queue:
continue
log_info = self.log_line_parsed_queue.popleft()
# do something with log_info
with parallel_backend("multiprocessing", n_jobs=1):
Parallel()(delayed(_collect_statistics)() for i in range(1))
def run(self, file_path_list):
# 多執行緒讀取日志檔案
for file_path in file_path_list:
thread = threading.Thread(target=self.read_log_file,
name="read_log_file",
args=(file_path,))
thread.start()
self.files_read_list.append(file_path)
# 啟動日志決議行程
thread = threading.Thread(target=self.start_processes_for_log_parsing, name="start_processes_for_log_parsing")
thread.start()
# 啟動日志統計資料收集行程
thread = threading.Thread(target=self.collect_statistics, name="collect_statistics")
thread.start()
start = datetime.now()
while threading.active_count() > 1:
print('程式正在努力決議日志...')
time.sleep(0.5)
end = datetime.now()
print('決議完成', 'start', start, 'end', end, '耗時', end - start)
if __name__ == "__main__":
log_parser = LogParser()
log_parser.run(['access.log', 'access2.log'])
注意:
需要合理的配置單次讀取檔案資料塊的大小,不能過大,或者過小,否則都可能會導致資料讀取速度變慢,筆者實踐環境下,發現10M~15M每次是一個比較高效的配置,
作者:授客
微信/QQ:1033553122
全國軟體測驗QQ交流群:7156436
Git地址:https://gitee.com/ishouke
友情提示:限于時間倉促,文中可能存在錯誤,歡迎指正、評論!
作者五行缺錢,如果覺得文章對您有幫助,請掃描下邊的二維碼打賞作者,金額隨意,您的支持將是我繼續創作的源動力,打賞后如有任何疑問,請聯系我!!!
微信打賞
支付寶打賞 全國軟體測驗交流QQ群
轉載請註明出處,本文鏈接:https://www.uj5u.com/houduan/540238.html
標籤:其他
