大家好,我是烤鴨:
這次介紹下nchan,nginx的一個module,
nchan
原始碼: https://github.com/slact/nchan
官網: https://nchan.io/
nginx 配置說明檔案: https://nchan.io/documents/nginxconf2016-slides.pdf
測驗環境搭建
4 臺linux centos 7,都安裝了nginx和nchan,
安裝可以參考下這篇文章,
https://www.cnblogs.com/rongfengliang/p/7866122.html
目前使用的是4臺nginx做測驗,1臺模擬上游的轉發服務器(類似 keepalived),后3臺是安裝了nchan的nginx,用來 pub/sub
看下配置,
nginx_master.conf
#master
upstream ws {
server 192.168.1.1:8080 weight=1 max_fails=2;
server 192.168.1.2:8080 weight=1 max_fails=2;
server 192.168.1.3:8080 weight=1 max_fails=2;
}
server {
listen 80;
server_name test.xxx.xxx.com;
root /usr/local/nginx/chat;
#error_page 404 /404.html;
error_page 500 502 503 504 /50x.html;
location = 50x.html {
root /usr/local/nginx/html;
}
#master
location / {
proxy_pass http://ws;
proxy_http_version 1.1;
proxy_set_header Upgrade $http_upgrade;
proxy_set_header Connection "upgrade";
}
access_log logs/barrage.access.log;
error_log logs/nchan_error.log;
}
nginx_salve.conf
nchan_shared_memory_size 256M;
upstream redis_cluster {
nchan_redis_server redis://:password@10.168.1.2:3001;
nchan_redis_server redis://:password@10.168.1.2:3002;
nchan_redis_server redis://:password@10.168.1.3:3001;
nchan_redis_server redis://:password@10.168.1.3:3002;
nchan_redis_server redis://:password@10.168.1.1:3001;
nchan_redis_server redis://:password@10.168.1.1:3002;
# you don't need to specify all the nodes, they will be autodiscovered
# however, it's recommended that you do specify at least a few master nodes.
}
server {
listen 8080;
server_name localhost;
root /usr/local/nginx/chat;
location = /sub {
add_header x-hit-from 127 always;
nchan_subscriber;
nchan_channel_id $arg_vid;
nchan_use_redis on;
nchan_redis_pass redis_cluster;
}
location = /pub{
nchan_publisher;
nchan_channel_id $arg_id;
nchan_redis_pass redis_cluster;
nchan_message_timeout 5m;
}
#..
location = /private/status {
nchan_stub_status;
}
}
websocket測驗網站:
http://www.websocket-test.com/
ws 建聯 sub介面用來接收訊息,vid是nginx配置里的channel_id

postman模擬訊息發送到 nchan, /pub 介面

redis 存訊息資料

如果自己寫前端頁面的話:可以是創建原生的websocket,也可以使用官方提供的NchanSubscriber.js,
正常 websocket:
var ws =new WebScoket("ws://xxx.yyy.com/sub?id=demo")
ws.onMessage=funciton(data){
console.log(data)
}
引入 NchanSubscriber.js:
var sub = new NchanSubscriber('http://xxx.yyy.com/sub?id=demo', 'websocket');
sub.on("message",
function(message, message_metadata) {
alert(message);
});
架構
# 共享記憶體,單機的時候取決于單機的記憶體,redis集群下取決于 單個節點的記憶體
nchan_shared_memory_size 32000M;
單機nginx的worker通過channel的hash把資料同步到當前worker的記憶體,再同步到共享記憶體,同時刷到第二個worker,(關于記憶體操作其實呼叫的是nginx的api)
右邊的redis模擬的場景是不同的nchan節點,共享層用redis來實作,

如果量小的話,簡單的做個im或者聊天室,單機就足夠了,使用的是nginx的機器記憶體,(和 springboot 直接加個 @WebSocket 注解差不多)

支持水平擴展的集群架構:(理論上支持不限數量的橫向擴展,瓶頸在redis集群,得做好容災方案,)
一般單臺機器的連接數在 65535(可以改大),所以即便存盤使用了redis,單機還是有瓶頸的,當然一般看訊息體的大小,可能redis會先崩,所以需要多臺nginx機器和一個超大的redis集群,來扛得住百萬用戶,nginx集群至少20臺才能維持這么多人同時在線,如果要考慮訊息的話,得看訊息內容,預估redis集群大小,

nchan 和 netty
今天有同事問我,關于分發訊息的,nchan和netty有什么區別,
簡單說下netty的實作,
用 ConcurrentMap 維護 key(聊天室id)和channelGroup(每一個用戶連接成功,就會增加一個channel),
當有訊息需要通知的時候需要呼叫方法即可,
channelGroup.writeAndFlush(new TextWebSocketFrame(message));
再看下 nchan的原始碼:
slact/nchan/blob/master/src/subscribers/common.c
ngx_int_t nchan_subscriber_receive_notice(subscriber_t *self, ngx_int_t code, void *data) {
if(code == NCHAN_NOTICE_SUBSCRIBER_INFO_REQUEST) {
// ... 構建需要通知的內容
// 獲取需要通知的 channel_id,從這個函式 nchan_get_subscriber_info_response_channel_id
ngx_str_t *response_channel_id = nchan_get_subscriber_info_response_channel_id(self->request, response_id);
// ... 構建msg物件
cf->storage_engine->publish(response_channel_id, &msg, cf, NULL, NULL);
if(result_allocd) {
ngx_http_complex_value_free(&result);
}
}
return NGX_OK;
}
/src/util/nchan_channel_id.c
ngx_str_t *nchan_get_subscriber_info_response_channel_id(ngx_http_request_t *r, uintptr_t request_id) {
// 全域的 request_ctx 物件,呼叫nginx api獲得,https://www.nginx.com/resources/wiki/extending/api/http/#ngx-http-get-module-ctx
nchan_request_ctx_t *ctx = ngx_http_get_module_ctx(r, ngx_nchan_module);
ngx_str_t *chid = ctx->subscriber_info_response_channel_id;
// child 是空的,就新分配記憶體,并給ctx的subscriber_info_response_channel_id賦值
if(!chid) {
// nginx 分配記憶體的函式,https://www.nginx.com/resources/wiki/extending/api/alloc/
chid = ngx_palloc(r->pool, sizeof(ngx_str_t));
if(chid == NULL) {
return NULL;
}
ctx->subscriber_info_response_channel_id = chid;
chid->data = ngx_palloc(r->pool, NCHAN_SUBSCRIBER_INFO_CHANNEL_ID_BUFFER_SIZE);
if(chid->data == NULL) {
ctx->subscriber_info_response_channel_id = NULL;
return NULL;
}
}
u_char *end = ngx_snprintf(chid->data, NCHAN_SUBSCRIBER_INFO_CHANNEL_ID_BUFFER_SIZE, "meta/sr%d", (ngx_int_t )request_id);
chid->len = end - chid->data;
return chid;
}
src/nchan_types.h
看一下 nchan_request_ctx_t 的結構,訂閱者的id和channel_id 都存了,
#define NCHAN_MULTITAG_REQUEST_CTX_MAX 4
typedef struct {
subscriber_t *sub;
nchan_reuse_queue_t *output_str_queue;
nchan_reuse_queue_t *reserved_msg_queue;
nchan_bufchain_pool_t *bcp; //bufchainpool maybe?
ngx_str_t *subscriber_type;
nchan_msg_id_t msg_id;
nchan_msg_id_t prev_msg_id;
ngx_str_t *publisher_type;
ngx_str_t *multipart_boundary;
ngx_str_t *channel_event_name;
ngx_str_t channel_id[NCHAN_MULTITAG_REQUEST_CTX_MAX];
int channel_id_count;
time_t channel_subscriber_last_seen;
int channel_subscriber_count;
int channel_message_count;
ngx_str_t *channel_group_name;
ngx_str_t *request_origin_header;
ngx_str_t *allow_origin;
ngx_int_t subscriber_info_response_id;
ngx_str_t *subscriber_info_response_channel_id;
unsigned sent_unsubscribe_request:1;
unsigned request_ran_content_handler:1;
} nchan_request_ctx_t;
其實是從nginx的全域物件獲取channel_id的,
總結
優點:
-
服務不需要考慮和維護鏈接等,只需要專注處理業務相關邏輯,
-
nchan由于直接在nginx層,性能更好(比起自己搭建pub/sub服務器),
缺點:
- 非應用層的,出現問題不好排查,
- 默認的訊息存盤時間過長(nchan_message_timeout:1h),訊息大量情況下容易拖垮集群,
- 以channelid為key的話,可能導致redis分配不均勻,單個節點壓力過大,影響不止當前的channelid,
- 集群模式依賴redis,需要考慮容災方案,
- 擴展性差,由于不支持多個redis集群,沒法根據特定條件分片,(比如某個聊天室人特別多,單獨一套redis集群,用netty的話可能比較好實作)(特意給作者留言確認了一下,https://github.com/slact/nchan/issues/619)
轉載請註明出處,本文鏈接:https://www.uj5u.com/ruanti/297116.html
標籤:其他
