mqtt服務,是一個訊息中間件,目前在物聯網領域使用非常廣泛,
mqtt服務器的搭建,也有很多方式,最直接的就是使用docker啟動emqx鏡像,
docker run -d --name emqx -p 18083:18083 -p 1883:1883 emqx
通過url訪問webui,http://192.168.56.100:18303 用戶名密碼:admin/public

mqtt與springboot結合
引入maven依賴
<parent>
<groupId>org.springframework.boot</groupId>
<artifactId>spring-boot-starter-parent</artifactId>
<version>2.4.3</version>
</parent>
<dependencies>
<dependency>
<groupId>junit</groupId>
<artifactId>junit</artifactId>
<scope>test</scope>
</dependency>
<dependency>
<groupId>org.springframework.boot</groupId>
<artifactId>spring-boot-starter-web</artifactId>
</dependency>
<dependency>
<groupId>org.springframework.boot</groupId>
<artifactId>spring-boot-starter-integration</artifactId>
</dependency>
<dependency>
<groupId>org.springframework.integration</groupId>
<artifactId>spring-integration-stream</artifactId>
</dependency>
<dependency>
<groupId>org.springframework.integration</groupId>
<artifactId>spring-integration-mqtt</artifactId>
</dependency>
<dependency>
<groupId>org.eclipse.paho</groupId>
<artifactId>org.eclipse.paho.client.mqttv3</artifactId>
<version>1.2.5</version>
</dependency>
</dependencies>
設定組態檔application.yml
mqtt:
serverURIs: tcp://192.168.226.100:1883
username: admin
password: public
client:
id: ${random.value}
topic: topic_default
設定配置類 MqttConfig.java
package com.xxx.mqttapp.config;
import java.util.Objects;
import javax.annotation.PostConstruct;
import org.eclipse.paho.client.mqttv3.MqttConnectOptions;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.springframework.beans.factory.annotation.Value;
import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Configuration;
import org.springframework.integration.annotation.IntegrationComponentScan;
import org.springframework.integration.annotation.ServiceActivator;
import org.springframework.integration.channel.DirectChannel;
import org.springframework.integration.core.MessageProducer;
import org.springframework.integration.mqtt.core.DefaultMqttPahoClientFactory;
import org.springframework.integration.mqtt.core.MqttPahoClientFactory;
import org.springframework.integration.mqtt.inbound.MqttPahoMessageDrivenChannelAdapter;
import org.springframework.integration.mqtt.outbound.MqttPahoMessageHandler;
import org.springframework.integration.mqtt.support.DefaultPahoMessageConverter;
import org.springframework.messaging.MessageChannel;
import org.springframework.messaging.MessageHandler;
@IntegrationComponentScan
@Configuration
public class MqttConfig {
public static final Logger log = LoggerFactory.getLogger(MqttConfig.class);
public static final String OUTBOUND_CHANNEL = "mqttOutboundChannel";
public static final String INPUT_CHANNEL = "mqttInputChannel";
@Value("${mqtt.username}")
private String username;
@Value("${mqtt.password}")
private String password;
@Value("${mqtt.serverURIs}")
private String hostUrl;
@Value("${mqtt.client.id}")
private String clientId;
@Value("${mqtt.topic}")
private String defaultTopic;
@PostConstruct
public void init() {
log.debug("username:{} password:{} hostUrl:{} clientId :{} ",
this.username, this.password, this.hostUrl, this.clientId, this.defaultTopic);
}
@Bean
public MqttPahoClientFactory clientFactory() {
final MqttConnectOptions options = new MqttConnectOptions();
options.setServerURIs(new String[]{hostUrl});
options.setUserName(username);
options.setPassword(password.toCharArray());
final DefaultMqttPahoClientFactory factory = new DefaultMqttPahoClientFactory();
factory.setConnectionOptions(options);
return factory;
}
@Bean(value = OUTBOUND_CHANNEL)
public MessageChannel mqttOutboundChannel() {
return new DirectChannel();
}
@Bean
@ServiceActivator(inputChannel = OUTBOUND_CHANNEL)
public MessageHandler mqttOutbound() {
final MqttPahoMessageHandler handler = new MqttPahoMessageHandler(clientId, clientFactory());
handler.setDefaultQos(1);
handler.setDefaultRetained(false);
handler.setDefaultTopic(defaultTopic);
handler.setAsync(false);
handler.setAsyncEvents(false);
return handler;
}
@Bean
public MessageChannel mqttInputChannel() {
return new DirectChannel();
}
@Bean
public MessageProducer inbound() {
MqttPahoMessageDrivenChannelAdapter adapter =
new MqttPahoMessageDrivenChannelAdapter(
clientId + "_inbound", clientFactory(), defaultTopic);
adapter.setCompletionTimeout(3000);
adapter.setConverter(new DefaultPahoMessageConverter());
adapter.setQos(1);
adapter.setOutputChannel(mqttInputChannel());
return adapter;
}
@Bean
@ServiceActivator(inputChannel = INPUT_CHANNEL)
public MessageHandler handler() {
return message -> {
String topic = Objects.requireNonNull(message.getHeaders().get("mqtt_receivedTopic")).toString();
log.info("topic: {},payload : {}", topic,message.getPayload().toString());
};
}
}
添加發送介面MqttGateway.java
package com.xxx.mqttapp.service;
import org.springframework.integration.annotation.MessagingGateway;
import org.springframework.integration.mqtt.support.MqttHeaders;
import org.springframework.messaging.handler.annotation.Header;
import com.xxx.mqttapp.config.MqttConfig;
@MessagingGateway(defaultRequestChannel = MqttConfig.OUTBOUND_CHANNEL)
public interface MqttGateway {
public void sendToMqtt(@Header(MqttHeaders.TOPIC)String topic,String payload);
}
定義發送控制器
package com.xxx.mqttapp.web;
import java.util.HashMap;
import java.util.Map;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.web.bind.annotation.PostMapping;
import org.springframework.web.bind.annotation.RequestMapping;
import org.springframework.web.bind.annotation.RestController;
import com.xxx.mqttapp.service.MqttGateway;
@RequestMapping("/test")
@RestController
public class TestController {
@Autowired
private MqttGateway mqttGateway;
@PostMapping("/sendMessage")
public Map<String, Object> sendMessage(String topic,String payload){
Map<String, Object> result = new HashMap<String, Object>();
mqttGateway.sendToMqtt(topic, payload);
result.put("topic", topic);
result.put("payload", payload);
return result;
}
}
springboot啟動類
package com.xxx.mqttapp;
import org.springframework.boot.SpringApplication;
import org.springframework.boot.autoconfigure.SpringBootApplication;
@SpringBootApplication
public class App {
public static void main( String[] args ){
SpringApplication.run(App.class, args);
}
}
啟動springboot應用,測驗:

訊息發送成功,控制臺列印訊息如下:

這個測驗,不是直接發送訊息到mqtt服務器,而是借助web介面,發送http請求,后臺接收,然后呼叫生產者發送方法,
消費者直接配置了一個MqttPahoMessageDrivenChannelAdapter,并訂閱defaultTopic主題,他需要設定一個channel和一個handler,所以,生產者一發送訊息過來,消費者立馬就消費掉了,
轉載請註明出處,本文鏈接:https://www.uj5u.com/qita/342302.html
標籤:其他
