一: 安裝rocketmq與啟動
虛擬機環境分配4G記憶體,linux centos7
下載連接 http://rocketmq.apache.org/dowloading/releases/
解壓后進入bin目錄,修改記憶體配置,否則啟動失敗(默認配置記憶體8G,記憶體不夠會啟動失敗)
# /home/admin/rocketmqall4.7.0/bin
vi runbroker.sh
# 設定小些
# JAVA_OPT="${JAVA_OPT} -server -Xms4g -Xmx4g -Xmn4g"
# 關閉防火墻或者自行開放埠,避免網路不通
systemctl disable firewalld
# 后臺啟動 nameserver 默認埠9876
nohup sh mqnamesrv &
# 后臺啟動 mqbroker 默認埠 10911
nohup sh mqbroker -n localhost:9876 autoCreateTopicEnable=true &
#踩坑記錄
#客戶端生產者使用時問題
#報錯說 no router info of this topic
#則通過手動創建 topic, 注意 引數格式 ip:port
sh mqadmin updateTopic -b localhost:10911 -t DemoTopic -n localhost:9876
#手動創建topic時 報錯 簽名演算法問題(擴展包沒找到)
#rocketMQ:unable to calculate a request signature. error=Algorithm HmacSHA1 not available
cd ~/rocketmqall4.7.0/bin
vi tools.sh
#在 ${JAVA_HOME}/jre/lib/ext 后加上ext檔案夾的絕對路徑,
#如JAVA_OPT="${JAVA_OPT} -Djava.ext.dirs=${BASE_DIR}/lib:${JAVA_HOME}/jre/lib/ext:/usr/java/jdk1.8.0_65/jre/lib/ext"
#再次創建 topic.
#這樣就可以基本使用了
#rocketmq web控制臺擴展在文末描述
二: 運行樣例測驗下
引入客戶端依賴包
<dependency>
<groupId>org.apache.rocketmq</groupId>
<artifactId>rocketmq-client</artifactId>
<version>4.7.0</version>
</dependency>
1、Producer端發送同步訊息
這種可靠性同步地發送方式使用的比較廣泛,比如:重要的訊息通知,短信通知,
public class MySyncProducer { public static void main(String[] args) throws Exception { // 實體化訊息生產者Producer DefaultMQProducer producer = new DefaultMQProducer("my_SyncProducer"); // 設定NameServer的地址 producer.setNamesrvAddr("192.168.1.114:9876"); // 啟動Producer實體 producer.start(); for (int i = 0; i < 10000; i++) { // 創建訊息,并指定Topic,Tag和訊息體 Message msg = new Message("my_TopicTest" /* Topic */, "TagA" /* Tag */, ("Hello RocketMQ " + i).getBytes(RemotingHelper.DEFAULT_CHARSET) /* Message body */ ); // 發送訊息到一個Broker SendResult sendResult = producer.send(msg); // 通過sendResult回傳訊息是否成功送達 System.out.printf("%s%n", sendResult); } // 如果不再發送訊息,關閉Producer實體, Thread.sleep(500000); producer.shutdown(); } }
觀察產生的訊息資料 跑了2次產生2w條訊息
2w條分布在4個訊息佇列中

消費者消費
public class Consumer { public static void main(String[] args) throws InterruptedException, MQClientException { // 實體化消費者 DefaultMQPushConsumer consumer = new DefaultMQPushConsumer("my_SyncProducer"); // 設定NameServer的地址 consumer.setNamesrvAddr("192.168.1.114:9876"); // 訂閱一個或者多個Topic,以及Tag來過濾需要消費的訊息 consumer.subscribe("my_TopicTest", "*"); // 注冊回呼實作類來處理從broker拉取回來的訊息 consumer.registerMessageListener(new MessageListenerConcurrently() { @Override public ConsumeConcurrentlyStatus consumeMessage(List<MessageExt> msgs, ConsumeConcurrentlyContext context) { System.out.printf("%s Receive New Messages: %s %n", Thread.currentThread().getName(), msgs); // 標記該訊息已經被成功消費 return ConsumeConcurrentlyStatus.CONSUME_SUCCESS; } }); // 啟動消費者實體 consumer.start(); System.out.printf("Consumer Started.%n"); } }
消費詳情

2、Producer端發送異步訊息
異步訊息通常用在對回應時間敏感的業務場景,即發送端不能容忍長時間地等待Broker的回應
public class AsyncProducer { public static void main(String[] args) throws Exception { // 實體化訊息生產者Producer DefaultMQProducer producer = new DefaultMQProducer("my_AsyncProducer"); // 設定NameServer的地址 producer.setNamesrvAddr("192.168.1.114:9876"); // 啟動Producer實體 producer.start(); producer.setRetryTimesWhenSendAsyncFailed(0); int messageCount = 1000; // 根據訊息數量實體化倒計時計算器 final CountDownLatch2 countDownLatch = new CountDownLatch2(messageCount); for (int i = 0; i < messageCount; i++) { final int index = i; // 創建訊息,并指定Topic,Tag和訊息體 Message msg = new Message("my_asyncTopic", "TagA", "AUTO_CREATE_TOPIC_KEY", "Hello world2".getBytes(RemotingHelper.DEFAULT_CHARSET)); // SendCallback接收異步回傳結果的回呼 producer.send(msg, new SendCallback() { @Override public void onSuccess(SendResult sendResult) { System.out.printf("%-10d OK %s %n", index, sendResult.getMsgId()); } @Override public void onException(Throwable e) { System.out.printf("%-10d Exception %s %n", index, e); e.printStackTrace(); } }); } // 等待5s countDownLatch.await(5, TimeUnit.SECONDS); // 如果不再發送訊息,關閉Producer實體, producer.shutdown(); } }
3、Producer端單向發送訊息
這種方式主要用在不特別關心發送結果的場景,例如日志發送,
public class OnewayProducer { public static void main(String[] args) throws Exception{ // 實體化訊息生產者Producer DefaultMQProducer producer = new DefaultMQProducer("my_OnewayProducer"); // 設定NameServer的地址 producer.setNamesrvAddr("192.168.1.114:9876"); // 啟動Producer實體 producer.start(); for (int i = 0; i < 10000; i++) { // 創建訊息,并指定Topic,Tag和訊息體 Message msg = new Message("my_OnewayTopic" /* Topic */, "TagA" /* Tag */, ("Hello RocketMQ " + i).getBytes(RemotingHelper.DEFAULT_CHARSET) /* Message body */ ); // 發送單向訊息,沒有任何回傳結果 producer.sendOneway(msg); } // 如果不再發送訊息,關閉Producer實體, producer.shutdown(); } }
4、 消費訊息
消費指定生產組producerGroup及指定topic的訊息
public class Consumer { public static void main(String[] args) throws InterruptedException, MQClientException { // 實體化消費者 DefaultMQPushConsumer consumer = new DefaultMQPushConsumer("my_SyncProducer"); // 設定NameServer的地址 consumer.setNamesrvAddr("192.168.1.114:9876"); // 訂閱一個或者多個Topic,以及Tag來過濾需要消費的訊息 consumer.subscribe("my_TopicTest", "*"); // 注冊回呼實作類來處理從broker拉取回來的訊息 consumer.registerMessageListener(new MessageListenerConcurrently() { @Override public ConsumeConcurrentlyStatus consumeMessage(List<MessageExt> msgs, ConsumeConcurrentlyContext context) { System.out.printf("%s Receive New Messages: %s %n", Thread.currentThread().getName(), msgs); // 標記該訊息已經被成功消費 return ConsumeConcurrentlyStatus.CONSUME_SUCCESS; } }); // 啟動消費者實體 consumer.start(); System.out.printf("Consumer Started.%n"); } }
5、 順序訊息樣例
訊息生產到一個佇列那么FIFO,全域中都是有序的消費.如果是分片到多個佇列,每個佇列中都是有序的,磁區有序.
順序訊息的產生
public class OrderProducer { public static void main(String[] args) throws Exception { DefaultMQProducer producer = new DefaultMQProducer("my_OrderProducer"); producer.setNamesrvAddr("192.168.1.114:9876"); producer.start(); String[] tags = new String[]{"TagA", "TagC", "TagD"}; // 訂單串列 List<OrderStep> orderList = new OrderProducer().buildOrders(); Date date = new Date(); SimpleDateFormat sdf = new SimpleDateFormat("yyyy-MM-dd HH:mm:ss"); String dateStr = sdf.format(date); for (int i = 0; i < 10; i++) { // 加個時間前綴 String body = dateStr + " Hello RocketMQ " + orderList.get(i); Message msg = new Message("my_OrderProducer_Topic", tags[i % tags.length], "KEY" + i, body.getBytes()); SendResult sendResult = producer.send(msg, new MessageQueueSelector() { @Override public MessageQueue select(List<MessageQueue> mqs, Message msg, Object arg) { Long id = (Long) arg; //根據訂單id選擇發送queue long index = id % mqs.size(); return mqs.get((int) index); } }, orderList.get(i).getOrderId());//訂單id System.out.println(String.format("SendResult status:%s, queueId:%d, body:%s", sendResult.getSendStatus(), sendResult.getMessageQueue().getQueueId(), body)); } producer.shutdown(); } /** * 訂單的步驟 */ private static class OrderStep { private long orderId; private String desc; public long getOrderId() { return orderId; } public void setOrderId(long orderId) { this.orderId = orderId; } public String getDesc() { return desc; } public void setDesc(String desc) { this.desc = desc; } @Override public String toString() { return "OrderStep{" + "orderId=" + orderId + ", desc='" + desc + '\'' + '}'; } } /** * 生成模擬訂單資料 */ private List<OrderStep> buildOrders() { List<OrderStep> orderList = new ArrayList<OrderStep>(); OrderStep orderDemo = new OrderStep(); orderDemo.setOrderId(15103111039L); orderDemo.setDesc("創建"); orderList.add(orderDemo); orderDemo = new OrderStep(); orderDemo.setOrderId(15103111065L); orderDemo.setDesc("創建"); orderList.add(orderDemo); orderDemo = new OrderStep(); orderDemo.setOrderId(15103111039L); orderDemo.setDesc("付款"); orderList.add(orderDemo); orderDemo = new OrderStep(); orderDemo.setOrderId(15103117235L); orderDemo.setDesc("創建"); orderList.add(orderDemo); orderDemo = new OrderStep(); orderDemo.setOrderId(15103111065L); orderDemo.setDesc("付款"); orderList.add(orderDemo); orderDemo = new OrderStep(); orderDemo.setOrderId(15103117235L); orderDemo.setDesc("付款"); orderList.add(orderDemo); orderDemo = new OrderStep(); orderDemo.setOrderId(15103111065L); orderDemo.setDesc("完成"); orderList.add(orderDemo); orderDemo = new OrderStep(); orderDemo.setOrderId(15103111039L); orderDemo.setDesc("推送"); orderList.add(orderDemo); orderDemo = new OrderStep(); orderDemo.setOrderId(15103117235L); orderDemo.setDesc("完成"); orderList.add(orderDemo); orderDemo = new OrderStep(); orderDemo.setOrderId(15103111039L); orderDemo.setDesc("完成"); orderList.add(orderDemo); return orderList; } }OrderProducer

順序消費訊息
public class ConsumerInOrder { public static void main(String[] args) throws Exception { DefaultMQPushConsumer consumer = new DefaultMQPushConsumer("my_OrderProducer"); consumer.setNamesrvAddr("192.168.1.114:9876"); /** * 設定Consumer第一次啟動是從佇列頭部開始消費還是佇列尾部開始消費<br> * 如果非第一次啟動,那么按照上次消費的位置繼續消費 */ consumer.setConsumeFromWhere(ConsumeFromWhere.CONSUME_FROM_FIRST_OFFSET); consumer.subscribe("my_OrderProducer_Topic", "TagA || TagC || TagD"); consumer.registerMessageListener(new MessageListenerOrderly() { Random random = new Random(); @Override public ConsumeOrderlyStatus consumeMessage(List<MessageExt> msgs, ConsumeOrderlyContext context) { context.setAutoCommit(true); int anInt = random.nextInt(); for (MessageExt msg : msgs) { // 可以看到每個queue有唯一的consume執行緒來消費, 訂單對每個queue(磁區)有序 System.out.println(anInt+" consumeThread=" + Thread.currentThread().getName() + "queueId=" + msg.getQueueId() + ", content:" + new String(msg.getBody())); } try { //模擬業務邏輯處理中... TimeUnit.SECONDS.sleep(random.nextInt(10)); } catch (Exception e) { e.printStackTrace(); } return ConsumeOrderlyStatus.SUCCESS; } }); consumer.start(); System.out.println("Consumer Started."); } }ConsumerInOrder
磁區順序消費

6、延時訊息樣例
先啟動消費者等待延時訊息
public class ScheduledMessageConsumer { public static void main(String[] args) throws Exception { // 實體化消費者 DefaultMQPushConsumer consumer = new DefaultMQPushConsumer("my_ScheduledMessageConsumer"); consumer.setNamesrvAddr("192.168.1.114:9876"); // 訂閱Topics consumer.subscribe("my_Scheduled_Topic", "*"); // 注冊訊息監聽者 consumer.registerMessageListener(new MessageListenerConcurrently() { @Override public ConsumeConcurrentlyStatus consumeMessage(List<MessageExt> messages, ConsumeConcurrentlyContext context) { for (MessageExt message : messages) { // Print approximate delay time period System.out.println("Receive message[msgId=" + message.getMsgId() + "] " + (System.currentTimeMillis() - message.getStoreTimestamp()) + "ms later"); } return ConsumeConcurrentlyStatus.CONSUME_SUCCESS; } }); // 啟動消費者 consumer.start(); } }ScheduledMessageConsumer
發送延時訊息
public class ScheduledMessageProducer { public static void main(String[] args) throws Exception { // 實體化一個生產者來產生延時訊息 DefaultMQProducer producer = new DefaultMQProducer("my_ScheduledMessage"); producer.setNamesrvAddr("192.168.1.114:9876"); // 啟動生產者 producer.start(); int totalMessagesToSend = 100; for (int i = 0; i < totalMessagesToSend; i++) { Message message = new Message("my_Scheduled_Topic", ("Hello scheduled message " + i).getBytes()); // 設定延時等級3,這個訊息將在10s之后發送(現在只支持固定的幾個時間,詳看delayTimeLevel) message.setDelayTimeLevel(3); // 發送訊息 producer.send(message); } // 關閉生產者 producer.shutdown(); } }ScheduledMessageProducer
延遲時間結束后訊息才放入佇列被消費者消費,消費時間比發送時間晚,
使用場景:如電商里,提交了一個訂單就可以發送一個延時訊息,1h后去檢查這個訂單的狀態,如果還是未付款就取消訂單釋放庫存,(redis中有個key失效時間,失效事件類似)
使用限制: 時間不是任意的,private String messageDelayLevel = "1s 5s 10s 30s 1m 2m 3m 4m 5m 6m 7m 8m 9m 10m 20m 30m 1h 2h";
7、訊息事務樣例
事務訊息共有三種狀態,提交狀態、回滾狀態、中間狀態:
- TransactionStatus.CommitTransaction: 提交事務,它允許消費者消費此訊息,
- TransactionStatus.RollbackTransaction: 回滾事務,它代表該訊息將被洗掉,不允許被消費,
- TransactionStatus.Unknown: 中間狀態,它代表需要檢查訊息佇列來確定狀態,
創建事務性生產者
使用 TransactionMQProducer類創建生產者,并指定唯一的 ProducerGroup,就可以設定自定義執行緒池來處理這些檢查請求,執行本地事務后、需要根據執行結果對訊息佇列進行回復,回傳的事務狀態是以上三種,
public class TransactionProducer { public static void main(String[] args) throws MQClientException, InterruptedException { TransactionListener transactionListener = new TransactionListenerImpl(); TransactionMQProducer producer = new TransactionMQProducer("my_TransactionMQProducer"); ExecutorService executorService = new ThreadPoolExecutor(2, 5, 100, TimeUnit.SECONDS, new ArrayBlockingQueue<Runnable>(2000), new ThreadFactory() { @Override public Thread newThread(Runnable r) { Thread thread = new Thread(r); thread.setName("client-transaction-msg-check-thread"); return thread; } }); // 設定NameServer的地址 producer.setNamesrvAddr("192.168.1.114:9876"); producer.setExecutorService(executorService); producer.setTransactionListener(transactionListener); producer.start(); String[] tags = new String[] {"TagA", "TagB", "TagC", "TagD", "TagE"}; for (int i = 0; i < 10; i++) { try { Message msg = new Message("my_transaction_msg_topic", tags[i % tags.length], "KEY" + i, ("Hello RocketMQ " + i).getBytes(RemotingHelper.DEFAULT_CHARSET)); SendResult sendResult = producer.sendMessageInTransaction(msg, null); System.out.printf("%s%n", sendResult); Thread.sleep(10); } catch (MQClientException | UnsupportedEncodingException e) { e.printStackTrace(); } } for (int i = 0; i < 100000; i++) { Thread.sleep(1000); } producer.shutdown(); }TransactionProducer
事務性訊息的監聽
static class TransactionListenerImpl implements TransactionListener{ private AtomicInteger transactionIndex = new AtomicInteger(0); private ConcurrentHashMap<String, Integer> localTrans = new ConcurrentHashMap<>(); @Override public LocalTransactionState executeLocalTransaction(Message msg, Object arg) { int value =https://www.cnblogs.com/wangrq/p/ transactionIndex.getAndIncrement(); int status = value % 3; localTrans.put(msg.getTransactionId(), status); return LocalTransactionState.UNKNOW; } @Override public LocalTransactionState checkLocalTransaction(MessageExt msg) { Integer status = localTrans.get(msg.getTransactionId()); if (null != status) { switch (status) { case 0: return LocalTransactionState.UNKNOW; case 1: return LocalTransactionState.COMMIT_MESSAGE; case 2: return LocalTransactionState.ROLLBACK_MESSAGE; } } return LocalTransactionState.COMMIT_MESSAGE; } }TransactionListenerImpl
事務訊息使用上的限制
- 事務訊息不支持延時訊息和批量訊息,
- 為了避免單個訊息被檢查太多次而導致半佇列訊息累積,我們默認將單個訊息的檢查次數限制為 15 次,但是用戶可以通過 Broker 組態檔的
transactionCheckMax引數來修改此限制,如果已經檢查某條訊息超過 N 次的話( N =transactionCheckMax) 則 Broker 將丟棄此訊息,并在默認情況下同時列印錯誤日志,用戶可以通過重寫AbstractTransactionCheckListener類來修改這個行為, - 事務訊息將在 Broker 組態檔中的引數 transactionMsgTimeout 這樣的特定時間長度之后被檢查,當發送事務訊息時,用戶還可以通過設定用戶屬性 CHECK_IMMUNITY_TIME_IN_SECONDS 來改變這個限制,該引數優先于
transactionMsgTimeout引數, - 事務性訊息可能不止一次被檢查或消費,
- 提交給用戶的目標主題訊息可能會失敗,目前這依日志的記錄而定,它的高可用性通過 RocketMQ 本身的高可用性機制來保證,如果希望確保事務訊息不丟失、并且事務完整性得到保證,建議使用同步的雙重寫入機制,
- 事務訊息的生產者 ID 不能與其他型別訊息的生產者 ID 共享,與其他型別的訊息不同,事務訊息允許反向查詢、MQ服務器能通過它們的生產者 ID 查詢到消費者
8、批量訊息樣例
9、過濾訊息樣例
0、OpenMessaging樣例
官方鏈接 https://github.com/apache/rocketmq/blob/master/docs/cn/RocketMQ_Example.md
三:RocketMQ-Console-Ng擴展控制臺
下載 : https://github.com/apache/rocketmq-externals/tree/master/rocketmq-console
修改組態檔 namesrvAddr地址后打包運行
mvn clean package -Dmaven.test.skip=true java -jar target/rocketmq-console-ng-1.0.1.jar
訪問: http://localhost:8080/

轉載請註明出處,本文鏈接:https://www.uj5u.com/houduan/162672.html
標籤:Java
