定義tcp server協議的資料結構
這里使用了libuv為基礎,來制作高性能的服務器,而作為服務器,我們必須支撐一種或者多種協議,這里使用類似于modbus協議的,資料結構被寫死了,并不靈活,以0x69開頭,0x16為結尾,如果需要適合其他協議,需要改寫代碼,一種方法是使用js來做腳本,或者json來做,用js更為簡單,并且可以以執行腳本的方式來執行一些初始化的指令,
//協議頭部和回呼函式設定
typedef struct tcp_settings
{
urls url;
//datatype 不同型別
//
char datatype = 0;
unsigned char head = 0x69; //0x69 = 105
//如果有頭部起始位置一定為0 一個位元組如0x69
char headoffset = 0;
//起始頭部為0 或者 1個位元組 .2 .3 .4
char headlength = 0;
//從第一個位元組開始是id,從0開始算
char idoffset = 1;
//4個位元組為id長度,設備id的長度
char idlength = 4;
//命令長度
char cmdlength = 1;
//命令偏移量
char cmdoffset = 5;
//代表包長內容的只有一個位元組
char content_len = 1;
//代表包長度的是第6個位元組為起始位置,從0開始算
char content_offset = 6;
char includeht = 0;
//是否包含crc校驗和end
char includecrcend = 0;
// 0 是沒有校驗,2是2個位元組crc16
char crclen = 2;
char headfieldslen = 0;
char timestamp_offset = 0;
char timestamp_len = 0;
char type_offset = 0;
char type_len = 0;
char memo_offset = 0;
char memo_len = 0;
//一個結束位元組,如果為零則忽略end
char end_start = 1;
//end_start 如果為零則忽略
unsigned char end = 0x16;
/*保留
big endian 0
little endian 1
*/
char bl = enum_big;
/*計算出來的長度
初始化為-1,
否則大于0
*/
int hlen_calc = -1;
uv_loop_t * uv_loop = NULL;
}tcp_settings;
主代碼
要達到高性能的代碼,
1、少量的拷貝 盡量少拷貝資料
2、執行緒池 使用執行緒池處理資料
3、異步 使用異步方式接收鏈接和接收資料
libuv支持了執行緒池的處理方式,以及本身就是異步方式的api,給我們提供了很大的方便性,
定義執行緒池資料結構
typedef struct thread_work {
thread_work(client_t* cli, tcp_unit * unit) :
request(),
client(cli),
data(unit),
error(false) {
//保存資料指標,傳到處理執行緒
request.data = this;
}
uint32_t id =0;
client_t* client = NULL;
//把資料接過來進行處理
tcp_unit * data = NULL;
uv_work_t request;
bool error;
}thread_work;
code
#include "uv.h"
#include "protocol.h"
#include "util.h"
#include <map>
#include <iostream>
using namespace std;
#define MAX_WRITE_HANDLES 1000
//考慮quick js
class tcp_server
{
//包頭和包體的
tcp_settings _set;
//第二代資料結構,把決議放到腳本里,不要用c++決議,只接收資料
tcp_settings2 _set2;
//tcpserver
uv_tcp_t * _server = NULL;
public:
tcp_server(){}
~tcp_server() {}
protected:
//客戶端斷開連接
static void on_close(uv_handle_t* handle) {
//客戶端下線
client_t* client = (client_t*)handle->data;
client->clean();
delete client;
//client_offline(client->deviceid);
}
static void alloc_cb(uv_handle_t * handle, size_t suggested_size, uv_buf_t* buf) {
int buflen = 0;
int headlen = 0;
int bodylen = 0;
client_t* client = (client_t*)handle->data;
char *head = &(client->head[0]); //資料接收的頭部
char *pos = head + client->recvlen; //位置指向資料已經接收的下一個位元組
//得到頭部長度
#ifdef _USE_SETTING1
headlen = get_headlen(client->config);
#else
headlen = get_headlen2(client->config2);
#endif
if (client->status == enum_head) //接收頭部位元組
{
buflen = headlen - client->recvlen;
*buf = uv_buf_init(pos, buflen);
}
else //接收資料部分位元組
{
//得到包體長度
#ifdef _USE_SETTING1
bodylen = get_bodylen(client->config, head);
#else
bodylen = get_bodylen2(client->config2, head);
#endif
//printf("the body len is %d\n", bodylen);
if (bodylen > 0) {
if (client->buffer_data == NULL) {
//總長度
//printf("create memory\n");
client->buffer_data = new tcp_unit();
//加上頭部長度
client->buffer_data->headlen = headlen;
client->buffer_data->data = new char[bodylen + headlen];
client->buffer_data->tlen = bodylen + headlen;
//資料接收的長度加上頭部的長度,開始接收資料體
client->buffer_data->recvlen = headlen;
//拷貝頭部
memcpy(client->buffer_data->data, head, headlen);
*buf = uv_buf_init(client->buffer_data->data + headlen, bodylen);
}
else {
//前面加了頭部
char * pos = client->buffer_data->data + client->buffer_data->recvlen;
buflen = client->buffer_data->tlen - client->buffer_data->recvlen;
*buf = uv_buf_init(pos, buflen);
}
}
else { //否則沒有包體,只有包頭
client->buffer_data->tlen = bodylen;
client->buffer_data->recvlen = 0;
}
}
}
//
static void worker(uv_work_t* req) {
thread_work * rb = (thread_work *)req->data;
tcp_unit *tu = rb->data;
tcp_server *server =(tcp_server*)rb->client->data;
server->on_data(tu);
}
static void after_worker(uv_work_t* req,int status) {
thread_work *work = static_cast<thread_work *>(req->data);
tcp_unit * tu = work->data;
free(tu->data);
free(tu);
free(work);
}
static int tcp_parser_execute(client_t* client, char *data, int size)
{
int headlen = 0;
#ifdef _USE_SETTING1
headlen = get_headlen(client->config);
#else
headlen = get_headlen2(client->config2);
#endif
if (client->status == enum_head)
{
client->recvlen += size;
//如果頭部位元組已經接收完畢
if (headlen == client->recvlen)
{
client->status = enum_body; //開始接收包體資料
//頭部已經接收完畢則發生事件
//繼承加入可以發生事件,如加入串列
tcp_server * server = (tcp_server *)client->data;
server->on_headers_complete(client);
}
}
else if (client->status == enum_body)
{
client->buffer_data->recvlen += size;
if (client->buffer_data->tlen == client->buffer_data->recvlen) {//資料已經接收完畢
//包頭資料初始化
client->recvlen = 0;
client->status = enum_head; //開始重新接收包頭
if (client->buffer_data->tlen > 0) {
//發生事件
tcp_server * server = (tcp_server *)client->data;
server->on_message_complete(client);
thread_work * work = new thread_work(client,client->buffer_data);
work->id = client->deviceid;
client->buffer_data = NULL;
//訊息體接收結束,交給執行緒池處理
int status = uv_queue_work(client->config->uv_loop,
&work->request,
worker,
after_worker);
CHECK(status, "uv_queue_work");
}
}
}
return client->recvlen;
}
static void on_read(uv_stream_t* tcp, ssize_t nread, const uv_buf_t * buf) {
ssize_t parsed;
client_t* client = (client_t*)tcp->data;
if (nread >= 0) {
parsed = (ssize_t)tcp_parser_execute(
client, buf->base, nread);
if (parsed < 0) {
LOG_ERROR("parse error");
//tcp_parser *parser = &client->parser;
uv_close((uv_handle_t*)&client->tcp, on_close);
}
}
else {
if (nread != UV_EOF) {
UVERR(nread, "read");
}
uv_close((uv_handle_t*)&client->tcp, on_close);
}
}
static void on_connect(uv_stream_t* server_handle, int status) {
CHECK(status, "connect");
printf("connected!\n");
tcp_server * server = (tcp_server *)server_handle->data;
client_t* client = (client_t*)calloc(1, sizeof(client_t));
client->config = &(server->_set);
//這里還沒有改成讀json檔案,后面停用腳本檔案
client->config2 = &(server->_set2);
client->data = server;
uv_loop_t * uv_loop = client->config->uv_loop;
uv_tcp_init(uv_loop, &client->tcp);
client->tcp.data = client;
int r = uv_accept(server_handle, (uv_stream_t*)&client->tcp);
if (r == 0) {
uv_read_start((uv_stream_t*)&client->tcp, alloc_cb, on_read);
}
else {
CHECK(r, "accept");
uv_close((uv_handle_t*)(&client->tcp), on_close);
}
}
public:
//頭部接收完畢
virtual int on_headers_complete(void *param) {
return 0;
}
virtual int on_message_complete(void *param) {
return 0;
}
//收到完整一幀資料
virtual int on_data(tcp_unit * data) {
return 0;
}
int start(const char * ip, uint16_t port) {
int r = uv_tcp_init(_set.uv_loop, _server);
//保存用戶資料,是tcp server 本身指標
_server->data = this; // &_set;
CHECK(r, "tcp_init");
r = uv_tcp_keepalive(_server, 1, 60);
CHECK(r, "tcp_keepalive");
struct sockaddr_in address;
r = uv_ip4_addr(ip, port, &address);
CHECK(r, "ip4_addr");
r = uv_tcp_bind(_server, (const struct sockaddr*)&address, 0);
CHECK(r, "tcp_bind");
r = uv_listen((uv_stream_t*)_server, MAX_WRITE_HANDLES, on_connect);
CHECK(r, "uv_listen");
//tcp_servers.push(port, server);
return 0;
}
int tcp_init(const char * configfile,uv_loop_t * uv_loop)
{
//fix me ,here is not use configfile
_server = new uv_tcp_t();
_set.uv_loop = uv_loop;
#if 0
if (read_config_protocol(configfile, _set) != 0)
{
cout << "error read config file "<<configfile << endl;
uninit();
return -1;
}
//
_set.hlen_calc = get_headlen(&_set);
printf("the head len is %d\n", _set.hlen_calc);
#endif
return 0;
}
void uninit()
{
free(_server);
}
//得到所有的客戶端ip地址和流量等資料
int getclients()
{
return 0;
}
client_t *getclient(uint32_t deviceid) {
return NULL;
}
};
主要代碼按照包頭–》包體的方式接收,接收完包頭,接收包體,然后進入下一個回圈,
呼叫
未完待續
轉載請註明出處,本文鏈接:https://www.uj5u.com/qita/290743.html
標籤:其他
