1.為什么用行程池
1.在需要頻繁的創建洗掉較多行程的情況下,導致計算機資源消耗過多
2.行程池則是創建指定行程數量等待執行事件,避免了不必要的創建和銷毀程序
2.行程池的使用步驟
1.創建行程池,在池內放入適量的行程,將事件加入行程池的等待佇列
2.使用行程池中的行程不斷處理事件,所有事件處理后回收關閉行程池
3.語法概述
from multiprocessing import Pool pool = Pool(processes=4) # 創建指定行程數量行程池并回傳行程池物件 # 異步方式將事件放入行程池執行,回傳一個物件該物件 # callback: 回呼函式,每當行程池中的行程處理完任務了,回傳的結果交給回呼函式,由回呼函式進一步處理,回呼函式只有異步時才有 ret = pool.apply_async(func, args=(), kwds={}, callback=None, error_callback=None) 引數: func要執行的事件函式,可以通過回傳值物件的 ret.get() 方法得到func函式的回傳值 args: 位置引數元組,要給函式傳遞的引數 kwargs: 鍵值對引數字典,要給函式傳遞的引數 callback: 行程的任務函式回傳值被當做回呼函式的形參接收到,以此進行進一步的處理操作,回呼函式是主行程呼叫的 error_callback: 錯誤時主行程執行的回呼函式 pool.close() # 關閉行程池,使其無法加入新的事件 pool.join() # 阻塞等待行程池退出(當所有事情處理完畢后) pool.apply() # 用法和 pool.apply_async() 一樣,但是沒有回傳值,異步方式將事件放入行程池順序執行,一個事件結束再執行另一個事件 func_list = pool.map(func, iter) # 類似于內建函式map,將第二個引數的迭代數傳遞給第一個引數的函式執行,同時兼容了使用行程池執行
map函式等同于for + apply_async示例
from multiprocessing import Pool import time def fun(num): time.sleep(1) return num * num test = [1, 2, 3, 4, 5, 6] pool = Pool(3) r = pool.map(fun, test) # 回傳fun函式的回傳值串列 # 上面折行代碼等同于以下注釋部分的代碼 # r = [] # for i in test: # res = pool.apply_async(fun, (i,)) # r.append(res.get()) print(r) pool.close() pool.join()
map回傳值示例
from multiprocessing import Pool def func(num): num += 1 print(num) return num if __name__ == '__main__': p = Pool(5) res = p.map(func, [i for i in range(100)]) p.close() p.join() print('主行程中map的回傳值', res)
4.查看行程池中行程的行程號
from multiprocessing import Pool import os import time import random import sys def worker(msg): t_start = time.time() print("%s開始執行,行程號為%d" % (msg, os.getpid())) # random.random()隨機生成0~1之間的浮點數 time.sleep(random.random() * 2) t_stop = time.time() print(msg, "執行完畢,耗時%0.2f" % (t_stop - t_start)) return "函式{}-{}-{}".format(sys._getframe().f_code.co_name, str(msg), "over") ret = list() po = Pool(3) # 定義一個行程池,最大行程數3 for i in range(0, 10): # Pool().apply_async(要呼叫的目標,(傳遞給目標的引數元祖,)) # 每次回圈將會用空閑出來的子行程去呼叫目標 r = po.apply_async(worker, (i,)) ret.append(r) print("----start----") po.close() # 關閉行程池,關閉后po不再接收新的請求 po.join() # 等待po中所有子行程執行完成,必須放在close陳述句之后 for r in ret: print(r.get()) # 可以通過回傳值物件的 r.get() 方法得到worker函式的回傳值 print("-----end-----") """執行結果 ----start---- 0開始執行,行程號為23515 1開始執行,行程號為23516 2開始執行,行程號為23517 1 執行完畢,耗時0.07 3開始執行,行程號為23516 3 執行完畢,耗時0.08 4開始執行,行程號為23516 4 執行完畢,耗時0.66 5開始執行,行程號為23516 2 執行完畢,耗時1.25 6開始執行,行程號為23517 0 執行完畢,耗時1.37 7開始執行,行程號為23515 5 執行完畢,耗時0.83 8開始執行,行程號為23516 8 執行完畢,耗時0.33 9開始執行,行程號為23516 7 執行完畢,耗時0.72 9 執行完畢,耗時0.34 6 執行完畢,耗時1.71 函式worker-0-over 函式worker-1-over 函式worker-2-over 函式worker-3-over 函式worker-4-over 函式worker-5-over 函式worker-6-over 函式worker-7-over 函式worker-8-over 函式worker-9-over -----end----- """
5.行程池實作檔案拷貝
import multiprocessing import os import time import random def copy_file(queue, file_name, source_folder_name, dest_folder_name): """copy檔案到指定的路徑""" f_read = open(source_folder_name + "/" + file_name, "rb") f_write = open(dest_folder_name + "/" + file_name, "wb") while True: time.sleep(random.random()) content = f_read.read(1024) if content: f_write.write(content) else: break f_read.close() f_write.close() # 發送已經拷貝完畢的檔案名字 queue.put(file_name) def main(): # 獲取要復制的檔案夾 source_folder_name = input("請輸入要復制檔案夾名字:") # 整理目標檔案夾 dest_folder_name = source_folder_name + "[副本]" # 創建目標檔案夾 try: os.mkdir(dest_folder_name) except: pass # 如果檔案夾已經存在,那么創建會失敗 # 獲取這個檔案夾中所有的普通檔案名 file_names = os.listdir(source_folder_name) # 創建Queue queue = multiprocessing.Manager().Queue() # 創建行程池 pool = multiprocessing.Pool(3) for file_name in file_names: # 向行程池中添加任務 pool.apply_async(copy_file, args=(queue, file_name, source_folder_name, dest_folder_name)) # 主行程顯示進度 pool.close() all_file_num = len(file_names) while True: file_name = queue.get() if file_name in file_names: file_names.remove(file_name) copy_rate = (all_file_num - len(file_names)) * 100 / all_file_num print("\r%.2f...(%s)" % (copy_rate, file_name) + " " * 50, end="") if copy_rate >= 100: break print() if __name__ == "__main__": main()
6.行程池實作圖片之家古裝美女圖片爬蟲
import os from multiprocessing import Pool import requests from bs4 import BeautifulSoup def get_url(url): res = requests.get(url) li = list() if res.status_code == 200: # 回傳網頁源代碼 soup = BeautifulSoup(res.text, "html.parser") # print(soup) # 回傳含有圖片url的div標簽 re = soup.find("div", class_="list_con_box").find_all("li") # print(re) # 資料清洗 for i in re: img_s = i.find("img") # 回傳含有圖片url的img標簽 if img_s: src_s = img_s.get("src") # 回傳圖片url # print(src_s) # https://img.tupianzj.com/uploads/allimg/200828/30-200RQ110300-L.jpg li.append(src_s) return li def get_img(url): s = url.split("/")[-1] # print(s) # 30-200RQ110300-L.jpg r = requests.get(url) if r.status_code == 200: with open("./古裝美女/" + s, "wb") as f: f.write(r.content) def main(): headers = { "user-agent": "Mozilla/5.0 (Macintosh; Intel Mac OS X 10.15; rv:75.0) Gecko/20100101 Firefox/75.0" } # 圖片之家古裝美女url url = "https://www.tupianzj.com/meinv/guzhuang/" # 獲取圖片url串列 img_url = get_url(url) # 提取圖片名稱并保存到本地 if not os.path.isdir("./古裝美女"): os.mkdir("./古裝美女") # 用行程池實作多任務下載 pool = Pool(7) pool.map(get_img, img_url) # 效果等同于以下注釋的兩行代碼 # for i in img_url: # pool.apply_async(get_img, (i,)) pool.close() pool.join() if __name__ == "__main__": main()
7.行程池中父行程呼叫回呼函式
from multiprocessing import Pool import requests import os def func(url): res = requests.get(url) print('子行程的pid:%s,父行程的pid:%s'%(os.getpid(),os.getppid())) # print(res.text) if res.status_code == 200: return url,res.text def cal_back(sta): url, text = sta print('回呼函式的pid', os.getpid()) with open('a.txt', 'a', encoding='utf-8') as f: f.write(url + text) # print('回呼函式中!', url) if __name__ == '__main__': p = Pool(5) l = ['https://www.baidu.com', 'http://www.jd.com', 'http://www.taobao.com', 'http://www.mi.com', 'http://www.cnblogs.com', 'https://www.bilibili.com', ] print('主行程的pid', os.getpid()) for i in l: p.apply_async(func, args=(i,), callback=cal_back) # 異步執行任務func,每有一個行程執行完任務后,在func中return一個結果,結果會自動的被callback指定的函式,當成形式引數來接收到 p.close() p.join()
轉載請註明出處,本文鏈接:https://www.uj5u.com/houduan/33674.html
標籤:Python
