本文主要介紹使用 JMS 1.1 API 來操作 ActiveMQ,文中所使用到的軟體版本:Java 1.8.0_191、ActiveMQ "Classic" 5.16.2、ActiveMQ Artemis 2.17.0,
1、Java 操作 ActiveMQ "Classic"
使用 JMS 1.1 的 API 操作 ActiveMQ "Classic",
1.1、引入依賴
<dependency> <groupId>org.apache.activemq</groupId> <artifactId>activemq-all</artifactId> <version>5.16.2</version> </dependency>
1.2、發送訊息
1.2.1、發送到 Queue
public static void sendToQueue() throws JMSException { ActiveMQConnectionFactory activeMQConnectionFactory = new ActiveMQConnectionFactory(brokerURL); //連接池 PooledConnectionFactory pooledConnectionFactory = new PooledConnectionFactory(); pooledConnectionFactory.setConnectionFactory(activeMQConnectionFactory); Connection connection = pooledConnectionFactory.createConnection(); connection.start(); Session session = connection.createSession(false, Session.CLIENT_ACKNOWLEDGE); Destination destination = session.createQueue("testQueue"); MessageProducer producer = session.createProducer(destination); //訊息持久化 producer.setDeliveryMode(DeliveryMode.PERSISTENT); for (int i = 1; i <= 10; i++) { TextMessage message = session.createTextMessage("訊息" + i); producer.send(message); System.out.println("已發送的訊息:" + message.getText()); } producer.close(); session.close(); connection.close(); pooledConnectionFactory.stop(); }
1.2.2、發送到 Queue(事務)
public static void sendToQueueTransaction() throws JMSException { ActiveMQConnectionFactory activeMQConnectionFactory = new ActiveMQConnectionFactory(brokerURL); Connection connection = activeMQConnectionFactory.createConnection(); connection.start(); Session session = connection.createSession(true, Session.AUTO_ACKNOWLEDGE); MessageProducer producer = null; try { Destination destination = session.createQueue("testQueue"); producer = session.createProducer(destination); producer.setDeliveryMode(DeliveryMode.PERSISTENT); for (int i = 1; i <= 10; i++) { TextMessage message = session.createTextMessage("事務訊息" + i); producer.send(message); System.out.println("已發送的訊息:" + message.getText()); } session.commit(); } catch (JMSException e) { session.rollback(); e.printStackTrace(); } finally { producer.close(); session.close(); connection.close(); } }
1.2.3、發送到 Topic
public static void sendToTopic() throws JMSException { ActiveMQConnectionFactory activeMQConnectionFactory = new ActiveMQConnectionFactory(brokerURL); Connection connection = activeMQConnectionFactory.createConnection(); connection.start(); Session session = connection.createSession(false, Session.AUTO_ACKNOWLEDGE); Destination destination = session.createTopic("testTopic"); MessageProducer producer = session.createProducer(destination); producer.setDeliveryMode(DeliveryMode.PERSISTENT); for (int i = 1; i <= 10; i++) { TextMessage message = session.createTextMessage("訊息" + i); producer.send(message); System.out.println("已發送的訊息:" + message.getText()); } producer.close(); session.close(); connection.close(); }
1.2.4、發送到 Topic(事務)
public static void sendToTopicTraction() throws JMSException { ActiveMQConnectionFactory activeMQConnectionFactory = new ActiveMQConnectionFactory(brokerURL); Connection connection = activeMQConnectionFactory.createConnection(); connection.start(); Session session = connection.createSession(true, Session.AUTO_ACKNOWLEDGE); Destination destination = session.createTopic("testTopic"); MessageProducer producer = session.createProducer(destination); producer.setDeliveryMode(DeliveryMode.PERSISTENT); try { for (int i = 1; i <= 10; i++) { TextMessage message = session.createTextMessage("事務訊息" + i); producer.send(message); System.out.println("已發送的訊息:" + message.getText()); } session.commit(); } catch (JMSException e) { session.rollback(); e.printStackTrace(); } finally { producer.close(); session.close(); connection.close(); } }
完整代碼:
package com.abc.demo.general.activemq; import org.apache.activemq.ActiveMQConnectionFactory; import org.apache.activemq.jms.pool.PooledConnectionFactory; import javax.jms.*; public class Producer { private static String brokerURL = "tcp://10.40.96.140:61616"; public static void main(String[] args) throws JMSException { sendToQueue(); // sendToQueueTransaction(); // sendToTopic(); // sendToTopicTraction(); } public static void sendToQueue() throws JMSException { ActiveMQConnectionFactory activeMQConnectionFactory = new ActiveMQConnectionFactory(brokerURL); //連接池 PooledConnectionFactory pooledConnectionFactory = new PooledConnectionFactory(); pooledConnectionFactory.setConnectionFactory(activeMQConnectionFactory); Connection connection = pooledConnectionFactory.createConnection(); connection.start(); Session session = connection.createSession(false, Session.CLIENT_ACKNOWLEDGE); Destination destination = session.createQueue("testQueue"); MessageProducer producer = session.createProducer(destination); //訊息持久化 producer.setDeliveryMode(DeliveryMode.PERSISTENT); for (int i = 1; i <= 10; i++) { TextMessage message = session.createTextMessage("訊息" + i); producer.send(message); System.out.println("已發送的訊息:" + message.getText()); } producer.close(); session.close(); connection.close(); pooledConnectionFactory.stop(); } /** * 以事務方式發送訊息 * @throws JMSException */ public static void sendToQueueTransaction() throws JMSException { ActiveMQConnectionFactory activeMQConnectionFactory = new ActiveMQConnectionFactory(brokerURL); Connection connection = activeMQConnectionFactory.createConnection(); connection.start(); Session session = connection.createSession(true, Session.AUTO_ACKNOWLEDGE); MessageProducer producer = null; try { Destination destination = session.createQueue("testQueue"); producer = session.createProducer(destination); producer.setDeliveryMode(DeliveryMode.PERSISTENT); for (int i = 1; i <= 10; i++) { TextMessage message = session.createTextMessage("事務訊息" + i); producer.send(message); System.out.println("已發送的訊息:" + message.getText()); } session.commit(); } catch (JMSException e) { session.rollback(); e.printStackTrace(); } finally { producer.close(); session.close(); connection.close(); } } public static void sendToTopic() throws JMSException { ActiveMQConnectionFactory activeMQConnectionFactory = new ActiveMQConnectionFactory(brokerURL); Connection connection = activeMQConnectionFactory.createConnection(); connection.start(); Session session = connection.createSession(false, Session.AUTO_ACKNOWLEDGE); Destination destination = session.createTopic("testTopic"); MessageProducer producer = session.createProducer(destination); producer.setDeliveryMode(DeliveryMode.PERSISTENT); for (int i = 1; i <= 10; i++) { TextMessage message = session.createTextMessage("訊息" + i); producer.send(message); System.out.println("已發送的訊息:" + message.getText()); } producer.close(); session.close(); connection.close(); } public static void sendToTopicTraction() throws JMSException { ActiveMQConnectionFactory activeMQConnectionFactory = new ActiveMQConnectionFactory(brokerURL); Connection connection = activeMQConnectionFactory.createConnection(); connection.start(); Session session = connection.createSession(true, Session.AUTO_ACKNOWLEDGE); Destination destination = session.createTopic("testTopic"); MessageProducer producer = session.createProducer(destination); producer.setDeliveryMode(DeliveryMode.PERSISTENT); try { for (int i = 1; i <= 10; i++) { TextMessage message = session.createTextMessage("事務訊息" + i); producer.send(message); System.out.println("已發送的訊息:" + message.getText()); } session.commit(); } catch (JMSException e) { session.rollback(); e.printStackTrace(); } finally { producer.close(); session.close(); connection.close(); } } }Producer.java
1.3、消費者
1.3.1、從 Queue 中消費訊息
public static void recevieFromQueue() throws JMSException { ActiveMQConnectionFactory activeMQConnectionFactory = new ActiveMQConnectionFactory(brokerURL); //連接池 PooledConnectionFactory pooledConnectionFactory = new PooledConnectionFactory(); pooledConnectionFactory.setConnectionFactory(activeMQConnectionFactory); Connection connection = pooledConnectionFactory.createConnection(); connection.start(); Session session = connection.createSession(false, Session.CLIENT_ACKNOWLEDGE); Destination destination = session.createQueue("testQueue"); MessageConsumer consumer = session.createConsumer(destination); consumer.setMessageListener(message -> { TextMessage textMessage = (TextMessage) message; try { System.out.println("接受到的訊息:" + textMessage.getText()); textMessage.acknowledge(); } catch (JMSException e) { e.printStackTrace(); } }); }
1.3.2、從 Queue 中消費訊息(事務)
public static void recevieFromQueueTransction() throws JMSException { ActiveMQConnectionFactory activeMQConnectionFactory = new ActiveMQConnectionFactory(brokerURL); Connection connection = activeMQConnectionFactory.createConnection(); connection.start(); Session session = connection.createSession(true, Session.AUTO_ACKNOWLEDGE); Destination destination = session.createQueue("testQueue"); MessageConsumer consumer = session.createConsumer(destination); AtomicInteger index = new AtomicInteger(); try { consumer.setMessageListener(message -> { TextMessage textMessage = (TextMessage) message; try { System.out.println("接受到的訊息:" + textMessage.getText()); index.getAndIncrement(); //每10條提交一次 if (index.get() % 10 == 0) { session.commit(); } } catch (JMSException e) { e.printStackTrace(); } }); } catch (JMSException e) { session.rollback(); e.printStackTrace(); } }
1.3.3、從 Topic 中消費訊息
public static void recevieFromTopic() throws JMSException { ActiveMQConnectionFactory activeMQConnectionFactory = new ActiveMQConnectionFactory(brokerURL); Connection connection = activeMQConnectionFactory.createConnection(); connection.start(); Session session = connection.createSession(false, Session.AUTO_ACKNOWLEDGE); Destination destination = session.createTopic("testTopic"); MessageConsumer consumer = session.createConsumer(destination); consumer.setMessageListener(message -> { TextMessage textMessage = (TextMessage) message; try { System.out.println("接受到的訊息:" + textMessage.getText()); } catch (JMSException e) { e.printStackTrace(); } }); }
1.3.4、從 Topic 中消費訊息(持久化訂閱+事務)
對于 Topic,使用 MessageConsumer 消費訊息,只能消費訂閱時間之后的訊息;JMS 允許訂閱者創建一個可持久化的訂閱(TopicSubscriber),這樣,即使訂閱者宕機恢復后,也能接收宕機時生產者發布的訊息,
public static void recevieFromTopicDurable() throws JMSException { ActiveMQConnectionFactory activeMQConnectionFactory = new ActiveMQConnectionFactory(brokerURL); Connection connection = activeMQConnectionFactory.createConnection(); connection.setClientID("12345678"); connection.start(); Session session = connection.createSession(true, Session.AUTO_ACKNOWLEDGE); Topic topic = session.createTopic("testTopic"); TopicSubscriber topicSubscriber = session.createDurableSubscriber(topic, "test"); AtomicInteger index = new AtomicInteger(); topicSubscriber.setMessageListener(message -> { TextMessage textMessage = (TextMessage) message; try { System.out.println("接受到的訊息:" + textMessage.getText()); index.getAndIncrement(); //每10條提交一次 if (index.get() % 10 == 0) { session.commit(); } } catch (JMSException e) { e.printStackTrace(); } }); }
完整代碼:
package com.abc.demo.general.activemq; import org.apache.activemq.ActiveMQConnectionFactory; import org.apache.activemq.jms.pool.PooledConnectionFactory; import javax.jms.*; import java.util.concurrent.atomic.AtomicInteger; public class Consumer { private static String brokerURL = "tcp://10.40.96.140:61616"; public static void main(String[] args) throws JMSException { recevieFromQueue(); // recevieFromQueueTransction(); // recevieFromTopic(); // recevieFromTopicDurable(); } public static void recevieFromQueue() throws JMSException { ActiveMQConnectionFactory activeMQConnectionFactory = new ActiveMQConnectionFactory(brokerURL); //連接池 PooledConnectionFactory pooledConnectionFactory = new PooledConnectionFactory(); pooledConnectionFactory.setConnectionFactory(activeMQConnectionFactory); Connection connection = pooledConnectionFactory.createConnection(); connection.start(); Session session = connection.createSession(false, Session.CLIENT_ACKNOWLEDGE); Destination destination = session.createQueue("testQueue"); MessageConsumer consumer = session.createConsumer(destination); consumer.setMessageListener(message -> { TextMessage textMessage = (TextMessage) message; try { System.out.println("接受到的訊息:" + textMessage.getText()); textMessage.acknowledge(); } catch (JMSException e) { e.printStackTrace(); } }); } public static void recevieFromQueueTransction() throws JMSException { ActiveMQConnectionFactory activeMQConnectionFactory = new ActiveMQConnectionFactory(brokerURL); Connection connection = activeMQConnectionFactory.createConnection(); connection.start(); Session session = connection.createSession(true, Session.AUTO_ACKNOWLEDGE); Destination destination = session.createQueue("testQueue"); MessageConsumer consumer = session.createConsumer(destination); AtomicInteger index = new AtomicInteger(); try { consumer.setMessageListener(message -> { TextMessage textMessage = (TextMessage) message; try { System.out.println("接受到的訊息:" + textMessage.getText()); index.getAndIncrement(); //每10條提交一次 if (index.get() % 10 == 0) { session.commit(); } } catch (JMSException e) { e.printStackTrace(); } }); } catch (JMSException e) { session.rollback(); e.printStackTrace(); } } public static void recevieFromTopic() throws JMSException { ActiveMQConnectionFactory activeMQConnectionFactory = new ActiveMQConnectionFactory(brokerURL); Connection connection = activeMQConnectionFactory.createConnection(); connection.start(); Session session = connection.createSession(false, Session.AUTO_ACKNOWLEDGE); Destination destination = session.createTopic("testTopic"); MessageConsumer consumer = session.createConsumer(destination); consumer.setMessageListener(message -> { TextMessage textMessage = (TextMessage) message; try { System.out.println("接受到的訊息:" + textMessage.getText()); } catch (JMSException e) { e.printStackTrace(); } }); } public static void recevieFromTopicDurable() throws JMSException { ActiveMQConnectionFactory activeMQConnectionFactory = new ActiveMQConnectionFactory(brokerURL); Connection connection = activeMQConnectionFactory.createConnection(); connection.setClientID("12345678"); connection.start(); Session session = connection.createSession(true, Session.AUTO_ACKNOWLEDGE); Topic topic = session.createTopic("testTopic"); TopicSubscriber topicSubscriber = session.createDurableSubscriber(topic, "test"); AtomicInteger index = new AtomicInteger(); topicSubscriber.setMessageListener(message -> { TextMessage textMessage = (TextMessage) message; try { System.out.println("接受到的訊息:" + textMessage.getText()); index.getAndIncrement(); //每10條提交一次 if (index.get() % 10 == 0) { session.commit(); } } catch (JMSException e) { e.printStackTrace(); } }); } }Consumer.java
2、Java 操作 ActiveMQ Artemis
使用 JMS 2.0 的 API 操作 ActiveMQ Artemis,
2.1、引入依賴
<dependency> <groupId>org.apache.activemq</groupId> <artifactId>artemis-jms-client-all</artifactId> <version>2.17.0</version> </dependency>
2.2、發送訊息
2.2.1、發送到 Queue
public static void sendToQueue() throws Exception { ActiveMQConnectionFactory activeMQConnectionFactory = new ActiveMQConnectionFactory(brokerURL); JMSContext context = activeMQConnectionFactory.createContext(); JMSProducer producer = context.createProducer(); Destination destination = context.createQueue("testQueue"); //訊息持久化 producer.setDeliveryMode(DeliveryMode.PERSISTENT); //延遲投遞 producer.setDeliveryDelay(1000 * 5); //異步發送 producer.setAsync(new CompletionListener() { @Override public void onCompletion(Message message) { System.out.println("訊息發送完成"); } @Override public void onException(Message message, Exception exception) { exception.printStackTrace(); } }); for (int i = 1; i <= 5; i++) { TextMessage message = context.createTextMessage("訊息" + i); producer.send(destination, message); System.out.println("已發送的訊息:" + message.getText()); } context.close(); }
2.2.2、發送到 Queue(事務)
public static void sendToQueueTransaction() throws JMSException { ActiveMQConnectionFactory activeMQConnectionFactory = new ActiveMQConnectionFactory(brokerURL); JMSContext context = activeMQConnectionFactory.createContext(JMSContext.SESSION_TRANSACTED); try { Destination destination = context.createQueue("testQueue"); JMSProducer producer = context.createProducer(); producer.setDeliveryMode(DeliveryMode.PERSISTENT); for (int i = 1; i <= 10; i++) { TextMessage message = context.createTextMessage("事務訊息" + i); producer.send(destination, message); System.out.println("已發送的訊息:" + message.getText()); } context.commit(); } catch (JMSException e) { context.rollback(); e.printStackTrace(); } finally { context.close(); } }
2.2.3、發送到 Topic
public static void sendToTopic() throws JMSException { ActiveMQConnectionFactory activeMQConnectionFactory = new ActiveMQConnectionFactory(brokerURL); JMSContext context = activeMQConnectionFactory.createContext(); JMSProducer producer = context.createProducer(); Destination destination = context.createTopic("testTopic"); producer.setDeliveryMode(DeliveryMode.PERSISTENT); for (int i = 1; i <= 10; i++) { TextMessage message = context.createTextMessage("訊息" + i); producer.send(destination, message); System.out.println("已發送的訊息:" + message.getText()); } context.close(); }
2.2.4、發送到 Topic(事務)
public static void sendToTopicTraction() { ActiveMQConnectionFactory activeMQConnectionFactory = new ActiveMQConnectionFactory(brokerURL); JMSContext context = activeMQConnectionFactory.createContext(JMSContext.SESSION_TRANSACTED); try { JMSProducer producer = context.createProducer(); Destination destination = context.createTopic("testTopic"); producer.setDeliveryMode(DeliveryMode.PERSISTENT); for (int i = 1; i <= 5; i++) { TextMessage message = context.createTextMessage("事務訊息" + i); producer.send(destination, message); System.out.println("已發送的訊息:" + message.getText()); } context.commit(); } catch (JMSException e) { context.rollback(); e.printStackTrace(); } finally { context.close(); } }
完整代碼:
package com.abc.demo.general.activemq; import org.apache.activemq.artemis.jms.client.ActiveMQConnectionFactory; import javax.jms.*; public class ProducerJms20 { private static String brokerURL = "tcp://10.40.96.11:61616"; public static void main(String[] args) throws Exception { sendToQueue(); // sendToQueueTransaction(); // sendToTopic(); // sendToTopicTraction(); } public static void sendToQueue() throws Exception { ActiveMQConnectionFactory activeMQConnectionFactory = new ActiveMQConnectionFactory(brokerURL); JMSContext context = activeMQConnectionFactory.createContext(); JMSProducer producer = context.createProducer(); Destination destination = context.createQueue("testQueue"); //訊息持久化 producer.setDeliveryMode(DeliveryMode.PERSISTENT); //延遲投遞 producer.setDeliveryDelay(1000 * 5); //異步發送 producer.setAsync(new CompletionListener() { @Override public void onCompletion(Message message) { System.out.println("訊息發送完成"); } @Override public void onException(Message message, Exception exception) { exception.printStackTrace(); } }); for (int i = 1; i <= 5; i++) { TextMessage message = context.createTextMessage("訊息" + i); producer.send(destination, message); System.out.println("已發送的訊息:" + message.getText()); } context.close(); } /** * 以事務方式發送訊息 * @throws JMSException */ public static void sendToQueueTransaction() throws JMSException { ActiveMQConnectionFactory activeMQConnectionFactory = new ActiveMQConnectionFactory(brokerURL); JMSContext context = activeMQConnectionFactory.createContext(JMSContext.SESSION_TRANSACTED); try { Destination destination = context.createQueue("testQueue"); JMSProducer producer = context.createProducer(); producer.setDeliveryMode(DeliveryMode.PERSISTENT); for (int i = 1; i <= 5; i++) { TextMessage message = context.createTextMessage("事務訊息" + i); producer.send(destination, message); System.out.println("已發送的訊息:" + message.getText()); } context.commit(); } catch (JMSException e) { context.rollback(); e.printStackTrace(); } finally { context.close(); } } public static void sendToTopic() throws JMSException { ActiveMQConnectionFactory activeMQConnectionFactory = new ActiveMQConnectionFactory(brokerURL); JMSContext context = activeMQConnectionFactory.createContext(); JMSProducer producer = context.createProducer(); Destination destination = context.createTopic("testTopic"); producer.setDeliveryMode(DeliveryMode.PERSISTENT); for (int i = 1; i <= 5; i++) { TextMessage message = context.createTextMessage("訊息" + i); producer.send(destination, message); System.out.println("已發送的訊息:" + message.getText()); } context.close(); } public static void sendToTopicTraction() { ActiveMQConnectionFactory activeMQConnectionFactory = new ActiveMQConnectionFactory(brokerURL); JMSContext context = activeMQConnectionFactory.createContext(JMSContext.SESSION_TRANSACTED); try { JMSProducer producer = context.createProducer(); Destination destination = context.createTopic("testTopic"); producer.setDeliveryMode(DeliveryMode.PERSISTENT); for (int i = 1; i <= 5; i++) { TextMessage message = context.createTextMessage("事務訊息" + i); producer.send(destination, message); System.out.println("已發送的訊息:" + message.getText()); } context.commit(); } catch (JMSException e) { context.rollback(); e.printStackTrace(); } finally { context.close(); } } }View Code
2.3、消費者
2.3.1、從 Queue 中消費訊息
public static void recevieFromQueue() throws Exception { ActiveMQConnectionFactory activeMQConnectionFactory = new ActiveMQConnectionFactory(brokerURL); JMSContext context = activeMQConnectionFactory.createContext(JMSContext.AUTO_ACKNOWLEDGE); Destination destination = context.createQueue("testQueue"); JMSConsumer consumer = context.createConsumer(destination); consumer.setMessageListener(message -> { try { String msg = message.getBody(String.class); System.out.println("接受到的訊息:" + msg); } catch (JMSException e) { e.printStackTrace(); } }); //JMS2.0設定MessageListener是不阻塞執行緒的,通過該方法阻塞執行緒 System.in.read(); }
2.3.2、從 Queue 中消費訊息(事務)
public static void recevieFromQueueTransction() throws Exception { ActiveMQConnectionFactory activeMQConnectionFactory = new ActiveMQConnectionFactory(brokerURL); JMSContext context = activeMQConnectionFactory.createContext(JMSContext.SESSION_TRANSACTED); Destination destination = context.createQueue("testQueue"); JMSConsumer consumer = context.createConsumer(destination); AtomicInteger index = new AtomicInteger(); try { consumer.setMessageListener(message -> { try { String msg = message.getBody(String.class); System.out.println("接受到的訊息:" + msg); index.getAndIncrement(); //每10條提交一次 if (index.get() % 10 == 0) { context.commit(); } } catch (JMSException e) { e.printStackTrace(); } }); } catch (Exception e) { context.rollback(); e.printStackTrace(); } System.in.read(); }
2.3.3、從 Topic 中消費訊息
public static void recevieFromTopic() throws Exception { ActiveMQConnectionFactory activeMQConnectionFactory = new ActiveMQConnectionFactory(brokerURL); JMSContext context = activeMQConnectionFactory.createContext(JMSContext.AUTO_ACKNOWLEDGE); Topic topic = context.createTopic("testTopic"); JMSConsumer consumer = context.createConsumer(topic); consumer.setMessageListener(message -> { try { String msg = message.getBody(String.class); System.out.println("接受到的訊息:" + msg); } catch (JMSException e) { e.printStackTrace(); } }); System.in.read(); }
2.3.4、從 Topic 中消費訊息(持久化訂閱+事務)
public static void recevieFromTopicDurable() throws Exception { ActiveMQConnectionFactory activeMQConnectionFactory = new ActiveMQConnectionFactory(brokerURL); JMSContext context = activeMQConnectionFactory.createContext(JMSContext.SESSION_TRANSACTED); context.setClientID("12345678"); Topic topic = context.createTopic("testTopic"); JMSConsumer consumer = context.createDurableConsumer(topic, "test"); AtomicInteger index = new AtomicInteger(); consumer.setMessageListener(message -> { try { String msg = message.getBody(String.class); System.out.println("接受到的訊息:" + msg); index.getAndIncrement(); //每5條提交一次 if (index.get() % 5 == 0) { context.commit(); } } catch (JMSException e) { e.printStackTrace(); } }); System.in.read(); }
2.3.5、從 Topic 中消費訊息(共享訂閱)
public static void recevieFromTopicShare() throws Exception { //模擬三個消費者 for (int i = 0; i < 3; i++) { new Thread(() -> { ActiveMQConnectionFactory activeMQConnectionFactory = new ActiveMQConnectionFactory(brokerURL); JMSContext context = activeMQConnectionFactory.createContext(JMSContext.AUTO_ACKNOWLEDGE); Topic topic = context.createTopic("testTopic"); JMSConsumer consumer = context.createSharedConsumer(topic, "testShare"); consumer.setMessageListener(message -> { try { String msg = message.getBody(String.class); System.out.println(Thread.currentThread() + "-接受到的訊息:" + msg); } catch (JMSException e) { e.printStackTrace(); } }); }).start(); } System.in.read(); }
2.3.6、從 Topic 中消費訊息(共享持久訂閱+事務)
public static void recevieFromTopicShareDurable() throws Exception { //模擬三個消費者 for (int i = 0; i < 3; i++) { new Thread(() -> { ActiveMQConnectionFactory activeMQConnectionFactory = new ActiveMQConnectionFactory(brokerURL); JMSContext context = activeMQConnectionFactory.createContext(JMSContext.SESSION_TRANSACTED); Topic topic = context.createTopic("testTopic"); JMSConsumer consumer = context.createSharedDurableConsumer(topic, "testShare2"); consumer.setMessageListener(message -> { try { String msg = message.getBody(String.class); System.out.println(Thread.currentThread() + "-接受到的訊息:" + msg); //處理完一條就提交 context.commit(); } catch (JMSException e) { e.printStackTrace(); } }); }).start(); } System.in.read(); }
完整代碼:
package com.abc.demo.general.activemq; import org.apache.activemq.artemis.jms.client.ActiveMQConnectionFactory; import javax.jms.*; import java.util.concurrent.atomic.AtomicInteger; public class ConsumerJms20 { private static String brokerURL = "tcp://10.40.96.11:61616"; public static void main(String[] args) throws Exception { recevieFromQueue(); // recevieFromQueueTransction(); // recevieFromTopic(); // recevieFromTopicDurable(); // recevieFromTopicShare(); // recevieFromTopicShareDurable(); } public static void recevieFromQueue() throws Exception { ActiveMQConnectionFactory activeMQConnectionFactory = new ActiveMQConnectionFactory(brokerURL); JMSContext context = activeMQConnectionFactory.createContext(JMSContext.AUTO_ACKNOWLEDGE); Destination destination = context.createQueue("testQueue"); JMSConsumer consumer = context.createConsumer(destination); consumer.setMessageListener(message -> { try { String msg = message.getBody(String.class); System.out.println("接受到的訊息:" + msg); } catch (JMSException e) { e.printStackTrace(); } }); //JMS2.0設定MessageListener是不阻塞執行緒的,通過該方法阻塞執行緒 System.in.read(); } public static void recevieFromQueueTransction() throws Exception { ActiveMQConnectionFactory activeMQConnectionFactory = new ActiveMQConnectionFactory(brokerURL); JMSContext context = activeMQConnectionFactory.createContext(JMSContext.SESSION_TRANSACTED); Destination destination = context.createQueue("testQueue"); JMSConsumer consumer = context.createConsumer(destination); AtomicInteger index = new AtomicInteger(); try { consumer.setMessageListener(message -> { try { String msg = message.getBody(String.class); System.out.println("接受到的訊息:" + msg); index.getAndIncrement(); //每10條提交一次 if (index.get() % 10 == 0) { context.commit(); } } catch (JMSException e) { e.printStackTrace(); } }); } catch (Exception e) { context.rollback(); e.printStackTrace(); } System.in.read(); } public static void recevieFromTopic() throws Exception { ActiveMQConnectionFactory activeMQConnectionFactory = new ActiveMQConnectionFactory(brokerURL); JMSContext context = activeMQConnectionFactory.createContext(JMSContext.AUTO_ACKNOWLEDGE); Topic topic = context.createTopic("testTopic"); JMSConsumer consumer = context.createConsumer(topic); consumer.setMessageListener(message -> { try { String msg = message.getBody(String.class); System.out.println("接受到的訊息:" + msg); } catch (JMSException e) { e.printStackTrace(); } }); System.in.read(); } /** * 持久訂閱+事務 * @throws Exception */ public static void recevieFromTopicDurable() throws Exception { ActiveMQConnectionFactory activeMQConnectionFactory = new ActiveMQConnectionFactory(brokerURL); JMSContext context = activeMQConnectionFactory.createContext(JMSContext.SESSION_TRANSACTED); context.setClientID("12345678"); Topic topic = context.createTopic("testTopic"); JMSConsumer consumer = context.createDurableConsumer(topic, "test"); AtomicInteger index = new AtomicInteger(); consumer.setMessageListener(message -> { try { String msg = message.getBody(String.class); System.out.println("接受到的訊息:" + msg); index.getAndIncrement(); //每5條提交一次 if (index.get() % 5 == 0) { context.commit(); } } catch (JMSException e) { e.printStackTrace(); } }); System.in.read(); } /** * 共享訂閱 * @throws Exception */ public static void recevieFromTopicShare() throws Exception { //模擬三個消費者 for (int i = 0; i < 3; i++) { new Thread(() -> { ActiveMQConnectionFactory activeMQConnectionFactory = new ActiveMQConnectionFactory(brokerURL); JMSContext context = activeMQConnectionFactory.createContext(JMSContext.AUTO_ACKNOWLEDGE); Topic topic = context.createTopic("testTopic"); JMSConsumer consumer = context.createSharedConsumer(topic, "testShare"); consumer.setMessageListener(message -> { try { String msg = message.getBody(String.class); System.out.println(Thread.currentThread() + "-接受到的訊息:" + msg); } catch (JMSException e) { e.printStackTrace(); } }); }).start(); } System.in.read(); } /** * 共享持久訂閱+事務 * @throws Exception */ public static void recevieFromTopicShareDurable() throws Exception { //模擬三個消費者 for (int i = 0; i < 3; i++) { new Thread(() -> { ActiveMQConnectionFactory activeMQConnectionFactory = new ActiveMQConnectionFactory(brokerURL); JMSContext context = activeMQConnectionFactory.createContext(JMSContext.SESSION_TRANSACTED); Topic topic = context.createTopic("testTopic"); JMSConsumer consumer = context.createSharedDurableConsumer(topic, "testShare2"); consumer.setMessageListener(message -> { try { String msg = message.getBody(String.class); System.out.println(Thread.currentThread() + "-接受到的訊息:" + msg); //處理完一條就提交 context.commit(); } catch (JMSException e) { e.printStackTrace(); } }); }).start(); } System.in.read(); } }View Code
轉載請註明出處,本文鏈接:https://www.uj5u.com/houduan/297703.html
標籤:Java
