Linux C的MQTT測驗代碼撰寫
做完在同一個主機下的MQTT客戶端實驗后,做億點點改變,我們使用移植好的MQTT介面在Ubuntu下寫一個MQTT的客戶端,然后和windows下的MQTT.fx工具通信,
-
代碼
#include <stdio.h> #include <stdlib.h> #include <string.h> #include "MQTTClient.h" //需要在系統中提前安裝好MQTT,可以參考 #define ADDRESS "tcp://192.168.50.81:1883" //根據 MQTT 實際主機地址調整 #define CLIENTID_PUB "ExampleClientPub" #define CLIENTID_SUB "ExampleClientSub" #define QOS 1 #define TIMEOUT 10000L #define PUB_TOPIC "ScratchToSoftWare" #define SUB_TOPIC "SoftWareToScratch" #define NUM_THREADS 2 //執行緒個數,一個用于發布,一個用于訂閱 #define FAN_OFF "{'fan':false}" //表示風扇關閉的json字串 #define FAN_ON "{'fan':true}" //表示風扇打開的json字串 int fan_state = 0; volatile MQTTClient_deliveryToken deliveredtoken; //傳遞給MQTTClient_setCallbacks的回呼函式,訊息發送成功后,呼叫此回呼函式 void delivered(void *context, MQTTClient_deliveryToken dt) { printf("Message with token value %d delivery confirmed\n", dt); deliveredtoken = dt; } //傳遞給MQTTClient_setCallbacks的回呼函式 訊息到達后,呼叫此回呼函式 int msgarrvd(void *context, char *topicName, int topicLen, MQTTClient_message *message) { printf("Message arrived\n"); printf(" topic: %s\n", topicName); printf(" message: %.*s\n", message->payloadlen, (char*)message->payload); MQTTClient_freeMessage(&message); MQTTClient_free(topicName); return 1; } //傳遞給MQTTClient_setCallbacks的回呼函式 連接例外斷開后呼叫此回呼函式 void connlost(void *context, char *cause) { printf("\nConnection lost\n"); printf(" cause: %s\n", cause); } //實作MQTT的訂閱 void *mqtt_subscribe() { MQTTClient client; //定義一個MQTT客戶端client MQTTClient_connectOptions conn_opts = MQTTClient_connectOptions_initializer; int rc; //初始化client,設定MQTT服務器的地址"tcp://192.168.50.81:1883" 并訂閱"ExampleClientSub" if ((rc = MQTTClient_create(&client, ADDRESS, CLIENTID_SUB, MQTTCLIENT_PERSISTENCE_NONE, NULL)) != MQTTCLIENT_SUCCESS) { printf("Failed to create client, return code %d\n", rc); rc = EXIT_FAILURE; goto exit; } //設定回呼函式, if ((rc = MQTTClient_setCallbacks(client, NULL, connlost, msgarrvd, delivered)) != MQTTCLIENT_SUCCESS) { printf("Failed to set callbacks, return code %d\n", rc); rc = EXIT_FAILURE; goto destroy_exit; } conn_opts.keepAliveInterval = 20; conn_opts.cleansession = 1; //連接服務器 if ((rc = MQTTClient_connect(client, &conn_opts)) != MQTTCLIENT_SUCCESS) { printf("Failed to connect, return code %d\n", rc); rc = EXIT_FAILURE; goto destroy_exit; } printf("Subscribing to topic %s\nfor client %s using QoS%d\n\n" "Press Q<Enter> to quit\n\n", SUB_TOPIC, CLIENTID_SUB, QOS); //訂閱主題 if ((rc = MQTTClient_subscribe(client, SUB_TOPIC, QOS)) != MQTTCLIENT_SUCCESS) { printf("Failed to subscribe, return code %d\n", rc); rc = EXIT_FAILURE; } else { int ch; do { ch = getchar(); } while (ch!='Q' && ch != 'q'); //等待輸入字符 'Q'或'q' if ((rc = MQTTClient_unsubscribe(client, SUB_TOPIC)) != MQTTCLIENT_SUCCESS) //取消訂閱 { printf("Failed to unsubscribe, return code %d\n", rc); rc = EXIT_FAILURE; } } if ((rc = MQTTClient_disconnect(client, 10000)) != MQTTCLIENT_SUCCESS) //斷開和服務器的連接 { printf("Failed to disconnect, return code %d\n", rc); rc = EXIT_FAILURE; } destroy_exit: MQTTClient_destroy(&client); //釋放客戶端的資源 exit: return NULL; } //實作MQTT的發布 void *mqtt_publish() { MQTTClient client; //定義一個MQTT客戶端 MQTTClient_connectOptions conn_opts = MQTTClient_connectOptions_initializer; MQTTClient_message pubmsg = MQTTClient_message_initializer; MQTTClient_deliveryToken token; char data[1024]; int rc; //初始化MQTT客戶端 if ((rc = MQTTClient_create(&client, ADDRESS, CLIENTID_PUB, MQTTCLIENT_PERSISTENCE_NONE, NULL)) != MQTTCLIENT_SUCCESS) { printf("Failed to create client, return code %d\n", rc); exit(EXIT_FAILURE); } conn_opts.keepAliveInterval = 20; conn_opts.cleansession = 1; //連接MQTT服務器 if ((rc = MQTTClient_connect(client, &conn_opts)) != MQTTCLIENT_SUCCESS) { printf("Failed to connect, return code %d\n", rc); exit(EXIT_FAILURE); } pubmsg.qos = QOS; pubmsg.retained = 0; while(1) { //判斷風扇標志位的值,封裝發送的訊息 if (fan_state == 0) { pubmsg.payload = FAN_ON; pubmsg.payloadlen = strlen(FAN_ON); fan_state = 1; } else { pubmsg.payload = FAN_OFF; pubmsg.payloadlen = strlen(FAN_OFF); fan_state = 0; } //回圈發送訊息,在"ScratchToSoftWare" 這個主題下發布訊息 if ((rc = MQTTClient_publishMessage(client, PUB_TOPIC, &pubmsg, &token)) != MQTTCLIENT_SUCCESS) { printf("Failed to publish message, return code %d\n", rc); exit(EXIT_FAILURE); } printf("Waiting for up to %d seconds for publication of %s\n" "on topic %s for client with ClientID: %s\n", (int)(TIMEOUT/1000), data, PUB_TOPIC, CLIENTID_PUB); rc = MQTTClient_waitForCompletion(client, token, TIMEOUT); printf("Message with delivery token %d delivered\n", token); usleep(5000000); } if ((rc = MQTTClient_disconnect(client, 10000)) != MQTTCLIENT_SUCCESS) printf("Failed to disconnect, return code %d\n", rc); MQTTClient_destroy(&client); return NULL; } int main(int argc, char **argv) { pthread_t threads[NUM_THREADS]; pthread_create(&threads[0], 0, mqtt_subscribe, NULL); pthread_create(&threads[1], 0, mqtt_publish, NULL); pause(); return 0; } -
編譯命令
$gcc main.c -lpthread -lpaho-mqtt3c -
效果展示

轉載請註明出處,本文鏈接:https://www.uj5u.com/qita/293808.html
標籤:其他
