Spring JMS 是基于 Spring 框架的 JMS 訊息解決方案,提供模板化發送和接收訊息的抽象層;本文主要介紹在 SpringBoot 中用 Spring JMS 操作 ActiveMQ,文中使用到的軟體版本:ActiveMQ 5.16.2、SpringBoot 2.4.9、Java 1.8.0_191,
1、SpringBoot 整合 ActiveMQ "Classic"
1.1、引入依賴
<dependency> <groupId>org.springframework.boot</groupId> <artifactId>spring-boot-starter-activemq</artifactId> </dependency> <!--jms 連接池--> <dependency> <groupId>org.messaginghub</groupId> <artifactId>pooled-jms</artifactId> </dependency>
1.2、配置(application.yml)
spring: activemq: broker-url: tcp://10.40.100.69:61616 pool: enabled: true max-connections: 5
1.3、配置類
package com.abc.demo.activemq; import org.apache.activemq.ActiveMQConnectionFactory; import org.springframework.beans.factory.annotation.Autowired; import org.springframework.beans.factory.annotation.Value; import org.springframework.context.annotation.Bean; import org.springframework.context.annotation.Configuration; import org.springframework.jms.config.DefaultJmsListenerContainerFactory; import org.springframework.jms.config.JmsListenerContainerFactory; import org.springframework.jms.core.JmsMessagingTemplate; import org.springframework.jms.core.JmsTemplate; import javax.jms.ConnectionFactory; @Configuration public class ActiveMQConfig { @Value("${spring.activemq.broker-url}") private String brokerUrl; @Autowired private ConnectionFactory connectionFactory; @Bean public JmsTemplate jmsTemplate() { JmsTemplate jmsTemplate = new JmsTemplate(connectionFactory); //開啟事務,需配合@Transactional使用 jmsTemplate.setSessionTransacted(true); return jmsTemplate; } @Bean("jmsMessagingTemplateTransaction") public JmsMessagingTemplate jmsMessagingTemplate() { JmsMessagingTemplate jmsMessagingTemplate = new JmsMessagingTemplate(); jmsMessagingTemplate.setJmsTemplate(jmsTemplate()); return jmsMessagingTemplate; } @Bean("jmsListenerContainerFactoryTopic") public JmsListenerContainerFactory<?> jmsListenerContainerFactoryTopic() { DefaultJmsListenerContainerFactory factory = new DefaultJmsListenerContainerFactory(); factory.setConnectionFactory(connectionFactory); factory.setPubSubDomain(true); return factory; } @Bean("jmsListenerContainerFactoryTopicDurable") public JmsListenerContainerFactory<?> jmsListenerContainerFactoryTopicDurable() { DefaultJmsListenerContainerFactory factory = new DefaultJmsListenerContainerFactory(); factory.setConnectionFactory(new ActiveMQConnectionFactory(brokerUrl)); factory.setPubSubDomain(true); //設定持久訂閱 factory.setSubscriptionDurable(true); factory.setClientId("1234"); return factory; } }
1.4、生產者
package com.abc.demo.activemq; import com.abc.demo.util.DateUtil; import org.apache.activemq.command.ActiveMQQueue; import org.apache.activemq.command.ActiveMQTopic; import org.slf4j.Logger; import org.slf4j.LoggerFactory; import org.springframework.beans.factory.annotation.Autowired; import org.springframework.jms.core.JmsMessagingTemplate; import org.springframework.scheduling.annotation.Scheduled; import org.springframework.stereotype.Component; import org.springframework.transaction.annotation.Transactional; import javax.jms.Destination; /** * 生產者,這里使用定時任務來發送訊息 */ @Component public class Producer { private static Logger logger = LoggerFactory.getLogger(Producer.class); private Destination queue = new ActiveMQQueue("testQueue"); private Destination topic = new ActiveMQTopic("testTopic"); @Autowired private JmsMessagingTemplate jmsMessagingTemplate; @Scheduled(cron = "0 0/1 * * * ?") public void sendToQueue() { String message = "queue-msg-" + DateUtil.getCurrentDateString("yyyyMMddHHmmss"); logger.info(message); jmsMessagingTemplate.convertAndSend(queue, message); } @Scheduled(cron = "30 0/1 * * * ?") public void sendToTopic() { String message = "topic-msg-" + DateUtil.getCurrentDateString("yyyyMMddHHmmss"); logger.info(message); jmsMessagingTemplate.convertAndSend(topic, message); } /** * 事務 */ @Transactional @Scheduled(cron = "5 0/1 * * * ?") public void sendToQueueTransaction() { for (int i = 0; i < 5; i++) { String message = "queue-msg-transaction-" + DateUtil.getCurrentDateString("yyyyMMddHHmmss") + "-" + i; logger.info(message); jmsMessagingTemplate.convertAndSend(queue, message); } } /** * 事務 */ @Transactional @Scheduled(cron = "35 0/1 * * * ?") public void sendToTopicTransaction() { for (int i = 0; i < 5; i++) { String message = "topic-msg-transaction-" + DateUtil.getCurrentDateString("yyyyMMddHHmmss") + "-" + i; logger.info(message); jmsMessagingTemplate.convertAndSend(topic, message); } } }
1.5、消費者
package com.abc.demo.activemq; import org.slf4j.Logger; import org.slf4j.LoggerFactory; import org.springframework.jms.annotation.JmsListener; import org.springframework.stereotype.Component; import javax.jms.JMSException; import javax.jms.Session; @Component public class Consumer { private static Logger logger = LoggerFactory.getLogger(Consumer.class); @JmsListener(destination = "testQueue") public void recevieFromQueue(String msg, Session session) throws JMSException { logger.info(session.getTransacted() + "-" + msg); } @JmsListener(destination = "testTopic", containerFactory = "jmsListenerContainerFactoryTopic") public void recevieFromTopic(String msg) { logger.info(msg); } /** * 持久化訂閱 * @param msg */ @JmsListener(destination = "testTopic", containerFactory = "jmsListenerContainerFactoryTopicDurable") public void recevieFromTopicDurable(String msg) { logger.info(msg); } }
2、SpringBoot 整合 ActiveMQ Artemis
2.1、引入依賴
<dependency> <groupId>org.springframework.boot</groupId> <artifactId>spring-boot-starter-artemis</artifactId> </dependency> <!--jms 連接池--> <dependency> <groupId>org.messaginghub</groupId> <artifactId>pooled-jms</artifactId> </dependency>
2.2、配置(application.yml)
spring: artemis: host: 10.40.96.140 port: 61616 pool: enabled: true max-connections: 5
2.3、配置類
package com.abc.demo.activemq.artemis; import org.apache.activemq.artemis.jms.client.ActiveMQConnectionFactory; import org.springframework.beans.factory.annotation.Autowired; import org.springframework.beans.factory.annotation.Value; import org.springframework.context.annotation.Bean; import org.springframework.context.annotation.Configuration; import org.springframework.jms.config.DefaultJmsListenerContainerFactory; import org.springframework.jms.config.JmsListenerContainerFactory; import org.springframework.jms.core.JmsMessagingTemplate; import org.springframework.jms.core.JmsTemplate; import javax.jms.ConnectionFactory; @Configuration public class ActiveMQConfig { @Value("${spring.artemis.host}") private String host; @Value("${spring.artemis.port}") private String port; @Autowired private ConnectionFactory connectionFactory; @Bean public JmsTemplate jmsTemplate() { JmsTemplate jmsTemplate = new JmsTemplate(connectionFactory); //開啟事務,需配合@Transactional使用 jmsTemplate.setSessionTransacted(true); return jmsTemplate; } @Bean("jmsMessagingTemplateTransaction") public JmsMessagingTemplate jmsMessagingTemplate() { JmsMessagingTemplate jmsMessagingTemplate = new JmsMessagingTemplate(); jmsMessagingTemplate.setJmsTemplate(jmsTemplate()); return jmsMessagingTemplate; } @Bean("jmsListenerContainerFactoryTopic") public JmsListenerContainerFactory<?> jmsListenerContainerFactoryTopic() { DefaultJmsListenerContainerFactory factory = new DefaultJmsListenerContainerFactory(); factory.setConnectionFactory(connectionFactory); factory.setPubSubDomain(true); return factory; } @Bean("jmsListenerContainerFactoryTopicDurable") public JmsListenerContainerFactory<?> jmsListenerContainerFactoryTopicDurable() { DefaultJmsListenerContainerFactory factory = new DefaultJmsListenerContainerFactory(); // factory.setConnectionFactory(new ActiveMQConnectionFactory("tcp://" + host + ":" + port)); factory.setConnectionFactory(connectionFactory); factory.setPubSubDomain(true); //設定持久訂閱 factory.setSubscriptionDurable(true); factory.setClientId("1234"); return factory; } @Bean("jmsListenerContainerFactoryTopicShare") public JmsListenerContainerFactory<?> jmsListenerContainerFactoryTopicShare() { DefaultJmsListenerContainerFactory factory = new DefaultJmsListenerContainerFactory(); factory.setConnectionFactory(connectionFactory); factory.setPubSubDomain(true); //設定共享訂閱 factory.setSubscriptionShared(true); return factory; } @Bean("jmsListenerContainerFactoryTopicShareDurable") public JmsListenerContainerFactory<?> jmsListenerContainerFactoryTopicShareDurable() { DefaultJmsListenerContainerFactory factory = new DefaultJmsListenerContainerFactory(); factory.setConnectionFactory(connectionFactory); factory.setPubSubDomain(true); //設定共享訂閱 factory.setSubscriptionShared(true); //設定持久訂閱 factory.setSubscriptionDurable(true); return factory; } }
2.4、生產者
package com.abc.demo.activemq.artemis; import com.abc.demo.util.DateUtil; import org.apache.activemq.artemis.jms.client.ActiveMQQueue; import org.apache.activemq.artemis.jms.client.ActiveMQTopic; import org.slf4j.Logger; import org.slf4j.LoggerFactory; import org.springframework.beans.factory.annotation.Autowired; import org.springframework.jms.core.JmsMessagingTemplate; import org.springframework.scheduling.annotation.Scheduled; import org.springframework.stereotype.Component; import org.springframework.transaction.annotation.Transactional; import javax.jms.Destination; /** * 生產者,這里使用定時任務來發送訊息 */ @Component public class Producer { private static Logger logger = LoggerFactory.getLogger(Producer.class); private Destination queue = new ActiveMQQueue("testQueue"); private Destination topic = new ActiveMQTopic("testTopic"); @Autowired private JmsMessagingTemplate jmsMessagingTemplate; @Scheduled(cron = "0 0/1 * * * ?") public void sendToQueue() { String message = "queue-msg-" + DateUtil.getCurrentDateString("yyyyMMddHHmmss"); logger.info(message); jmsMessagingTemplate.convertAndSend(queue, message); } @Scheduled(cron = "30 0/1 * * * ?") public void sendToTopic() { String message = "topic-msg-" + DateUtil.getCurrentDateString("yyyyMMddHHmmss"); logger.info(message); jmsMessagingTemplate.convertAndSend(topic, message); } /** * 事務 */ @Transactional @Scheduled(cron = "5 0/1 * * * ?") public void sendToQueueTransaction() { for (int i = 0; i < 5; i++) { String message = "queue-msg-transaction-" + DateUtil.getCurrentDateString("yyyyMMddHHmmss") + "-" + i; logger.info(message); jmsMessagingTemplate.convertAndSend(queue, message); } } /** * 事務 */ @Transactional @Scheduled(cron = "35 0/1 * * * ?") public void sendToTopicTransaction() { for (int i = 0; i < 5; i++) { String message = "topic-msg-transaction-" + DateUtil.getCurrentDateString("yyyyMMddHHmmss") + "-" + i; logger.info(message); jmsMessagingTemplate.convertAndSend(topic, message); } } }
2.5、消費者
package com.abc.demo.activemq.artemis; import org.slf4j.Logger; import org.slf4j.LoggerFactory; import org.springframework.jms.annotation.JmsListener; import org.springframework.stereotype.Component; import javax.jms.JMSException; import javax.jms.Session; @Component public class Consumer { private static Logger logger = LoggerFactory.getLogger(Consumer.class); @JmsListener(destination = "testQueue") public void recevieFromQueue(String msg, Session session) throws JMSException { logger.info(session.getTransacted() + "-" + msg); } @JmsListener(destination = "testTopic", containerFactory = "jmsListenerContainerFactoryTopic") public void recevieFromTopic(String msg) { logger.info(msg); } /** * 持久化訂閱 * @param msg */ @JmsListener(destination = "testTopic", containerFactory = "jmsListenerContainerFactoryTopicDurable") public void recevieFromTopicDurable(String msg) { logger.info(msg); } /** * 共享訂閱 * @param msg */ @JmsListener(destination = "testTopic", containerFactory = "jmsListenerContainerFactoryTopicShare", subscription = "test") public void recevieFromTopicShare(String msg) { logger.info(msg); } @JmsListener(destination = "testTopic", containerFactory = "jmsListenerContainerFactoryTopicShare", subscription = "test") public void recevieFromTopicShare2(String msg) { logger.info(msg); } /** * 共享持久訂閱 * @param msg */ @JmsListener(destination = "testTopic", containerFactory = "jmsListenerContainerFactoryTopicShareDurable", subscription = "test2") public void recevieFromTopicShareDurable(String msg) { logger.info(msg); } }
轉載請註明出處,本文鏈接:https://www.uj5u.com/houduan/299457.html
標籤:其他
