python爬蟲之多執行緒、多行程
使用多行程、多執行緒撰寫爬蟲的代碼能有效的提高爬蟲爬取目標網站的效率,
多人學習python,不知道從何學起,
很多人學習python,掌握了基本語法過后,不知道在哪里尋找案例上手,
很多已經做案例的人,卻不知道如何去學習更加高深的知識,
那么針對這三類人,我給大家提供一個好的學習平臺,免費領取視頻教程,電子書籍,以及課程的源代碼!??¤
QQ群:1057034340
一、什么是行程和執行緒
參考 廖雪峰的官方網站 關于行程和執行緒的講解:
行程:對于作業系統來說,一個任務就是一個行程(Process),比如打開一個瀏覽器就是啟動一個瀏覽器行程,打開一個記事本就啟動了一個記事本行程,打開兩個記事本就啟動了兩個記事本行程,打開一個Word就啟動了一個Word行程,
執行緒:有些行程還不止同時干一件事,比如Word,它可以同時進行打字、拼寫檢查、列印等事情,在一個行程內部,要同時干多件事,就需要同時運行多個“子任務”,我們把行程內的這些“子任務”稱為執行緒(Thread),
每個行程至少要做一件事,所以,一個行程至少有一個執行緒,
二、多行程
實作多行程的四種方式
os.fork()
python 的 os 模塊封裝了常見的系統呼叫,其中,多行程的呼叫就是 fork() 函式,具體示例代碼如下:
#!/usr/bin/env python2
# -*- coding=utf-8 -*-
"""
fork()
1.只有在Unix系統中有效,Windows系統中無效
2.fork函式呼叫一次,回傳兩次:在父行程中回傳值為子行程id,在子行程中回傳值為0
"""
import os
pid = os.fork()
if pid == 0:
print("執行子行程,子行程pid={pid},父行程ppid={ppid}".format(pid=os.getpid(), ppid=os.getppid()))
else:
print("執行父行程,子行程pid={pid},父行程ppid={ppid}".format(pid=pid, ppid=os.getpid()))
# 執行父行程,子行程pid=611,父行程ppid=610
# 執行子行程,子行程pid=611,父行程ppid=610
Process 類
通過 Multiprocessing 模塊中的 Process 類,創建Process物件,
Process類的構造方法:
init(self, group=None, targent=None, name=None, args=(), kwargs={})
| 引數 | 說明 |
|---|---|
| group | 行程所屬組,基本不用, |
| targent | 表示呼叫物件,一般為函式, |
| args | 表示呼叫物件引數元祖, |
| name | 行程別名, |
| kwargs | 表示呼叫物件的字典, |
具體示例代碼如下:
#!/usr/bin/env python2
# -*- coding=utf-8 -*-
from multiprocessing import Process
def run_process(name):
print(name)
if __name__ == "__main__":
p = Process(target=run_process, args=("test",))
p.start()
p.join()
print("子行程結束")
# test
# 子行程結束
繼承 Process 類
通過繼承Process類,重寫 run 方法,使用 .start() 方法,會自動呼叫 run 方法,具體示例代碼如下:
from multiprocessing import Process
class NewProcess(Process):
def __init__(self, n):
super(NewProcess, self).__init__()
self.n = n
def run(self):
print(self.n)
if __name__ == "__main__":
test = "test"
p = NewProcess(test)
p.start()
p.join()
print("子行程結束")
# test
# 子行程結束
行程池 Pool 類
Pool 類可以提供指定數量( 一般為CPU的核數 )的行程供用戶呼叫,當有新的請求提交的 Pool 中時,如果池中還沒有滿,就會創建一個新的行程來執行這些請求, 如果池滿,請求就會告知先等待,直到池中有行程結束,才會創建新的行程來執行這些請求,
注意:行程池中的行程是不能共享佇列和資料的,而 Process 生成的子行程可以共享佇列,
Pool 類中的常用方法:
| 函式 | 函式原型 | 說明 |
|---|---|---|
| apply() | apply(func[, args=()[, kwds={}]]) | 該函式用于傳遞不定引數,主行程會被阻塞直到函式執行結束(不建議使用,并且3.x以后不再出現), |
| apply_async() | apply_async(func[, args()[, kwds{}[, callback=None]]]) | 與apply用法一樣,但它是非阻塞且支持結果回傳進行回呼, |
| map() | map(func, utterable[, chunksize=None]) | Pool類中的map方法,與內置的map函式用法行為基本一致,它會使行程阻塞直到回傳結果,第二個引數是一個迭代器,但在實際使用中,必須在整個佇列都就緒后,程式才會運行子行程, |
| close() | 關閉行程池(Pool),使其不能再添加新的Process, | |
| terminate() | 結束作業行程,不再處理未處理的任務, | |
| join() | 主行程阻塞等待子行程的退出,join方法必須在close或terminate之后使用, |
具體代碼如下:
#!/usr/bin/env python2
# -*- coding=utf-8 -*-
import time
from multiprocessing import Pool
def run(num):
time.sleep(1)
return num * num
if __name__ == "__main__":
testList = [1, 2, 3, 4, 5, 6, 7]
print('單行程執行') # 順序執行
t1 = time.time()
for i in testList:
run(i)
t2 = time.time()
print('順序執行的時間為:', int(t2 - t1))
print('多行程 map 執行') # 并行執行
p = Pool(4) # 創建擁有4個行程數量的行程池
result = p.map(run, testList)
p.close() # 關閉行程池,不再接受新的任務
p.join() # 主行程阻塞等待子行程的退出
t3 = time.time()
print('執行的時間為:', int(t3 - t2))
print(result)
# 單行程執行
# 順序執行的時間為: 7
# 多行程 map 執行
# 執行的時間為: 2
# [1, 4, 9, 16, 25, 36, 49]
行程通信
Queue()
佇列:先進先出,按照順序
通信原理:在記憶體中建立佇列資料結構模型,多個行程都可以通過佇列存入內容,取出內容的順序和存入內容的順序保存一致,
| 方法 | 功能 | 引數 |
|---|---|---|
| q = Queue(maxsize = 0) | 創建佇列訊息,并回傳佇列物件, | 表示最多存盤多少訊息,默認表示根據記憶體分配存盤, |
| q.put(data, [block, timeout]) | 向佇列存盤訊息, | Data:要存入的資料,block:默認佇列滿時會堵塞,設定False則非堵塞,timeout:超時時間, |
| data = https://www.cnblogs.com/4186c/p/q.get([block, timeout]) | 獲取佇列訊息, | block:默認佇列空時會堵塞,設定False則非堵塞,timeout:超時時間, |
| q.full() | 判斷佇列是否為滿, | |
| q.empty() | 判斷佇列是否為空, | |
| q.size() | 判斷佇列中的訊息數量, | |
| q.close() | 關閉佇列, |
#!/usr/bin/env python2
# -*- coding=utf-8 -*-
from multiprocessing import Process, Queue
def foo(data):
s = data.get() # 管子的另一端放在子行程這里,子行程接收到了資料
if s not in "":
print('子行程已收到資料...')
print(s) # 子行程列印出了資料內容...
if __name__ == '__main__': # 要加這行...
q = Queue() # 創建行程通信的Queue,你可以理解為我拿了個管子來...
p = Process(target=foo, args=(q,)) # 創建子行程
print('主行程準備發送資料...')
q.put("資料接收成功") # 將管子的一端放在主行程這里,主行程往管子里丟入資料↑
p.start() # 啟子子行程
p.join()
# 主行程準備發送資料...
# 子行程已收到資料...
# 資料接收成功
Pipe()
通信原理:在記憶體中開辟管道空間,生成管道操作物件,多個行程使用“同一個”管道物件進行操作即可實作通信,
#!/usr/bin/env python2
# -*- coding=utf-8 -*-
from multiprocessing import Process, Pipe
def f(conn):
conn.send([42, None, 'hello']) # 向管道中寫入內容
conn.close()
if __name__ == '__main__':
parent_conn, child_conn = Pipe()
p = Process(target=f, args=(child_conn,))
p.start()
print(parent_conn.recv()) # 從管道讀取資訊
p.join()
# prints "[42, None, 'hello']"
manager()
行程的 manager 方法可以共享資料,比如共享串列,元祖,字典,鎖,字符,
#!/usr/bin/env python2
# -*- coding=utf-8 -*-
import multiprocessing
def f(m_list):
m_list.append("f")
if __name__ == '__main__':
manager = multiprocessing.Manager()
m_list = manager.list([1, 2, 3])
p = multiprocessing.Process(target=f, args=(m_list, ))
p.start()
p.join()
print(m_list)
# [1, 2, 3, 'f']
三、多執行緒
執行緒在程式中是獨立的、并非的執行流,與分隔的行程相比執行緒之間的隔離程度要小,它們共享記憶體,檔案句柄和其它行程應有的狀態, 多執行緒之間共享全域變數 ,
創建多執行緒多兩種方式
threading模塊Thread類
具體代碼如下:
#!/usr/bin/env python2
# -*- coding=utf-8 -*-
import threading
import time
def run(n):
print("task", n)
time.sleep(1)
print('1s')
time.sleep(1)
print('0s')
time.sleep(1)
if __name__ == '__main__':
t1 = threading.Thread(target=run, args=("t1",))
t2 = threading.Thread(target=run, args=("t2",))
t1.start()
t2.start()
t1.join()
t2.join()
# task t1
# task t2
# 1s
# 1s
# 0s
# 0s
自定義執行緒
繼承threading.Thread類自定義執行緒類,其本質是重構Thread類中的run方法,
#!/usr/bin/env python2
# -*- coding=utf-8 -*-
from threading import Thread
import time
class MyThread(Thread):
def __init__(self, n):
super(MyThread, self).__init__()
self.n = n
def run(self):
print("task", self.n)
time.sleep(1)
print('1s')
time.sleep(1)
print('0s')
time.sleep(1)
if __name__ == '__main__':
t1 = MyThread("t1")
t2 = MyThread("t2")
t1.start()
t2.start()
# task t1
# task t2
# 1s
# 1s
# 0s
# 0s
守護執行緒
setDaemon(True)把所有的子執行緒都變成了主執行緒的守護執行緒,因此當主行程結束后,子執行緒也會隨之結束,所以當主執行緒結束后,整個程式就退出了,
#!/usr/bin/env python2
# -*- coding=utf-8 -*-
import threading
import time
def run(n):
print("task", n)
time.sleep(1) # 此時子執行緒停1s
print('2')
time.sleep(1)
print('1')
if __name__ == '__main__':
t = threading.Thread(target=run, args=("t1",))
t.setDaemon(True) # 把子行程設定為守護執行緒,必須在start()之前設定
t.start()
print("end")
# task t1
# end
想要守護執行緒執行結束后,主行程再結束,可以使用 join 方法,讓主執行緒等待子執行緒執行完畢,
Lock
多執行緒和多行程最大的不同在于,多行程中,同一個變數,各自都有一份拷貝存與每個行程中,互不影響,而多執行緒中,所有變數都由所有執行緒共享,所以,任何一個變數都可以被任何一個執行緒修改,因此,執行緒之間共享資料最大的危險在于多個執行緒同時改一個變數,把內容給改亂了,
#!/usr/bin/env python2
# -*- coding=utf-8 -*-
import threading
value = https://www.cnblogs.com/4186c/p/0
lock = threading.Lock()
def change_it(n):
# 先存后取,結果應該為0:
global value
value = https://www.cnblogs.com/4186c/p/value + n
value = value - n
# 未加鎖(值不確定)
def run_thread(n):
for i in range(2000000):
change_it(n)
# 加鎖
# def run_thread(n):
# for i in range(2000000):
# lock.acquire()
# try:
# change_it(n)
# finally:
# lock.release()
t1 = threading.Thread(target=run_thread, args=(5,))
t2 = threading.Thread(target=run_thread, args=(8,))
t1.start()
t2.start()
t1.join()
t2.join()
print(value)
# 29
由于鎖只有一個,無論多少執行緒,同一時刻最多只有一個執行緒持有該鎖,所以不會造成修改的沖突,當多個執行緒同時執行 lock.acquire() 時,只有一個執行緒能成功獲取鎖,然后繼續執行代碼,其它執行緒就繼續等待直到獲得鎖為止,
獲得鎖的執行緒用完一定要釋放鎖,否則那些等待鎖的執行緒將會永遠的等待下去,成為死執行緒,所以用 try...finally 來確保鎖一定會被釋放,
鎖的好處就是確保某段關鍵代碼只能由一個執行緒從頭到尾完整的執行,壞處當然也很多,首先是阻止了多執行緒并發執行,包含鎖的某段代碼實際上只能以單執行緒模式執行,效率大大的下降了,其次,由于可以存在多個鎖,不同的執行緒持有不同的鎖,并試圖獲取對方持有的鎖時,可能會造成死鎖,導致多個執行緒全部掛起,既不能執行,也無法結束,只能靠作業系統強制終止,
信號量(BoundedSemaphore類)
Lock同時只允許一個執行緒更改資料,而Semaphore是同時允許一定數量的執行緒去更改資料,
import threading
import time
def run(n, semaphore):
semaphore.acquire() #加鎖
time.sleep(1)
print("run the thread:%s\n" % n)
semaphore.release() #釋放
if __name__ == '__main__':
num = 0
semaphore = threading.BoundedSemaphore(5) # 最多允許5個執行緒同時運行
for i in range(22):
t = threading.Thread(target=run, args=("t-%s" % i, semaphore))
t.start()
while threading.active_count() != 1:
pass # print threading.active_count()
else:
print('-----all threads done-----')
GIL鎖
在非 python 環境中,單核情況下,同時只能有一個任務執行,多核可以同時支持多個執行緒同時執行,但是在 python 中,無論有多少核,同只能執行一個執行緒,究其原因,這就是GIL的存在導致的,
GIL全稱Global Interpreter Lock(全域解釋器鎖),來源是python設計之初的考慮,為了資料安全所做的決定,某個執行緒想要執行,必須先拿到GIL,我們可以把GIL看作是“通行證”,并且在一個python行程中,GIL只有一個,拿不到通行證的執行緒,就不允許進入CPU執行,GIL只有在cpython中才有,因為cpython呼叫的是c語言的原生執行緒,所以他不能直接操作cpu,只能利用GIL保證同一時間只能有一個執行緒拿到資料,而在pypy和jpython中是沒有GIL的,
python針對不同型別的代碼執行效率也是不同的,
1、cpu密集型代碼(各種回圈處理、計數等),在這種情況下,由于計算機作業多,ticks計數很快就會達到閾值,然后觸發GIL的釋放與再競爭(多個執行緒來回切換是需要消耗資源的),所以python下的多執行緒對cpu密集型代并不友好,
2、IO密集型代碼(檔案處理,網路爬蟲等涉及檔案讀寫的操作),多執行緒能夠有效提升效率(單執行緒下有IO操作會進行IO等待,造成不必要的浪費,而開啟多執行緒能在執行緒A等待時,自動切換到執行緒B,可以不浪費CPU的資源,從而能提升程式執行效率),所以python的多執行緒對IO密集型代碼比較友好,
使用建議
python下想要充分利用多核CPU,就使用多行程,因為每個行程都有各子獨立的GIL,互不干擾,這樣就可以真正意義上的并行執行,在python中,多行程的執行效率優于多執行緒(僅僅針對多核CPU而言),
四、爬取豆瓣電影TOP250
采取三種方式,爬取前250名電影,
(1)所爬取的網頁鏈接: https://movie.douban.com/top250?start=0&filter=
(2)通過分析網頁,發現第一頁的url start=0,第二頁的url start=25,第三頁的url start=50,
(3)主要爬取電影名跟評分,用來進行比對,所以資料方面就不過多的提取和保存,只簡單的列印出來,
多行程爬取
#!/usr/bin/env python2
# -*- coding=utf-8 -*-
import multiprocessing
from multiprocessing import Process, Queue
import time
from lxml import etree
import requests
class DouBanSpider(Process):
def __init__(self, q, url_list, lock):
# 重寫寫父類的__init__方法
super(DouBanSpider, self).__init__()
self.url_list = url_list
self.q = q
self.lock = lock
self.headers = {
'Host': 'movie.douban.com',
'Referer': 'https://movie.douban.com/top250?start=225&filter=',
'User-Agent': 'Mozilla/5.0 (Windows NT 10.0; WOW64) AppleWebKit/537.36 (KHTML, like Gecko) Chrome/59.0.3071.104 Safari/537.36',
}
def run(self):
self.parse_page()
def send_request(self, url):
'''
用來發送請求的方法
:return: 回傳網頁原始碼
'''
# 請求出錯時,重復請求3次,
i = 0
while i <= 3:
try:
print(u"[INFO]請求url:" + url)
return requests.get(url=url, headers=self.headers).content
except Exception as e:
print(u'[INFO] %s%s' % (e, url))
i += 1
def parse_page(self):
'''
決議網站原始碼,并采用xpath提取 電影名稱和平分放到佇列中
:return:
'''
time.sleep(0.1)
while 1:
try:
url = self.url_list.pop()
except IndexError as e:
break
self.lock.acquire()
response = self.send_request(url)
html = etree.HTML(response)
# 獲取到一頁的電影資料
node_list = html.xpath("//div[@class='info']")
for move in node_list:
# 電影名稱
title = move.xpath('.//a/span/text()')[0]
# 評分
score = move.xpath('.//div[@]//span[@]/text()')[0]
# 將每一部電影的名稱跟評分加入到佇列
self.q.put(score + "\t" + title)
self.lock.release()
class AllUrlSpider(Process):
def __init__(self, url_lis):
super(AllUrlSpider, self).__init__()
self.url_list = url_lis
def run(self):
base_url = 'https://movie.douban.com/top250?start='
# 構造所有url
for num in range(225, -1, -25):
self.url_list.append(base_url + str(num))
print("獲得URL:{}".format(base_url + str(num)))
def main():
# 創建一個佇列用來保存行程獲取到的資料
q = Queue()
lock = multiprocessing.Lock()
manager = multiprocessing.Manager()
url_list = manager.list()
a = AllUrlSpider(url_list)
p = DouBanSpider(q, url_list, lock)
b = DouBanSpider(q, url_list, lock)
c = DouBanSpider(q, url_list, lock)
a.start()
p.start()
b.start()
c.start()
a.join()
p.join()
b.join()
c.join()
while not q.empty():
print(q.get())
if __name__ == "__main__":
start = time.time()
main()
print('[info]耗時:%s' % (time.time() - start))
多行程爬取耗時7.15秒,部分結果如下圖所示:

多執行緒爬取
#!/usr/bin/env python2
# -*- coding=utf-8 -*-
from queue import Queue
from threading import Thread
import threading
import time
from lxml import etree
import requests
url_list = []
lock = threading.Lock()
class DouBanSpider(Thread):
def __init__(self, q) :
# 重寫寫父類的__init__方法
super(DouBanSpider, self).__init__()
self.q = q
self.headers = {
'Host': 'movie.douban.com',
'Referer': 'https://movie.douban.com/top250?start=225&filter=',
'User-Agent': 'Mozilla/5.0 (Windows NT 10.0; WOW64) AppleWebKit/537.36 (KHTML, like Gecko) Chrome/59.0.3071.104 Safari/537.36',
}
def run(self):
self.parse_page()
def send_request(self, url):
'''
用來發送請求的方法
:return: 回傳網頁原始碼
'''
# 請求出錯時,重復請求3次,
i = 0
while i <= 3:
try:
print
u"[INFO]請求url:" + url
html = requests.get(url=url, headers=self.headers).content
except Exception as e:
print
u'[INFO] %s%s' % (e, url)
i += 1
else:
return html
def parse_page(self):
'''
決議網站原始碼,并采用xpath提取 電影名稱和平分放到佇列中
:return:
'''
while 1:
try:
url = url_list.pop()
except IndexError as e:
break
lock.acquire()
response = self.send_request(url)
html = etree.HTML(response)
# 獲取到一頁的電影資料
node_list = html.xpath("//div[@class='info']")
for move in node_list:
# 電影名稱
title = move.xpath('.//a/span/text()')[0]
# 評分
score = move.xpath('.//div[@]//span[@]/text()')[0]
# 將每一部電影的名稱跟評分加入到佇列
self.q.put(score + "\t" + title)
lock.release()
class AllUrlSpider(Thread):
def run(self):
base_url = 'https://movie.douban.com/top250?start='
# 構造所有url
for num in range(225, -1, -25):
url_list.append(base_url + str(num))
print("獲得URL:{}".format(base_url + str(num)))
def main():
# 創建一個佇列用來保存行程獲取到的資料
q = Queue()
a = AllUrlSpider()
a.start()
# 保存執行緒
Thread_list = []
# 創建并啟動執行緒
for i in range(5):
p = DouBanSpider(q)
p.start()
Thread_list.append(p)
a.join()
# 讓主執行緒等待子執行緒執行完成
for i in Thread_list:
i.join()
while not q.empty():
print(q.get())
if __name__ == "__main__":
start = time.time()
main()
print('[info]耗時:%s' % (time.time() - start))
多執行緒爬取耗時5秒,部分結果如下圖所示:

耗時跟網路的好壞也是有一定的關系,每次測出的資料結果也不一樣,但理論上來講,執行緒在I/O密集的操作性是要高于行程的,
轉載請註明出處,本文鏈接:https://www.uj5u.com/houduan/220844.html
標籤:Python
上一篇:技術點15:Filter過濾器
