一、前言
上一篇我們介紹了 MQTT 協議格式以及相關的特性:一文讀懂物聯網 MQTT 協議之基礎特性篇,這一篇我們就來實戰一番,理論得與實踐結合,方能吃透 MQTT,
我的那個讀者還提到了講一下 Mosquitto,這是一款開源訊息代理軟體,提供輕量級的,支持可發布/可訂閱的的訊息推送模式,使設備對設備之間的短訊息通信變得簡單,比如現在應用廣泛的低功耗傳感器,手機、嵌入式計算機、微型控制器等移動設備,
老周這就來帶大家在 CentOS 上搭建 Mosquitto 服務器,
二、搭建準備
Mosquitto 安裝版本:Mosquitto1.4.4
Mosquitto 各版本下載地址:https://mosquitto.org/files/source/
MQTT 協議參考網站:MQTT 3.1.1
libwebsockets下載地址:https://github.com/warmcat/libwebsockets/releases
CentOS 版本:CentOS 7.8.2003

2.1 軟體準備
從官網獲取安裝包:
wget http://mosquitto.org/files/source/mosquitto-1.4.14.tar.gz
2.2 安裝
tar -zxvf mosquitto-1.4.14.tar.gz
cd mosquitto-1.4.14
2.3 修改組態檔
config.mk 包括了多個選項, 可按需關倍訓開啟,但一旦開啟則需要先安裝對應的模塊,
vim config.mk
| 選項 | 說明 | make時的出錯資訊 |
|---|---|---|
| WITH_SRV | 啟用c-areas庫的支持,一個支持異步DNS查找的庫,見http://c-ares.haxx.se | missing ares.h |
| WITH_UUID | 啟用lib-uuid支持,支持為每個連接的客戶端生成唯一的uuid, | missing uuid.h |
| WITH_WEBSOCKETS | 啟用websocket支持,需安裝libwebsockets,對于需要使用websocket協議的應用開啟, | missing libwebsockets.h |
WITH_SRV:=yes
WITH_UUID:=yes
WITH_WEBSOCKETS:=yes
2.3.1 安裝 c-areas
yum install c-ares-devel -y
2.3.2 安裝 lib-uuid
yum install uuid-devel -y
yum install libuuid-devel -y
2.3.3 安裝 libwebsockets
cd ~
wget https://github.com/warmcat/libwebsockets/archive/v3.2.1.tar.gz
tar zxvf v3.2.1.tar.gz
cd libwebsockets-3.2.1
mkdir build
cd build
cmake .. -DLIB_SUFFIX=64
make install
ldconfig
cd mosquitto-1.4.14
yum install openssl-devel -y
2.4 編譯和安裝
make && make install
執行編譯 make 命令的時候,如果你的終端出現:

那就把把 WITH_WEBSOCKETS 從 yes 改成 no 后,就可以成功編譯了,
WITH_WEBSOCKETS:=yes
改成
WITH_WEBSOCKETS:=no
如果你的應用不需要 websocket 協議,可以把這個引數給設定 no 關掉,
如果終端出現的是這樣:

那么恭喜你,Mosquitto 安裝成功了,
2.5 說明
程式檔案將默認安裝到以下位置
| 路徑 | 程式檔案 |
|---|---|
| /usr/local/sbin | mosquiotto server |
| /etc/mosquitto | configuration |
| /usr/local/bin | utility command |
修正鏈接庫路徑
由于作業系統版本及架構原因,很容易出現安裝之后的鏈接庫無法被找到,如啟動 mosquitto 客戶端可能出現找不到 libmosquitto.so.1 檔案,因此需要添加鏈接庫路徑:
vim /etc/ld.so.conf.d/liblocal.conf
在檔案中添加以下內容:
/usr/local/lib64
/usr/local/lib
# 重繪
ldconfig
三、 Mosquitto Server 啟動與測驗
3.1 啟動
3.1.1 mosquitto 默認以 mosquitto 用戶啟動
可以通過組態檔修改,需添加用戶:
groupadd mosquitto
useradd -g mosquitto mosquitto
3.1.2 修改組態檔
mv /etc/mosquitto/mosquitto.conf.example /etc/mosquitto/mosquitto.conf
# =================================================================
# General configuration
# =================================================================
# 客戶端心跳的間隔時間
#retry_interval 20
# 系統狀態的重繪時間
#sys_interval 10
# 系統資源的回收時間,0表示盡快處理
#store_clean_interval 10
# 服務行程的PID
#pid_file /var/run/mosquitto.pid
# 服務行程的系統用戶
#user mosquitto
# 客戶端心跳訊息的最大并發數
#max_inflight_messages 10
# 客戶端心跳訊息快取佇列
#max_queued_messages 100
# 用于設定客戶端長連接的過期時間,默認永不過期
#persistent_client_expiration
# =================================================================
# Default listener
# =================================================================
# 服務系結的IP地址
#bind_address
# 服務系結的埠號
#port 1883
# 允許的最大連接數,-1表示沒有限制
#max_connections -1
# cafile:CA證書檔案
# capath:CA證書目錄
# certfile:PEM證書檔案
# keyfile:PEM密鑰檔案
#cafile
#capath
#certfile
#keyfile
# 必須提供證書以保證資料安全性
#require_certificate false
# 若require_certificate值為true,use_identity_as_username也必須為true
#use_identity_as_username false
# 啟用PSK(Pre-shared-key)支持
#psk_hint
# SSL/TSL加密演算法,可以使用“openssl ciphers”命令獲取
# as the output of that command.
#ciphers
# =================================================================
# Persistence
# =================================================================
# 訊息自動保存的間隔時間
#autosave_interval 1800
# 訊息自動保存功能的開關
#autosave_on_changes false
# 持久化功能的開關
persistence true
# 持久化DB檔案
persistence_file mosquitto.db
# 持久化DB檔案目錄
persistence_location /var/lib/mosquitto/
# =================================================================
# Logging
# =================================================================
# 4種日志模式:stdout、stderr、syslog、topic
# none 則表示不記日志,此配置可以提升些許性能
log_dest none
# 選擇日志的級別(可設定多項)
#log_type error
#log_type warning
#log_type notice
#log_type information
# 是否記錄客戶端連接資訊
#connection_messages true
# 是否記錄日志時間
#log_timestamp true
# =================================================================
# Security
# =================================================================
# 客戶端ID的前綴限制,可用于保證安全性
#clientid_prefixes
# 允許匿名用戶
#allow_anonymous true
# 用戶/密碼檔案,默認格式:username:password
#password_file
# PSK格式密碼檔案,默認格式:identity:key
#psk_file
# pattern write sensor/%u/data
# ACL權限配置,常用語法如下:
# 用戶限制:user <username>
# 話題限制:topic [read|write] <topic>
# 正則限制:pattern write sensor/%u/data
#acl_file
# =================================================================
# Bridges
# =================================================================
# 允許服務之間使用“橋接”模式(可用于分布式部署)
#connection <name>
#address <host>[:<port>]
#topic <topic> [[[out | in | both] qos-level] local-prefix remote-prefix]
# 設定橋接的客戶端ID
#clientid
# 橋接斷開時,是否清除遠程服務器中的訊息
#cleansession false
# 是否發布橋接的狀態資訊
#notifications true
# 設定橋接模式下,訊息將會發布到的話題地址
# $SYS/broker/connection/<clientid>/state
#notification_topic
# 設定橋接的keepalive數值
#keepalive_interval 60
# 橋接模式,目前有三種:automatic、lazy、once
#start_type automatic
# 橋接模式automatic的超時時間
#restart_timeout 30
# 橋接模式lazy的超時時間
#idle_timeout 60
# 橋接客戶端的用戶名
#username
# 橋接客戶端的密碼
#password
# bridge_cafile:橋接客戶端的CA證書檔案
# bridge_capath:橋接客戶端的CA證書目錄
# bridge_certfile:橋接客戶端的PEM證書檔案
# bridge_keyfile:橋接客戶端的PEM密鑰檔案
#bridge_cafile
#bridge_capath
#bridge_certfile
#bridge_keyfile
關于詳細配置可參考:http://mosquitto.org/man/mosquitto-conf-5.html
3.1.3 設定用戶名和密碼
將組態檔中 #allow_anonymous true 去掉注釋,設定為 false,#password_file 去掉注釋并添加密碼檔案保存的位置:
allow_anonymous false
password_file /etc/mosquitto/pwfile.example
mosquitto_passwd -c /etc/mosquitto/pwfile.example 用戶名
之后需輸入兩次密碼
注意如果想添加用戶
mosquitto_passwd -b /etc/mosquitto/pwfile.example 用戶名 密碼
同樣連續會提示連續輸入兩次密碼,注意第二次創建用戶時不用加 -c 如果加 -c 會把第一次創建的用戶覆寫,
3.1.4 啟動 mosquitto
mosquitto -c /etc/mosquitto/mosquitto.conf -d
成功將啟動并監聽 1883 埠
3.2 測驗
新建兩個 shell 視窗 A/B
A 訂閱主題:
mosquitto_sub -t 主題名 -h 主機IP -u 用戶名 -P 密碼
例如:mosquitto_sub -t topic-riemann -h localhost -u mosquitto -P mosquitto
B 推送訊息:
mosquitto_pub -t 主題名 -h 主機IP -m "訊息內容" -u 用戶名 -P 密碼
例如:mosquitto_pub -t topic-riemann -h localhost -m "hello,mqtt" -u mosquitto -P mosquitto
3.3 可能遇到的問題
如果你出現這個錯誤:
mosquitto_sub: error while loading shared libraries: libmosquitto.so.1: cannot open shared object file: No such file or directory
解決方法:
編譯完 mosquitto 之后,進入到 lib 目錄下,將編譯之后的 libmosquitto.so.1 拷貝到目錄 /usr/local/lib下,執行如下命令:
cp libmosquitto.so.1 /usr/local/lib
然后再執行命令:
sudo ln -s /usr/local/lib/libmosquitto.so.1 /usr/lib/libmosquitto.so.1
ldconfig
3.4 測驗結果


四、Java 實作 Mosquitto 客戶端
4.1 專案結構圖

4.2 添加 pom.xml
<parent>
<groupId>org.springframework.boot</groupId>
<artifactId>spring-boot-starter-parent</artifactId>
<version>2.1.6.RELEASE</version>
</parent>
<dependencies>
<dependency>
<groupId>org.springframework.boot</groupId>
<artifactId>spring-boot-starter-web</artifactId>
</dependency>
<dependency>
<groupId>org.springframework.boot</groupId>
<artifactId>spring-boot-devtools</artifactId>
</dependency>
<dependency>
<groupId>org.projectlombok</groupId>
<artifactId>lombok</artifactId>
<optional>true</optional>
</dependency>
<dependency>
<groupId>org.springframework.integration</groupId>
<artifactId>spring-integration-mqtt</artifactId>
<version>5.2.5.RELEASE</version>
</dependency>
</dependencies>
4.3 application.yml
mqtt:
host: tcp://服務器IP:1883
clientId: client_${random.value}
topic: test/system/module/biz
qoslevel: 1
username: mosquitto
password: mosquitto
timeout: 10000
keepalive: 20
server:
port: 8888
4.4 MqttConfig
/**
* @author: 微信公眾號【老周聊架構】
*/
@Slf4j
@Configuration
@IntegrationComponentScan
public class MqttConfig {
@Value("${mqtt.username}")
private String username;
@Value("${mqtt.password}")
private String password;
@Value("${mqtt.host}")
private String hostUrl;
@Value("${mqtt.clientId}")
private String clientId;
@Value("${mqtt.topic}")
private String defaultTopic;
// 連接超時
@Value("${mqtt.timeout}")
private int completionTimeout;
@Bean
public MqttConnectOptions getMqttConnectOptions() {
MqttConnectOptions mqttConnectOptions = new MqttConnectOptions();
mqttConnectOptions.setCleanSession(true);
mqttConnectOptions.setConnectionTimeout(10);
mqttConnectOptions.setKeepAliveInterval(90);
mqttConnectOptions.setAutomaticReconnect(true);
mqttConnectOptions.setUserName(username);
mqttConnectOptions.setPassword(password.toCharArray());
mqttConnectOptions.setServerURIs(new String[]{hostUrl});
mqttConnectOptions.setKeepAliveInterval(2);
return mqttConnectOptions;
}
@Bean
public MqttPahoClientFactory mqttClientFactory() {
DefaultMqttPahoClientFactory factory = new DefaultMqttPahoClientFactory();
factory.setConnectionOptions(getMqttConnectOptions());
return factory;
}
@Bean
@ServiceActivator(inputChannel = "mqttOutboundChannel")
public MessageHandler mqttOutbound() {
MqttPahoMessageHandler messageHandler = new MqttPahoMessageHandler(clientId, mqttClientFactory());
messageHandler.setAsync(true);
messageHandler.setDefaultTopic(defaultTopic);
return messageHandler;
}
@Bean
public MessageChannel mqttOutboundChannel() {
DirectChannel directChannel = new DirectChannel();
return directChannel;
}
// 接收通道
@Bean
public MessageChannel mqttInputChannel() {
DirectChannel directChannel = new DirectChannel();
return directChannel;
}
// 配置client,監聽的topic
@Bean
public MessageProducer inbound() {
MqttPahoMessageDrivenChannelAdapter adapter = new MqttPahoMessageDrivenChannelAdapter(clientId + "_inbound",
mqttClientFactory(), "test/#");
adapter.setCompletionTimeout(completionTimeout);
adapter.setConverter(new DefaultPahoMessageConverter());
adapter.setQos(1);
adapter.setOutputChannel(mqttInputChannel());
return adapter;
}
// 通過通道獲取資料
@Bean
@ServiceActivator(inputChannel = "mqttInputChannel")
public MessageHandler handler() {
return message -> {
String topic = (String)message.getHeaders().get("mqtt_receivedTopic");
log.info("主題:{},訊息接收到的資料:{}", topic, message.getPayload());
};
}
}
4.5 MqttGateWay
/**
* @author: 微信公眾號【老周聊架構】
*/
@MessagingGateway(defaultRequestChannel = "mqttOutboundChannel")
public interface MqttGateWay {
// 定義多載方法,用于訊息發送
void sendToMqtt(String payload);
// 指定topic進行訊息發送
void sendToMqtt(@Header(MqttHeaders.TOPIC) String topic, String payload);
void sendToMqtt(@Header(MqttHeaders.TOPIC) String topic, @Header(MqttHeaders.QOS) int qos, String payload);
}
4.6 MqttController 控制類
/**
* @author: 微信公眾號【老周聊架構】
*/
@Slf4j
@RestController
@RequestMapping("/api")
public class MqttController {
@Autowired
MqttGateWay mqttGateWay;
@PostMapping("/publish")
public String publish(@RequestHeader(value = "toplic") String toplic , String message) {
log.info(String.format("topic: %s, message: %s", toplic, message));
mqttGateWay.sendToMqtt(toplic, message);
return "success";
}
}
4.7 MqttApplication 啟動類
/**
* @author: 微信公眾號【老周聊架構】
*/
@SpringBootApplication
public class MqttApplication {
public static void main(String[] args) {
SpringApplication.run(MqttApplication.class, args);
}
}
4.8 啟動 mosquitto 服務器
mosquitto -c /etc/mosquitto/mosquitto.conf -d
4.9 利用 IDEA 的 HTTP Client 模擬 HTTP 請求


4.10 測驗結果
IDEA 控制臺接收到該主題的訊息:

shell 終端顯示也收到了訂閱了該主題的訊息:

轉載請註明出處,本文鏈接:https://www.uj5u.com/qita/291727.html
標籤:其他
上一篇:學習筆記之51單片機鍵盤篇(非編碼鍵盤與編碼鍵盤、非編碼鍵盤的掃描方式、獨立鍵盤、矩陣鍵盤)
下一篇:掃雷的實作
