0. 啟動Name Server與 Broker
1. 引入依賴
添加 RocketMQ 客戶端訪問支持,具體版本和安裝的 RocketMQ 版本一致即可,
<dependency>
<groupId>org.apache.rocketmq</groupId>
<artifactId>rocketmq-client</artifactId>
<version>4.7.1</version>
</dependency>
2. 訊息生產者
public class Producer {
public static void main(String[] args) throws Exception {
//創建一個訊息生產者,并設定一個訊息生產者組
DefaultMQProducer producer = new DefaultMQProducer("rocket_test_consumer_group");
//指定 NameServer 地址
producer.setNamesrvAddr("127.0.0.1:9876");
//指定最大超時時間,用默認的會報錯
producer.setSendMsgTimeout(60000);
//初始化 Producer,整個應用生命周期內只需要初始化一次
producer.start();
for (int i = 0; i < 100; i++) {
//創建一條訊息物件,指定其主題、標簽和訊息內容
Message msg = new Message(
/* 訊息主題名 */
"topicTest",
/* 訊息標簽 */
"TagA",
/* 訊息內容 */
("Hello Java demo RocketMQ " + i).getBytes(RemotingHelper.DEFAULT_CHARSET)
);
//發送訊息并回傳結果
SendResult sendResult = producer.send(msg);
System.out.printf("%s%n", sendResult);
}
// 一旦生產者實體不再被使用則將其關閉,包括清理資源,關閉網路連接等
producer.shutdown();
}
}
使用DefaultMQProducer類來創建了一個訊息生產者,該類建構式入參 producerGroup 是訊息生產者組的名字,無論生產者還是消費者都必須給出 GroupName ,并保證該名字的唯一性,
接下來指定 NameServer 地址和呼叫 start 方法初始化,在整個應用生命周期內只需要呼叫一次 start 方法,
初始化完成后,呼叫 send 方法發送訊息,示例中只是簡單的構造了100條同樣的訊息發送,其實一個 Producer 物件可以發送多個主題多個標簽的訊息,訊息物件的標簽可以為空,send 方法是同步呼叫,只要不拋例外就標識成功,
最后應用退出時呼叫 shutdown 方法清理資源、關閉網路連接,從服務器上注銷自己,通常建議應用在 JBOSS、Tomcat 等容器的退出鉤子里呼叫 shutdown 方法,
3. 訊息消費者
public class Consumer {
public static void main(String[] args) throws Exception {
//創建一個訊息消費者,并設定一個訊息消費者組
DefaultMQPushConsumer consumer = new DefaultMQPushConsumer("rocket_test_consumer_group");
//指定 NameServer 地址
consumer.setNamesrvAddr("127.0.0.1:9876");
//設定 Consumer 第一次啟動時從佇列頭部開始消費還是佇列尾部開始消費
consumer.setConsumeFromWhere(ConsumeFromWhere.CONSUME_FROM_FIRST_OFFSET);
//訂閱指定 Topic 下的所有訊息
consumer.subscribe("topicTest", "*");
//注冊訊息監聽器
consumer.registerMessageListener(new MessageListenerConcurrently() {
public ConsumeConcurrentlyStatus consumeMessage(List<MessageExt> list, ConsumeConcurrentlyContext context) {
//默認 list 里只有一條訊息,可以通過設定引數來批量接收訊息
if (list != null) {
for (MessageExt ext : list) {
try {
System.out.println(new Date() + new String(ext.getBody(), RemotingHelper.DEFAULT_CHARSET));
} catch (UnsupportedEncodingException e) {
e.printStackTrace();
}
}
}
return ConsumeConcurrentlyStatus.CONSUME_SUCCESS;
}
});
// 消費者物件在使用之前必須要呼叫 start 初始化
consumer.start();
System.out.println("訊息消費者已啟動");
}
}
用 DefaultMQPushConsumer 類來創建一個訊息消費者,該類建構式入參 consumerGroup 是訊息消費者組的名字,需要保證該名字的唯一性,
? 接下來指定 NameServer 地址和設定消費者應用程式第一次啟動時從佇列頭部開始消費還是佇列尾部開始消費,
接著呼叫 subscribe 方法給消費者物件訂閱指定主題下的訊息,該方法第一個引數是主題名,第二個引數是標簽名,示例表示訂閱了主題名 topic_example_java下所有標簽的訊息,
最主要的是注冊訊息監聽器才能消費訊息,示例中用的是Consumer Push的方式,即設定監聽器回呼的方式消費訊息,默認監聽回呼方法中 List<MessageExt>里只有一條訊息,可以通過設定引數來批量接收訊息,
? 最后呼叫 start 方法初始化,在整個應用生命周期內只需要呼叫一次 start 方法,
4. 測驗
先來運行生產者

再運行消費者

轉載請註明出處,本文鏈接:https://www.uj5u.com/qita/423266.html
標籤:其他
上一篇:RocketMQ訊息保障
