最近專案中用到了,MQTT 實作 物聯網行業的訊息發布訂閱,因為MQTT小巧,耗費流量少,在物聯網中很受歡迎,在2G網路中就可以運行,MQTT 使用的是apache-apollo-1.7.1
**
安裝:
1.下載apache-apollo-1.7.1
地址:http://archive.apache.org/dist/activemq/activemq-apollo/1.7.1/
2.解壓,進入到D:\java\apache-apollo-1.7.1\bin 目錄下,執行命令
apollo.cmd create mybroker

3.進入到剛建好的目錄,mybroker/bin目錄,執行
apollo-broker.cmd run

成功!
看下目錄結構:


主要組態檔:
etc/apollo.xml
<?xml version="1.0" encoding="UTF-8" standalone="yes"?>
<!--
Licensed to the Apache Software Foundation (ASF) under one or more
contributor license agreements. See the NOTICE file distributed with
this work for additional information regarding copyright ownership.
The ASF licenses this file to You under the Apache License, Version
2.0 (the "License"); you may not use this file except in compliance
with the License. You may obtain a copy of the License at
http://www.apache.org/licenses/LICENSE-2.0 Unless required by
applicable law or agreed to in writing, software distributed under
the License is distributed on an "AS IS" BASIS, WITHOUT WARRANTIES
OR CONDITIONS OF ANY KIND, either express or implied. See the
License for the specific language governing permissions and
limitations under the License.
-->
<!--
For more information on how configure this file please
reference:
http://activemq.apache.org/apollo/versions/1.7.1/website/documentation/user-manual.html
-->
<broker xmlns="http://activemq.apache.org/schema/activemq/apollo">
<notes>
The default configuration with tls/ssl enabled.
</notes>
<log_category console="console" security="security" connection="connection" audit="audit"/>
<authentication domain="apollo"/>
<!-- Give admins full access -->
<access_rule allow="admins" action="*"/>
<access_rule allow="*" action="connect" kind="connector"/>
<virtual_host id="mybroker">
<!--
You should add all the host names that this virtual host is known as
to properly support the STOMP 1.1 virtual host feature.
-->
<host_name>mybroker</host_name>
<host_name>localhost</host_name>
<host_name>127.0.0.1</host_name>
<!-- Uncomment to disable security for the virtual host -->
<!-- <authentication enabled="false"/> -->
<!-- Uncomment to disable security for the virtual host -->
<!-- <authentication enabled="false"/> -->
<access_rule allow="users" action="connect create destroy send receive consume"/>
<!-- You can delete this element if you want to disable persistence for this virtual host -->
<leveldb_store directory="${apollo.base}/data"/>
</virtual_host>
<web_admin bind="http://127.0.0.1:61680"/>
<web_admin bind="https://127.0.0.1:61681"/>
<connector id="tcp" bind="tcp://0.0.0.0:61613" connection_limit="2000"/>
<connector id="tls" bind="tls://0.0.0.0:61614" connection_limit="2000"/>
<connector id="ws" bind="ws://0.0.0.0:61623" connection_limit="2000"/>
<connector id="wss" bind="wss://0.0.0.0:61624" connection_limit="2000"/>
<key_storage file="${apollo.base}/etc/keystore" password="password" key_password="password"/>
</broker>
etc/users.properties
## ---------------------------------------------------------------------------
## Licensed to the Apache Software Foundation (ASF) under one or more
## contributor license agreements. See the NOTICE file distributed with
## this work for additional information regarding copyright ownership.
## The ASF licenses this file to You under the Apache License, Version 2.0
## (the "License"); you may not use this file except in compliance with
## the License. You may obtain a copy of the License at
##
## http://www.apache.org/licenses/LICENSE-2.0
##
## Unless required by applicable law or agreed to in writing, software
## distributed under the License is distributed on an "AS IS" BASIS,
## WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
## See the License for the specific language governing permissions and
## limitations under the License.
## ---------------------------------------------------------------------------
#
# The list of users that can login. This file supports both plain text or
# encrypted passwords. Here is an example what an encrypted password
# would look like:
#
# admin=ENC(Cf3Jf3tM+UrSOoaKU50od5CuBa8rxjoL)
#
admin=Admin@1234
其中用戶名密碼是 admin/Admin@1234
登錄到后臺管理網址:http://127.0.0.1:61680/console/index.html

輸入剛才的admin/Admin@1234,進入到后臺管理界面,可以看到里面的訂閱主題等資訊:

附上一段,mqtt 快速啟動的 bat 代碼
@echo off
title mqtt-server
set ENV_HOME="E:\Program Files\apache-apollo-1.7.1-windows-distro\apache-apollo-1.7.1\bin\mybroker\bin"
E:
color 0a
cd %ENV_HOME%
apollo-broker.cmd run
exit
與SpringBoot2 整合
廢話少說,直接上代碼:
1.Maven 中pom.mxl 配置
<!--MQtt-->
<dependency>
<groupId>org.springframework.integration</groupId>
<artifactId>spring-integration-stream</artifactId>
</dependency>
<dependency>
<groupId>org.springframework.integration</groupId>
<artifactId>spring-integration-mqtt</artifactId>
</dependency>
2.MqttConfig 配置類
此類里面提供了一些,mqtt 發布,訂閱相關的配置
package com.dechnic.framework.config;
import org.eclipse.paho.client.mqttv3.MqttConnectOptions;
import org.springframework.beans.factory.annotation.Value;
import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Configuration;
import org.springframework.integration.annotation.IntegrationComponentScan;
import org.springframework.integration.annotation.ServiceActivator;
import org.springframework.integration.channel.DirectChannel;
import org.springframework.integration.core.MessageProducer;
import org.springframework.integration.mqtt.core.DefaultMqttPahoClientFactory;
import org.springframework.integration.mqtt.core.MqttPahoClientFactory;
import org.springframework.integration.mqtt.inbound.MqttPahoMessageDrivenChannelAdapter;
import org.springframework.integration.mqtt.outbound.MqttPahoMessageHandler;
import org.springframework.integration.mqtt.support.DefaultPahoMessageConverter;
import org.springframework.messaging.Message;
import org.springframework.messaging.MessageChannel;
import org.springframework.messaging.MessageHandler;
import org.springframework.messaging.MessagingException;
/**
* @description:
* @author:houqd
* @time: 2021/8/9 11:17
*/
@Configuration
@IntegrationComponentScan
public class MqttConfig {
@Value("${spring.mqtt.username}")
private String username;
@Value("${spring.mqtt.password}")
private String password;
@Value("${spring.mqtt.url}")
private String hostUrl;
@Value("${spring.mqtt.client.id}")
private String clientId;
@Value("${spring.mqtt.client.subScribTopics}")
private String[] subScribTopics;
@Value("${spring.mqtt.default.topic}")
private String defaultTopic;
@Value("${spring.mqtt.completionTimeOut}")
private int completionTimeOut ; //連接超時
@Bean
public MqttConnectOptions getMqttConnectionOptions() {
MqttConnectOptions mqttConnectOptions = new MqttConnectOptions();
mqttConnectOptions.setUserName(username);
mqttConnectOptions.setPassword(password.toCharArray());
mqttConnectOptions.setServerURIs(new String[]{hostUrl});
mqttConnectOptions.setKeepAliveInterval(2);
return mqttConnectOptions;
}
@Bean
public MqttPahoClientFactory mqttClientFactory() {
DefaultMqttPahoClientFactory factory = new DefaultMqttPahoClientFactory();
factory.setConnectionOptions(getMqttConnectionOptions());
return factory;
}
/**
* 發送通道
* @return
*/
@Bean
public MessageChannel mqttOutboundChannel(){
return new DirectChannel();
}
@Bean
//ServiceActivator注解表明當前方法用于處理MQTT訊息,inputChannel引數指定了用于接收訊息資訊的channel,
@ServiceActivator(inputChannel = "mqttOutboundChannel")
public MessageHandler mqttOutbound(){
MqttPahoMessageHandler messageHandler = new MqttPahoMessageHandler(clientId,mqttClientFactory());
messageHandler.setAsync(true);
messageHandler.setDefaultTopic(defaultTopic);
return messageHandler;
}
/**
* 接收通道
* @return
*/
@Bean
public MessageChannel mqttInputChannel() {
return new DirectChannel();
}
//配置client,監聽的topic
@Bean
public MessageProducer inbound(){
MqttPahoMessageDrivenChannelAdapter adapter = new MqttPahoMessageDrivenChannelAdapter(clientId+"_inbound",mqttClientFactory(),subScribTopics);
adapter.setCompletionTimeout(completionTimeOut);
adapter.setConverter(new DefaultPahoMessageConverter());
adapter.setQos(1);
adapter.setOutputChannel(mqttInputChannel());
return adapter;
}
//通過通道獲取資料
@Bean
@ServiceActivator(inputChannel = "mqttInputChannel")
public MessageHandler handler() {
return new MessageHandler() {
@Override
public void handleMessage(Message<?> message) throws MessagingException {
String topic = message.getHeaders().get("mqtt_receivedTopic").toString();
String type = topic.substring(topic.lastIndexOf("/")+1, topic.length());
if("hello".equalsIgnoreCase(topic)){
System.out.println("hello,XX,"+message.getPayload().toString());
}else if("hello1".equalsIgnoreCase(topic)){
System.out.println("hello1,XX,"+message.getPayload().toString());
}
}
};
}
}
3.application.yml 配置
# 資料源配置
spring:
datasource:
type: com.alibaba.druid.pool.DruidDataSource
driverClassName: com.mysql.cj.jdbc.Driver
druid:
# 主庫資料源
master:
url: jdbc:mysql://localhost:3306/ds_db_ctl?useUnicode=true&characterEncoding=utf8&zeroDateTimeBehavior=convertToNull&useSSL=true&serverTimezone=GMT%2B8
username: root
password: 123456
# 從庫資料源
slave:
# 從資料源開關/默認關閉
enabled: false
url:
username:
password:
# 初始連接數
initialSize: 5
# 最小連接池數量
minIdle: 10
# 最大連接池數量
maxActive: 20
# 配置獲取連接等待超時的時間
maxWait: 60000
# 配置間隔多久才進行一次檢測,檢測需要關閉的空閑連接,單位是毫秒
timeBetweenEvictionRunsMillis: 60000
# 配置一個連接在池中最小生存的時間,單位是毫秒
minEvictableIdleTimeMillis: 300000
# 配置一個連接在池中最大生存的時間,單位是毫秒
maxEvictableIdleTimeMillis: 900000
# 配置檢測連接是否有效
validationQuery: SELECT 1 FROM DUAL
testWhileIdle: true
testOnBorrow: false
testOnReturn: false
webStatFilter:
enabled: true
statViewServlet:
enabled: true
# 設定白名單,不填則允許所有訪問
allow:
url-pattern: /druid/*
# 控制臺管理用戶名和密碼
login-username: ruoyi
login-password: 123456
filter:
stat:
enabled: true
# 慢SQL記錄
log-slow-sql: true
slow-sql-millis: 1000
merge-sql: true
wall:
config:
multi-statement-allow: true
# redis 配置
redis:
# 地址
host: localhost
# 埠,默認為6379
port: 6379
# 資料庫索引
database: 0
# 密碼
password: asdffdsa
# 連接超時時間
timeout: 10s
lettuce:
pool:
# 連接池中的最小空閑連接
min-idle: 0
# 連接池中的最大空閑連接
max-idle: 8
# 連接池的最大資料庫連接數
max-active: 8
# #連接池最大阻塞等待時間(使用負值表示沒有限制)
max-wait: -1ms
mqtt:
username: admin
password: Admin@1234
url: tcp://127.0.0.1:61613
client:
id: mqttId
subScribTopics: Hello,Hello1
default:
topic: topic
completionTimeOut: 3000
# 日志配置
logging:
level:
com.dechnic: info
org.springframework: warn
4.MQTT 發布介面配置
package com.dechnic.framework.service;
import org.springframework.integration.annotation.MessagingGateway;
import org.springframework.integration.mqtt.support.MqttHeaders;
import org.springframework.messaging.handler.annotation.Header;
@MessagingGateway(defaultRequestChannel = "mqttOutboundChannel")
public interface MqttGateWay {
void sendToMqtt(String data,@Header(MqttHeaders.TOPIC) String topic);
}
5.測驗
package com.dechnic.apix.controller;
import com.dechnic.framework.service.MqttGateWay;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.web.bind.annotation.RequestMapping;
import org.springframework.web.bind.annotation.RestController;
/**
* @description:
* @author:houqd
* @time: 2021/8/9 10:47
*/
@RestController
@RequestMapping("/test")
public class TestMqttController {
@Autowired
private MqttGateWay mqttGateWay;
@RequestMapping("/sendMqtt.do")
public String sendMqtt(String sendData,String topic){
mqttGateWay.sendToMqtt(sendData,topic);
return "OK";
}
}
通過 PostMan 發布主題Hello1:

在后臺會收到訂閱訊息:

總結:1.通過maven 引入jar ,及 MqttConfig 組態檔中的
@Configuration
@IntegrationComponentScan 注解引入Mqtt 相關配置,
2.先配置channel ,在channel 的基礎上配置,發送端和接收端,MessageHandler及MessageProducer
3.MqttPahoMessageDrivenChannelAdapter 是MessageProducer的實作類,
4.@ServiceActivator(inputChannel = “mqttInputChannel”) 注解標識該方法是“訊息處理”方法,inputChannel 指明處理的訊息來源是哪個通道,inputChannel 是接收訊息,outChannel 是發送訊息,
轉載請註明出處,本文鏈接:https://www.uj5u.com/qita/293046.html
標籤:其他
上一篇:【“互聯網+”大賽華為云賽道】IoT命題攻略:僅需四步,輕松實作場景智能化設計
下一篇:1.物聯網的基礎知識
