我正在嘗試構建一個使用執行緒打開并行 TCP 套接字的系統。我的執行緒是使用訊息佇列 IPC 觸發的,因此每次資料包到達訊息佇列時,一個執行緒“喚醒”,打開與遠程服務器的 TCP 連接并發送資料包。我的問題是,在 Wireshark 中,我可以看到使用執行緒而不是一個連接發送檔案所需的時間更短,但吞吐量沒有改變。
我的問題是:
- 如何驗證我的執行緒并行作業?
- 我該如何改進這段代碼?, 3. 如何使用一個執行緒打開多個套接字?
我正在使用虛擬機來運行多執行緒客戶端。我使用的 IDE 是 Clion ,語言是 C。我的代碼:
#include<stdio.h>
#include<stdlib.h>
#include<sys/socket.h>
#include<string.h>
#include <arpa/inet.h>
#include <unistd.h> // for close
#include<pthread.h>
#include <math.h>
#include<malloc.h>
#include<signal.h>
#include<stdbool.h>
#include<sys/types.h>
#include<linux/if_packet.h>
#include<netinet/in.h>
#include<netinet/if_ether.h> // for ethernet header
#include<netinet/ip.h> // for ip header
#include<netinet/udp.h> // for udp header
#include<netinet/tcp.h>
#include <byteswap.h>
#include <errno.h>
#include <stdio.h>
#include <stdlib.h>
#include <sys/types.h>
#include <fcntl.h>
#include <sys/stat.h>
#include <mqueue.h>
#include <assert.h>
#include <time.h>
#define QUEUE_NAME "/ServerDan_Queue"
#define QUEUE_PERM 0660
#define MAX_MESSAGES 10 //Max size = 10
#define MAX_MSG_SIZE 4105 //Max size = 8192B
#define MSG_BUFFER_SIZE MAX_MSG_SIZE 10
#define BSIZE 1024
#define Nbytes 4096
#define ElorServer_addr "192.168.1.54"
///params:
pthread_mutex_t lock = PTHREAD_MUTEX_INITIALIZER;
struct sockaddr_in server;
struct stat obj;
int sock;
int k, size, status;
int i = 0;
typedef struct frag
{
int packet_number;
int seq;
uint8_t data[4096];
bool lastfrag;
} fragma;
void * middlemanThread(void *arg)
{
///========================================///
///**** Waiting for message queue trigger!:///
///=======================================///
long id = (long)arg;
id =1;
mqd_t qd; //queue descriptor
//open the queue for reading//
qd= mq_open(QUEUE_NAME,O_RDONLY);
assert(qd != -1);
struct mq_attr attr;
assert(mq_getattr(qd,&attr) != -1);
uint8_t *income_buf = calloc(attr.mq_msgsize,1);
uint8_t *cast_buf = calloc(attr.mq_msgsize,1);
assert(income_buf);
fragma frag;
struct timespec timeout;
clock_gettime(CLOCK_REALTIME,&timeout);
timeout.tv_sec =50;
//bool closesoc =false;
printf("Waiting for messages ..... \n\n");
while(1){
///========================================///
///**** Open message queue fo receive:///
///=======================================///
if((mq_timedreceive(qd,income_buf,attr.mq_msgsize,0,&timeout))<0){
printf("Failed to receive message for 50 sec \n");
//closesoc =true;
pthread_exit(NULL);
}
else{
cast_buf = income_buf;
printf("Received successfully , your msg :\n");
frag.packet_number = *cast_buf;
cast_buf = (cast_buf sizeof(int));
frag.seq = *cast_buf;
cast_buf = (cast_buf sizeof(int));
memccpy(frag.data,((fragma*)cast_buf)->data,0,Nbytes);
cast_buf = cast_buf Nbytes;
frag.lastfrag = *cast_buf;
uint8_t * data = frag.data;
}
pthread_mutex_lock(&lock);
///========================================///
///**** Connecting to Server and send Frament:///
///=======================================///
int size = sizeof(( fragma *)income_buf)->packet_number sizeof(( fragma *)income_buf)->seq sizeof(( fragma *)income_buf)->data sizeof(( fragma *)income_buf)->lastfrag;
printf("In thread\n");
int clientSocket;
struct sockaddr_in serverAddr;
socklen_t addr_size;
// Create the socket.
clientSocket = socket(PF_INET, SOCK_STREAM, 0);
//Configure settings of the server address
// Address family is Internet
serverAddr.sin_family = AF_INET;
//Set port number, using htons function
serverAddr.sin_port = htons(8081);
//Set IP address to localhost
serverAddr.sin_addr.s_addr = inet_addr("192.168.14.149");
memset(serverAddr.sin_zero, '\0', sizeof serverAddr.sin_zero);
//Connect the socket to the server using the address
addr_size = sizeof serverAddr;
connect(clientSocket, (struct sockaddr *) &serverAddr, addr_size);
if(send(clientSocket , income_buf , size,0) < 0)
{
printf("Send failed\n");
}
printf("Trhead Id : %ld \n" , id);
printf("Packet number : %d \n Seq = %d \n lasfrag = %d\n\n",frag.packet_number,frag.seq,(int)frag.lastfrag);
pthread_mutex_unlock(&lock);
//if(closesoc)
close(clientSocket);
usleep(20000);
}
}
int main(){
int i = 0;
pthread_t tid[5];
while(i< 5)
{
if( pthread_create(&tid[i], NULL, middlemanThread, (void*)i) != 0 )
printf("Failed to create thread\n");
i ;
}
sleep(2);
i = 0;
while(i< 5)
{
pthread_join(tid[i ],NULL);
printf("Thread ID : %d:\n",i);
}
return 0;
}
uj5u.com熱心網友回復:
因此每次資料包到達訊息佇列時,一個執行緒“喚醒”,打開與遠程服務器的 TCP 連接并發送資料包
如果您完全關心速度或效率,請不要這樣做。使用 TCP 套接字可以做的最昂貴的事情是初始連接。您正在進行 3 次握手只是為了發送一條訊息!
然后,您在執行整個操作時持有一個全域互斥鎖 - 這也是您程式中最慢的操作。
目前的設計是有效的單執行緒的,但在最復雜和最昂貴的可能途徑。
我可以看到使用執行緒而不是一個連接發送檔案所需的時間更短,但吞吐量沒有改變
我不知道您實際測量的是什么,而且您也完全不清楚。什么是檔案?一個片段?多個片段?與您的 MTU 相比有多大?您是否檢查過片段實際上是以正確的順序收到的,因為在我看來,唯一可能的并行性是可能中斷的地方。
如何為單個檔案實作更低的延遲和不變的吞吐量?
如何驗證我的執行緒并行作業?
如果您在wireshark 中看到多個具有不同源埠的TCP 連接,并且它們的資料包是交錯的,則您具有有效的并行性。這不太可能,因為您使用全域互斥鎖明確禁止它!
在wireshark中檢查吞吐量的最佳方法是什么?
別。使用wireshark檢查資料包,使用服務器確定吞吐量。這就是結果真正重要的地方。
3.并行TCP的概念是否假設增加吞吐量?
如果您不知道它的用途,為什么要實作所有這些復雜性?
單個執行緒(正確編碼且沒有虛假互斥抖動)很有可能使您的網路飽和,所以:不。擁有多個 I/O 執行緒通常是為了方便地劃分您的邏輯和狀態(即,每個執行緒擁有一個客戶端,或者不同執行緒中有不同的不相關 I/O 子系統),而不是性能。
如果您想從訊息佇列中取出資料包并將它們發送到 TCP,那么高效的方法是:
- 使用單個執行緒來執行此操作(您的程式可能有其他執行緒在執行其他操作 - 如果可能,請避免與它們同步)
- 打開到服務器的單個持久 TCP 連接,不要為每個片段連接/關閉它
- 就是這樣。它比您擁有的要簡單得多,并且會表現得更好。
您實際上可以讓一個執行緒處理多個不同的連接,但我看不出這對您的情況有任何用處,因此請保持簡單。
uj5u.com熱心網友回復:
以下是部分答案:
3.并行TCP的概念是否假設增加吞吐量?
有點。這實際上取決于瓶頸是什么。
第一個可能的瓶頸是擁塞控制。TCP 發送方對一次可以發送的資料包數量有限制(在接收到第一個資料包的 ACK 之前),稱為擁塞視窗。這個數字應該從小處開始,并隨著時間的推移而增長。此外,如果一個資料包丟失,這個數字會減少一半,然后緩慢增長,直到發生下一次丟棄。然而,限制是針對每個 TCP 連接的,因此如果您將資料分布在多個并行連接上,則整體擁塞視窗(所有流的所有視窗的總和)將增長得更快,而下降的數量則更少。(這是一個總結,要詳細了解擁塞控制是如何作業的,這是一個很大的話題)。無論您是否使用執行緒,這都應該發生。您可以在一個執行緒中打開多個連接,并達到相同的效果。
第二個可能的瓶頸是作業系統中的網路處理。AFAIK 這是從 10Gb 連接開始的問題。也許是 1Gb,但可能不是。TCP 處理發生在作業系統而不是您的應用程式中。如果處理由作業系統在處理器之間傳播(應該有啟用此功能的引數),您可能會獲得更好的性能,并且由于快取可能會獲得更好的性能。
如果您從磁盤讀取檔案,您的磁盤 IO 也很可能成為瓶頸。在這種情況下,我認為在不同執行緒之間傳播發送資料實際上沒有幫助。
轉載請註明出處,本文鏈接:https://www.uj5u.com/gongcheng/350162.html
