分布式系統架構-samgr/source系統服務開發框架基礎代碼message.c講解
- 本篇概述
- 代碼框架
- 訊息管理流程圖
- message.h
- 關鍵結構體
- 函式宣告
- message.c
- 發送請求與回應
- 接收訊息與釋放訊息
- 共享
- 知識補充
本篇概述
本篇主要講解message.h和message_inner.h以及message.c的部分代碼,這三個檔案代碼核心在于訊息(msg)的收發,通過請求與回應的方式,利用訊息交換進行外部介面對服務于功能的呼叫管理,請求與回應像是一對相生相融的兄弟,共同決定了系統的安全運作,
代碼框架
message
├── 關鍵結構體
│ └── Identity
│ └── Request
│ └── Response
│ └── Exchange
├── 宏定義
│ └── Handler
│ └── Exchange
├── 訊息功能函式
│ └── SAMGR_SendRequest
│ └── SAMGR_SendResponse
│ └── SAMGR_MsgRecv
│ └── SAMGR_FreeMsg
│ └── SharedSend
│ └── FreeReference
│ └── SAMGR_SendSharedRequest
│ └── SAMGR_SendSharedDirectRequest
│ └── SAMGR_SendResponseByIdentity
訊息管理流程圖

message.h
關鍵結構體
關鍵的結構體包括:
- Identity:它決定了請求和回應對應服務或功能的id,
- Request :其中包括了請求的訊息id與資料內容,
- Response :主要包含了回應的內容,
- ==Exchange ==:這里尤為強調exchange,因為它是訊息管理的基礎,函式對請求與回應的功能性呼叫都是以exchange結構體內容為基礎的,
/**
* @brief Identifies a service and feature.
* 概述:標識服務和特性
* You can use this structure to identity a {@link IUnknown} feature to which messages will be
* sent through the asynchronous function of {@link IUnknown}. \n
* 你可以使用這個結構去標識一個功能,這個功能的信號將通過異步功能IUnknown發送
*/
struct Identity {
/** Service ID 服務ID*/
int16 serviceId;
/** Feature ID 功能ID*/
int16 featureId;
/** Message queue ID 訊息佇列ID*/
MQueueId queueId;
};
/**
* @brief Defines a request.
* 概述:定義一個請求
* You can use this structure to define the request that will be sent to a feature through the
* asynchronous function of {@link IUnknown}. \n
* 你可以使用這個框架來定義將通過異步IUnknown介面發送給一個功能的請求
* Request, which is data that is packed to send to a feature. \n
* 請求是被打包發送給一個feature(功能、特征)的資料
* If the data is not empty and the length is not 0, the system automatically releases the data. \n
* 如果資料不為空且長度不為0,系統將自動釋放資料
*/
struct Request {
/** Message ID 訊息ID*/
int16 msgId;
/** Data length 訊息長度*/
int16 len;
/** Data content 資料內容*/
void *data;
/** Message value, which is defined by developers 被開發者定義的訊息值*/
uint32 msgValue;
};
/**
* @brief Defines a response.
* 概述:定義一個回應
* This structure is used to send a response after the message processing function of a service
* or feature processes a request. \n
* 這個框架是用來在一個服務或功能處理一個請求的訊息處理功能之后發送的一個回應
* If the data is not empty and the length is not 0, the system automatically releases the data. \n
* 如果資料不為空且長度不為0,系統將自動釋放資料
*/
struct Response {
/** Data content 回應的資料內容*/
void *data;
/** Data length 回應的資料長度*/
int16 len;
};
struct Exchange {
Identity id; /**< The target service or feature identity. 目標服務或功能的id*/
Request request;
Response response;
short type; /**< The exchange type. 交換型別*/
Handler handler; /**< async response or immediately request callback function 異步回應或立即請求的回呼函式*/
uint32 *sharedRef; /**< use to share the request and response for saving memory 用于共享請求和回應以節約記憶體*/
};
函式宣告
其中,最重要的函式莫過于:SAMGR_SendRequest()和SAMGR_SendResponse(),它們的注解與宣告如下:
/**
* @brief Sends a request to a service or feature of a specified identity.
* 概述:向一個服務或功能的特定id發送一個請求
* This function is called by a service to send messages to its own features through the
* asynchronous function of {@link IUnknown}. \n
*
* @param identity Indicates the pointer to the ID of the feature or service that processes
* the message.
* identity代表指向一個服務或功能的id,它會處理這條訊息
* @param request Indicates the pointer to the request.
* request代表指向請求的指標
* @param handler Indicates the function handling the response. If the value is <b>NULL</b>,
* no response is required.
* handler代表處理回應的功能,如果這個值為空則沒有回應被請求
* @return Returns <b>EC_SUCCESS</b> if the request is sent successfully; returns other error codes
* if the request fails to be sent. The caller needs to release the memory applied in the request.
* 如果請求被成功發送將會回傳EC_SUCCESS,回傳錯誤代碼如果請求發送失敗,呼叫者需要釋放請求應用的記憶體空間
* @since 1.0
* @version 1.0
*/
int32 SAMGR_SendRequest(const Identity *identity, const Request *request, Handler handler);
/**
* @brief Sends a response after processing a request.
* 概述:在處理請求后發送一個回應
* This function is called to send a response after processing a request by {@link MessageHandle}
* of a service or {@link OnMessage} of a feature. \n
*
* @param request Indicates the pointer to the original request.
* request代表指向原始請求的指標
* @param response Indicates the pointer to the response content.
* response代表回應內容的指標
* @return Returns <b>EC_SUCCESS</b> if the response is sent successfully; returns other error
* codes if the response fails to be sent.
* 如果回應被成功發送將會回傳EC_SUCCESS,回傳錯誤代碼如果請求發送失敗
* @attention
* <ul><li>This function can be called only in {@link MessageHandle} or {@link OnMessage}. </li>
* 這個函式只能在MessageHandle或OnMessage中被呼叫
* <li>The request must be the original one passed from {@link MessageHandle} or
* {@link OnMessage}. Otherwise, a memory exception occurs. </li>
* 請求必須是最初的從MessageHandle或OnMessage發送來的,否則記憶體例外便會發生
* <li> When the caller sends a request, the <b>handler</b> callback function must be carried. </li>
* 當呼叫者發送一個請求時,handler回呼函式必須被攜帶
* <li>The response is sent to the message queue of the service to which the requester belongs
* for processing. Therefore, the requester should wait for the response in non-blocking mode. </li></ul>
* 回應被發送到請求者所屬的服務的訊息佇列進行處理,因此,請求者應該以非阻塞模式等待回應,
* @since 1.0
* @version 1.0
*/
int32 SAMGR_SendResponse(const Request *request, const Response *response);
我們將其余函式宣告代碼也列在下面供讀者參考:
//訊息接收
int32 SAMGR_MsgRecv(MQueueId queueId, uint8 *interMsg, uint32 size);
/**
* The function just release the Message->data and Message->sharedRef(use free), not release the msg entry.
* 這個函式僅僅釋放Message->data和Message->sharedRef(use free),不釋放訊息入口
* If you alloc the msg on the heep, you should release it by yourself, you`d better alloc on the stack.
* 如果你在堆上分配了訊息空間,你需要自行釋放它,你最好在堆疊中分配訊息空間
* The function will be called automatically.
* 這個函式將被自動呼叫
* Do not call this function manually, except the SM_SendRequest return error!
* 不要人為地呼叫這個函式,除非SM_SendRequest回傳錯誤代碼
**/
//訊息釋放
int32 SAMGR_FreeMsg(Exchange *exchange);
/**
* @brief Sends a request to multiple services or features to save memory.
*
* This function is used to publish topics for the {@link Broadcast} service to broadcast messages. \n
*
* @param identity Indicates the pointer to the IDs of services or features, to which requests
* are sent.
* @param request Indicates the pointer to the request.
* @param token Indicates the pointer to reference counting.
* @param handler Indicates the function handling the response. If the value is <b>NULL</b>,
* no response is required.
* @retval Returns the token if the request is sent successfully; returns <b>NULL</b> if the
* request fails to be sent.
* @attention
* <ul><li>Ensure that the thread specified by <b>identity</b> processes the message after
* all messages are sent. Common practice: Add a lock before sending a request and add
* the same lock during processing. </li>
* <li>If <b>NULL</b> is returned, the caller needs to release the memory of the request. </li></ul>
* @since 1.0
* @version 1.0
*/
uint32 *SAMGR_SendSharedRequest(const Identity *identity, const Request *request, uint32 *token, Handler handler);
/**
* @brief Sends a request and response of a caller to the feature thread. The handler is directly
* called to process the request and response without using the message processing functions.
* (Customized function for the broadcast service)
*
* This function is used to publish topics for the {@link Broadcast} service to broadcast messages. \n
* The value of reference counting is incremented by one each time this function is called. \n
*
* @param id Indicates the pointer to the IDs of services or features, to which the request and
* response are sent.
* @param request Indicates the pointer to the request.
* @param resp Indicates the pointer to the response.
* @param ref Indicates the reference counting.
* @param handler Indicates the function for handling the request and response. This parameter
* cannot be <b>NULL</b>.
* @return Returns <b>EC_SUCCESS</b> if the request and response are sent successfully; returns
* other error codes if the request and response fail to be sent.
* @attention
* <ul><li>Ensure that the thread specified by <b>identity</b> processes the message after all
* messages are sent. Common practice: Add a lock before sending a request and add the same lock
* during processing. </li>
* <li>If <b>NULL</b> is returned, the caller needs to release the memory of the request and
* response. </li>
* <li>If the response changes each time when a request is sent, ensure that the response
* will not be released. (Set <b>len</b> to <b>0</b>, the <b>data</b> of response will be
* the resident memory.) </li></ul>
* @since 1.0
* @version 1.0
*/
int32 SAMGR_SendSharedDirectRequest(const Identity *id, const Request *req, const Response *resp, uint32 **ref,
Handler handler);
/**
* @brief Sends a response to a specified service or feature after processing the original request.
* (Customized function for <b>bootstrap</b>)
*
* This function is called to send a response after processing a request by {@link MessageHandle}
* of a service or {@link OnMessage} of a feature. \n
* This function can be customized to implement phased startup of different types of services. \n
*
* @param id Indicates the pointer to the ID of a service or feature. The response is sent to the
* thread of the service or feature for processing.
* @param request Indicates the pointer to the original request.
* @param response Indicates the pointer to the response content.
* @return Returns <b>EC_SUCCESS</b> if the response is sent successfully; returns other error
* codes if the response fails to be sent.
* @attention
* <ul><li>This function can be called only in <b>MessageHandle</b> or <b>OnMessage</b>. </li>
* <li>The request must be the original one passed from <b>MessageHandle</b> or <b>OnMessage</b>.
* Otherwise, a memory exception occurs. </li>
* <li> When the caller sends a request, the <b>handler</b> callback function must be carried. </li>
* <li>The response is sent to the message queue of a specified ID for processing. Therefore,
* wait for the response in non-blocking mode. </li></ul>
* @since 1.0
* @version 1.0
*/
int32 SAMGR_SendResponseByIdentity(const Identity *id, const Request *request, const Response *response);
static int32 SharedSend(MQueueId queueId, Exchange *exchange, int initRef);
static BOOL FreeReference(Exchange *exchange);
message.c
發送請求與回應
詳細代碼及注解如下:
//定義發送請求函式
int32 SAMGR_SendRequest(const Identity *identity, const Request *request, Handler handler)
{
if (request == NULL || identity == NULL) {
return EC_INVALID;
}
//初始化訊息交換結構體
Exchange exchange = {*identity, *request, {NULL, 0}, MSG_NON, handler, NULL};
//將交換訊息需要的目標服務或功能id的佇列id置空
exchange.id.queueId = NULL;
//如果處理器的值非空則重新賦值佇列id以及交換型別
if (handler != NULL) {
exchange.id.queueId = SAMGR_GetCurrentQueueID();
exchange.type = MSG_CON;
}
//將id佇列中的執行資訊壓入exchange中
return QUEUE_Put(identity->queueId, &exchange, 0, DONT_WAIT);
}
//定義發送回應函式
int32 SAMGR_SendResponse(const Request *request, const Response *response)
{
// we need send the default the con message or not?
if (request == NULL) {
return EC_INVALID;
}
//獲取交換請求物件
Exchange *exchange = GET_OBJECT(request, Exchange, request);
//交換型別一定要是需要被確認的請求
if (exchange->type != MSG_CON) {
return EC_INVALID;
}
if (exchange->handler == NULL) {
return EC_SUCCESS;
}
//對交換型別、回應資料及長度進行初始化
exchange->type = MSG_ACK;
exchange->response.data = NULL;
exchange->response.len = 0;
//將回應的內容賦值到exchange結構體中去
if (response != NULL) {
exchange->response = *response;
}
// If there is no task queue, we will call the response processor in current task.
//如果沒有任務佇列,則呼叫當前任務中的回應處理器,
if (exchange->id.queueId == NULL) {
exchange->handler(&exchange->request, &exchange->response);
return EC_SUCCESS;
}
// Send back to the origin to process the task.
// 如果上述情況都不滿足則進行發送分享,這里的含義是將任務發送回原點以處理任務,即重置任務,
int32 ret = SharedSend(exchange->id.queueId, exchange, 1);
if (ret != EC_SUCCESS) {
exchange->handler(&exchange->request, &exchange->response);
(void)FreeReference(exchange);
}
return EC_SUCCESS;
}
接收訊息與釋放訊息
詳細代碼及注解如下:
//定義訊息接收函式
int32 SAMGR_MsgRecv(MQueueId queueId, uint8 *interMsg, uint32 size)
{
if (queueId == NULL || interMsg == NULL || size == 0) {
return EC_INVALID;
}
//將傳入的訊息復制入地址為0x00的位置
if (memset_s(interMsg, size, 0x00, size) != EOK) {
return EC_FAILURE;
}
//在執行佇列ID中加入接收到的訊息資訊
return QUEUE_Pop(queueId, interMsg, 0, WAIT_FOREVER);
}
//定義釋放訊息
int32 SAMGR_FreeMsg(Exchange *exchange)
{
//檢查exchange的索引是否需要置空,呼叫函式以達成free索引的功能
if (!FreeReference(exchange)) {
return EC_SUCCESS;
}
//檢查請求并置空請求資料
if (exchange->request.len > 0) {
SAMGR_Free(exchange->request.data);
exchange->request.data = NULL;
}
//檢查回應并置空回應資料
if (exchange->response.len > 0) {
SAMGR_Free(exchange->response.data);
exchange->response.data = NULL;
}
return EC_SUCCESS;
}
共享
我們可以利用共享訊息來節省一定的時間效率與空間效率,詳細代碼及注解如下:
//發送共享請求
uint32 *SAMGR_SendSharedRequest(const Identity *identity, const Request *request, uint32 *token, Handler handler)
{
if (identity == NULL || request == NULL) {
return NULL;
}
Exchange exchange = {*identity, *request, {NULL, 0}, MSG_NON, handler, token};
exchange.type = (handler == NULL) ? MSG_NON : MSG_CON;
exchange.id.queueId = NULL;
int32 err = SharedSend(identity->queueId, &exchange, 0);
if (err != EC_SUCCESS) {
HILOG_ERROR(HILOG_MODULE_SAMGR, "SharedSend [%p] failed(%d)!", identity->queueId, err);
(void)FreeReference(&exchange);
}
return exchange.sharedRef;
}
//發送共享直接請求
int32 SAMGR_SendSharedDirectRequest(const Identity *id, const Request *req, const Response *resp, uint32 **ref,
Handler handler)
{
if (handler == NULL || ref == NULL) {
return EC_INVALID;
}
Exchange exchange = {0};
if (req != NULL) {
exchange.request = *req;
}
if (resp != NULL) {
exchange.response = *resp;
}
exchange.handler = handler;
exchange.sharedRef = *ref;
exchange.type = MSG_DIRECT;
exchange.id = *id;
exchange.id.queueId = NULL;
int32 err = SharedSend(id->queueId, &exchange, 0);
if (err != EC_SUCCESS) {
HILOG_ERROR(HILOG_MODULE_SAMGR, "SharedSend [%p] failed(%d)!", id->queueId, err);
(void)FreeReference(&exchange);
}
*ref = exchange.sharedRef;
return err;
}
//通過id發送回應
int32 SAMGR_SendResponseByIdentity(const Identity *id, const Request *request, const Response *response)
{
// we need send the default the con message or not?
if (request == NULL || id == NULL) {
return EC_INVALID;
}
Exchange *exchange = GET_OBJECT(request, Exchange, request);
if (exchange->type == MSG_NON) {
return EC_INVALID;
}
exchange->id.queueId = id->queueId;
return SAMGR_SendResponse(request, response);
}
//定義共享發送函式
static int32 SharedSend(MQueueId queueId, Exchange *exchange, int initRef)
{
/* if the msg data and response is NULL, we just direct copy, no need shared the message. */
//如果訊息資料和回應是空值,我們只需要直接復制,不需要分享訊息
if ((exchange->request.data == NULL || exchange->request.len <= 0) &&
(exchange->response.data == NULL || exchange->response.len <= 0)) {
return QUEUE_Put(queueId, exchange, 0, DONT_WAIT);
//這里是直接將訊息佇列id復制入exchange執行task的id中
}
/* 1.add reference */
MUTEX_GlobalLock();//加入全域互斥鎖
if (exchange->sharedRef == NULL) {
//加入參考
exchange->sharedRef = (uint32*)SAMGR_Malloc(sizeof(uint32));
if (exchange->sharedRef == NULL) {
MUTEX_GlobalUnlock();
return EC_NOMEMORY;
}
*(exchange->sharedRef) = initRef;
}
//地址移1位
(*(exchange->sharedRef))++;
MUTEX_GlobalUnlock();
//將任務佇列中的id加入exchange中
return QUEUE_Put(queueId, exchange, 0, DONT_WAIT);
}
//釋放exchange中的索引值
static BOOL FreeReference(Exchange *exchange)
{
if (exchange == NULL) {
return FALSE;
}
BOOL needFree = TRUE;
/* 檢查共享索引 */
MUTEX_GlobalLock();
if (exchange->sharedRef != NULL) {
//共享索引地址移1位,但不能為0,因此當出現0時需要及時進行重置
if (*(exchange->sharedRef) > 0) {
(*(exchange->sharedRef))--;
}
if (*(exchange->sharedRef) > 0) {
needFree = FALSE;
}
}
MUTEX_GlobalUnlock();
if (needFree) {
SAMGR_Free(exchange->sharedRef);
exchange->sharedRef = NULL;
}
return needFree;
}
知識補充
在exchange結構體中,我們對The exchange type.即交換型別有詳細的列舉型別定義:
enum ExchangeType {
MSG_EXIT = -1,//退出
MSG_NON = 0,//不需要確認
MSG_CON = 1,//需要確認
MSG_ACK = 2, //應答訊息
MSG_SYNC = 3,//同步訊息
MSG_DIRECT = 4,//直接交換
};
事實上,訊息的傳導,即回應請求模式是基于COAP協議的,
詳情請見:COAP協議簡介,
轉載請註明出處,本文鏈接:https://www.uj5u.com/ruanti/342155.html
標籤:其他
上一篇:nginx安裝http2.0協議
