1.IO概述
input 和 output: 是在記憶體中存在的資料交換操作
記憶體和磁盤交換: 檔案讀寫, 列印
記憶體和網路交換: recv send recvfrom, sendto
IO密集型程式: 程式中執行大量的IO操作,而較少的需要CPU運行,消耗CPU資源少,運行周期往往比較長
CPU密集型程式: 程式中執行大量的CPU運算,而較少的需要IO操作,消耗CPU資源多
2.IO分類(阻塞/非阻塞)
阻塞IO: 默認形態,是效率最低的一種IO
1.因為等待某種條件達成再繼續運行,例如: accept recv input
2.處理IO事件時耗時較長也會產生阻塞,例如: 檔案的讀寫程序,網路資料的傳輸程序
非阻塞IO: 通過修改IO物件使其變為非阻塞狀態,通常用回圈來不斷判斷阻塞條件,需要消耗更多的CPU但是在一定程度上提高了IO效率
IO多路復用: 通過同時監控多個IO事件,當那個IO事件就緒就執行那個IO事件,形成并發效果,也被稱作事件驅動IO
信號驅動IO: 準備資料階段(用戶行程非阻塞),復制資料階段(用戶行程阻塞)
1.當呼叫read函式的時候,準備資料的程序中用戶執行緒不阻塞,用戶執行緒可以去做其他事情
2.等到資料準備完了,用戶行程會收到一個SIGIO信號,然后可以在信號處理函式中處理資料
異步IO: Python實作不了,但是tornado框架自帶異步IO
1.阻塞IO,非阻塞IO,多路復用IO,信號驅動IO都是同步IO,都需等待將資料從內核緩沖區拷貝到用戶記憶體,會阻塞行程
2.異步IO是異步IO,當用戶發起一個操作后就立即去做其他事情了,內核會等待,檢測資料直到資料準備完成
3.然后將資料拷貝到用戶記憶體(無需先拷貝到內核緩沖區),這一切完成后會給用戶行程發送一個信號告知操作完成了
3.非阻塞IO-效率比阻塞IO高
1.非阻塞IO設定方法
import socket s = socket.socket() # 默認會創建流式套接字 # 修改套接字設定的阻塞狀態 # 引數(bool型別): 默認為True,設定為False則為非阻塞 s.setblocking()
2.TCP服務端-非阻塞IO
import socket import time def main(): # 創建資料流套接字 tcp_server_socket = socket.socket(socket.AF_INET, socket.SOCK_STREAM) # 設定埠重用 tcp_server_socket.setsockopt(socket.SOL_SOCKET, socket.SO_REUSEADDR, 1) # 系結埠,運行局域網內的客戶端連接 tcp_server_socket.bind(("0.0.0.0", 7890)) # 讓默認的套接字由主動變為被動監聽 listen tcp_server_socket.listen(128) # 設定套接字為非阻塞狀態 tcp_server_socket.setblocking(False) # 回圈目的: 多次呼叫accept等待客戶端連接,為多個客服端服務 while True: print("等待一個新的客戶端的到來...") try: new_client_socket, client_addr = tcp_server_socket.accept() except BlockingIOError: time.sleep(1) print(time.ctime()) continue print("客戶端' %s '已經到來" % str(client_addr)) # 設定客戶端字為非阻塞狀態 new_client_socket.setblocking(False) # 回圈目的: 為同一個客服端服務多次 while True: try: # 接收客戶端發送過來的請求 recv_data = https://www.cnblogs.com/tangxuecheng/p/new_client_socket.recv(1024) # recv解堵塞的兩種方式: 1.客戶端發送過來資料,2.客戶端呼叫了close if recv_data: print("客戶端' %s '發送過來的請求是: %s" % (str(client_addr), recv_data.decode("utf-8"))) # 回送資料給客戶端表示回應客戶端的請求 send_data = https://www.cnblogs.com/tangxuecheng/p/"----ok----Request accepted" new_client_socket.send(send_data.encode("utf-8")) else: break except BlockingIOError: time.sleep(1) print(time.time()) continue # 關閉accept回傳的套接字 new_client_socket.close() print("已經為 %s 客戶端已經服務完畢" % str(client_addr)) # 關閉監聽套接字 tcp_server_socket.close() if __name__ == "__main__": main()
3.TCP客戶端-非阻塞IO
import socket def main(): # 創建資料流套接字 tcp_client_socket = socket.socket(socket.AF_INET, socket.SOCK_STREAM) # 連接服務器 server_ip = input("請輸入要連接的服務器的ip:") serve_port = int(input("請輸入要連接的服務器的port:")) server_addr = (server_ip, serve_port) tcp_client_socket.connect(server_addr) # 發送資料 send_data = https://www.cnblogs.com/tangxuecheng/p/input("請輸入要發生的資料:") tcp_client_socket.send(send_data.encode("utf-8")) # 接收服務器發送過來的資料 recv_data = https://www.cnblogs.com/tangxuecheng/p/tcp_client_socket.recv(1024) print("接收到的資料為:%s" % recv_data.decode("utf-8")) # 關閉套接字 tcp_client_socket.close() if __name__ == "__main__": main()
4.超時檢測(超時等待)-效率比非阻塞IO高
1.超時檢測設定
import socket s = socket.socket() # 默認會創建流式套接字 # 設定套接字的超時檢測,所謂的超時檢測即對原本阻塞的函式進行設定,使其不再始終阻塞,而是阻塞等待一定時間后自動回傳 # 引數: 超時時間,在規定時間中如果正常接收阻塞則繼續執行否則產生timeout例外 s.settimeout(5) # 設定超時時間為5秒
2.TCP服務端-超時檢測
import socket import traceback def main(): # 創建資料流套接字 tcp_server_socket = socket.socket(socket.AF_INET, socket.SOCK_STREAM) # 設定埠重用 tcp_server_socket.setsockopt(socket.SOL_SOCKET, socket.SO_REUSEADDR, 1) # 系結埠,運行局域網內的客戶端連接 tcp_server_socket.bind(("0.0.0.0", 7890)) # 讓默認的套接字由主動變為被動監聽 listen tcp_server_socket.listen(128) # 設定套接字超時等待時間為5秒 tcp_server_socket.settimeout(5) # 回圈目的: 多次呼叫accept等待客戶端連接,為多個客服端服務 while True: print("等待一個新的客戶端的到來...") try: new_client_socket, client_addr = tcp_server_socket.accept() except Exception: traceback.print_exc() continue print("客戶端' %s '已經到來" % str(client_addr)) # 設定客戶端字的超時等待時間為5秒 new_client_socket.settimeout(5) # 回圈目的: 為同一個客服端服務多次 while True: try: # 接收客戶端發送過來的請求 recv_data = https://www.cnblogs.com/tangxuecheng/p/new_client_socket.recv(1024) # recv解堵塞的兩種方式: 1.客戶端發送過來資料,2.客戶端呼叫了close if recv_data: print("客戶端' %s '發送過來的請求是: %s" % (str(client_addr), recv_data.decode("utf-8"))) # 回送資料給客戶端表示回應客戶端的請求 send_data = https://www.cnblogs.com/tangxuecheng/p/"----ok----Request accepted" new_client_socket.send(send_data.encode("utf-8")) else: break except Exception: traceback.print_exc() continue # 關閉accept回傳的套接字 new_client_socket.close() print("已經為 %s 客戶端已經服務完畢" % str(client_addr)) # 關閉監聽套接字 tcp_server_socket.close() if __name__ == "__main__": main()
3.TCP客戶端-超時檢測
import socket def main(): # 創建資料流套接字 tcp_client_socket = socket.socket(socket.AF_INET, socket.SOCK_STREAM) # 連接服務器 server_ip = input("請輸入要連接的服務器的ip:") serve_port = int(input("請輸入要連接的服務器的port:")) server_addr = (server_ip, serve_port) tcp_client_socket.connect(server_addr) # 發送資料 send_data = https://www.cnblogs.com/tangxuecheng/p/input("請輸入要發生的資料:") tcp_client_socket.send(send_data.encode("utf-8")) # 接收服務器發送過來的資料 recv_data = https://www.cnblogs.com/tangxuecheng/p/tcp_client_socket.recv(1024) print("接收到的資料為:%s" % recv_data.decode("utf-8")) # 關閉套接字 tcp_client_socket.close() if __name__ == "__main__": main()
5.IO多路復用-效率比超時等待高
1.IO多路復用概述
1.IO多路復用的定義: 同時監控多個IO事件,當那個IO事件就緒就執行那個IO事件,形成并發效果
2.使用IO多路復用的注意點
1.在處理IO程序中不應該發生死回圈(即某個IO單獨占用服務器)
2.IO多路復用是單行程程式,是一個并發程式
3.IO多路復用有較高的IO執行效率
2.IO多路復用-select模塊中的三個方法(select,pool,epool)
1.三個方法的適用平臺
select: 適用于Windows Linux Unix MacOS
poll: 適用于Linux Unix MacOS
epoll: 適用于Linux Unix
2.select方法-采用輪詢機制,32位機檔案描述符只能開1024,64位機檔案描述符只能開2048
r,w,x = select(rlist, wlist, xlist, [timeout])
功能: 監控IO事件,阻塞等待IO事件發生
引數:
rlist: 串列,存放要監控等待處理的IO,例如客戶端主動連接,或客戶端發送訊息
wlist: 串列,存放希望主動處理的IO,例如回送訊息給客戶端
xlist: 串列,存放如果發生例外需要處理的IO
timeout: 數字,超時檢測,默認一直阻塞
回傳值:
r: 串列,rlist中準備就緒的IO
w: 串列,wlist中準備就緒的IO
x: 串列,xlist中準備就緒的IO
3.poll方法-采用輪詢機制突破了檔案描述符只能開1024的限制(1G記憶體大概能開10萬個事件去監聽),效率上和select差不多
1.創建poll物件
p = select.poll()
2.加入關注的IO
p.register(s) # 添加IO關注
p.unregister(s) # 從關注IO中洗掉
3.使用poll函式監控
events = p.poll()
功能: 阻塞等待register的事件只要有任意準備就緒即回傳
回傳值: events # [(fileno, event), (), ()]; fileno: 檔案描述符; event: 準備就緒的事件
4.處理發生的IO事件
# poll的IO事件
POLLIN # 可讀rlist
POLLOUT # 可寫wlist
POLLUP # 斷開連接
POLLERR # 例外xlist
POLLPRI # 緊急處理
POLLVAL # 無效資料
4.epoll方法: 采用了回呼機制同樣也突破了檔案描述符只能開1024的限制(1G記憶體大概能開10萬個事件去監聽)
1.效率上比poll和select稍高,但只能用于Linux和Unix平臺
2.epoll既可以采用水平觸發,也可以采用邊緣觸發,而select和pool只支持水平觸發
3.epoll實作原理提流程圖: https://www.processon.com/view/link/5efd699c7d9c08442043e5b8
4.水平觸發: 如果檔案描述符已經就緒可以非阻塞的執行IO操作了,此時會觸發通知,允許在任意時刻重復檢測IO的狀態,
沒有必要每次描述符就緒后盡可能多的執行IO,select和pool就屬于水平觸發
5.邊緣觸發: 如果檔案描述符自上次狀態改變后有新的IO活動到來,此時會觸發通知,在收到一個IO事件通知后要盡可能多的執行IO操作,
因為如果在一次通知中沒有執行完IO那么就需要等到下一次新的IO活動到來才能獲取到就緒的描述符,信號驅動式IO就屬于邊緣觸發
3.TCP服務端-select方法實作IO多路復用
import socket import sys import traceback import select def main(): # 創建資料流套接字 tcp_server_socket = socket.socket(socket.AF_INET, socket.SOCK_STREAM) # 設定埠重用 tcp_server_socket.setsockopt(socket.SOL_SOCKET, socket.SO_REUSEADDR, 1) # 系結埠,運行局域網內的客戶端連接 tcp_server_socket.bind(("0.0.0.0", 7890)) # 讓默認的套接字由主動變為被動監聽 listen tcp_server_socket.listen(128) # 設定套接字超時等待時間為5秒 tcp_server_socket.settimeout(5) # 創建一個串列存放要監控等待處理的IO rlist = [tcp_server_socket] # 創建一個串列存放希望主動處理的IO wlist = list() # 創建一個串列存放如果發生例外需要處理的IO xlist = [tcp_server_socket] # 回圈目的: 多次呼叫select監控客戶端連接事件,阻塞等待客戶端連接事件發生 while True: print("監控是否有新的客戶端的到來...") # 引數說明: rlist: 串列,存放要監控等待處理的IO; wlist: 串列,存放希望主動處理的IO # 引數說明: xlist: 串列,存放如果發生例外需要處理的IO; timeout: 數字,超時檢測,默認一直阻塞 # 回傳值說明: rs: 串列,rlist中準備就緒的IO; ws: 串列,wlist中準備就緒的IO; xs: 串列,xlist中準備就緒的IO # wlist有內容,select會解除阻塞立即回傳 rs, ws, xs = select.select(rlist, wlist, xlist, 5) # 設定超時等待時間為5秒 # 遍歷準備就緒的IO串列 for r in rs: # 有客戶端連接時 if r is tcp_server_socket: new_client_socket, client_addr = r.accept() print("客戶端' %s '已經到來" % str(client_addr)) # 將為客戶端服務的套接字加入監控串列rlist rlist.append(new_client_socket) else: try: # 已經連接的客戶端發送訊息時 recv_data = https://www.cnblogs.com/tangxuecheng/p/r.recv(1024) if recv_data: # r.getsockname()可以獲取套接字系結的地址 print("客戶端' %s '發送過來的請求是: %s" % (str(r.getsockname()), recv_data.decode("utf-8"))) # 服務器回送訊息放到主動處理串列wlist wlist.append(r) else: # 如果客戶端是呼叫了close斷開了連接,那么需要從監控串列中洗掉為客戶端服務的套接字 rlist.remove(r) print("已經為 %s 客戶端已經服務完畢" % str(r.getsockname())) # 關閉為客戶端服務的套接字 r.close() except Exception: traceback.print_exc() # 遍歷需要服務器主動處理的IO串列 for w in ws: # 回送資料給客戶端表示回應客戶端的請求 send_data = https://www.cnblogs.com/tangxuecheng/p/"----ok----Request accepted" w.send(send_data.encode("utf-8")) # 把為客戶端服務的套接字從主動處理的串列中洗掉 wlist.remove(w) # 遍歷例外請求的IO串列 for x in xs: # 如果監聽套接字發生了例外 if x is tcp_server_socket: # 關閉例外的監聽套接字 x.close() # 退出程式 sys.exit(1) if __name__ == "__main__": main()
4.TCP客戶端-select方法實作IO多路復用
import socket def main(): # 創建資料流套接字 tcp_client_socket = socket.socket(socket.AF_INET, socket.SOCK_STREAM) # 連接服務器 server_ip = input("請輸入要連接的服務器的ip:") serve_port = int(input("請輸入要連接的服務器的port:")) server_addr = (server_ip, serve_port) tcp_client_socket.connect(server_addr) # 發送資料 send_data = https://www.cnblogs.com/tangxuecheng/p/input("請輸入要發生的資料:") tcp_client_socket.send(send_data.encode("utf-8")) # 接收服務器發送過來的資料 recv_data = https://www.cnblogs.com/tangxuecheng/p/tcp_client_socket.recv(1024) print("接收到的資料為:%s" % recv_data.decode("utf-8")) # 關閉套接字 tcp_client_socket.close() if __name__ == "__main__": main()
5.TCP服務器-pool方法實作IO多路復用
import socket import select import traceback import sys def main(): # 創建資料流套接字 tcp_server_socket = socket.socket(socket.AF_INET, socket.SOCK_STREAM) # 設定埠重用 tcp_server_socket.setsockopt(socket.SOL_SOCKET, socket.SO_REUSEADDR, 1) # 系結埠,運行局域網內的客戶端連接 tcp_server_socket.bind(("0.0.0.0", 7890)) # 讓默認的套接字由主動變為被動監聽 listen tcp_server_socket.listen(128) # 設定套接字超時等待時間為5秒 tcp_server_socket.settimeout(5) # 創建一個IO事件地圖 fdmap = {tcp_server_socket.fileno(): tcp_server_socket} # 創建poll物件 epool_list = select.poll() # 將監聽套接對應的fd注冊到poll中進行監聽,如果fd已經注冊過,則會發生例外 epool_list.register(tcp_server_socket, (select.POLLIN | select.POLLERR)) # 回圈目的: 等待新的客戶端連接或者已連接的客戶端發送過來資料 while True: print("開始事件監控") events = epool_list.poll() for fd, event in events: if fd == tcp_server_socket.fileno(): new_client_socket, client_addr = tcp_server_socket.accept() print("客戶端' %s '已經到來" % str(client_addr)) epool_list.register(new_client_socket, select.POLLIN) # 記錄這個資訊 fdmap[new_client_socket.fileno()] = new_client_socket elif event & select.POLLIN: try: # 已經連接的客戶端發送訊息時 recv_data = https://www.cnblogs.com/tangxuecheng/p/fdmap[fd].recv(1024) if not recv_data: # 在poll中注銷客戶端的資訊 epool_list.unregister(fd) print("已經為客戶端已經服務完畢") # 關閉客戶端的檔案句柄 fdmap[fd].close() # 在字典中洗掉與已關閉客戶端相關的資訊 del fdmap[fd] else: print("客戶端發送過來的請求是: %s" % (recv_data.decode("utf-8"))) # 回送資料給客戶端表示回應客戶端的請求 send_data = https://www.cnblogs.com/tangxuecheng/p/"----ok----Request accepted" fdmap[fd].send(send_data.encode("utf-8")) except Exception: traceback.print_exc() elif event & select.POLLERR: # 如果監聽套接字發生了例外 if fd is tcp_server_socket.fileno(): # 關閉例外的監聽套接字 fdmap[fd].close() # 退出程式 sys.exit(1) if __name__ == "__main__": main()
6.TCP客戶端-pool方法實作IO多路復用
import socket def main(): # 創建資料流套接字 tcp_client_socket = socket.socket(socket.AF_INET, socket.SOCK_STREAM) # 連接服務器 server_ip = input("請輸入要連接的服務器的ip:") serve_port = int(input("請輸入要連接的服務器的port:")) server_addr = (server_ip, serve_port) tcp_client_socket.connect(server_addr) # 發送資料 send_data = https://www.cnblogs.com/tangxuecheng/p/input("請輸入要發生的資料:") tcp_client_socket.send(send_data.encode("utf-8")) # 接收服務器發送過來的資料 recv_data = https://www.cnblogs.com/tangxuecheng/p/tcp_client_socket.recv(1024) print("接收到的資料為:%s" % recv_data.decode("utf-8")) # 關閉套接字 tcp_client_socket.close() if __name__ == "__main__": main()
7.TCP服務器-epool方法實作IO多路復用
import socket import select import traceback import sys def main(): # 創建資料流套接字 tcp_server_socket = socket.socket(socket.AF_INET, socket.SOCK_STREAM) # 設定埠重用 tcp_server_socket.setsockopt(socket.SOL_SOCKET, socket.SO_REUSEADDR, 1) # 系結埠,運行局域網內的客戶端連接 tcp_server_socket.bind(("0.0.0.0", 7890)) # 讓默認的套接字由主動變為被動監聽 listen tcp_server_socket.listen(128) # 設定套接字超時等待時間為5秒 tcp_server_socket.settimeout(5) # 創建一個IO事件地圖 fdmap = {tcp_server_socket.fileno(): tcp_server_socket} # 創建poll物件 epool_list = select.poll() # 將監聽套接對應的fd注冊到poll中進行監聽,如果fd已經注冊過,則會發生例外 epool_list.register(tcp_server_socket, (select.EPOLLIN | select.EPOLLERR)) # 回圈目的: 等待新的客戶端連接或者已連接的客戶端發送過來資料 while True: print("開始事件監控") events = epool_list.poll() for fd, event in events: if fd == tcp_server_socket.fileno(): new_client_socket, client_addr = tcp_server_socket.accept() print("客戶端' %s '已經到來" % str(client_addr)) epool_list.register(new_client_socket, select.EPOLLIN) # 記錄這個資訊 fdmap[new_client_socket.fileno()] = new_client_socket elif event & select.EPOLLIN: try: # 已經連接的客戶端發送訊息時 recv_data = https://www.cnblogs.com/tangxuecheng/p/fdmap[fd].recv(1024) if not recv_data: # 在poll中注銷客戶端的資訊 epool_list.unregister(fd) print("已經為客戶端已經服務完畢") # 關閉客戶端的檔案句柄 fdmap[fd].close() # 在字典中洗掉與已關閉客戶端相關的資訊 del fdmap[fd] else: print("客戶端發送過來的請求是: %s" % (recv_data.decode("utf-8"))) # 回送資料給客戶端表示回應客戶端的請求 send_data = https://www.cnblogs.com/tangxuecheng/p/"----ok----Request accepted" fdmap[fd].send(send_data.encode("utf-8")) except Exception: traceback.print_exc() elif event & select.EPOLLERR: if fd is tcp_server_socket.fileno(): # 關閉例外的監聽套接字 fdmap[fd].close() # 退出程式 sys.exit(1) if __name__ == "__main__": main()
8.TCP客戶端-epool方法實作IO多路復用
import socket def main(): # 創建資料流套接字 tcp_client_socket = socket.socket(socket.AF_INET, socket.SOCK_STREAM) # 連接服務器 server_ip = input("請輸入要連接的服務器的ip:") serve_port = int(input("請輸入要連接的服務器的port:")) server_addr = (server_ip, serve_port) tcp_client_socket.connect(server_addr) # 發送資料 send_data = https://www.cnblogs.com/tangxuecheng/p/input("請輸入要發生的資料:") tcp_client_socket.send(send_data.encode("utf-8")) # 接收服務器發送過來的資料 recv_data = https://www.cnblogs.com/tangxuecheng/p/tcp_client_socket.recv(1024) print("接收到的資料為:%s" % recv_data.decode("utf-8")) # 關閉套接字 tcp_client_socket.close() if __name__ == "__main__": main()
轉載請註明出處,本文鏈接:https://www.uj5u.com/houduan/10160.html
標籤:Python
