為了更好的閱讀體驗,請點擊這里
本文大部分內容翻譯自 Chapter 1 - Basics,原因是之前翻譯的版本太老了,不得不親自披掛上陣拿機器翻譯一下,只截取了部分自己可能用得到的,所以如果有看不太懂的地方,去翻一下原網頁吧,QWQ
附贈 libzmq 的 api 介面函式說明 一份,
一、基礎函式
-
int zmq_recv (void *socket, void *buf, size_t len, int flags);
zmq_recv()函式應從 socket 引數參考的套接字接收訊息,并將其存盤在 buf 引數參考的緩沖區中,任何超過 len 引數指定長度的位元組都將被截斷,如果指定套接字上沒有可用訊息,則 zmq_recv() 函式將阻塞,直到請求得到滿足, flags 引數是下面定義的標志的組合: 如果 len 為零,則 buf 引數可以為 null, -
int zmq_send (void *socket, void *buf, size_t len, int flags);
zmq_send()函式應排隊從 buf 和 len 引數參考的緩沖區創建的訊息, -
int zmq_setsockopt (void *socket, int option_name, const void *option_value, size_t option_len);
zmq_setsockopt()函式應將 option_name 引數指定的選項設定為 option_value 引數指向的值,用于 socket 引數指向的 ?MQ 套接字,option_len 引數是以位元組為單位的選項值的大小,對于采用“字串”型別值的選項,提供的位元組資料應該不包含零位元組,或者以單個零位元組結尾(終止 ASCII NUL 字符), -
void *zmq_ctx_new ();
創建新的 ZeroMQ context,如果成功的話,這個函式會為新創建的背景關系回傳一個 opaque 句柄,否則回傳NULL, -
void *zmq_socket (void *context, int type);
這個函式使用具體的背景關系創建一個 zmq 套接字并為新創建的套接字回傳一個 opaque 句柄,type引數指定了套接字的型別,這決定了在套接字上的對話語意,
新創建的套接字開始是未系結的,同時不與任何端點有聯系,為了做到一個訊息跟著套接字流動這樣的操作,必須用zmq_connect()連接至少一個端點,或者是至少一個端點必須被使用zmq_bind()創建為了傳入的連接, -
int zmq_bind (void *socket, const char *endpoint);在一個套接字上接受傳入的連接,這個函式系結套接字到一個區域的端點然后在這個端點上接受傳入的連接,
endpoint引數是一個包含了transport://跟隨著一個地址的字串,transport指定了是用什么協議,地址決定了系結到哪個地址,
該函式支持以下協議:- tcp
- ipc
- inproc
- pgm, epgm
- vmci
-
int zmq_connect (void *socket, const char *endpoint);從套接字上創建發出的連接,這個函式把套接字連接到端點上然后在那個端點上接受傳入的連接,
endpoint引數是一個包含了transport://跟隨著一個地址的字串,transport指定了是用什么協議,地址決定了連接到哪個地址,
該函式支持的協議同上,
由于其他語言的字串格式不像 C 的格式是 str\0 而是 str 的,因此需要手動添加最后的終止符,因此原教程撰寫了比較方便使用的方法,原文中建立了一個規則,即 ZeroMQ 字串是指定長度的,并且在沒有尾隨 null 的情況下在線發送,在最簡單的情況下(我們將在示例中這樣做),一個 ZeroMQ 字串巧妙地映射到一個 ZeroMQ 訊息幀,
在 C 中,接收 ZeroMQ 字串并將其作為有效的 C 字串傳遞給應用程式:
// Receive ZeroMQ string from socket and convert into C string
// Chops string at 255 chars, if it's longer
static char *s_recv (void *socket) {
char buffer [256];
int size = zmq_recv (socket, buffer, 255, 0);
if (size == -1)
return NULL;
if (size > 255)
size = 255;
buffer [size] = \0;
/* use strndup(buffer, sizeof(buffer)-1) in *nix */
return strdup (buffer);
}
這形成了一個方便的輔助函式,本著讓我們可以重用有利可圖的東西的精神,讓我們撰寫一個類似的 s_send 函式,以正確的 ZeroMQ 格式發送字串,并將其打包到一個我們可以重用的頭檔案中,
結果是 zhelpers.h,這個頭檔案可以讓我們可以用 C 撰寫更甜美和更短的 ZeroMQ 應用程式,這是一個相當長的源代碼,并且只能在 C 中使用,所以請 在閑暇時閱讀它,
二、幾種協議
1. request-reply
從一個 Hello World 示例開始,我們將制作一個客戶端和一個服務器,客戶端向服務器發送“Hello”,服務器回復“World”,這是 C 中的服務器,它在埠 5555 上打開一個 ZeroMQ 套接字,讀取其上的請求,并用“World”回復每個請求:
// Hello World server
#include <zmq.h>
#include <stdio.h>
#include <unistd.h>
#include <string.h>
#include <assert.h>
int main (void)
{
// Socket to talk to clients
void *context = zmq_ctx_new ();
void *responder = zmq_socket (context, ZMQ_REP);
int rc = zmq_bind (responder, "tcp://*:5555");
assert (rc == 0);
while (1) {
char buffer [10];
zmq_recv (responder, buffer, 10, 0);
printf ("Received Hello\n");
sleep (1); // Do some 'work'
zmq_send (responder, "World", 5, 0);
}
return 0;
}
// Hello World client
#include <zmq.h>
#include <string.h>
#include <stdio.h>
#include <unistd.h>
int main (void)
{
printf ("Connecting to hello world server...\n");
void *context = zmq_ctx_new ();
void *requester = zmq_socket (context, ZMQ_REQ);
zmq_connect (requester, "tcp://localhost:5555");
int request_nbr;
for (request_nbr = 0; request_nbr != 10; request_nbr++) {
char buffer [10];
printf ("Sending Hello %d...\n", request_nbr);
zmq_send (requester, "Hello", 5, 0);
zmq_recv (requester, buffer, 10, 0);
printf ("Received World %d\n", request_nbr);
}
zmq_close (requester);
zmq_ctx_destroy (context);
return 0;
}
如果您終止服務器 (Ctrl-C) 并重新啟動它,客戶端將無法正常恢復,從崩潰的行程中恢復并不是那么容易,
2. pub-sub
一個推送天氣更新的示例,其中包含郵政編碼、溫度和相對濕度,我們將生成隨機值,就像真實的氣象站一樣,
// Weather update server
// Binds PUB socket to tcp://*:5556
// Publishes random weather updates
#include "zhelpers.h"
int main (void)
{
// Prepare our context and publisher
void *context = zmq_ctx_new ();
void *publisher = zmq_socket (context, ZMQ_PUB);
int rc = zmq_bind (publisher, "tcp://*:5556");
assert (rc == 0);
// Initialize random number generator
srandom ((unsigned) time (NULL));
while (1) {
// Get values that will fool the boss
int zipcode, temperature, relhumidity;
zipcode = randof (100000);
temperature = randof (215) - 80;
relhumidity = randof (50) + 10;
// Send message to all subscribers
char update [20];
sprintf (update, "%05d %d %d", zipcode, temperature, relhumidity);
s_send (publisher, update);
}
zmq_close (publisher);
zmq_ctx_destroy (context);
return 0;
}
這種更新流沒有開始也沒有結束,就像一個永無止境的廣播,
這是客戶端應用程式,它偵聽更新流并獲取與指定郵政編碼有關的任何內容,默認情況下是紐約市,因為這是開始任何冒險的好地方:
// Weather update client
// Connects SUB socket to tcp://localhost:5556
// Collects weather updates and finds avg temp in zipcode
#include "zhelpers.h"
int main (int argc, char *argv [])
{
// Socket to talk to server
printf ("Collecting updates from weather server...\n");
void *context = zmq_ctx_new ();
void *subscriber = zmq_socket (context, ZMQ_SUB);
int rc = zmq_connect (subscriber, "tcp://localhost:5556");
assert (rc == 0);
// Subscribe to zipcode, default is NYC, 10001
const char *filter = (argc > 1)? argv [1]: "10001 ";
rc = zmq_setsockopt (subscriber, ZMQ_SUBSCRIBE,
filter, strlen (filter));
assert (rc == 0);
// Process 100 updates
int update_nbr;
long total_temp = 0;
for (update_nbr = 0; update_nbr < 100; update_nbr++) {
char *string = s_recv (subscriber);
int zipcode, temperature, relhumidity;
sscanf (string, "%d %d %d",
&zipcode, &temperature, &relhumidity);
total_temp += temperature;
free (string);
}
printf ("Average temperature for zipcode '%s' was %dF\n",
filter, (int) (total_temp / update_nbr));
zmq_close (subscriber);
zmq_ctx_destroy (context);
return 0;
}

請注意,當您使用 SUB 套接字時,您必須使用 zmq_setsockopt() 和 ZMQ_SUBSCRIBE 設定訂閱,如此代碼所示,如果您不設定任何訂閱,您將不會收到任何訊息,這是初學者常犯的錯誤,訂閱者可以設定多個訂閱,這些訂閱被加在一起,也就是說,如果更新匹配任何訂閱,訂閱者就會收到它,訂戶也可以取消特定的訂閱,訂閱通常(但不總是)是一個可列印的字串,請參閱 zmq_setsockopt() 以了解其作業原理,
PUB-SUB 套接字對是異步的,客戶端在一個回圈中執行 zmq_recv()(或者一次,如果這就是它僅需要呼叫一次的話),嘗試向 SUB 套接字發送訊息將導致錯誤,同樣,該服務器端會根據需要經常執行 zmq_send() ,但不得在 PUB 套接字上執行 zmq_recv(),
理論上,對于 ZeroMQ 套接字,哪一端連接和哪一端系結并不重要,但是,在實踐中存在未記錄(原文:undocumented)的差異,文章稍后會談到,現在,除非您的網路設計使接下來的操作不可能,請系結 PUB 并連接 SUB,
關于 PUB-SUB 套接字,還有一件更重要的事情需要了解:您無法準確知道訂閱者何時開始接收訊息,即使你啟動一個訂閱者,等待一段時間,然后再啟動發布者,訂閱者也總是會錯過發布者發送的第一條訊息,這是因為當訂閱者連接到發布者時(需要很短但非零的時間),發布者可能已經在發送訊息了,
這種“反應遲鈍”的癥狀經常影響到足夠多的人,因此我們將對其進行詳細解釋,請記住,ZeroMQ 執行異步 I/O,即在后臺執行,假設您有兩個節點按以下順序執行此操作:
- 訂閱者連接到端點并接收和計算訊息,
- 發布者系結到端點并立即發送 1,000 條訊息,
那么訂閱者很可能不會收到任何東西,你會眨眼(blink),檢查你是否設定了正確的過濾器,然后再試一次,訂閱者仍然不會收到任何東西,
建立 TCP 連接涉及往返握手,這需要幾毫秒,具體取決于您的網路和對等點之間的躍點數,在那個時候,ZeroMQ 可以發送很多訊息,為了便于討論,假設建立連接需要 5 毫秒,并且同一鏈路每秒可以處理 1M 訊息,在訂閱者連接到發布者的 5 毫秒內,發布者僅需 1 毫秒即可發送這些 1K 訊息,
在 第 2 章 - 套接字和模式中,我們將解釋如何同步發布者和訂閱者,以便在訂閱者真正連接并準備好之前您不會開始發布資料,有一個簡單而愚蠢的方法可以延遲發布者,那就是休眠(sleep),但是,不要在實際應用程式中這樣做,因為它非常脆弱,而且不優雅且緩慢,使用睡眠向自己證明發生了什么,然后等待這章內容 第 2 章 - 套接字和模式,看看如何正確地做到這一點,
同步的替代方法是簡單地假設發布的資料流是無限的,沒有起點也沒有終點,還假設訂戶不關心在它啟動之前發生了什么,這就是我們構建天氣客戶端示例的方式,
因此,客戶端訂閱了它選擇的郵政編碼并收集了該郵政編碼的 100 個更新,如果郵政編碼是隨機分布的,這意味著來自服務器的大約一千萬次更新,您可以啟動客戶端,然后啟動服務器,客戶端將繼續作業,您可以根據需要隨時停止和重新啟動服務器,客戶端將繼續作業,當客戶端收集到它的一百個更新時,它計算平均值,列印它,然后退出,
關于發布-訂閱(pub-sub)模式的一些要點:
- 訂閱者可以連接到多個發布者,每次使用一個連接呼叫,然后資料將到達并交錯(“公平排隊”),這樣就沒有一個發布者會淹沒其他發布者,
- 如果發布者沒有連接的訂閱者,那么它將簡單地丟棄所有訊息,
- 如果您使用的是 TCP 而訂閱者速度很慢,則訊息將在發布者上排隊,稍后我們將研究如何使用“高水位線”來保護發布者免受這種情況的影響,
- 從 ZeroMQ v3.x 開始,當使用連接協議(
tcp:@< *>@*或ipc:@<>@)時,過濾發生在發布者端,使用epgm:@<//>@協議,過濾發生在訂閱者端,在 ZeroMQ v2.x 中,所有過濾都發生在訂閱者端,
3. Divide and Conquer(并行計算)
注意,Ventilator 是呼吸機,worker 是工人,sink 是水槽,我不太會翻譯這三個詞,因此均使用英文代替,下文中如果出現了這三種中文翻譯證明是機翻忘刪了233

Ventilator 生成 100 個任務,每個任務都有一條訊息告訴 worker 休眠一定毫秒數:
// Task ventilator
// Binds PUSH socket to tcp://localhost:5557
// Sends batch of tasks to workers via that socket
#include "zhelpers.h"
int main (void)
{
void *context = zmq_ctx_new ();
// Socket to send messages on
void *sender = zmq_socket (context, ZMQ_PUSH);
zmq_bind (sender, "tcp://*:5557");
// Socket to send start of batch message on
void *sink = zmq_socket (context, ZMQ_PUSH);
zmq_connect (sink, "tcp://localhost:5558");
printf ("Press Enter when the workers are ready: ");
getchar ();
printf ("Sending tasks to workers...\n");
// The first message is "0" and signals start of batch
s_send (sink, "0");
// Initialize random number generator
srandom ((unsigned) time (NULL));
// Send 100 tasks
int task_nbr;
int total_msec = 0; // Total expected cost in msecs
for (task_nbr = 0; task_nbr < 100; task_nbr++) {
int workload;
// Random workload from 1 to 100msecs
workload = randof (100) + 1;
total_msec += workload;
char string [10];
sprintf (string, "%d", workload);
s_send (sender, string);
}
printf ("Total expected cost: %d msec\n", total_msec);
zmq_close (sink);
zmq_close (sender);
zmq_ctx_destroy (context);
return 0;
}
這是 worker 的應用程式,它收到一條訊息,休眠該秒數,然后發出已完成的信號:
// Task worker
// Connects PULL socket to tcp://localhost:5557
// Collects workloads from ventilator via that socket
// Connects PUSH socket to tcp://localhost:5558
// Sends results to sink via that socket
#include "zhelpers.h"
int main (void)
{
// Socket to receive messages on
void *context = zmq_ctx_new ();
void *receiver = zmq_socket (context, ZMQ_PULL);
zmq_connect (receiver, "tcp://localhost:5557");
// Socket to send messages to
void *sender = zmq_socket (context, ZMQ_PUSH);
zmq_connect (sender, "tcp://localhost:5558");
// Process tasks forever
while (1) {
char *string = s_recv (receiver);
printf ("%s.", string); // Show progress
fflush (stdout);
s_sleep (atoi (string)); // Do the work
free (string);
s_send (sender, ""); // Send results to sink
}
zmq_close (receiver);
zmq_close (sender);
zmq_ctx_destroy (context);
return 0;
}
下面是 sink 的應用程式,它收集了 100 個任務,然后計算整個處理所花費的時間,因此如果有多個任務,我們可以確認這些 worker 確實在并行運行:
// Task sink
// Binds PULL socket to tcp://localhost:5558
// Collects results from workers via that socket
#include "zhelpers.h"
int main (void)
{
// Prepare our context and socket
void *context = zmq_ctx_new ();
void *receiver = zmq_socket (context, ZMQ_PULL);
zmq_bind (receiver, "tcp://*:5558");
// Wait for start of batch
char *string = s_recv (receiver);
free (string);
// Start our clock now
int64_t start_time = s_clock ();
// Process 100 confirmations
int task_nbr;
for (task_nbr = 0; task_nbr < 100; task_nbr++) {
char *string = s_recv (receiver);
free (string);
if (task_nbr % 10 == 0)
printf (":");
else
printf (".");
fflush (stdout);
}
// Calculate and report duration of batch
printf ("Total elapsed time: %d msec\n",
(int) (s_clock () - start_time));
zmq_close (receiver);
zmq_ctx_destroy (context);
return 0;
}
讓我們更詳細地看一下這段代碼的某些方面:
- workers 將上游連接到 ventilator,將下游連接到 sink,這意味著您可以任意添加 worker,如果 workers 系結到他們的端點,您將需要(a)更多端點(b)每次添加 worker 時修改 ventilator 和/或 sink,我們說呼吸機和水槽是我們架構的穩定部分,而工人是它的動態部分,
- 我們必須將批處理的開始與所有正在運行的 worker 同步,這是 ZeroMQ 中一個相當常見的陷阱,沒有簡單的解決方案,
zmq_connect方法需要一定的時間,因此,當一組 workers 連接到 ventilator 時,第一個成功連接的 worker 將在短時間內收到大量訊息,而其他 worker 也在連接,如果您不以某種方式同步批處理的最開頭,系統將根本不會并行運行,您可以嘗試取消 ventilator 中的等待,看看會發生什么, - ventilator 的 PUSH 套接字將任務平均分配給 workers(假設他們在當前批次開始分發之前都已連接),這稱為負載平衡,我們將再次更詳細地討論它,
- sink 的 PULL 套接字平均地收集來自 workers 的結果,這稱為公平排隊,

管道模式還表現出“慢連接”綜合癥,導致人們指責 PUSH 套接字沒有正確地進行負載平衡,如果您正在使用 PUSH 和 PULL,并且您的一個作業人員收到的訊息比其他作業人員多得多,那是因為 PULL 套接字的加入速度比其他套接字快,并且在其他作業人員設法連接之前獲取了很多訊息,如果你想要適當的負載平衡,你可能想看看 第 3 章 - 高級請求-回復模式中的負載平衡模式,
三、使用 ZeroMQ 編程
獲得正確的背景關系(context)
ZeroMQ 應用程式總是從創建背景關系開始,然后使用它來創建套接字,在 C 中,它是zmq_ctx_new()呼叫,您應該在您的流程中創建和使用一個背景關系,從技術上講,背景關系是單個行程中所有套接字的容器,并充當行程內套接字的傳輸,這是在一個行程中連接執行緒的最快方式,如果在運行時一個行程有兩個背景關系,它們就像單獨的 ZeroMQ 實體,如果這正是你想要的,好的,但除此之外請記住:
在行程開始時呼叫 zmq_ctx_new() 一次,在結束時呼叫 zmq_ctx_destroy() 一次,
如果您正在使用 fork() 系統呼叫,請在 fork 之后,然后在子行程代碼的開頭執行 zmq_ctx_new(),一般來說,您會在子行程中做有趣的(ZeroMQ)事情,而在父行程中做無聊的流程管理,
干凈地退出
使用 C 時,您必須在使用完物件后小心地釋放它們,否則會導致記憶體泄漏、應用程式不穩定以及通常的惡果,
記憶體泄漏是一回事,但 ZeroMQ 對于退出應用程式的方式非常挑剔,原因是技術性的和痛苦的,但結果是如果你讓任何套接字保持打開狀態,zmq_ctx_destroy() 函式將永遠掛起(hang),并且即使您關閉所有套接字,且如果有待定(pending)的連接或發送,zmq_ctx_destroy() 默認情況下也會永遠等待,除非您在關閉它們之前在這些套接字上設定 LINGER 為零,
我們需要擔心的 ZeroMQ 物件是訊息、套接字和背景關系,幸運的是它非常簡單,至少在簡單的程式中是這樣:
- 盡可能使用
zmq_send()和zmq_recv(),因為它避免了使用zmq_msg_t物件的需要, - 如果您確實使用
zmq_msg_recv(),請始終在完成后立即通過呼叫zmq_msg_close()釋放接收到的訊息, - 如果您要打開和關閉大量套接字,這可能表明您需要重新設計您的應用程式,在某些情況下,除非您銷毀背景關系,否則套接字句柄不會被釋放,
- 退出程式時,關閉套接字,然后呼叫
zmq_ctx_destroy(),這破壞了背景關系(destroy context),
至少對于 C 開發是這樣,在具有自動物件銷毀功能的語言中,套接字和背景關系將在您離開作用域時被銷毀,如果您使用例外,則必須在類似于“最終”塊的地方進行清理,這與任何資源一樣,
如果您正在進行多執行緒作業,它會變得比這更復雜,首先,不要嘗試從多個執行緒使用同一個套接字,請不要解釋為什么您認為這會非常有趣,只是請不要這樣做,接下來,您需要關閉每個有正在進行的請求的套接字,正確的方法是設定一個較低的 LINGER 值(1 秒),然后關閉套接字,如果您的語言系結在您銷毀背景關系時沒有自動為您執行此操作,我建議您發送一個補丁,
最后,破壞背景關系,這將導致附加執行緒(即共享相同背景關系)中的任何阻塞接識訓輪詢或發送回傳錯誤,捕獲該錯誤,然后設定 linger on,關閉該執行緒中的套接字,然后退出,不要兩次破壞相同的背景關系,主執行緒中的 zmq_ctx_destroy 將阻塞,直到它知道的所有套接字都安全關閉,
瞧!它非常復雜和痛苦,以至于任何稱職的語言系結作者都會自動執行此操作并使套接字關閉舞蹈變得不必要,
四、為什么需要 ZeroMQ
ZeroMQ:一個高效、可嵌入的庫,可以解決應用程式在網路上變得非常靈活所需的大部分問題,而且成本不高,
具體來說:
- 它在后臺執行緒中異步處理 I/O,它們使用無鎖資料結構與應用程式執行緒通信,因此并發 ZeroMQ 應用程式不需要鎖、信號量或其他等待狀態,
- 組件可以動態地來來去去,ZeroMQ 會自動重新連接,這意味著您可以按任何順序啟動組件,您可以創建“面向服務的架構”(SOA),其中服務可以隨時加入和離開網路,
- 它會在需要時自動對訊息進行排隊,它智能地執行此操作,在排隊之前將訊息推送到盡可能靠近接收方的位置,
- 它有處理過滿佇列(稱為“高水位線”)的方法,當佇列已滿時,ZeroMQ 會自動阻止發送者或丟棄訊息,具體取決于您正在執行的訊息傳遞型別(所謂的“模式”),
- 它允許您的應用程式通過任意傳輸相互通信:TCP、多播、行程內、行程間,您無需更改代碼即可使用不同的傳輸方式,
- 它使用取決于訊息傳遞模式的不同策略來安全地處理慢速/阻塞的讀取器,
- 它允許您使用各種模式(例如請求-回復和發布-訂閱)來路由訊息,這些模式是您創建拓撲、網路結構的方式,
- 它允許您創建代理以通過單個呼叫排隊、轉發或捕獲訊息,代理可以降低網路互連的復雜性,
- 它使用在線路上的簡單框架,完全按照發送時的方式傳遞整個訊息,如果您寫了一條 10k 的訊息,您將收到一條 10k 的訊息,
- 它不會對訊息強加任何格式,它們是從零到千兆位元組大的 blob,當您想要表示資料時,您可以選擇其他一些產品,例如 msgpack、Google 的協議緩沖區等,
- 它通過在有意義的情況下自動重試來智能地處理網路錯誤,
- 它可以減少您的碳排放,用更少的 CPU 做更多的事情意味著你的盒子使用更少的電力,你可以讓你的舊盒子使用更長時間,Al Gore 會喜歡 ZeroMQ,
實際上 ZeroMQ 做的遠不止這些,它對您開發具有網路功能的應用程式的方式具有顛覆性的影響,從表面上看,它是一個受套接字啟發的 API,您可以在其上執行 zmq_recv() 和 zmq_send(),但是訊息處理很快成為中心回圈,您的應用程式很快就會分解為一組訊息處理任務,它優雅而自然,它可以擴展:這些任務中的每一個都映射到一個節點,并且節點通過任意傳輸相互通信,一個行程中的兩個節點(節點是一個執行緒)、一個盒子上的兩個節點(節點是一個行程)或一個網路中的兩個節點(節點是一個盒子)——它們都是一樣的,沒有應用程式代碼更改,
轉載請註明出處,本文鏈接:https://www.uj5u.com/qita/550803.html
標籤:其他
下一篇:返回列表
