作為程式員,要時刻記得去學習,
這一講主要是結合rocketmq的原生api,進行學習,
- 首先去官網下載rocketmq原始碼,參照原始碼,建立生產者,消費者,

生產者生產模式:
- producer同步發送訊息
package org.apache.rocketmq.example.quickstart;
import org.apache.rocketmq.client.exception.MQClientException;
import org.apache.rocketmq.client.producer.DefaultMQProducer;
import org.apache.rocketmq.client.producer.SendResult;
import org.apache.rocketmq.common.message.Message;
import org.apache.rocketmq.remoting.common.RemotingHelper;
public class Producer {
public static void main(String[] args) throws MQClientException, InterruptedException {
DefaultMQProducer producer = new DefaultMQProducer("please_rename_unique_group_name");
producer.setNamesrvAddr("192.168.0.11:9876;192.168.0.13:9876");
producer.start();
for (int i = 0; i < 20; i++) {
try {
Message msg = new Message("TopicTest",
"TagA",
("Hello RocketMQ " + i).getBytes(RemotingHelper.DEFAULT_CHARSET)
);
SendResult sendResult = producer.send(msg);
System.out.printf("%s%n", sendResult);
} catch (Exception e) {
e.printStackTrace();
Thread.sleep(1000);
}
}
producer.shutdown();
}
}
- producer異步發送
package org.apache.rocketmq.example.simple;
import java.io.UnsupportedEncodingException;
import java.util.concurrent.CountDownLatch;
import java.util.concurrent.TimeUnit;
import org.apache.rocketmq.client.exception.MQClientException;
import org.apache.rocketmq.client.producer.DefaultMQProducer;
import org.apache.rocketmq.client.producer.SendCallback;
import org.apache.rocketmq.client.producer.SendResult;
import org.apache.rocketmq.common.message.Message;
import org.apache.rocketmq.remoting.common.RemotingHelper;
public class AsyncProducer {
public static void main(
String[] args) throws MQClientException, InterruptedException, UnsupportedEncodingException {
DefaultMQProducer producer = new DefaultMQProducer("Jodie_Daily_test");
producer.setNamesrvAddr("192.168.0.11:9876;192.168.0.13:9876");
producer.start();
producer.setRetryTimesWhenSendAsyncFailed(0);
int messageCount = 100;
final CountDownLatch countDownLatch = new CountDownLatch(messageCount);
for (int i = 0; i < messageCount; i++) {
try {
final int index = i;
Message msg = new Message("Jodie_topic_1023",
"TagA",
"OrderID188",
"Hello world".getBytes(RemotingHelper.DEFAULT_CHARSET));
producer.send(msg, new SendCallback() {
@Override
public void onSuccess(SendResult sendResult) {
countDownLatch.countDown();
System.out.printf("%-10d OK %s %n", index, sendResult.getMsgId());
}
@Override
public void onException(Throwable e) {
countDownLatch.countDown();
System.out.printf("%-10d Exception %s %n", index, e);
e.printStackTrace();
}
});
} catch (Exception e) {
e.printStackTrace();
}
}
System.out.println("等待他們執行完,我在執行~~~~~~");
countDownLatch.await(5, TimeUnit.SECONDS);
producer.shutdown();
}
}
- prdoducer單向發送
package org.apache.rocketmq.example.quickstart;
import org.apache.rocketmq.client.exception.MQClientException;
import org.apache.rocketmq.client.producer.DefaultMQProducer;
import org.apache.rocketmq.common.message.Message;
import org.apache.rocketmq.remoting.common.RemotingHelper;
public class Producer {
public static void main(String[] args) throws MQClientException, InterruptedException {
DefaultMQProducer producer = new DefaultMQProducer("please_rename_unique_group_name");
producer.setNamesrvAddr("192.168.0.11:9876;192.168.0.13:9876");
producer.start();
for (int i = 0; i < 20; i++) {
try {
Message msg = new Message("TopicTest",
"TagA",
("Hello RocketMQ " + i).getBytes(RemotingHelper.DEFAULT_CHARSET)
);
producer.sendOneway(msg);
} catch (Exception e) {
e.printStackTrace();
Thread.sleep(1000);
}
}
producer.shutdown();
}
}
現在來講講三種方式有什么區別:
- 同步發送send(msg)方法有回傳值,但是需要關注介面呼叫的回傳值,也可以認為如果呼叫程序的某一刻是阻塞的,

異步發送send(msg,sendResultBack),這種做法主要是不需要每次主動等待呼叫的結果,可以去處理其它邏輯,
- demo里面用了CountDownLatch:主要是讓一個執行緒等待其他執行緒執行完后在執行,
- 主要是為了模擬異步發送的效果,maIn執行緒先列印這個輸出,但是發送的回呼是在后面完成的,證明了,main沒有故意等待訊息發送的成功與否,而執行緒去做了其他的事情,
發送失敗成功與否都會回呼,然后呼叫countDownLatch.countDown()方法,messageCount 減一
直到messageCount =0,此時才會繼續執行producer.shutDown();
int messageCount = 100;
final CountDownLatch countDownLatch = new CountDownLatch(messageCount);

- 單向發送表示:只管發,不關心結果
producer.sendOneway(msg);
- 消費者消費模式:
消費者模式一般有pull和push:
首先看push模式
官方提供的消費push消費demo:
package org.apache.rocketmq.example.sample;
import org.apache.rocketmq.client.consumer.DefaultMQPushConsumer;
import org.apache.rocketmq.client.consumer.listener.ConsumeConcurrentlyContext;
import org.apache.rocketmq.client.consumer.listener.ConsumeConcurrentlyStatus;
import org.apache.rocketmq.client.consumer.listener.MessageListenerConcurrently;
import org.apache.rocketmq.client.exception.MQClientException;
import org.apache.rocketmq.common.consumer.ConsumeFromWhere;
import org.apache.rocketmq.common.message.MessageExt;
import java.util.List;
public class PushConsumer {
public static void main(String[] args) throws InterruptedException, MQClientException {
//DefaultMQPushConsumer 官方提供的消費類
DefaultMQPushConsumer consumer = new DefaultMQPushConsumer("CID_JODIE_1");
//設定一些引數
consumer.setNamesrvAddr("192.168.0.11:9876;192.168.0.13:9876");
consumer.subscribe("TopicTest", "*");
consumer.setConsumeFromWhere(ConsumeFromWhere.CONSUME_FROM_LAST_OFFSET);
//wrong time format 2017_0422_221800
consumer.setConsumeTimestamp("20181109221800");
//注入監聽器,訊息推送過來,消費訊息
consumer.registerMessageListener(new MessageListenerConcurrently() {
@Override
public ConsumeConcurrentlyStatus consumeMessage(List<MessageExt> msgs,
ConsumeConcurrentlyContext context) {
System.out.printf("%s Receive New Messages: %s %n", Thread.currentThread().getName(), msgs);
return ConsumeConcurrentlyStatus.CONSUME_SUCCESS;
}
});
// 啟動消費者
consumer.start();
System.out.printf("Consumer Started.%n");
}
}
再看pull模式:
package org.apache.rocketmq.example.sample;
import org.apache.rocketmq.client.consumer.DefaultMQPullConsumer;
import org.apache.rocketmq.client.consumer.PullResult;
import org.apache.rocketmq.client.exception.MQClientException;
import org.apache.rocketmq.common.message.MessageQueue;
import java.util.HashMap;
import java.util.Map;
import java.util.Set;
public class PullConsumer {
private static final Map<MessageQueue, Long> OFFSE_TABLE = new HashMap<MessageQueue, Long>();
public static void main(String[] args) throws MQClientException {
DefaultMQPullConsumer consumer = new DefaultMQPullConsumer("please_rename_unique_group_name_5");
consumer.setNamesrvAddr("192.168.0.11:9876;192.168.0.13:9876");
consumer.start();
Set<MessageQueue> mqs = consumer.fetchSubscribeMessageQueues("TopicTest");
for (MessageQueue mq : mqs) {
System.out.printf("Consume from the queue: %s%n", mq);
SINGLE_MQ:
while (true) {
try {
PullResult pullResult =
consumer.pullBlockIfNotFound(mq, null, getMessageQueueOffset(mq), 32);
System.out.printf("%s%n", pullResult);
putMessageQueueOffset(mq, pullResult.getNextBeginOffset());
switch (pullResult.getPullStatus()) {
case FOUND:
break;
case NO_MATCHED_MSG:
break;
case NO_NEW_MSG:
break SINGLE_MQ;
case OFFSET_ILLEGAL:
break;
default:
break;
}
} catch (Exception e) {
e.printStackTrace();
}
}
}
consumer.shutdown();
}
private static long getMessageQueueOffset(MessageQueue mq) {
Long offset = OFFSE_TABLE.get(mq);
if (offset != null)
return offset;
return 0;
}
private static void putMessageQueueOffset(MessageQueue mq, long offset) {
OFFSE_TABLE.put(mq, offset);
}
}
轉載請註明出處,本文鏈接:https://www.uj5u.com/houduan/258928.html
標籤:java
上一篇:Java三大特征詳解
下一篇:Arraylist擴容機制理解
