一.scoket基本介紹
1.scoket簡介(以下是來自chatgpt回答)
1)Socket(套接字)是計算機網路中用于描述主機之間通信的一種機制,它定義了一種標準的介面,
使得應用程式可以利用網路傳輸層提供的服務(如TCP或UDP)進行通信,
2)Socket的作用是在網路應用程式之間提供資料傳輸服務,通過Socket,應用程式可以將資料發送
到網路上的另一個應用程式,并從網路上的另一個應用程式接收資料,
3)Socket還提供了一種機制,使得應用程式可以接收來自網路上的請求,并對這些請求進行回應,
常見的網路應用程式,如Web服務器、郵件服務器、FTP服務器等都使用Socket技術,此外,許多P2P
(點對點)應用程式,如BitTorrent、eMule等也使用Socket技術,Socket還可以用于實作各種網路應用
程式,如網路游戲、聊天程式等,
2.建立TCP/IP連接
1)創建一個scoket物件
socket_stream = socket.socket(socket.AF_INET, socket.SOCK_STREAM)
第一個引數表示通信協議型別
AF_INET:適用于網路通信
AF_UNIX:適用于本地行程間通信
第二個引數表示創建什么連接
SOCK_STREAM:表示tcp/ip連線
SOCK_DGRAM:表示udp連接
2)與服務端建立連接
# 建立連接 ip_port = ("ip", "port") # 連接的服務端ip和埠 socket_stream.connect(ip_port) # 建立連接,出錯不回傳錯誤碼 socket_stream.connect_ex(ip_port) # 出錯時回傳錯誤碼,不拋例外
connect()和connect_ex()選一個即可
3)發送資料
# 發送資料 data = https://www.cnblogs.com/lihongtaoya/p/'hello' socket_stream.send(data.encode("utf-8")) # 發送TCP資料,當send在待發送資料量大于己端快取區剩余空間時,資料丟失,不會發完 # or socket_stream.sendall(data.encode("utf-8")) # 發送完整的TCP資料(回圈呼叫send,sendall在待發送資料量大于己端快取區剩余空間時,資料不丟失,回圈呼叫send直到發完)
當發送的資料較大時建議使用sendall發送
4)接收TCP資料
# 接收TCP資料,1024(變數)表示每次最多接受1M位元組的資料,recv()函式是一個阻塞函式,沒有要接收的資料時,會一直等待,直到接收到資料或出現錯誤才會回傳 response_data = https://www.cnblogs.com/lihongtaoya/p/socket_stream.recv(1024)
5)關閉連接
# 關閉連接 socket_stream.close()
3.建立UDP連接
1)創建scoket物件
client_dgram = socket.socket(socket.AF_INET, socket.SOCK_DGRAM)
2)發送資料
# 發送資料 ip_port = ('127.0.0.1', 8080) data = 'Hello' client_dgram.sendto(data.encode("utf-8"), ip_port)
這里發送用的是sendto()方法
3)接收資料
# 接收UDP資料資料 data = https://www.cnblogs.com/lihongtaoya/p/client_dgram.recvfrom(4096)
4)關閉連接
# 關閉連接 client_dgram.close()
二.使用scoket撰寫聊天程式
以下部分代碼參考:https://www.yuque.com/imhelloworld/nov9az/bffcea259d3c96fb17a130acebc12801
聊天程式的話涉及兩個端,既server和click,這里直接貼代碼了,相關注釋都是代碼里
1)server
import socket import threading # 服務器IP地址和埠號 SERVER_IP = 'IP地址' SERVER_PORT = 埠號 # 創建一個socket物件 server_socket = socket.socket(socket.AF_INET, socket.SOCK_STREAM) # 系結IP地址和埠號 server_socket.bind((SERVER_IP, SERVER_PORT)) # 設定最大連接數,超過10后拒絕,0的話表示不接受連接,直接拒絕 server_socket.listen(10) # 客戶端串列 clients = [] def handle_client(client_socket): while True: try: # 接收客戶端發送的訊息 message = client_socket.recv(10240).decode('utf-8') print("列印", message) if message == 'quert': client_socket.send(message.encode('utf-8')) else: # 廣播訊息給所有客戶端 for c in clients: if c != client_socket: c.send(message.encode('utf-8')) except: # 發生例外時關閉連接 client_socket.close() clients.remove(client_socket) print("發送例外了") break # 回圈監聽客戶端連接 while True: print('Waiting for client connections...') # 接受客戶端連接請求,新的客戶端請求時創建一個新的socket,用于處理客戶端的請求,原有的socket繼續監聽其他客戶端的連接請求; # 函式是一個阻塞函式,當沒有客戶端連接請求時,會一直等待,直到有客戶端連接請求到達 client_socket, client_address = server_socket.accept() print('客戶socket', client_socket, 'ip和埠', client_address) clients.append(client_socket) # 創建一個新的執行緒處理客戶端連接 client_thread = threading.Thread(target=handle_client, args=(client_socket, )) client_thread.start()
2)click
import socket import threading import time # 服務器IP地址和埠號 SERVER_IP = 'IP地址' SERVER_PORT = 埠號 # 創建一個socket物件 client_socket = socket.socket(socket.AF_INET, socket.SOCK_STREAM) # 連接服務器 client_socket.connect((SERVER_IP, SERVER_PORT)) def receive(): while True: try: # 接收服務器發送的訊息 message = client_socket.recv(1024).decode('utf-8') if message == 'quert': client_socket.close() break print('\n對方回答:', message) except: # 發生例外時關閉連接 client_socket.close() break def send(): while True: # 獲取用戶輸入的訊息 message = input() if message == 'quert': client_socket.close() print('連接斷開,通信結束') break else: # 發送訊息給服務器 client_socket.send(message.encode('utf-8')) # 創建兩個執行緒,分別用于接收和發送訊息 receive_thread = threading.Thread(target=receive) send_thread = threading.Thread(target=send) # 啟動執行緒 receive_thread.start() send_thread.start()
這里需要注意的是,既然是聊天程式,那么肯定涉及兩個click,兩個click代碼一致即可,運行時先運行server,后運行click,之后就可以在Terminal下建立對話了,
以上是click在同一臺電腦上,若不在同一臺電腦的,建立對話需在同一個局域網下,ip地址在dos視窗下輸入ipconfig\all即可查看,埠號可隨便填個,只要不被占用就行,

三.scoket+chatgpt建立搜索對話框
1)獲取chatgpt搜索介面,這里就直接貼代碼了
def api(params, tokened="token"): url = "https://api.aigcfun.com/api/v1/text?key=" + tokened data = {"messages": [{"role": "system", "content": "請以markdown的形式回傳答案"}, {"role": "user", "content": params}], "tokensLength": 32, "model": "gpt-3.5-turbo"} response_data = requests.post(url=url, json=data) return response_data.json()["choices"][0]["text"]
params為用戶問的問題,return回傳的是chatgpt回答的答案
2)獲取到chatgpt的回答內容后回傳給服務端
def receive(): while True: try: # 接收服務器發送的訊息 message = client_socket.recv(1024).decode('utf-8') data_message = api(message) while data_message == None: # 由于chatgpt有時回應的比較慢,這里需添加個回圈判斷 continue # if data_message == '您今日使用次數已達上限,請明日再試!': # data_message = api(params=message, tokened=token()) client_socket.send(data_message.encode('utf-8')) # 發送chatgpt回答的問題給服務端 except: # 發生例外時關閉連接 client_socket.close() break
3)目前chatpgt雖是免費的,但每天都有次數限制,所以可以請求獲取token的介面來獲取新的token,這里就不做演示了(hhhhhh.........)

轉載請註明出處,本文鏈接:https://www.uj5u.com/houduan/548775.html
標籤:Python
