主頁 > 軟體工程 > 使用執行緒的并行TCP連接

使用執行緒的并行TCP連接

2021-11-06 17:17:38 軟體工程

我正在嘗試構建一個使用執行緒打開并行 TCP 套接字的系統。我的執行緒是使用訊息佇列 IPC 觸發的,因此每次資料包到達訊息佇列時,一個執行緒“喚醒”,打開與遠程服務器的 TCP 連接并發送資料包。我的問題是,在 Wireshark 中,我可以看到使用執行緒而不是一個連接發送檔案所需的時間更短,但吞吐量沒有改變。
我的問題是:

  1. 如何驗證我的執行緒并行作業?
  2. 我該如何改進這段代碼?, 3. 如何使用一個執行緒打開多個套接字?

我正在使用虛擬機來運行多執行緒客戶端。我使用的 IDE 是 Clion ,語言是 C。我的代碼:

#include<stdio.h>
#include<stdlib.h>
#include<sys/socket.h>
#include<string.h>
#include <arpa/inet.h>
#include <unistd.h> // for close
#include<pthread.h>
#include <math.h>
#include<malloc.h>
#include<signal.h>
#include<stdbool.h>
#include<sys/types.h>
#include<linux/if_packet.h>
#include<netinet/in.h>
#include<netinet/if_ether.h>    // for ethernet header
#include<netinet/ip.h>      // for ip header
#include<netinet/udp.h>     // for udp header
#include<netinet/tcp.h>
#include <byteswap.h>
#include <errno.h>
#include <stdio.h>
#include <stdlib.h>
#include <sys/types.h>
#include <fcntl.h>
#include <sys/stat.h>
#include <mqueue.h>
#include <assert.h>
#include <time.h>
#define QUEUE_NAME "/ServerDan_Queue"
#define QUEUE_PERM 0660
#define MAX_MESSAGES 10 //Max size = 10
#define MAX_MSG_SIZE 4105 //Max size = 8192B
#define MSG_BUFFER_SIZE MAX_MSG_SIZE 10
#define BSIZE 1024
#define Nbytes 4096
#define ElorServer_addr "192.168.1.54"
///params:
pthread_mutex_t lock = PTHREAD_MUTEX_INITIALIZER;
struct sockaddr_in server;
struct stat obj;
int sock;
int k, size, status;
int i = 0;

typedef struct frag
{
    int packet_number;
    int seq;
    uint8_t data[4096];
    bool lastfrag;
} fragma;

void * middlemanThread(void *arg)
{
    ///========================================///
    ///**** Waiting for message queue trigger!:///
    ///=======================================///
    long id = (long)arg;
    id =1;
    mqd_t qd; //queue descriptor
    //open the queue for reading//
    qd= mq_open(QUEUE_NAME,O_RDONLY);
    assert(qd != -1);
    struct mq_attr attr;
    assert(mq_getattr(qd,&attr) != -1);
    uint8_t *income_buf = calloc(attr.mq_msgsize,1);
    uint8_t *cast_buf = calloc(attr.mq_msgsize,1);
    assert(income_buf);
    fragma frag;
    struct timespec timeout;
    clock_gettime(CLOCK_REALTIME,&timeout);
    timeout.tv_sec =50;
    //bool closesoc =false;
    printf("Waiting for messages ..... \n\n");
    while(1){
        ///========================================///
        ///**** Open message queue fo receive:///
        ///=======================================///

        if((mq_timedreceive(qd,income_buf,attr.mq_msgsize,0,&timeout))<0){
            printf("Failed to receive message for 50 sec \n");
            //closesoc =true;
            pthread_exit(NULL);
        }
        else{
            cast_buf = income_buf;
            printf("Received successfully , your msg :\n");
            frag.packet_number = *cast_buf;
            cast_buf = (cast_buf   sizeof(int));
            frag.seq = *cast_buf;
            cast_buf = (cast_buf   sizeof(int));
            memccpy(frag.data,((fragma*)cast_buf)->data,0,Nbytes);
            cast_buf = cast_buf   Nbytes;
            frag.lastfrag = *cast_buf;
            uint8_t * data = frag.data;
        }
        pthread_mutex_lock(&lock);

        ///========================================///
        ///**** Connecting to Server and send Frament:///
        ///=======================================///

        int size = sizeof(( fragma *)income_buf)->packet_number   sizeof(( fragma *)income_buf)->seq   sizeof(( fragma *)income_buf)->data   sizeof(( fragma *)income_buf)->lastfrag;
        printf("In thread\n");
        int clientSocket;
        struct sockaddr_in serverAddr;
        socklen_t addr_size;

        // Create the socket.
        clientSocket = socket(PF_INET, SOCK_STREAM, 0);

        //Configure settings of the server address
        // Address family is Internet
        serverAddr.sin_family = AF_INET;

        //Set port number, using htons function
        serverAddr.sin_port = htons(8081);

        //Set IP address to localhost
        serverAddr.sin_addr.s_addr = inet_addr("192.168.14.149");
        memset(serverAddr.sin_zero, '\0', sizeof serverAddr.sin_zero);

        //Connect the socket to the server using the address
        addr_size = sizeof serverAddr;
        connect(clientSocket, (struct sockaddr *) &serverAddr, addr_size);
        if(send(clientSocket , income_buf , size,0) < 0)
        {
            printf("Send failed\n");
        }
        printf("Trhead Id : %ld \n" , id);
        printf("Packet number : %d \n Seq = %d  \n lasfrag = %d\n\n",frag.packet_number,frag.seq,(int)frag.lastfrag);
        pthread_mutex_unlock(&lock);
        //if(closesoc)
        close(clientSocket);
        usleep(20000);
    }
}
int main(){
    int i = 0;
    pthread_t tid[5];

    while(i< 5)
    {
        if( pthread_create(&tid[i], NULL, middlemanThread, (void*)i) != 0 )
            printf("Failed to create thread\n");
        i  ;
    }
    sleep(2);
    i = 0;
    while(i< 5)
    {
        pthread_join(tid[i  ],NULL);
        printf("Thread ID : %d:\n",i);
    }
    return 0;
}

uj5u.com熱心網友回復:

因此每次資料包到達訊息佇列時,一個執行緒“喚醒”,打開與遠程服務器的 TCP 連接并發送資料包

如果您完全關心速度或效率,請不要這樣做。使用 TCP 套接字可以做的最昂貴的事情是初始連接。您正在進行 3 次握手只是為了發送一條訊息!

然后,您在執行整個操作時持有一個全域互斥鎖 - 這也是您程式中最慢的操作。

目前的設計是有效的單執行緒的,但在最復雜和最昂貴的可能途徑。

我可以看到使用執行緒而不是一個連接發送檔案所需的時間更短,但吞吐量沒有改變

我不知道您實際測量的是什么,而且您也完全不清楚。什么是檔案?一個片段?多個片段?與您的 MTU 相比有多大?您是否檢查過片段實際上是以正確的順序收到的,因為在我看來,唯一可能的并行性是可能中斷的地方。

如何為單個檔案實作更低的延遲和不變的吞吐量?

如何驗證我的執行緒并行作業?

如果您在wireshark 中看到多個具有不同源埠的TCP 連接,并且它們的資料包是交錯的,則您具有有效的并行性。這不太可能,因為您使用全域互斥鎖明確禁止它!

在wireshark中檢查吞吐量的最佳方法是什么?

別。使用wireshark檢查資料包,使用服務器確定吞吐量。這就是結果真正重要的地方。

3.并行TCP的概念是否假設增加吞吐量?

如果您不知道它的用途,為什么要實作所有這些復雜性?

單個執行緒(正確編碼且沒有虛假互斥抖動)很有可能使您的網路飽和,所以:不。擁有多個 I/O 執行緒通常是為了方便地劃分您的邏輯和狀態(即,每個執行緒擁有一個客戶端,或者不同執行緒中有不同的不相關 I/O 子系統),而不是性能。


如果您想從訊息佇列中取出資料包并將它們發送到 TCP,那么高效的方法是:

  1. 使用單個執行緒來執行此操作(您的程式可能有其他執行緒在執行其他操作 - 如果可能,請避免與它們同步)
  2. 打開到服務器的單個持久 TCP 連接,不要為每個片段連接/關閉它
  3. 就是這樣。它比您擁有的要簡單得多,并且會表現得更好。

您實際上可以讓一個執行緒處理多個不同的連接,但我看不出這對您的情況有任何用處,因此請保持簡單。

uj5u.com熱心網友回復:

以下是部分答案:

3.并行TCP的概念是否假設增加吞吐量?

有點。這實際上取決于瓶頸是什么。

第一個可能的瓶頸是擁塞控制。TCP 發送方對一次可以發送的資料包數量有限制(在接收到第一個資料包的 ACK 之前),稱為擁塞視窗。這個數字應該從小處開始,并隨著時間的推移而增長。此外,如果一個資料包丟失,這個數字會減少一半,然后緩慢增長,直到發生下一次丟棄。然而,限制是針對每個 TCP 連接的,因此如果您將資料分布在多個并行連接上,則整體擁塞視窗(所有流的所有視窗的總和)將增長得更快,而下降的數量則更少。(這是一個總結,要詳細了解擁塞控制是如何作業的,這是一個很大的話題)。無論您是否使用執行緒,這都應該發生。您可以在一個執行緒中打開多個連接,并達到相同的效果。

第二個可能的瓶頸是作業系統中的網路處理。AFAIK 這是從 10Gb 連接開始的問題。也許是 1Gb,但可能不是。TCP 處理發生在作業系統而不是您的應用程式中。如果處理由作業系統在處理器之間傳播(應該有啟用此功能的引數),您可能會獲得更好的性能,并且由于快取可能會獲得更好的性能。

如果您從磁盤讀取檔案,您的磁盤 IO 也很可能成為瓶頸。在這種情況下,我認為在不同執行緒之間傳播發送資料實際上沒有幫助。

轉載請註明出處,本文鏈接:https://www.uj5u.com/gongcheng/350162.html

標籤:C linux 多线程 插座 通讯协议

上一篇:閱讀時是否需要鎖定非執行緒安全的集合?

下一篇:c -如何將值從父執行緒或子執行緒中的子執行緒下的值傳遞給C

標籤雲
其他(157675) Python(38076) JavaScript(25376) Java(17977) C(15215) 區塊鏈(8255) C#(7972) AI(7469) 爪哇(7425) MySQL(7132) html(6777) 基礎類(6313) sql(6102) 熊猫(6058) PHP(5869) 数组(5741) R(5409) Linux(5327) 反应(5209) 腳本語言(PerlPython)(5129) 非技術區(4971) Android(4554) 数据框(4311) css(4259) 节点.js(4032) C語言(3288) json(3245) 列表(3129) 扑(3119) C++語言(3117) 安卓(2998) 打字稿(2995) VBA(2789) Java相關(2746) 疑難問題(2699) 细绳(2522) 單片機工控(2479) iOS(2429) ASP.NET(2402) MongoDB(2323) 麻木的(2285) 正则表达式(2254) 字典(2211) 循环(2198) 迅速(2185) 擅长(2169) 镖(2155) 功能(1967) .NET技术(1958) Web開發(1951) python-3.x(1918) HtmlCss(1915) 弹簧靴(1913) C++(1909) xml(1889) PostgreSQL(1872) .NETCore(1853) 谷歌表格(1846) Unity3D(1843) for循环(1842)

熱門瀏覽
  • Git本地庫既關聯GitHub又關聯Gitee

    創建代碼倉庫 使用gitee舉例(github和gitee差不多) 1.在gitee右上角點擊+,選擇新建倉庫 ? 2.選擇填寫倉庫資訊,然后進行創建 ? 3.服務端已經準備好了,本地開始作準備 (1)Git 全域設定 git config --global user.name "成鈺" git c ......

    uj5u.com 2020-09-10 05:04:14 more
  • CODING DevOps 代碼質量實戰系列第二課,相約周三

    隨著 ToB(企業服務)的興起和 ToC(消費互聯網)產品進入成熟期,線上故障帶來的損失越來越大,代碼質量越來越重要,而「質量內建」正是 DevOps 核心理念之一。**《DevOps 代碼質量實戰(PHP 版)》**為 CODING DevOps 代碼質量實戰系列的第二課,同時也是本系列的 PHP ......

    uj5u.com 2020-09-10 05:07:43 more
  • 推薦Scrum書籍

    推薦Scrum書籍 直接上干貨,推薦書籍清單如下(推薦有順序的哦) Scrum指南 Scrum精髓 Scrum敏捷軟體開發 Scrum捷徑 硝煙中的Scrum和XP : 我們如何實施Scrum 敏捷軟體開發:Scrum實戰指南 Scrum要素 大規模Scrum:大規模敏捷組織的設計 用戶故事地圖 用 ......

    uj5u.com 2020-09-10 05:07:45 more
  • CODING DevOps 代碼質量實戰系列最后一課,周四發車

    隨著 ToB(企業服務)的興起和 ToC(消費互聯網)產品進入成熟期,線上故障帶來的損失越來越大,代碼質量越來越重要,而「質量內建」正是 DevOps 核心理念之一。 **《DevOps 代碼質量實戰(Java 版)》**為 CODING DevOps 代碼質量實戰系列的最后一課,同時也是本系列的 ......

    uj5u.com 2020-09-10 05:07:52 more
  • 敏捷軟體工程實踐書籍

    Scrum轉型想要做好,第一步先了解并真正落實Scrum,那么我推薦的Scrum書籍是要看懂并實踐的。第二步是團隊的工程實踐要做扎實。 下面推薦工程實踐書單: 重構:改善既有代碼的設計 決議極限編程 : 擁抱變化 代碼整潔代碼 程式員的職業素養 修改代碼的藝術 撰寫可讀代碼的藝術 測驗驅動開發 : ......

    uj5u.com 2020-09-10 05:07:55 more
  • Jenkins+svn+nginx實作windows環境自動部署vue前端專案

    前面文章介紹了Jenkins+svn+tomcat實作自動化部署,現在終于有空抽時間出來寫下Jenkins+svn+nginx實作自動部署vue前端專案。 jenkins的安裝和配置已經在前面文章進行介紹,下面介紹實作vue前端專案需要進行的哪些額外的步驟。 注意:在安裝jenkins和nginx的 ......

    uj5u.com 2020-09-10 05:08:49 more
  • CODING DevOps 微服務專案實戰系列第一課,明天等你

    CODING DevOps 微服務專案實戰系列第一課**《DevOps 微服務專案實戰:DevOps 初體驗》**將由 CODING DevOps 開發工程師 王寬老師 向大家介紹 DevOps 的基本理念,并探討為什么現代開發活動需要 DevOps,同時將以 eShopOnContainers 項 ......

    uj5u.com 2020-09-10 05:09:14 more
  • CODING DevOps 微服務專案實戰系列第二課來啦!

    近年來,工程專案的結構越來越復雜,需要接入合適的持續集成流水線形式,才能滿足更多變的需求,那么如何優雅地使用 CI 能力提升生產效率呢?CODING DevOps 微服務專案實戰系列第二課 《DevOps 微服務專案實戰:CI 進階用法》 將由 CODING DevOps 全堆疊工程師 何晨哲老師 向 ......

    uj5u.com 2020-09-10 05:09:33 more
  • CODING DevOps 微服務專案實戰系列最后一課,周四開講!

    隨著軟體工程越來越復雜化,如何在 Kubernetes 集群進行灰度發布成為了生產部署的”必修課“,而如何實作安全可控、自動化的灰度發布也成為了持續部署重點關注的問題。CODING DevOps 微服務專案實戰系列最后一課:**《DevOps 微服務專案實戰:基于 Nginx-ingress 的自動 ......

    uj5u.com 2020-09-10 05:10:00 more
  • CODING 儀表盤功能正式推出,實作作業資料可視化!

    CODING 儀表盤功能現已正式推出!該功能旨在用一張張統計卡片的形式,統計并展示使用 CODING 中所產生的資料。這意味著無需額外的設定,就可以收集歸納寶貴的作業資料并予之量化分析。這些海量的資料皆會以圖表或串列的方式躍然紙上,方便團隊成員隨時查看各專案的進度、狀態和指標,云端協作迎來真正意義上 ......

    uj5u.com 2020-09-10 05:11:01 more
最新发布
  • windows系統git使用ssh方式和gitee/github進行同步

    使用git來clone專案有兩種方式:HTTPS和SSH:
    HTTPS:不管是誰,拿到url隨便clone,但是在push的時候需要驗證用戶名和密碼;
    SSH:clone的專案你必須是擁有者或者管理員,而且需要在clone前添加SSH Key。SSH 在push的時候,是不需要輸入用戶名的,如果配置... ......

    uj5u.com 2023-04-19 08:41:12 more
  • windows系統git使用ssh方式和gitee/github進行同步

    使用git來clone專案有兩種方式:HTTPS和SSH:
    HTTPS:不管是誰,拿到url隨便clone,但是在push的時候需要驗證用戶名和密碼;
    SSH:clone的專案你必須是擁有者或者管理員,而且需要在clone前添加SSH Key。SSH 在push的時候,是不需要輸入用戶名的,如果配置... ......

    uj5u.com 2023-04-19 08:35:34 more
  • 2023年農牧行業6大CRM系統、5大場景盤點

    在物聯網、大資料、云計算、人工智能、自動化技術等現代資訊技術蓬勃發展與逐步成熟的背景下,數字化正成為農牧行業供給側結構性變革與高質量發展的核心驅動因素。因此,改造和提升傳統農牧業、開拓創新現代智慧農牧業,加快推進農牧業的現代化、資訊化、數字化建設已成為農牧業發展的重要方向。 當下,企業數字化轉型已經 ......

    uj5u.com 2023-04-18 08:05:44 more
  • 2023年農牧行業6大CRM系統、5大場景盤點

    在物聯網、大資料、云計算、人工智能、自動化技術等現代資訊技術蓬勃發展與逐步成熟的背景下,數字化正成為農牧行業供給側結構性變革與高質量發展的核心驅動因素。因此,改造和提升傳統農牧業、開拓創新現代智慧農牧業,加快推進農牧業的現代化、資訊化、數字化建設已成為農牧業發展的重要方向。 當下,企業數字化轉型已經 ......

    uj5u.com 2023-04-18 08:00:18 more
  • 計算機組成原理—存盤器

    計算機組成原理—硬體結構 二、存盤器 1.概述 存盤器是計算機系統中的記憶設備,用來存放程式和資料 1.1存盤器的層次結構 快取-主存層次主要解決CPU和主存速度不匹配的問題,速度接近快取 主存-輔存層次主要解決存盤系統的容量問題,容量接近與價位接近于主存 2.主存盤器 2.1概述 主存與CPU的聯 ......

    uj5u.com 2023-04-17 08:20:31 more
  • 談一談我對協同開發的一些認識

    如今各互聯網公司普通都使用敏捷開發,采用小步快跑的形式來進行專案開發。如果是小專案或者小需求,那一個開發可能就搞定了。但對于電商等復雜的系統,其功能多,結構復雜,一個人肯定是搞不定的,所以都是很多人來共同開發維護。以我曾經待過的商城團隊為例,光是后端開發就有七十多人。 為了更好地開發這類大型系統,往 ......

    uj5u.com 2023-04-17 08:18:55 more
  • 專案管理PRINCE2核心知識點整理

    PRINCE2,即 PRoject IN Controlled Environment(受控環境中的專案)是一種結構化的專案管理方法論,由英國政府內閣商務部(OGC)推出,是英國專案管理標準。
    PRINCE2 作為一種開放的方法論,是一套結構化的專案管理流程,描述了如何以一種邏輯性的、有組織的方法,... ......

    uj5u.com 2023-04-17 08:18:51 more
  • 談一談我對協同開發的一些認識

    如今各互聯網公司普通都使用敏捷開發,采用小步快跑的形式來進行專案開發。如果是小專案或者小需求,那一個開發可能就搞定了。但對于電商等復雜的系統,其功能多,結構復雜,一個人肯定是搞不定的,所以都是很多人來共同開發維護。以我曾經待過的商城團隊為例,光是后端開發就有七十多人。 為了更好地開發這類大型系統,往 ......

    uj5u.com 2023-04-17 08:18:00 more
  • 專案管理PRINCE2核心知識點整理

    PRINCE2,即 PRoject IN Controlled Environment(受控環境中的專案)是一種結構化的專案管理方法論,由英國政府內閣商務部(OGC)推出,是英國專案管理標準。
    PRINCE2 作為一種開放的方法論,是一套結構化的專案管理流程,描述了如何以一種邏輯性的、有組織的方法,... ......

    uj5u.com 2023-04-17 08:17:55 more
  • 計算機組成原理—存盤器

    計算機組成原理—硬體結構 二、存盤器 1.概述 存盤器是計算機系統中的記憶設備,用來存放程式和資料 1.1存盤器的層次結構 快取-主存層次主要解決CPU和主存速度不匹配的問題,速度接近快取 主存-輔存層次主要解決存盤系統的容量問題,容量接近與價位接近于主存 2.主存盤器 2.1概述 主存與CPU的聯 ......

    uj5u.com 2023-04-17 08:12:06 more