MQTT--mosquitto實作發布與訂閱
- 一、MQTT是什么?
- 二、mosquitto
- 1.Ubuntu下安裝mosquitto
- 2.測驗mosquitto訂閱與發布
- 3.使用mosquitto庫函式實作上述訂閱與發布
- 三 、使用MQTT實作從樹莓派上獲得的溫度上傳到服務器中的資料庫中,
- 1.發布端:
- 2.訂閱端:
- 3.ds18d20.h
- 4.ds18b20.c
一、MQTT是什么?
MQTT簡介:https://www.runoob.com/w3cnote/mqtt-intro.html
二、mosquitto
mosquitto是一款實作了訊息推送協議 MQTT v3.1 的開源訊息代理軟體,提供輕量級的,支持可發布/可訂閱的的訊息推送模式,使設備對設備之間的短訊息通信變得簡單,
在實驗中使用mosquitto庫函式來實作訂閱與發布,
mosquitto庫函式原始碼:https://mosquitto.org/api/files/mosquitto-h.html
1.Ubuntu下安裝mosquitto
- wget下載原始碼包
wget http://mosquitto.org/files/source/mosquitto-1.5.5.tar.gz
2.解壓
tar -xzvf mosquitto-1.5.5.tar.gz
3.進入目錄
cd mosquitto-1.5.5/
4.編譯
make
5.運行安裝
sudo make install
可能出現問題及解決方法
【1】編譯找不到openssl/ssl.hsudo apt-get install libssl-dev
【2】編譯程序g++命令未找到:sudo apt-get install g++
【3】編譯程序找不到ares.hsudo apt-get install libc-ares-dev
【4】編譯程序找不到uuid/uuid.hsudo apt-get install uuid-dev
【5】使用程序中找不到libmosquitto.so.1 error while loading shared libraries: libmosquitto.so.1: cannot open shared object file: No such file or directory
【解決方法】——修改libmosquitto.so位置
創建鏈接sudo ln -s /usr/local/lib/libmosquitto.so.1 /usr/lib/libmosquitto.so.1
更新元件sudo ldconfig
參考:https://blog.csdn.net/caijiwyj/article/details/86671211
2.測驗mosquitto訂閱與發布
1.mosquitto broker配置
查看mosquitto組態檔在mosquitto檔案夾下vim mosquitto.conf
mosquitto組態檔默認即可,如有需求可按需修改,
2.啟動
mosquitto [-c config file] [ -d | --daemon ] [-p port number] [-v]
引數說明:
-c 后面跟的是啟動mosquitto可以調整的引數,比如是否開啟基本認證,埠是什么,SSL單向和雙向的認證配置等等,
-d 表示MQTT mosquitto將在后臺運行,
-p 代表當前的mosquitto服務實體啟動以后,其監聽埠號,這個配置的覆寫[-c config file] 指定的組態檔中的埠,
-v 代碼除錯模式(verbose)在終端輸出更多的資訊 ,
3.發布
mosquitto_sub 命令引數說明
1. -c 設定‘clean session’為無效狀態,這樣一直保持訂閱狀態,即便是已經失去連接,如果再次連接仍舊能夠接收的斷開期間發送的訊息,
2. -d 列印debug資訊
3. -h 指定要連接的域名 默認為localhost
4. -i 指定clientId
5. -I 指定clientId前綴
6. -k keepalive 每隔一段時間,發PING訊息通知broker,仍處于連接狀態, 默認為60秒,
7. -q 指定希望接收到QoS為什么的訊息 默認QoS為0
8. -R 不顯示陳舊的訊息
9. -t 訂閱topic
10. -v 列印訊息
11. --will-payload 指定一個訊息,該訊息當客戶端與broker意外斷開連接時發出,該引數需要與--will-topic一起使用
12. --will-qos Will的QoS值,該引數需要與--will-topic一起使用
13. --will-retain 指定Will訊息被當做一個retain訊息(即訊息被廣播后,該訊息被保留起來),該引數需要與--will-topic一起使用
14. --will-topic 用戶發送Will訊息的topic
4.訂閱
mosquitto_pub 命令引數說明
1. -d 列印debug資訊
2. -f 將指定檔案的內容作為發送訊息的內容
3. -h 指定要連接的域名 默認為localhost
4. -i 指定要給哪個clientId的用戶發送訊息
5. -I 指定給哪個clientId前綴的用戶發送訊息
6. -m 訊息內容
7. -n 發送一個空(null)訊息
8. -p 連接埠號
9. -q 指定QoS的值(0,1,2)
10. -t 指定topic
11. -u 指定broker訪問用戶
12. -P 指定broker訪問密碼
13. -V 指定MQTT協議版本
14. --will-payload 指定一個訊息,該訊息當客戶端與broker意外斷開連接時發出,該引數需要與--will-topic一起使用
15. --will-qos Will的QoS值,該引數需要與--will-topic一起使用
16. --will-retain 指定Will訊息被當做一個retain訊息(即訊息被廣播后,該訊息被保留起來),該引數需要與--will-topic一起使用
17. --will-topic 用戶發送Will訊息的topic
5.關閉
# 查看mosquitto服務行程ID
ps -aux | grep mosquitto
# 殺掉服務行程
# PID 為上一步查找的ID
kill -9 PID
例如:
第一個終端

新建一個終端

3.使用mosquitto庫函式實作上述訂閱與發布
之前給過mosquitto庫的API:https://mosquitto.org/api/files/mosquitto-h.html
下面介紹常用的一些函式,剩下的API讀者可依據上述鏈接自行學習,
int mosquitto_lib_init(void)
//功能:使用mosquitto庫函式前,要先初始化,使用之后就要清除,清除函式;int mosquitto_lib_cleanup();
//回傳值:MOSQ_ERR_SUCCESS 總是
int mosquitto_lib_cleanup(void)
//功能:使用完mosquitto函式之后,要做清除作業,
//回傳值: MOSQ_ERR_SUCCESS 總是
struct mosquitto *mosquitto_new( const char * id, bool clean_session, void * obj )
/*功能:創建一個新的mosquitto客戶端實體,新建客戶端,
引數:①id :用作客戶端ID的字串,如果為NULL,將生成一個隨機客戶端ID,如果id為NULL,clean_session必須為true,
②clean_session:設定為true以指示代理在斷開連接時清除所有訊息和訂閱,設定為false以指示其保留它們,客戶端將永遠不會在斷開連接時丟棄自己的傳出訊息,呼叫mosquitto_connect或mosquitto_reconnect將導致重新發送訊息,使mosquitto_reinitialise將客戶端重置為其原始狀態,如果id引數為NULL,則必須將其設定為true,
簡言之:就是斷開后是否保留訂閱資訊true/false
③obj: 用戶指標,將作為引數傳遞給指定的任何回呼,(回呼引數)
回傳:成功時回傳結構mosquitto的指標,失敗時回傳NULL,詢問errno以確定失敗的原因:
ENOMEM記憶體不足,
EINVAL輸入引數無效,
void mosquitto_destroy( struct mosquitto * mosq )
/*功能:釋放客戶端
引數:mosq: struct mosquitto指標
void mosquitto_connect_callback_set(struct mosquitto * mosq, void (*on_connect)(struct mosquitto *mosq, void *obj, int rc) )
/*功能:連接確認回呼函式,當代理發送CONNACK訊息以回應連接時,將呼叫此方法,
引數:mosq: struct mosquitto指標
void (*on_connect)(struct mosquitto *mosq , void *obj, int rc) 回呼函式 (引數:
mosq: struct mosquitto指標
obj:mosquitto_new中提供的用戶資料
rc: 連接回應的回傳碼,其中有:
0-成功
1-連接被拒絕(協議版本不可接受)
2-連接被拒絕(識別符號被拒絕)
3-連接被拒絕(經紀人不可用)
4-255-保留供將來使用
)
void mosquitto_disconnect_callback_set( struct mosquitto *mosq,void (*on_disconnect)( struct mosquitto *mosq,void *obj, int rc) );
/*功能:斷開連接回呼函式,當代理收到DISCONNECT命令并斷開與客戶端的連接,將呼叫此方法,
引數:mosq: struct mosquitto指標
void (*on_connect)(struct mosquitto *mosq , void *obj, int rc) 回呼函式 (引數:
mosq: struct mosquitto指標
obj:mosquitto_new中提供的用戶資料
rc:0表示客戶端已經呼叫mosquitto_disconnect,任何其他值,表示斷開連接時意外的,)
int mosquitto_connect( struct mosquitto * mosq, const char * host, int port, int keepalive )
/*功能: 連接到MQTT代理/服務器(主題訂閱要在連接服務器之后進行)
引數:①mosq : 有效的mosquitto實體,mosquitto_new()回傳的mosq.
②host : 服務器ip地址
③port:服務器的埠號
④keepalive:保持連接的時間間隔, 單位秒,如果在這段時間內沒有其他訊息交換,則代理應該將PING訊息發送到客戶端的秒數,
回傳:MOSQ_ERR_SUCCESS 成功,
MOSQ_ERR_INVAL 如果輸入引數無效,
MOSQ_ERR_ERRNO 如果系統呼叫回傳錯誤,變數errno包含錯誤代碼
int mosquitto_disconnect( struct mosquitto * mosq )
/*功能:斷開與代理/服務器的連接,
回傳:
MOSQ_ERR_SUCCESS 成功,
MOSQ_ERR_INVAL 如果輸入引數無效,
MOSQ_ERR_NO_CONN 如果客戶端未連接到代理,
int mosquitto_publish( struct mosquitto * mosq, int * mid, const char * topic, int payloadlen, const void * payload, int qos, bool retain )
/*功能:主題發布的函式
引數:①mosq:有效的mosquitto實體,客戶端
②mid:指向int的指標,如果不為NULL,則函式會將其設定為該特定訊息的訊息ID,然后可以將其與發布回呼一起使用,以確定何時發送訊息,請注意,盡管MQTT協議不對QoS = 0的訊息使用訊息ID,但libmosquitto為其分配了訊息ID,以便可以使用此引數對其進行跟蹤,
③topic:要發布的主題,以null結尾的字串
④payloadlen:有效負載的大小(位元組),有效值在0到268,435,455之間;主題訊息的內容長度
⑤payload: 主題訊息的內容,指向要發送的資料的指標,如果payloadlen >0,則它必須時有效的存盤位置,
⑥qos:整數值0、1、2指示要用于訊息的服務質量,
⑦retain:設定為true以保留訊息,
回傳:
MOSQ_ERR_SUCCESS 成功,
MOSQ_ERR_INVAL 如果輸入引數無效,
MOSQ_ERR_NOMEM 如果發生記憶體不足的情況,
MOSQ_ERR_NO_CONN 如果客戶端未連接到代理,
MOSQ_ERR_PROTOCOL 與代理進行通信時是否存在協議錯誤,
MOSQ_ERR_PAYLOAD_SIZE 如果payloadlen太大,
MOSQ_ERR_MALFORMED_UTF8 如果主題無效,則為UTF-8
MOSQ_ERR_QOS_NOT_SUPPORTED 如果QoS大于代理支持的QoS,
MOSQ_ERR_OVERSIZE_PACKET 如果結果包大于代理支持的包,
int mosquitto_subscribe( struct mosquitto * mosq, int * mid, const char * sub, int qos )
/*功能:訂閱主題函式
引數:①mosq:有效的mosquitto實體,客戶端
②mid: 指向int的指標,如果不為NULL,則函式會將其設定為該特定訊息的訊息ID,然后可以將其與訂閱回呼一起使用,以確定何時發送訊息,;主題的訊息ID
③sub: 主題名稱,訂閱模式,
④qos : 此訂閱請求的服務質量,
回傳值:
MOSQ_ERR_SUCCESS 成功,
MOSQ_ERR_INVAL 如果輸入引數無效,
MOSQ_ERR_NOMEM 如果發生記憶體不足的情況,
MOSQ_ERR_NO_CONN 如果客戶端未連接到代理,
MOSQ_ERR_MALFORMED_UTF8 如果主題無效,則為UTF-8
MOSQ_ERR_OVERSIZE_PACKET 如果結果包大于代理支持的包,
void mosquitto_message_callback_set( struct mosquitto * mosq, void (*on_message)(struct mosquitto *, void *, const struct mosquitto_message *) )
/*功能:訊息回呼函式,收到訂閱的訊息后呼叫,
引數:①mosq: 有效的mosquitto實體,客戶端,
②on_message 回呼函式,格式如下:void callback(struct mosquitto * mosq,void * obj,const struct mosquitto_message * message)
回呼的引數:
①mosq:進行回呼的mosquitto實體
②obj: mosquitto_new中提供的用戶資料
③message: 訊息資料,回呼完成后,庫將釋放此變數和關聯的記憶體,客戶應復制其所需要的任何資料,
struct mosquitto_message{
int mid;//訊息序號ID
char *topic; //主題
void *payload; //主題內容 ,MQTT 中有效載荷
int payloadlen; //訊息的長度,單位是位元組
int qos; //服務質量
bool retain; //是否保留訊息
};
int mosquitto_loop_forever( struct mosquitto * mosq, int timeout, int max_packets )
/*功能:此函式在無限阻塞回圈中為你呼叫loop(),對于只想在程式中運行MQTT客戶端回圈的情況,這很有用,如果服務器連接丟失,它將處理重新連接,如果在回呼中呼叫mosqitto_disconnect()它將回傳,
引數:①mosq: 有效的mosquitto實體,客戶端
②timeout: 超時之前,在select()呼叫中等待網路活動的最大毫秒數,設定為0以立即回傳,設定為負可使用默認值為1000ms,
③max_packets: 該引數當前未使用,應設為為1,以備來兼容
回傳值:
MOSQ_ERR_SUCCESS 成功,
MOSQ_ERR_INVAL 如果輸入引數無效,
MOSQ_ERR_NOMEM 如果發生記憶體不足的情況,
MOSQ_ERR_NO_CONN 如果客戶端未連接到代理,
MOSQ_ERR_CONN_LOST 如果與代理的連接丟失,
MOSQ_ERR_PROTOCOL 與代理進行通信時是否存在協議錯誤,
MOSQ_ERR_ERRNO 如果系統呼叫回傳錯誤,變數errno包含錯誤代碼
int mosquitto_loop_stop( struct mosquitto * mosq, bool force )
/*功能:網路事件阻塞回收結束處理函式,這是執行緒客戶端介面的一部分,呼叫一次可停止先前使用mosquitto_loop_start創建的網路執行緒,該呼叫將一直阻塞,直到網路執行緒結束,為了使網路執行緒結束,您必須事先呼叫mosquitto_disconnect或將force引數設定為true,
引數:①mosq :有效的mosquitto實體
②force:設定為true強制取消執行緒,如果為false,則必須已經呼叫mosquitto_disconnect,
回傳:
MOSQ_ERR_SUCCESS 成功,
MOSQ_ERR_INVAL 如果輸入引數無效,
MOSQ_ERR_NOT_SUPPORTED 如果沒有執行緒支持,
int mosquitto_loop_start( struct mosquitto * mosq )
/*功能:網路事件回圈處理函式,通過創建新的執行緒不斷呼叫mosquitto_loop() 函式處理網路事件,不阻塞
回傳:
MOSQ_ERR_SUCCESS 成功,
MOSQ_ERR_INVAL 如果輸入引數無效,
MOSQ_ERR_NOT_SUPPORTED 如果沒有執行緒支持,
我本人遇到的難點是用戶指標viod *obj的使用,我在專案原始碼中有使用到,另一個就是函式指標(其指向回呼函式)的使用,建議閱讀這篇文章(對于理解mosquitto原始碼也很有幫助):https://blog.csdn.net/weixin_53361650/article/details/119581149
下面寫兩段簡單的代碼來初步建立使用mosquitto庫函式編程的基本流程:
流程圖:

發布端原始碼:
/*********************************************************************************
*
* Filename: pub.c
* Description: This file
*
* Version: 1.0.0(26/09/21)
* Author: lastbreath <2631336290@qq.com>
* ChangeLog: 1, Release initial version on "26/09/21 12:46:58"
*
********************************************************************************/
#include <stdio.h>
#include <stdlib.h>
#include <unistd.h>
#include <string.h>
#include "mosquitto.h"
#include <errno.h>
#define HOST "localhost"
#define PORT 1883
#define KEEP_ALIVE 60
#define MSG_MAX_SIZE 512
static int running = 1;
void my_connect_callback(struct mosquitto *mosq,void *obj,int rc)
{
printf("Call the function: my_connect_callback.\n");
}
void my_disconnect_callback(struct mosquitto *mosq,void *obj,int rc)
{
printf("Call the function: my_disconnect_callback.\n");
running = 0;
}
void my_publish_callback(struct mosquitto *mosq,void *obj,int mid)
{
printf("Call the function: my_publish_callback.\n");
}
int main (int argc, char **argv)
{
int rv;
struct mosquitto *mosq;
char buff[MSG_MAX_SIZE];
mosquitto_lib_init();
mosq = mosquitto_new("pub_test",true,NULL);
if(mosq == NULL)
{
printf("New pub_test error: %s\n",strerror(errno));
mosquitto_lib_cleanup();
return -1;
}
mosquitto_connect_callback_set(mosq,my_connect_callback);
mosquitto_disconnect_callback_set(mosq,my_disconnect_callback);
mosquitto_publish_callback_set(mosq,my_publish_callback);
rv = mosquitto_connect(mosq,HOST,PORT,KEEP_ALIVE);
if(rv)
{
printf("Connect server error: %s\n",strerror(errno));
mosquitto_destroy(mosq);
mosquitto_lib_cleanup();
return -1;
}
printf("Start!\n");
int loop = mosquitto_loop_start(mosq);
if(loop)
{
printf("mosquitto loop error: %s\n",strerror(errno));
return 1;
}
while(fgets(buff,MSG_MAX_SIZE,stdin)!=NULL)
{
mosquitto_publish(mosq,NULL,"test",strlen(buff)+1,buff,0,0);
memset(buff,0,sizeof(buff));
}
mosquitto_destroy(mosq);
mosquitto_lib_cleanup();
printf("End!\n");
return 0;
}
訂閱端原始碼:
/*********************************************************************************
* Copyright: (C) 2021 lastbreath<2631336290@qq.com>
* All rights reserved.
*
* Filename: sub.c
* Description: This file
*
* Version: 1.0.0(09/30/2021)
* Author: lastbreath <2631336290@qq.com>
* ChangeLog: 1, Release initial version on "09/30/2021 08:40:22 PM"
*
********************************************************************************/
#include <stdio.h>
#include <stdlib.h>
#include <string.h>
#include "mosquitto.h"
#define HOST "localhost"
#define PORT 1883
#define KEEP_ALIVE 60
#define MSG_MAX_SIZE 512
static int running = 1;
void my_connect_callback(struct mosquitto *mosq,void *obj,int rc)
{
printf("Call the function:on_connect\n");
if(rc)
{
printf("on_connect error!\n");
exit(1);
}
else
{
if(mosquitto_subscribe(mosq,NULL,"test",2))
{
printf("Set the topic error!\n");
exit(1);
}
}
}
void my_disconnect_callback(struct mosquitto *mosq,void *obj,int rc)
{
printf("Call the function: my_disconnect_callback\n");
running = 0;
}
void my_subscribe_callback(struct mosquitto *mosq,void *obj,int mid,int qos_count,const int *granted_qos)
{
printf("Call the function: on_subscribe\n");
}
void my_message_callback(struct mosquitto *mosq, void *obj, const struct mosquitto_message *msg)
{
printf("Call the function: on_message\n");
printf("Recieve a message of %s: %s\n.",(char *)msg->topic,(char *)msg->payload);
if(0 == strcmp(msg->payload,"quit")){
mosquitto_disconnect(mosq);
}
}
int main (int argc, char **argv)
{
int ret;
struct mosquitto *mosq;
ret = mosquitto_lib_init();
mosq = mosquitto_new("sub_test",true,NULL);
if(mosq == NULL)
{
printf("New sub_test error!\n");
mosquitto_lib_cleanup();
return -1;
}
mosquitto_connect_callback_set(mosq,my_connect_callback);
mosquitto_disconnect_callback_set(mosq,my_disconnect_callback);
mosquitto_subscribe_callback_set(mosq,my_subscribe_callback);
mosquitto_message_callback_set(mosq,my_message_callback);
ret = mosquitto_connect(mosq,HOST,PORT,KEEP_ALIVE);
if(ret)
{
printf("Connect server error!\n");
mosquitto_destroy(mosq);
mosquitto_lib_cleanup();
return -1;
}
printf("Start!\n");
while(running)
{
mosquitto_loop(mosq,-1,1);
//mosquitto_loop_start(mosq);
}
mosquitto_destroy(mosq);
mosquitto_lib_cleanup();
printf("End!\n");
return 0;
}
makefie
all:
gcc -o sub sub.c -lmosquitto
gcc -o pub pub.c -lmosquitto
sub:
gcc -o sub sub.c -lmosquitto
pub:
gcc -o pub pub.c -lmosquitto
clean:
-rm sub pub
運行結果:

三 、使用MQTT實作從樹莓派上獲得的溫度上傳到服務器中的資料庫中,
到這里就可以將上述框架放入專案代碼中了,只需要修改之前專案中相關socket代碼部分即可,
1.發布端:
/*********************************************************************************
* Copyright: (C) 2021 lastbreath<2631336290@qq.com>
* All rights reserved.
*
* Filename: mqttpub.c
* Description: This file
*
* Version: 1.0.0(08/08/2021)
* Author:
*
* lastbreath <2631336290@qq.com>
* ChangeLog: 1, Release initial version on "08/08/2021 05:08:41 PM"
*
********************************************************************************/
#include <stdio.h>
#include <string.h>
#include <stdlib.h>
#include <errno.h>
#include <unistd.h>
#include <getopt.h>
#include <sys/types.h>
#include <sys/socket.h>
#include <arpa/inet.h>
#include <netinet/in.h>
#include <fcntl.h>
#include <sys/resource.h>
#include <netdb.h>
#include "mosquitto.h"
#include <libgen.h>
#include "ds18b20.h"
#include <time.h>
#include <signal.h>
#define BUF_SZIE 1024
#define ARRY_SIZE(x) (sizeof(x)/sizeof(x[0]))
#define KEEP_ALIVE 60
int g_stop = 0;
static inline void print_usage(char *progname);
void sig_handler(int sig_num)
{
if(sig_num == SIGUSR1)
g_stop = 1;
}
int main (int argc, char **argv)
{
char *progname = basename(argv[0]);
int port;
char *ip = NULL;
char *hostname = NULL;
char *topic = NULL;
struct hostent *hostentp;
struct mosquitto *mosq = NULL;
int daemon_run = 0;
int opt;
int rv;
float temperature = 0;
char buffer[BUF_SZIE];
time_t t;
struct tm *timep;
int log_fd;
int connect_flag = 1;
struct option options[] =
{
{"daemon",no_argument,NULL,'d'},
{"topic", required_argument,NULL,'t'},
{"hostname", required_argument,NULL,'n'},
{"ip", required_argument, NULL, 'i'},
{"port",required_argument,NULL,'p'},
{"help",no_argument,NULL,'h'},
{NULL,0,NULL,0}
};
while((opt = getopt_long(argc,argv,"dhp:t:i:n:",options,NULL)) != -1)
{
switch(opt)
{
case 't':
topic = optarg;
break;
case 'i':
ip = optarg;
break;
case 'n':
hostname = optarg;
break;
case 'd':
daemon_run = 1;
break;
case 'p':
port = atoi(optarg);
break;
case 'h':
print_usage(argv[0]);
return 0;
default:
break;
}
}
if(!port)
{
print_usage(progname);
return -1;
}
if(!hostname && !ip)
{
print_usage(progname);
return -1;
}
if(hostname)
{
hostentp = gethostbyname(hostname);
if(!hostentp)
{
printf("Failed to get host by name: %s\n",strerror(errno));
return -2;
}
printf("hostname: %s\n",hostentp->h_name);
ip = inet_ntoa(*(struct in_addr *)hostentp->h_addr);
printf("address: %s\n",ip);
}
if(!topic)
{
topic = "temperature";
}
if(daemon_run)
{
printf("program %s running in backgrund\n", progname);
if( (log_fd = open("client.log", O_CREAT|O_RDWR, 0666)) < 0)
{
printf("open() failed:%s\n", strerror(errno)) ;
return -2;
}
dup2(log_fd, STDOUT_FILENO) ;
dup2(log_fd, STDERR_FILENO) ;
daemon(1,1);
}
signal(SIGUSR1,sig_handler);
mosquitto_lib_init();
mosq = mosquitto_new(NULL,true,NULL);
if(!mosq)
{
printf("mosquitto_new() failed: %s\n",strerror(errno));
mosquitto_lib_cleanup();
return -1;
}
printf("Create mosquitto successfully!\n");
if(mosquitto_connect(mosq,ip,port,KEEP_ALIVE) != MOSQ_ERR_SUCCESS)
{
printf("mosquitto_connect() failed: %s\n",strerror(errno));
mosquitto_destroy(mosq);
mosquitto_lib_cleanup();
return -1;
}
printf("connect %s:%d successfully!\n",ip,port);
if(mosquitto_loop_start(mosq) != MOSQ_ERR_SUCCESS)
{
printf("mosquitto_loop_start() failed: %s\n",strerror(errno));
mosquitto_destroy(mosq);
mosquitto_lib_cleanup();
}
while(!g_stop)
{
if(ds18b20_get_temperature(&temperature) < 0)
{
printf("ds18b20_get_temperature failed.\n");
return -3;
}
time(&t);
char * str = ctime(&t);//此字串后有回車換行及‘/n’,所以要去掉
char tempbuff[25];
memset(tempbuff,0,25);
memcpy(tempbuff,str,24);
snprintf(buffer,ARRY_SIZE(buffer),"%s%f",tempbuff,temperature);
printf("%s\n",buffer);
if(mosquitto_publish(mosq,NULL,topic,strlen(buffer),buffer,0,0) != MOSQ_ERR_SUCCESS)
{
printf("mosquitto_publish() failed: %s\n",strerror(errno));
mosquitto_destroy(mosq);
mosquitto_lib_cleanup();
}else
{
printf("Publish information of temperature Ok!\n") ;
}
sleep(30);
}
close(log_fd);
return 0;
}
void print_usage(char *progname)
{
printf("%s usage:\n",progname);
printf("-p(--port): sepcify server listen port.\n");
printf("-h(--Help): print this help information.\n");
printf("-d(--daemon): set program running on background.\n");
printf("\nExample: %s -d -p 8889\n",progname);
}
2.訂閱端:
/*********************************************************************************
* Copyright: (C) 2021 lastbreath<2631336290@qq.com>
* All rights reserved.
*
* Filename: mqttsub.c
* Description: This file
*
* Version: 1.0.0(08/08/2021)
* Author:
*
* lastbreath <2631336290@qq.com>
* ChangeLog: 1, Release initial version on "08/08/2021 05:08:41 PM"
*
********************************************************************************/
#include <stdio.h>
#include <string.h>
#include <stdlib.h>
#include <errno.h>
#include <unistd.h>
#include <getopt.h>
#include <sys/types.h>
#include <sys/socket.h>
#include <arpa/inet.h>
#include <netinet/in.h>
#include <fcntl.h>
#include <sys/resource.h>
#include <mysql.h>
#include "mosquitto.h"
#include <signal.h>
#include <libgen.h>
#include <netdb.h>
#define MSG_MAX_SZIE 1024
#define ARRY_SIZE(x) (sizeof(x)/sizeof(x[0]))
#define KEEP_ALIVE 60
static inline void print_usage(char *progname);
int dboperate(char *buffer);
int g_stop = 0;
void my_message_calllback(struct mosquitto *mosq,void *obj,const struct mosquitto_message *msg);
void sig_handler(int sig_num)
{
if(sig_num == SIGUSR1)
g_stop = 1;
}
int main (int argc, char **argv)
{
char *progname = basename(argv[0]);
char *ip = NULL;
char *hostname = NULL;
char *topic = NULL;
struct hostent *hostentp;
struct mosquitto *mosq = NULL;
int connect_flag = 1;
int log_fd;
int port;
int daemon_run = 0;
int opt;
char buffer[MSG_MAX_SZIE];
//char *obj = buffer;
struct option options[] =
{
{"daemon",no_argument,NULL,'d'},
{"port",required_argument,NULL,'p'},
{"topic", required_argument,NULL,'t'},
{"hostname", required_argument,NULL,'n'},
{"ip", required_argument, NULL, 'i'},
{"help",no_argument,NULL,'h'},
{NULL,0,NULL,0}
};
while((opt = getopt_long(argc,argv,"dhp:t:i:n:",options,NULL)) != -1)
{
switch(opt)
{
case 't':
topic = optarg;
break;
case 'i':
ip = optarg;
break;
case 'n':
hostname = optarg;
break;
case 'd':
daemon_run = 1;
break;
case 'p':
port = atoi(optarg);
break;
case 'h':
print_usage(argv[0]);
return 0;
default:
break;
}
}
if(!port)
{
print_usage(progname);
return -1;
}
if(!hostname && !ip)
{
print_usage(progname);
return -1;
}
if(hostname)
{
hostentp = gethostbyname(hostname);
if(!hostname)
{
printf("Failed to get host by name: %s\n",strerror(errno));
return -2;
}
printf("hostname: %s\n",hostentp->h_name);
ip = inet_ntoa(*(struct in_addr *)hostentp->h_addr);
printf("address: %s\n",ip);
}
if(!topic)
{
topic = "temperature";
}
if(daemon_run)
{
printf("program %s running in background\n",progname);
if((log_fd = open("server.log",O_CREAT|O_RDWR,0666)) < 0)
{
printf("open() server.log failed: %s\n",strerror(errno));
return -2;
}
dup2(log_fd,STDOUT_FILENO);
dup2(log_fd,STDERR_FILENO);
daemon(1,1);
}
signal(SIGUSR1,sig_handler);
mosquitto_lib_init();
mosq = mosquitto_new(NULL,true,(void *)buffer);//obj 用戶指標的使用在這里
if(!mosq)
{
printf("failed to new mosquitto: %s\n",strerror(errno));
mosquitto_lib_cleanup();
return -1;
}
printf("new mosq successfully!\n");
mosquitto_message_callback_set(mosq,my_message_calllback);
if(mosquitto_connect(mosq,ip,port,KEEP_ALIVE) != MOSQ_ERR_SUCCESS)
{
printf("mosquitto_connect() failed: %s\n",strerror(errno));
mosquitto_destroy(mosq);
mosquitto_lib_cleanup();
return -2;
}
printf("mosq connect successfully!\n");
if((mosquitto_subscribe(mosq,NULL,topic,2)) != MOSQ_ERR_SUCCESS)
{
printf("failed to subscribe from broker: %s\n",strerror(errno));
mosquitto_destroy(mosq);
mosquitto_lib_cleanup();
return -3;
}
printf("mosq subscribe successfully!\n");
while(!g_stop)
{
mosquitto_loop(mosq,-1,1);
}
mosquitto_destroy(mosq);
mosquitto_lib_cleanup();
return 0;
}
void print_usage(char *progname)
{
printf("%s usage:\n",progname);
printf("-p(--port): sepcify server listen port.\n");
printf("-h(--Help): print this help information.\n");
printf("-d(--daemon): set program running on background.\n");
printf("\nExample: %s -d -p 8889\n",progname);
}
int dboperate(char *buffer)
{
MYSQL mysql,*sock;
MYSQL_RES *res;
MYSQL_FIELD *fd;
MYSQL_ROW row;
char qbuf[1024];
char str1[25];//字符陣列存字串時 要多一個位元組 來存放‘/0’
memset(str1,0,25);
memcpy(str1,buffer,24);
char str2[1024];
memset(str2,0,1024);
memcpy(str2,buffer+24,1024-24);
//printf("%d\n",222);
printf("%s\n",str1);
//printf("%d\n",224);
printf("%s\n",str2);
float t = atof(str2);
mysql_init(&mysql);
sock = mysql_real_connect(&mysql,"localhost","root","12345678","mydb",0,NULL,0);
if(!sock)
{
fprintf(stderr,"Couldn't connect to database!: %s\n",mysql_error(&mysql));
exit(1);
}
memset(qbuf,0,ARRY_SIZE(qbuf));
sprintf(qbuf,"insert temperature(subtime,temperature) values('%s',%f)",str1,t);
if(mysql_query(sock,qbuf))// when mysql_query() excute seccessfully it return 0;
{
fprintf(stderr,"Excute failed!: %s\n",mysql_error(sock));
exit(1);
}
printf("write into database successfully!\n");
return 0;
}
void my_message_calllback(struct mosquitto *mosq,void *obj,const struct mosquitto_message *msg)
{
memset((char *)obj,0,MSG_MAX_SZIE);
memcpy((char *)obj,(char *)msg->payload,MSG_MAX_SZIE);
printf("%s\n",(char *)obj);
dboperate((char *)obj);
}
3.ds18d20.h
#ifndef _DS18_H_
#define _DS18_H_
#include <stdio.h>
#include <string.h>
#include <sys/types.h>
#include <sys/stat.h>
#include <fcntl.h>
#include <errno.h>
#include <unistd.h>
#include <dirent.h>
#include <stdlib.h>
#define PATH "/sys/bus/w1/devices/"
#define BUF_SIZE 1024
extern int ds18b20_get_temperature(float *t);
#endif
4.ds18b20.c
#include "ds18b20.h"
int ds18b20_get_temperature(float *t)
{
DIR *dirp;
struct dirent *direntp;
char path[1024];
int fd;
int rv;
char buffer[BUF_SIZE];
char *result;
if((dirp = opendir(PATH)) == NULL)
{
printf("Failed to open the destination directory: %s\n",strerror(errno));
return -1;
}
while((direntp = readdir(dirp)) != NULL)
{
if(strstr(direntp->d_name,"28-") != NULL)
{
break;
}
}
if(direntp == NULL)
{
printf("Failed to find the destination directory: 28-*** .\n");
closedir(dirp);
return -2;
}
memset(path,0,sizeof(path));
strcat(path,PATH);
strcat(path,direntp->d_name);
strcat(path,"/w1_slave");
closedir(dirp);
if((fd = open(path,O_RDONLY)) < 0)
{
printf("Failed to open the destination file.\n");
return -3;
}
memset(buffer,0,sizeof(buffer));
while((rv = read(fd,buffer,sizeof(buffer))) > 0)
{
if((result = strstr(buffer,"t=")) != NULL)
{
result = result + 2;
break;
}
}
if(rv <= 0)
{
printf("Failed to read the temperature.\n");
close(fd);
return -4;
}
*t = atof(result)/1000;
close(fd);
return 0;
}
#if 0
int main()
{
float t;
ds18b20_get_temperature(&t);
printf("%f\n",t);
}
#endif
makefile
all:
@echo ""
@echo "Start compiling..."
@echo ""
gcc -o mqttpub mqttpub.c ds18b20.c -lmosquitto
gcc -o mqttsub mqttsub.c -I/usr/include/mysql -L/usr/lib/mysql -lmosquitto -lmysqlclient
@echo "end"
clean:
rm mqttpub mqttsub
runsub:
./mqttsub -p 1883 -n lastbreath.club
runpub:
./mqttpub -p 1883 -n lastbreath.club
運行截圖:

查看資料庫:

注意:
中間涉及到的資料庫操作參考:(還在寫博客)
轉載請註明出處,本文鏈接:https://www.uj5u.com/qita/321411.html
標籤:其他
上一篇:小熊派開發筆記-串口讀取RS485輸出的土壤七合一傳感器資料(基于STM32CubeMX)
下一篇:GP8101PAC芯片簡介
