行程、多行程、行程池
行程總概述
行程
from multiprocessing import Process
import os
# 子行程要執行的代碼
def run_proc(name):
print('Run child process %s (%s)...' % (name, os.getpid()))
if __name__=='__main__':
print('Parent process %s.' % os.getpid())
p = Process(target=run_proc, args=('test',))
print('Child process will start.')
p.start()
p.join()
print('Child process end.')
多行程(行程池創建)
from multiprocessing import Pool
import os, time, random
def long_time_task(name):
print('Run task %s (%s)...' % (name, os.getpid()))
start = time.time()
time.sleep(random.random() * 3)
end = time.time()
print('Task %s runs %0.2f seconds.' % (name, (end - start)))
if __name__ == '__main__':
print('Parent process %s.' % os.getpid())
p = Pool(3)
for i in range(4):
p.apply_async(long_time_task, args=(i,))
print('Waiting for all subprocesses done...')
p.close()
p.join()
print('All subprocesses done.')
決議:
對Pool物件呼叫join()方法會等待所有子行程執行完畢,呼叫join()之前必須先呼叫close(),呼叫close()之后就不能繼續添加新的Process了,
Parent process 87461.
Waiting for all subprocesses done...
Run task 0 (87462)...
Run task 1 (87463)...
Run task 2 (87464)...
Task 1 runs 1.66 seconds.
Run task 3 (87463)... -----------------> task3在某個行程結束時,在創建
Task 2 runs 2.33 seconds.
Task 0 runs 2.54 seconds.
Task 3 runs 2.83 seconds.
All subprocesses done.
行程之間通信
Process之間肯定是需要通信的,作業系統提供了很多機制來實作行程間的通信,Python的multiprocessing模塊包裝了底層的機制,提供了Queue、Pipes等多種方式來交換資料,
我們以Queue為例,在父行程中創建兩個子行程,一個往Queue里寫資料,一個從Queue里讀資料:
from multiprocessing import Process, Queue
import os, time, random
# 寫資料行程執行的代碼:
def write(q):
print('Process to write: %s' % os.getpid())
for value in ['A', 'B', 'C']:
print('Put %s to queue...' % value)
q.put(value)
time.sleep(random.random())
# 讀資料行程執行的代碼:
def read(q):
print('Process to read: %s' % os.getpid())
while True:
value = https://www.cnblogs.com/changting/archive/2020/09/24/q.get(True)
print('Get %s from queue.' % value)
if __name__=='__main__':
# 父行程創建Queue,并傳給各個子行程:
q = Queue()
pw = Process(target=write, args=(q,))
pr = Process(target=read, args=(q,))
# 啟動子行程pw,寫入:
pw.start()
# 啟動子行程pr,讀取:
pr.start()
# 等待pw結束:
pw.join()
# pr行程里是死回圈,無法等待其結束,只能強行終止:
pr.terminate()
執行緒總概述
執行緒
import time, threading
# 新執行緒執行的代碼:
def loop():
print('thread %s is running...' % threading.current_thread().name)
n = 0
while n < 5:
n = n + 1
print('thread %s >>> %s' % (threading.current_thread().name, n))
time.sleep(1)
print('thread %s ended.' % threading.current_thread().name)
print('thread %s is running...' % threading.current_thread().name)
t = threading.Thread(target=loop, name='LoopThread')
t.start()
t.join()
print('thread %s ended.' % threading.current_thread().name)
執行緒鎖-執行緒安全(操作同一個變數)
balance = 0
lock = threading.Lock()
def run_thread(n):
for i in range(100000):
# 先要獲取鎖:
lock.acquire()
try:
# 放心地改吧:
global balance
balance = balance + n
balance = balance - n
finally:
# 改完了一定要釋放鎖:
lock.release()
執行緒池創建
ThreadPoolExecutor實作
from socket import AF_INET, SOCK_STREAM, socket
from concurrent.futures import ThreadPoolExecutor
def echo_client(sock, client_addr):
'''
Handle a client connection
'''
print('Got connection from', client_addr)
while True:
msg = sock.recv(65536)
if not msg:
break
sock.sendall(msg)
print('Client closed connection')
sock.close()
def echo_server(addr):
pool = ThreadPoolExecutor(128)
sock = socket(AF_INET, SOCK_STREAM)
sock.bind(addr)
sock.listen(5)
while True:
client_sock, client_addr = sock.accept()
pool.submit(echo_client, client_sock, client_addr)
echo_server(('',15000))
手動創建你自己的執行緒池, 通常可以使用一個Queue來輕松實作
from socket import socket, AF_INET, SOCK_STREAM
from threading import Thread
from queue import Queue
def echo_client(q):
'''
Handle a client connection
'''
sock, client_addr = q.get()
print('Got connection from', client_addr)
while True:
msg = sock.recv(65536)
if not msg:
break
sock.sendall(msg)
print('Client closed connection')
sock.close()
def echo_server(addr, nworkers):
# Launch the client workers
q = Queue()
for n in range(nworkers):
t = Thread(target=echo_client, args=(q,))
t.daemon = True
t.start()
# Run the server
sock = socket(AF_INET, SOCK_STREAM)
sock.bind(addr)
sock.listen(5)
while True:
client_sock, client_addr = sock.accept()
q.put((client_sock, client_addr))
echo_server(('',15000), 128)
轉載請註明出處,本文鏈接:https://www.uj5u.com/qita/121167.html
標籤:其他
上一篇:邏輯式編程還有用嗎?--“三維度”邏輯編程語言的設計(2)
下一篇:需求分析第一章
