一、前言
相信看到這篇文章,大家也應該知道ActiveMQ是一個訊息中間件,主要特點就是異步處理,用來減少回應時間和解耦,主要的使用場景就是將比較耗時而且不需要即時同步回傳結果的操作作為訊息放入訊息佇列,由于使用了訊息佇列,只要保證訊息格式不變,訊息的發送方和接收方并不需要彼此聯系,也不需要受對方的影響,即解耦合,
二、理解ActiveMQ
怎樣理解ActiveMQ呢?ActiveMQ有兩種訊息傳遞模型,一種是點對點模型,一種是發布-訂閱模型,我個人比較喜歡拿微信作為物件來代入進行理解,點對點模型,就好比我給你發送微信資訊,一對一,我發送給你資訊之后,你在忙的時候,就肯定沒有理我(額,除非~這種關系應該不存在),等你閑下來的時候,再看我給你的資訊,發布-訂閱模型,就好比微信公眾號,一對多,只要有人訂閱了我的公眾號,如果我發送了一條推文,那么凡是訂閱了我公眾號的人,就都會收到我發的這條推文,
好了,廢話不多說,下面直接用SpringBoot來整合ActiveMQ,
三、佇列整合
1、新建一個maven工程,引入SpringBoot和ActiveMQ依賴
<?xml version="1.0" encoding="UTF-8"?>
<project xmlns="http://maven.apache.org/POM/4.0.0" xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 https://maven.apache.org/xsd/maven-4.0.0.xsd">
<modelVersion>4.0.0</modelVersion>
<parent>
<groupId>org.springframework.boot</groupId>
<artifactId>spring-boot-starter-parent</artifactId>
<version>2.4.1</version>
<relativePath/> <!-- lookup parent from repository -->
</parent>
<groupId>com.chen</groupId>
<artifactId>chen</artifactId>
<version>0.0.1-SNAPSHOT</version>
<packaging>war</packaging>
<name>Springboot_mq</name>
<description>Demo project for Spring Boot</description>
<properties>
<java.version>1.8</java.version>
</properties>
<dependencies>
<dependency>
<groupId>org.springframework.boot</groupId>
<artifactId>spring-boot-starter-web</artifactId>
</dependency>
<dependency>
<groupId>org.springframework.boot</groupId>
<artifactId>spring-boot-starter-tomcat</artifactId>
<scope>provided</scope>
</dependency>
<dependency>
<groupId>org.springframework.boot</groupId>
<artifactId>spring-boot-starter-test</artifactId>
<scope>test</scope>
</dependency>
<dependency>
<groupId>org.springframework.boot</groupId>
<artifactId>spring-boot-starter-activemq</artifactId>
<version>2.1.5.RELEASE</version>
</dependency>
</dependencies>
<build>
<plugins>
<plugin>
<groupId>org.springframework.boot</groupId>
<artifactId>spring-boot-maven-plugin</artifactId>
</plugin>
</plugins>
</build>
</project>
2、application.yml 核心配置
server:
port: 8086
spring:
activemq:
broker-url: tcp://192.168.203.129:61616
user: admin
password: admin
jms:
pub-sub-domain: false # false代表佇列,true代表主題
queue: queue01 # 自定義命名佇列
3、配置佇列bean,相當于Spring容器的bean標簽,注意@EnableJms注解,開啟JMS的適配規則,
package com.chen.config;
import org.apache.activemq.command.ActiveMQQueue;
import org.springframework.beans.factory.annotation.Value;
import org.springframework.context.annotation.Bean;
import org.springframework.jms.annotation.EnableJms;
import org.springframework.stereotype.Component;
@Component
@EnableJms // 開啟JMS適配
public class ConfigBean {
@Value("${queue}")
private String myQueue; // 注入組態檔中的queue
@Bean
public ActiveMQQueue queue() {
return new ActiveMQQueue(myQueue);
}
}
4、生產者發送訊息
package com.chen.produce;
import java.util.UUID;
import javax.jms.Queue;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.jms.core.JmsMessagingTemplate;
import org.springframework.scheduling.annotation.Scheduled;
import org.springframework.stereotype.Component;
@Component
public class Queue_Produce {
@Autowired
private JmsMessagingTemplate jmsMessagingTemplate;
@Autowired
private Queue queue;
// 呼叫一次一個資訊發出
public void produceMessage() {
jmsMessagingTemplate.convertAndSend(queue, "這是一條訊息");
}
}
5、消費者接收訊息,我這里直接使用@JmsListener注解進行監聽佇列,底層是由MQ的監聽器實作的,還有另外一種接收訊息的方法,就是MQ的receive()方法,
package com.chen.consummer;
import javax.jms.TextMessage;
import org.springframework.jms.annotation.JmsListener;
import org.springframework.stereotype.Component;
@Component
public class Queue_consummer {
@JmsListener(destination = "${queue}") // 注解監聽
public void receive(TextMessage textMessage) throws Exception{
System.out.println("消費者收到訊息:"+textMessage.getText());
}
}
6、接下來利用Junit進行一個單元測驗,直接運行單元方法,呼叫生產者進行發送訊息
package com.chen;
import javax.annotation.Resource;
import org.junit.Test;
import org.junit.runner.RunWith;
import org.springframework.boot.test.context.SpringBootTest;
import org.springframework.test.context.junit4.SpringJUnit4ClassRunner;
import org.springframework.test.context.web.WebAppConfiguration;
import com.chen.produce.Queue_Produce;
@SpringBootTest(classes = SpringbootMqApplication.class)
@RunWith(SpringJUnit4ClassRunner.class)
@WebAppConfiguration
public class TestActiveMQ {
@Resource
private Queue_Produce queue_produce;
@Test
public void testSend() throws Exception {
queue_produce.produceMessage();
}
}
可以在ActiveMQ圖形化界面看到,生產者生產了一條訊息,被消費者消費了一條訊息


7、增加定時投遞功能,利用Spring自帶的@Scheduled注解實作定時任務,每隔一段時間生產一個訊息,注意使用這個注解,需要在主啟動類開啟這個功能,不然無效啟用
package com.chen.produce;
import java.util.UUID;
import javax.jms.Queue;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.jms.core.JmsMessagingTemplate;
import org.springframework.scheduling.annotation.Scheduled;
import org.springframework.stereotype.Component;
@Component
public class Queue_Produce {
@Autowired
private JmsMessagingTemplate jmsMessagingTemplate;
@Autowired
private Queue queue;
// 呼叫一次一個資訊發出
public void produceMessage() {
jmsMessagingTemplate.convertAndSend(queue, "這是一條訊息");
}
// 帶定時投遞的業務方法
@Scheduled(fixedDelay = 3000) // 每3秒自動呼叫
public void produceMessageScheduled() {
jmsMessagingTemplate.convertAndSend(queue, "這是一條定時投遞的訊息,標記:" + UUID.randomUUID().toString().substring(0, 6));
System.out.println("投遞完成");
}
}
package com.chen;
import org.springframework.boot.SpringApplication;
import org.springframework.boot.autoconfigure.SpringBootApplication;
import org.springframework.scheduling.annotation.EnableScheduling;
@SpringBootApplication
@EnableScheduling // 開啟對定時任務的支持
public class SpringbootMqApplication {
public static void main(String[] args) {
SpringApplication.run(SpringbootMqApplication.class, args);
}
}
四、整合主題
1、重新新建一個maven專案,依賴和第三整合佇列一樣,配置application.yml,注意需要將false改為true,代表使用的是主題
server:
port: 8088
spring:
activemq:
broker-url: tcp://192.168.203.129:61616
user: admin
password: admin
jms:
pub-sub-domain: true
topic: topic01
2、配置主題bean
package com.chen.config;
import javax.jms.Topic;
import org.apache.activemq.command.ActiveMQTopic;
import org.springframework.beans.factory.annotation.Value;
import org.springframework.context.annotation.Bean;
import org.springframework.stereotype.Component;
@Component
public class ConfigBean {
@Value("${topic}")
private String topicName;
@Bean
public Topic topic() {
return new ActiveMQTopic(topicName);
}
}
3、同樣是生產者
package com.chen.produce;
import java.util.UUID;
import javax.jms.Topic;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.jms.core.JmsMessagingTemplate;
import org.springframework.scheduling.annotation.Scheduled;
import org.springframework.stereotype.Component;
@Component
public class Topic_Produce {
@Autowired
private JmsMessagingTemplate jmsMessagingTemplate;
@Autowired
private Topic topic;
@Scheduled(fixedDelay = 3000)
public void produceTopic() {
jmsMessagingTemplate.convertAndSend(topic, "這是一條主題訊息,標記:" + UUID.randomUUID().toString().substring(0, 6));
}
}
4、消費者
package com.chen.consummer;
import javax.jms.TextMessage;
import org.springframework.jms.annotation.JmsListener;
import org.springframework.stereotype.Component;
@Component
public class Topic_Consummer {
@JmsListener(destination = "${topic}")
public void receive(TextMessage textMessage) throws Exception{
System.out.println("消費者接收訂閱的主題訊息:"+textMessage.getText());
}
}
5、因為我這里使用了定時任務,所以直接啟用主啟動類就行,會自動呼叫生產者每隔三秒生產一條訊息
package com.chen;
import org.springframework.boot.SpringApplication;
import org.springframework.boot.autoconfigure.SpringBootApplication;
import org.springframework.scheduling.annotation.EnableScheduling;
@SpringBootApplication
@EnableScheduling
public class SpringbootMqTopicApplication {
public static void main(String[] args) {
SpringApplication.run(SpringbootMqTopicApplication.class, args);
}
}

轉載請註明出處,本文鏈接:https://www.uj5u.com/ruanti/243292.html
標籤:其他
上一篇:2020-12-31
下一篇:感覺技術跟不上了,建議多學習
