我需要在本地 Windows 機器上的 Java 應用程式和 python 腳本之間傳遞一些資料字串。因此,我決定使用與 TCP 通信的 Java 套接字服務器與 python 客戶端進行通信。Java 創建了兩個執行緒來處理本地主機的埠 9998 和 9999 上的兩個套接字連接。我使用埠 9998 來處理傳入訊息,而使用埠 9999 來處理發送訊息。對于發送/接收的前幾條訊息,我的兩個應用程式運行順利,并且在某些時候,它會在將字串從 Java 發送到 Python 的呼叫上停止。這是我的代碼的一部分:
這個 Java 類處理套接字服務器的創建和通信
public class ServerSocketConnection {
private int port;
private Socket socket;
private ServerSocket serverSocket;
private Logger logger;
private BufferedWriter out;
private BufferedReader in;
public ServerSocketConnection(int port) {
this.port = port;
logger = App.getLogger();
}
// Create a server for a socket connection
public void createServer() {
try {
// Create a server socket
serverSocket = new ServerSocket(port);
// Socket creation
socket = serverSocket.accept();
// Create a print writer
out = new BufferedWriter(new OutputStreamWriter(socket.getOutputStream()));
// Create a buffered reader
in = new BufferedReader(new InputStreamReader(socket.getInputStream()));
} catch (IOException e) {
logger.severe("Error creating server socket");
}
}
// Close the server socket
public void closeServer() {
try {
serverSocket.close();
} catch (IOException e) {
logger.severe("Error closing server socket");
}
}
public void sendMessage(String message) {
try {
// Sending the byte lenght of the message
byte[] ptext = message.getBytes("UTF-8");
send(String.valueOf(ptext.length));
// Sending the message
send(message);
} catch (IOException e) {
logger.severe("Error sending message:" e.getMessage());
}
}
private void send(String message) throws IOException {
out.write(message);
out.newLine();
out.flush();
}
public String receiveMessage() {
try {
return in.readLine();
} catch (IOException e) {
logger.severe("Error receiving message");
return null;
}
}
這是處理訊息發送的 Java 執行緒。它從其他執行緒之間共享的佇列中獲取要發送的訊息。
public class SendToPlatform implements Runnable {
private static final int PORT = 9999;
private Thread worker;
private AtomicBoolean running;
private AtomicBoolean stopped = new AtomicBoolean(false);
private BlockingQueue<String> queueOut;
private Logger logger;
private ServerSocketConnection serverSocketConnection;
public SendToPlatform(BlockingQueue<String> queueOut, AtomicBoolean running) {
this.queueOut = queueOut;
this.running = running;
this.logger = App.getLogger();
serverSocketConnection = new ServerSocketConnection(PORT);
}
public void run() {
stopped.set(false);
serverSocketConnection.createServer();
while (running.get()) {
socketSender();
}
stopped.set(true);
}
private void socketSender() {
if (!queueOut.isEmpty()) {
String element = null;
try {
element = queueOut.poll(1000, TimeUnit.MILLISECONDS);
} catch (InterruptedException e) {
logger.severe("SendToPlatform: InterruptedException: " e.getMessage());
}
serverSocketConnection.sendMessage(element);
}
}
}
This is the python thread that is used to receive the message from the Java socket server:
def __socket_reading_no_server(self, queue_input : queue.Queue, is_running : bool):
HOST = "localhost"
PORT = 9999
s = socket.socket(socket.AF_INET, socket.SOCK_STREAM)
s.connect((HOST, PORT))
while is_running:
data = s.recv(4)
message_size = int(data.decode('UTF-8').strip())
data = s.recv(min(message_size 2, 1024))
message = data.decode('UTF-8').strip()
queue_input.put(message)
s.close()
And this method is lanched as a thread with these instructions:
input_thread = threading.Thread(target=self.__socket_reading_no_server , args =(self.__queue_input, self.__running, ), daemon=True)
input_thread.start()
By debugging, logging, and using Wireshark to understand the problem in my code, I concluded that I have a recurrent problem with the out.write instruction that blocks while sending the message after around 10 messages are sent correctly. The pending message gets released when I close the socket connection. I tried using PrintWriter and DataOutputStream instead of BufferedWriter, but the same problem occurred. I tried not sending the length of the message before sending the string to adapt the s.recv() size, but the same problem occurred.
I'm new to socket programming, and probably I did something incredibly wrong, but I cannot find where the problem is. Maybe is there a better way to pass data between processes that I'm unaware of that I can use for my needs instead of sockets?
Edits after @absuu answer
After applying the corrections suggested in the answer I still get the same problem of out.write in the send method blocking while trying to write into the socket. I edited my code as follows:
public class ServerSocketConnection {
[...]
public void sendMessage(String message) {
try {
send(message);
} catch (IOException e) {
logger.severe("Error sending message:" e.getMessage());
}
}
private void send(String message) throws IOException {
message = "\r\n";
byte[] ptext = message.getBytes("UTF-8");
out.write(String.format("-",ptext.length));
out.write("\r\n");
out.flush();
out.write(new String(ptext));
logger.info("Data sent");
out.flush();
}
}
I also increased the s.recv size but nothing changed
uj5u.com熱心網友回復:
TL;DR,請參閱下面的代碼更正。
在您提出任何問題之前,您需要注意以下幾點:
- 將服務器的資料編碼從UTF-16更改為UTF-8。 為什么?任何端點之間的資料傳輸都依賴于
consistency,即服務器/客戶端應用程式的資料編碼。當您的服務器 (ServerSocketConnection) 發送使用UTF-16編碼的訊息時,您的客戶端 (__socket_reading_no_server) 正在接收使用UTF-8 編碼的訊息。即使您的客戶端能夠接收來自服務器的所有訊息,它也根本無法識別它們。例如,UTF-16將 string 編碼"5"為 bytes[0,53],對于UTF-8結果是[53](假設大端位元組序)。有關更多詳細資訊,請參閱維基百科。 - 不要使用
out.newLine(). 使用out.write("\r\n")來代替。 為什么?newline()的行為是平臺相關的,這導致分別為類 Unix 作業系統或 Windows 作業系統回傳一或兩個字符。它依賴于line.separator系統屬性,您可以參考Java 檔案了解更多詳細資訊。 - 您
data = s.recv(4)設定了客戶端一次最多讀取 4 個位元組的約束,這很危險。 為什么?因為根據 Python doc,您4不是客戶端接收的實際位元組數,而是要接收的最大資料量。此外,客戶端理論上最多只能接收 9999 位元組(byte1~4:)'9'的下一條傳入訊息。
對于您的問題“...在發送...時阻塞的指令”:不幸的是,這里沒有提供錯誤訊息,我們無法準確推斷整個事情的哪一部分出了問題。但是,我們可以推斷,這更可能是Java實作網路socket的結果,因為你遇到的情況在C網路編程中可能比較少見(即raw sockets),即在執行程序中不會發生阻塞。位元組的連續傳輸,根據系統呼叫的POSIX定義write(注意大多數高級語言最終會呼叫write系統呼叫來發送位元組):
成功完成后,write() 和 pwrite() 應回傳實際寫入與 fildes 關聯的檔案的位元組數。這個數字永遠不會大于 nbyte。否則,將回傳-1并設定 errno 以指示錯誤。
也就是說,在呼叫write將位元組發送到流緩沖區后,它只會回傳 something而不是阻塞。
網路套接字的Java 實作相當復雜,這絕對不是Java 的錯。事實上,如果我們能正確使用套接字,那么晦澀的錯誤就會消失。例如,根據我的測驗,在應用以下更正后,您的應用程式運行良好:
代碼更正:
ServerSocketConnection/byte[] ptext = message.getBytes("UTF-16");->byte[] ptext = message.getBytes("UTF-8");ServerSocketConnection/send(String.valueOf(ptext.length));->send(String.format("-",ptext.length));ServerSocketConnection/out.newLine()->out.write("\r\n")
測驗:
服務器:
BlockingQueue<String> q = new ArrayBlockingQueue<String>(20);
q.add("str 1");
q.add("str 2");
q.add("str 3");
serverSocketConnection.sendMessage(element);
logger.info("element:" element); // debug
########################################################## Server Outputs
Nov 11, 2021 11:05:51 PM SendToPlatform socketSender
INFO: element:str 1
Nov 11, 2021 11:05:51 PM SendToPlatform socketSender
INFO: element:str 2
Nov 11, 2021 11:05:51 PM SendToPlatform socketSender
INFO: element:str 3
客戶:
print("message: %s" % message) # debug
# queue_input.put(message)
########################################################## Client Outputs
message: str 1
message: str 2
message: str 3
編輯:
我要強調的另一件事是,雖然我們實際上并不知道您的應用程式要做什么,但是通過網路套接字可以實作簡單的基于長度的訊息傳輸。也許它不太實用和健壯,但絕對有可能。以下是對您的代碼的一些更詳細的更正:
服務器套接字連接:
// ............. other parts stay unchanged
public void sendMessage(String message) {
try {
int msgLen = message.getBytes("UTF-8").length;
send(String.format("=", msgLen)); // tell client the message size
send(message); // send actual message
} catch (IOException e) {
logger.severe("Error sending message:" e.getMessage());
}
}
private void send(String message) throws IOException {
out.write(message);
out.flush();
}
// ............. other parts stay unchanged
__socket_reading_no_server:
# ............. other parts stay unchanged
while is_running:
data = s.recv(3)
message_size = int(data.decode('UTF-8').strip())
data = s.recv(min(message_size, 1024))
message = data.decode('UTF-8').strip()
print("incoming message:[%s]" % message)
# ............. other parts stay unchanged
非常非常非常重要:
這里我們有 M=3,它限制了您的任何訊息都應該是最多999 個字符的字串!也就是說,如果您希望您的應用程式正常作業,那么您的每條訊息(例如String msg)都應該滿足msg.length <= 999。
uj5u.com熱心網友回復:
這是 Java 客戶端和 Python 服務器的“縮減”實作,它演示了發送任意長度訊息(在本例中為字串)的有效機制:
import java.io.DataOutputStream;
import java.io.IOException;
import java.net.Socket;
public class Client {
public static void main(String[] args) throws IOException {
String message = "Talking isn't doing. It is a kind of good deed to say well; and yet words are not deeds.";
try (Socket socket = new Socket("localhost", 7070)) {
try (DataOutputStream out = new DataOutputStream(socket.getOutputStream())) {
byte[] dts = message.getBytes();
out.writeInt(dts.length);
out.write(dts);
}
}
}
}
請注意客戶端如何在發送實際訊息之前將即將到來的訊息的長度作為 32 位整數發送。
from multiprocessing.connection import Listener
def main():
listener = Listener(address=('0.0.0.0', 7070), family='AF_INET', authkey=None)
with listener.accept() as conn:
print(conn.recv_bytes().decode())
if __name__ == '__main__':
main()
連接類期望使用在客戶端(本例中為 Java)實作的協議接收訊息 - 即,一個 32 位整數,它給出了要遵循的資料量。
我希望這能澄清問題
轉載請註明出處,本文鏈接:https://www.uj5u.com/ruanti/359339.html
標籤:python java multithreading sockets
