上一講【RocketMQ】訊息的拉取
訊息消費
當RocketMQ進行訊息消費的時候,是通過ConsumeMessageConcurrentlyService的submitConsumeRequest方法,將訊息提交到執行緒池中進行消費,具體的處理邏輯如下:
- 如果本次訊息的個數小于等于批量消費的大小
consumeBatchSize,構建消費請求ConsumeRequest,直接提交到執行緒池中進行消費即可 - 如果本次訊息的個數大于批量消費的大小
consumeBatchSize,說明需要分批進行提交,每次構建consumeBatchSize個訊息提交到執行緒池中進行消費 - 如果出現拒絕提交的例外,呼叫
submitConsumeRequestLater方法延遲進行提交
RocketMQ訊息消費是批量進行的,如果一批訊息的個數小于預先設定的批量消費大小,直接構建消費請求將消費任務提交到執行緒池處理即可,否則需要分批進行提交,
public class ConsumeMessageConcurrentlyService implements ConsumeMessageService {
@Override
public void submitConsumeRequest(
final List<MessageExt> msgs,
final ProcessQueue processQueue,
final MessageQueue messageQueue,
final boolean dispatchToConsume) {
final int consumeBatchSize = this.defaultMQPushConsumer.getConsumeMessageBatchMaxSize();
// 如果訊息的個數小于等于批量消費的大小
if (msgs.size() <= consumeBatchSize) {
// 構建消費請求
ConsumeRequest consumeRequest = new ConsumeRequest(msgs, processQueue, messageQueue);
try {
// 加入到消費執行緒池中
this.consumeExecutor.submit(consumeRequest);
} catch (RejectedExecutionException e) {
this.submitConsumeRequestLater(consumeRequest);
}
} else {
// 遍歷訊息
for (int total = 0; total < msgs.size(); ) {
// 創建訊息串列,大小為consumeBatchSize,用于批量提交使用
List<MessageExt> msgThis = new ArrayList<MessageExt>(consumeBatchSize);
for (int i = 0; i < consumeBatchSize; i++, total++) {
if (total < msgs.size()) {
// 加入到訊息串列中
msgThis.add(msgs.get(total));
} else {
break;
}
}
// 創建ConsumeRequest
ConsumeRequest consumeRequest = new ConsumeRequest(msgThis, processQueue, messageQueue);
try {
// 加入到消費執行緒池中
this.consumeExecutor.submit(consumeRequest);
} catch (RejectedExecutionException e) {
for (; total < msgs.size(); total++) {
msgThis.add(msgs.get(total));
}
// 如果出現拒絕提交例外,延遲進行提交
this.submitConsumeRequestLater(consumeRequest);
}
}
}
}
}
消費任務運行
ConsumeRequest是ConsumeMessageConcurrentlyService的內部類,實作了Runnable介面,在run方法中,對消費任務進行了處理:
-
判斷訊息所屬的處理佇列
processQueue是否處于洗掉狀態,如果已被洗掉,不進行處理 -
重置訊息的重試主題
因為延遲訊息的主題在后續處理的時候被設定為SCHEDULE_TOPIC_XXXX,所以這里需要重置,
-
如果設定了訊息消費鉤子函式,執行
executeHookBefore鉤子函式 -
獲取訊息監聽器,呼叫訊息監聽器的consumeMessage進行訊息消費,并回傳訊息的消費結果狀態,狀態有兩種分別為CONSUME_SUCCESS和RECONSUME_LATER
CONSUME_SUCCESS:表示訊息消費成功,
RECONSUME_LATER:表示消費失敗,稍后延遲重新進行消費,
-
獲取消費的時長,判斷是否超時
-
如果設定了訊息消費鉤子函式,執行
executeHookAfter鉤子函式 -
再次判斷訊息所屬的處理佇列是否處于洗掉狀態,如果不處于洗掉狀態,呼叫
processConsumeResult方法處理消費結果
public class ConsumeMessageConcurrentlyService implements ConsumeMessageService {
class ConsumeRequest implements Runnable {
private final List<MessageExt> msgs;
private final ProcessQueue processQueue; // 處理佇列
private final MessageQueue messageQueue; // 訊息佇列
@Override
public void run() {
// 如果處理佇列已被洗掉
if (this.processQueue.isDropped()) {
log.info("the message queue not be able to consume, because it's dropped. group={} {}", ConsumeMessageConcurrentlyService.this.consumerGroup, this.messageQueue);
return;
}
// 獲取訊息監聽器
MessageListenerConcurrently listener = ConsumeMessageConcurrentlyService.this.messageListener;
ConsumeConcurrentlyContext context = new ConsumeConcurrentlyContext(messageQueue);
ConsumeConcurrentlyStatus status = null;
// 重置訊息重試主題名稱
defaultMQPushConsumerImpl.resetRetryAndNamespace(msgs, defaultMQPushConsumer.getConsumerGroup());
ConsumeMessageContext consumeMessageContext = null;
// 如果設定了鉤子函式
if (ConsumeMessageConcurrentlyService.this.defaultMQPushConsumerImpl.hasHook()) {
// ...
// 執行鉤子函式
ConsumeMessageConcurrentlyService.this.defaultMQPushConsumerImpl.executeHookBefore(consumeMessageContext);
}
long beginTimestamp = System.currentTimeMillis();
boolean hasException = false;
ConsumeReturnType returnType = ConsumeReturnType.SUCCESS;
try {
if (msgs != null && !msgs.isEmpty()) {
for (MessageExt msg : msgs) {
// 設定消費開始時間戳
MessageAccessor.setConsumeStartTimeStamp(msg, String.valueOf(System.currentTimeMillis()));
}
}
// 通過訊息監聽器的consumeMessage進行訊息消費,并回傳消費結果狀態
status = listener.consumeMessage(Collections.unmodifiableList(msgs), context);
} catch (Throwable e) {
log.warn(String.format("consumeMessage exception: %s Group: %s Msgs: %s MQ: %s",
RemotingHelper.exceptionSimpleDesc(e),
ConsumeMessageConcurrentlyService.this.consumerGroup,
msgs,
messageQueue), e);
hasException = true;
}
// 計算消費時長
long consumeRT = System.currentTimeMillis() - beginTimestamp;
if (null == status) {
if (hasException) {
// 出現例外
returnType = ConsumeReturnType.EXCEPTION;
} else {
// 回傳NULL
returnType = ConsumeReturnType.RETURNNULL;
}
} else if (consumeRT >= defaultMQPushConsumer.getConsumeTimeout() * 60 * 1000) { // 判斷超時
returnType = ConsumeReturnType.TIME_OUT; // 回傳型別置為超時
} else if (ConsumeConcurrentlyStatus.RECONSUME_LATER == status) { // 如果延遲消費
returnType = ConsumeReturnType.FAILED; // 回傳類置為失敗
} else if (ConsumeConcurrentlyStatus.CONSUME_SUCCESS == status) { // 如果成功狀態
returnType = ConsumeReturnType.SUCCESS; // 回傳型別為成功
}
// ...
// 如果消費狀態為空
if (null == status) {
log.warn("consumeMessage return null, Group: {} Msgs: {} MQ: {}",
ConsumeMessageConcurrentlyService.this.consumerGroup,
msgs,
messageQueue);
// 狀態置為延遲消費
status = ConsumeConcurrentlyStatus.RECONSUME_LATER;
}
// 如果設定了鉤子函式
if (ConsumeMessageConcurrentlyService.this.defaultMQPushConsumerImpl.hasHook()) {
consumeMessageContext.setStatus(status.toString());
consumeMessageContext.setSuccess(ConsumeConcurrentlyStatus.CONSUME_SUCCESS == status);
// 執行executeHookAfter方法
ConsumeMessageConcurrentlyService.this.defaultMQPushConsumerImpl.executeHookAfter(consumeMessageContext);
}
ConsumeMessageConcurrentlyService.this.getConsumerStatsManager()
.incConsumeRT(ConsumeMessageConcurrentlyService.this.consumerGroup, messageQueue.getTopic(), consumeRT);
if (!processQueue.isDropped()) {
// 處理消費結果
ConsumeMessageConcurrentlyService.this.processConsumeResult(status, context, this);
} else {
log.warn("processQueue is dropped without process consume result. messageQueue={}, msgs={}", messageQueue, msgs);
}
}
}
}
// 重置訊息重試主題
public class DefaultMQPushConsumerImpl implements MQConsumerInner {
public void resetRetryAndNamespace(final List<MessageExt> msgs, String consumerGroup) {
// 獲取消費組的重試主題:%RETRY% + 消費組名稱
final String groupTopic = MixAll.getRetryTopic(consumerGroup);
for (MessageExt msg : msgs) {
// 獲取訊息的重試主題名稱
String retryTopic = msg.getProperty(MessageConst.PROPERTY_RETRY_TOPIC);
// 如果重試主題不為空并且與消費組的重試主題一致
if (retryTopic != null && groupTopic.equals(msg.getTopic())) {
// 設定重試主題
msg.setTopic(retryTopic);
}
if (StringUtils.isNotEmpty(this.defaultMQPushConsumer.getNamespace())) {
msg.setTopic(NamespaceUtil.withoutNamespace(msg.getTopic(), this.defaultMQPushConsumer.getNamespace()));
}
}
}
}
// 消費結果狀態
public enum ConsumeConcurrentlyStatus {
/**
* 消費成功
*/
CONSUME_SUCCESS,
/**
* 消費失敗,延遲進行消費
*/
RECONSUME_LATER;
}
處理消費結果
一、設定ackIndex
ackIndex的值用來判斷失敗訊息的個數,在processConsumeResult方法中根據消費結果狀態進行判斷,對ackIndex的值進行設定,前面可知消費結果狀態有以下兩種:
- CONSUME_SUCCESS:訊息消費成功,此時ackIndex設定為訊息大小 - 1,表示訊息都消費成功,
- RECONSUME_LATER:訊息消費失敗,回傳延遲消費狀態,此時ackIndex置為-1,表示訊息都消費失敗,
二、處理消費失敗的訊息
廣播模式
廣播模式下,如果訊息消費失敗,只將失敗的訊息列印出來不做其他處理,
集群模式
開啟for回圈,初始值為i = ackIndex + 1,結束條件為i < consumeRequest.getMsgs().size(),上面可知ackIndex有兩種情況:
- 消費成功:ackIndex值為訊息大小-1,此時ackIndex + 1的值等于訊息的個數大小,不滿足for回圈的執行條件,相當于訊息都消費成功,不需要進行失敗的訊息處理,
- 延遲消費:ackIndex值為-1,此時ackIndex+1為0,滿足for回圈的執行條件,從第一條訊息開始遍歷到最后一條訊息,呼叫
sendMessageBack方法向Broker發送CONSUMER_SEND_MSG_BACK訊息,如果發送成功Broker會根據延遲等級,放入不同的延遲佇列中,到達延遲時間后,消費者將會重新進行拉取,如果發送失敗,加入到失敗訊息串列中,稍后重新提交消費任務進行處理,
三、移除訊息,更新拉取偏移量
以上步驟處理完畢后,首先呼叫removeMessage從處理佇列中移除訊息并回傳拉取訊息的偏移量,然后呼叫updateOffset更新拉取偏移量,
public class ConsumeMessageConcurrentlyService implements ConsumeMessageService {
public void processConsumeResult(
final ConsumeConcurrentlyStatus status,
final ConsumeConcurrentlyContext context,
final ConsumeRequest consumeRequest
) {
// 獲取ackIndex
int ackIndex = context.getAckIndex();
if (consumeRequest.getMsgs().isEmpty())
return;
switch (status) {
case CONSUME_SUCCESS: // 如果消費成功
// 如果ackIndex大于等于訊息的大小
if (ackIndex >= consumeRequest.getMsgs().size()) {
// 設定為訊息大小-1
ackIndex = consumeRequest.getMsgs().size() - 1;
}
// 計算消費成功的的個數
int ok = ackIndex + 1;
// 計算消費失敗的個數
int failed = consumeRequest.getMsgs().size() - ok;
this.getConsumerStatsManager().incConsumeOKTPS(consumerGroup, consumeRequest.getMessageQueue().getTopic(), ok);
this.getConsumerStatsManager().incConsumeFailedTPS(consumerGroup, consumeRequest.getMessageQueue().getTopic(), failed);
break;
case RECONSUME_LATER: // 如果延遲消費
// ackIndex置為-1
ackIndex = -1;
this.getConsumerStatsManager().incConsumeFailedTPS(consumerGroup, consumeRequest.getMessageQueue().getTopic(),
consumeRequest.getMsgs().size());
break;
default:
break;
}
// 判斷消費模式
switch (this.defaultMQPushConsumer.getMessageModel()) {
case BROADCASTING: // 廣播模式
for (int i = ackIndex + 1; i < consumeRequest.getMsgs().size(); i++) {
MessageExt msg = consumeRequest.getMsgs().get(i);
log.warn("BROADCASTING, the message consume failed, drop it, {}", msg.toString());
}
break;
case CLUSTERING: // 集群模式
List<MessageExt> msgBackFailed = new ArrayList<MessageExt>(consumeRequest.getMsgs().size());
// 遍歷消費失敗的訊息
for (int i = ackIndex + 1; i < consumeRequest.getMsgs().size(); i++) {
// 獲取訊息
MessageExt msg = consumeRequest.getMsgs().get(i);
// 向Broker發送延遲訊息
boolean result = this.sendMessageBack(msg, context);
// 如果發送失敗
if (!result) {
// 消費次數+1
msg.setReconsumeTimes(msg.getReconsumeTimes() + 1);
// 加入失敗訊息串列中
msgBackFailed.add(msg);
}
}
// 如果不為空
if (!msgBackFailed.isEmpty()) {
consumeRequest.getMsgs().removeAll(msgBackFailed);
// 稍后重新進行消費
this.submitConsumeRequestLater(msgBackFailed, consumeRequest.getProcessQueue(), consumeRequest.getMessageQueue());
}
break;
default:
break;
}
// 從處理佇列中移除訊息
long offset = consumeRequest.getProcessQueue().removeMessage(consumeRequest.getMsgs());
if (offset >= 0 && !consumeRequest.getProcessQueue().isDropped()) {
// 更新拉取偏移量
this.defaultMQPushConsumerImpl.getOffsetStore().updateOffset(consumeRequest.getMessageQueue(), offset, true);
}
}
}
發送CONSUMER_SEND_MSG_BACK訊息
延遲級別
RocketMQ的延遲級別對應的延遲時間常量定義在MessageStoreConfig的messageDelayLevel變數中:
public class MessageStoreConfig {
private String messageDelayLevel = "1s 5s 10s 30s 1m 2m 3m 4m 5m 6m 7m 8m 9m 10m 20m 30m 1h 2h";
}
延遲級別與延遲時間對應關系:
延遲級別0 ---> 對應延遲時間1s,也就是延遲1秒后消費者重新從Broker拉取進行消費
延遲級別1 ---> 延遲時間5s
延遲級別2 ---> 延遲時間10s
...
以此類推,最大的延遲時間為2h
在sendMessageBack方法中,首先從背景關系中獲取了延遲級別(ConsumeConcurrentlyContext中可以看到,延遲級別默認為0),并對主題加上Namespace,然后呼叫defaultMQPushConsumerImpl的sendMessageBack發送訊息:
public class ConsumeMessageConcurrentlyService implements ConsumeMessageService {
public boolean sendMessageBack(final MessageExt msg, final ConsumeConcurrentlyContext context) {
// 獲取延遲級別
int delayLevel = context.getDelayLevelWhenNextConsume();
// 對主題添加上Namespace
msg.setTopic(this.defaultMQPushConsumer.withNamespace(msg.getTopic()));
try {
// 向Broker發送訊息
this.defaultMQPushConsumerImpl.sendMessageBack(msg, delayLevel, context.getMessageQueue().getBrokerName());
return true;
} catch (Exception e) {
log.error("sendMessageBack exception, group: " + this.consumerGroup + " msg: " + msg.toString(), e);
}
return false;
}
}
// 并發消費背景關系
public class ConsumeConcurrentlyContext {
/**
* -1,不進行重試,加入DLQ佇列
* 0, Broker控制重試頻率
* >0, 客戶端控制
*/
private int delayLevelWhenNextConsume = 0; // 默認為0
}
DefaultMQPushConsumerImp的sendMessageBack方法中又呼叫了MQClientAPIImpl的consumerSendMessageBack方法進行發送:
public class DefaultMQPushConsumerImpl implements MQConsumerInner {
public void sendMessageBack(MessageExt msg, int delayLevel, final String brokerName)
throws RemotingException, MQBrokerException, InterruptedException, MQClientException {
try {
// 獲取Broker地址
String brokerAddr = (null != brokerName) ? this.mQClientFactory.findBrokerAddressInPublish(brokerName)
: RemotingHelper.parseSocketAddressAddr(msg.getStoreHost());
// 呼叫consumerSendMessageBack方法發送訊息
this.mQClientFactory.getMQClientAPIImpl().consumerSendMessageBack(brokerAddr, msg,
this.defaultMQPushConsumer.getConsumerGroup(), delayLevel, 5000, getMaxReconsumeTimes());
} catch (Exception e) {
// ...
} finally {
msg.setTopic(NamespaceUtil.withoutNamespace(msg.getTopic(), this.defaultMQPushConsumer.getNamespace()));
}
}
}
在MQClientAPIImpl的consumerSendMessageBack方法中,可以看到設定的請求型別是CONSUMER_SEND_MSG_BACK,然后設定了訊息的相關資訊,向Broker發送請求:
public class MQClientAPIImpl {
public void consumerSendMessageBack(
final String addr,
final MessageExt msg,
final String consumerGroup,
final int delayLevel,
final long timeoutMillis,
final int maxConsumeRetryTimes
) throws RemotingException, MQBrokerException, InterruptedException {
// 創建請求頭
ConsumerSendMsgBackRequestHeader requestHeader = new ConsumerSendMsgBackRequestHeader();
// 設定請求型別為CONSUMER_SEND_MSG_BACK
RemotingCommand request = RemotingCommand.createRequestCommand(RequestCode.CONSUMER_SEND_MSG_BACK, requestHeader);
// 設定消費組
requestHeader.setGroup(consumerGroup);
requestHeader.setOriginTopic(msg.getTopic());
// 設定訊息物理偏移量
requestHeader.setOffset(msg.getCommitLogOffset());
// 設定延遲級別
requestHeader.setDelayLevel(delayLevel);
// 設定訊息ID
requestHeader.setOriginMsgId(msg.getMsgId());
// 設定最大消費次數
requestHeader.setMaxReconsumeTimes(maxConsumeRetryTimes);
// 向Broker發送請求
RemotingCommand response = this.remotingClient.invokeSync(MixAll.brokerVIPChannel(this.clientConfig.isVipChannelEnabled(), addr),
request, timeoutMillis);
assert response != null;
switch (response.getCode()) {
case ResponseCode.SUCCESS: {
return;
}
default:
break;
}
throw new MQBrokerException(response.getCode(), response.getRemark(), addr);
}
}
Broker對請求的處理
Broker對CONSUMER_SEND_MSG_BACK型別的請求在SendMessageProcessor中,處理邏輯如下:
- 根據消費組獲取訂閱資訊配置,如果獲取為空,記錄錯誤資訊,直接回傳
- 獲取消費組的重試主題,然后從重試佇列中隨機選取一個佇列,并創建
TopicConfig主題配置資訊 - 根據訊息的物理偏移量從commitlog中獲取訊息
- 判斷訊息的消費次數是否大于等于最大消費次數 或者 延遲等級小于0:
- 如果條件滿足,表示需要把訊息放入到死信佇列DLQ中,此時設定DLQ佇列ID
- 如果不滿足,判斷延遲級別是否為0,如果為0,使用3 + 訊息的消費次數作為新的延遲級別
- 新建訊息MessageExtBrokerInner,設定訊息的相關資訊,此時相當于生成了一個全新的訊息(會設定之前訊息的ID),會重新添加到CommitLog中,訊息主題的設定有兩種情況:
- 達到了加入DLQ佇列的條件,此時主題為DLQ主題(%DLQ% + 消費組名稱),訊息之后會添加到選取的DLQ佇列中
- 未達到DLQ佇列的條件,此時主題為重試主題(%RETRY% + 消費組名稱),之后重新進行消費
- 呼叫
asyncPutMessage添加訊息,詳細程序可參考之前的文章【訊息的存盤】
public class SendMessageProcessor extends AbstractSendMessageProcessor implements NettyRequestProcessor {
// 處理請求
public CompletableFuture<RemotingCommand> asyncProcessRequest(ChannelHandlerContext ctx,
RemotingCommand request) throws RemotingCommandException {
final SendMessageContext mqtraceContext;
switch (request.getCode()) {
case RequestCode.CONSUMER_SEND_MSG_BACK:
// 處理請求
return this.asyncConsumerSendMsgBack(ctx, request);
default:
// ...
}
}
private CompletableFuture<RemotingCommand> asyncConsumerSendMsgBack(ChannelHandlerContext ctx,
RemotingCommand request) throws RemotingCommandException {
final RemotingCommand response = RemotingCommand.createResponseCommand(null);
final ConsumerSendMsgBackRequestHeader requestHeader =
(ConsumerSendMsgBackRequestHeader)request.decodeCommandCustomHeader(ConsumerSendMsgBackRequestHeader.class);
// ...
// 根據消費組獲取訂閱資訊配置
SubscriptionGroupConfig subscriptionGroupConfig =
this.brokerController.getSubscriptionGroupManager().findSubscriptionGroupConfig(requestHeader.getGroup());
// 如果為空,直接回傳
if (null == subscriptionGroupConfig) {
response.setCode(ResponseCode.SUBSCRIPTION_GROUP_NOT_EXIST);
response.setRemark("subscription group not exist, " + requestHeader.getGroup() + " "
+ FAQUrl.suggestTodo(FAQUrl.SUBSCRIPTION_GROUP_NOT_EXIST));
return CompletableFuture.completedFuture(response);
}
// ...
// 獲取消費組的重試主題
String newTopic = MixAll.getRetryTopic(requestHeader.getGroup());
// 從重試佇列中隨機選取一個佇列
int queueIdInt = ThreadLocalRandom.current().nextInt(99999999) % subscriptionGroupConfig.getRetryQueueNums();
int topicSysFlag = 0;
if (requestHeader.isUnitMode()) {
topicSysFlag = TopicSysFlag.buildSysFlag(false, true);
}
// 創建TopicConfig主題配置資訊
TopicConfig topicConfig = this.brokerController.getTopicConfigManager().createTopicInSendMessageBackMethod(
newTopic,
subscriptionGroupConfig.getRetryQueueNums(),
PermName.PERM_WRITE | PermName.PERM_READ, topicSysFlag);
//...
// 根據訊息物理偏移量從commitLog檔案中獲取訊息
MessageExt msgExt = this.brokerController.getMessageStore().lookMessageByOffset(requestHeader.getOffset());
if (null == msgExt) {
response.setCode(ResponseCode.SYSTEM_ERROR);
response.setRemark("look message by offset failed, " + requestHeader.getOffset());
return CompletableFuture.completedFuture(response);
}
// 獲取訊息的重試主題
final String retryTopic = msgExt.getProperty(MessageConst.PROPERTY_RETRY_TOPIC);
if (null == retryTopic) {
MessageAccessor.putProperty(msgExt, MessageConst.PROPERTY_RETRY_TOPIC, msgExt.getTopic());
}
msgExt.setWaitStoreMsgOK(false);
// 延遲等級獲取
int delayLevel = requestHeader.getDelayLevel();
// 獲取最大消費重試次數
int maxReconsumeTimes = subscriptionGroupConfig.getRetryMaxTimes();
if (request.getVersion() >= MQVersion.Version.V3_4_9.ordinal()) {
Integer times = requestHeader.getMaxReconsumeTimes();
if (times != null) {
maxReconsumeTimes = times;
}
}
// 判斷訊息的消費次數是否大于等于最大消費次數 或者 延遲等級小于0
if (msgExt.getReconsumeTimes() >= maxReconsumeTimes
|| delayLevel < 0) {
// 獲取DLQ主題
newTopic = MixAll.getDLQTopic(requestHeader.getGroup());
// 選取一個佇列
queueIdInt = ThreadLocalRandom.current().nextInt(99999999) % DLQ_NUMS_PER_GROUP;
// 創建DLQ的topicConfig
topicConfig = this.brokerController.getTopicConfigManager().createTopicInSendMessageBackMethod(newTopic,
DLQ_NUMS_PER_GROUP,
PermName.PERM_WRITE | PermName.PERM_READ, 0);
// ...
} else {
// 如果延遲級別為0
if (0 == delayLevel) {
// 更新延遲級別
delayLevel = 3 + msgExt.getReconsumeTimes();
}
// 設定延遲級別
msgExt.setDelayTimeLevel(delayLevel);
}
// 新建訊息
MessageExtBrokerInner msgInner = new MessageExtBrokerInner();
msgInner.setTopic(newTopic); // 設定主題
msgInner.setBody(msgExt.getBody()); // 設定訊息
msgInner.setFlag(msgExt.getFlag());
MessageAccessor.setProperties(msgInner, msgExt.getProperties()); // 設定訊息屬性
msgInner.setPropertiesString(MessageDecoder.messageProperties2String(msgExt.getProperties()));
msgInner.setTagsCode(MessageExtBrokerInner.tagsString2tagsCode(null, msgExt.getTags()));
msgInner.setQueueId(queueIdInt); // 設定佇列ID
msgInner.setSysFlag(msgExt.getSysFlag());
msgInner.setBornTimestamp(msgExt.getBornTimestamp());
msgInner.setBornHost(msgExt.getBornHost());
msgInner.setStoreHost(msgExt.getStoreHost());
msgInner.setReconsumeTimes(msgExt.getReconsumeTimes() + 1);// 設定消費次數
// 原始的訊息ID
String originMsgId = MessageAccessor.getOriginMessageId(msgExt);
// 設定訊息ID
MessageAccessor.setOriginMessageId(msgInner, UtilAll.isBlank(originMsgId) ? msgExt.getMsgId() : originMsgId);
msgInner.setPropertiesString(MessageDecoder.messageProperties2String(msgExt.getProperties()));
// 添加重試訊息
CompletableFuture<PutMessageResult> putMessageResult = this.brokerController.getMessageStore().asyncPutMessage(msgInner);
return putMessageResult.thenApply((r) -> {
if (r != null) {
switch (r.getPutMessageStatus()) {
case PUT_OK:
// ...
return response;
default:
break;
}
response.setCode(ResponseCode.SYSTEM_ERROR);
response.setRemark(r.getPutMessageStatus().name());
return response;
}
response.setCode(ResponseCode.SYSTEM_ERROR);
response.setRemark("putMessageResult is null");
return response;
});
}
}
延遲訊息處理
由【訊息的存盤】文章可知,訊息添加會進入到asyncPutMessage方法中,首先獲取了事務型別,如果未使用事務或者是提交事務的情況下,對延遲時間級別進行判斷,如果延遲時間級別大于0,說明訊息需要延遲消費,此時做如下處理:
-
判斷訊息的延遲級別是否超過了最大延遲級別,如果超過了就使用最大延遲級別
-
獲取
RMQ_SYS_SCHEDULE_TOPIC,它是在TopicValidator中定義的常量,值為SCHEDULE_TOPIC_XXXX:public class TopicValidator { // ... public static final String RMQ_SYS_SCHEDULE_TOPIC = "SCHEDULE_TOPIC_XXXX"; } -
根據延遲級別選取對應的佇列,一般會把相同延遲級別的訊息放在同一個佇列中
-
備份之前的TOPIC和佇列ID
-
更改訊息佇列的主題為
RMQ_SYS_SCHEDULE_TOPIC,所以延遲訊息的主題最終被設定為RMQ_SYS_SCHEDULE_TOPIC,放在對應的延遲佇列中進行處理
public class CommitLog {
public CompletableFuture<PutMessageResult> asyncPutMessage(final MessageExtBrokerInner msg) {
// ...
// 獲取事務型別
final int tranType = MessageSysFlag.getTransactionValue(msg.getSysFlag());
// 如果未使用事務或者提交事務
if (tranType == MessageSysFlag.TRANSACTION_NOT_TYPE
|| tranType == MessageSysFlag.TRANSACTION_COMMIT_TYPE) {
// 判斷延遲級別
if (msg.getDelayTimeLevel() > 0) {
// 如果超過了最大延遲級別
if (msg.getDelayTimeLevel() > this.defaultMessageStore.getScheduleMessageService().getMaxDelayLevel()) {
msg.setDelayTimeLevel(this.defaultMessageStore.getScheduleMessageService().getMaxDelayLevel());
}
// 獲取RMQ_SYS_SCHEDULE_TOPIC
topic = TopicValidator.RMQ_SYS_SCHEDULE_TOPIC;
// 根據延遲級別選取對應的佇列
int queueId = ScheduleMessageService.delayLevel2QueueId(msg.getDelayTimeLevel());
// 備份之前的TOPIC和佇列ID
MessageAccessor.putProperty(msg, MessageConst.PROPERTY_REAL_TOPIC, msg.getTopic());
MessageAccessor.putProperty(msg, MessageConst.PROPERTY_REAL_QUEUE_ID, String.valueOf(msg.getQueueId()));
msg.setPropertiesString(MessageDecoder.messageProperties2String(msg.getProperties()));
// 設定SCHEDULE_TOPIC
msg.setTopic(topic);
msg.setQueueId(queueId);
}
}
// ...
}
}
拉取進度持久化
RocketMQ消費模式分為廣播模式和集群模式,廣播模式下消費進度保存在每個消費者端,集群模式下消費進度保存在Broker端,
廣播模式
更新進度
LocalFileOffsetStore中使用了一個ConcurrentMap型別的變數offsetTable存盤訊息佇列對應的拉取偏移量,KEY為訊息佇列,value為該訊息佇列對應的拉取偏移量,
在更新拉取進度的時候,從offsetTable中獲取當前訊息佇列的拉取偏移量,如果為空,則新建并保存到offsetTable中,否則獲取之前已經保存的偏移量,對值進行更新,需要注意這里只是更新了offsetTable中的資料,并沒有持久化到磁盤,持久化的操作在persistAll方法中:
public class LocalFileOffsetStore implements OffsetStore {
// offsetTable:KEY為訊息佇列,value為該訊息佇列的拉取偏移量
private ConcurrentMap<MessageQueue, AtomicLong> offsetTable =
new ConcurrentHashMap<MessageQueue, AtomicLong>();
@Override
public void updateOffset(MessageQueue mq, long offset, boolean increaseOnly) {
if (mq != null) {
// 獲取之前的拉取進度
AtomicLong offsetOld = this.offsetTable.get(mq);
if (null == offsetOld) {
// 如果之前不存在,進行創建
offsetOld = this.offsetTable.putIfAbsent(mq, new AtomicLong(offset));
}
// 如果不為空
if (null != offsetOld) {
if (increaseOnly) {
MixAll.compareAndIncreaseOnly(offsetOld, offset);
} else {
// 更新拉取偏移量
offsetOld.set(offset);
}
}
}
}
}
加載進度
由于廣播模式下消費進度保存在消費者端,所以需要從本地磁盤加載之前保存的消費進度檔案,
LOCAL_OFFSET_STORE_DIR:消費進度檔案所在的根路徑
public final static String LOCAL_OFFSET_STORE_DIR = System.getProperty(
"rocketmq.client.localOffsetStoreDir", System.getProperty("user.home") + File.separator + ".rocketmq_offsets");
在LocalFileOffsetStore的建構式中可以看到,對拉取偏移量的保存檔案路徑進行了設定,為LOCAL_OFFSET_STORE_DIR + 客戶端ID + 消費組名稱 + offsets.json,從名字上看,消費進度的資料格式是以JSON的形式進行保存的:
this.storePath = LOCAL_OFFSET_STORE_DIR + File.separator + this.mQClientFactory.getClientId() + File.separator +
this.groupName + File.separator + "offsets.json";
在load方法中,首先從本地讀取 offsets.json檔案,并序列化為OffsetSerializeWrapper物件,然后將保存的消費進度加入到offsetTable中:
public class LocalFileOffsetStore implements OffsetStore {
// 檔案路徑
public final static String LOCAL_OFFSET_STORE_DIR = System.getProperty(
"rocketmq.client.localOffsetStoreDir",
System.getProperty("user.home") + File.separator + ".rocketmq_offsets");
private final String storePath;
// ...
public LocalFileOffsetStore(MQClientInstance mQClientFactory, String groupName) {
this.mQClientFactory = mQClientFactory;
this.groupName = groupName;
// 設定拉取進度檔案的路徑
this.storePath = LOCAL_OFFSET_STORE_DIR + File.separator +
this.mQClientFactory.getClientId() + File.separator +
this.groupName + File.separator +
"offsets.json";
}
@Override
public void load() throws MQClientException {
// 從本地讀取拉取偏移量
OffsetSerializeWrapper offsetSerializeWrapper = this.readLocalOffset();
if (offsetSerializeWrapper != null && offsetSerializeWrapper.getOffsetTable() != null) {
// 加入到offsetTable中
offsetTable.putAll(offsetSerializeWrapper.getOffsetTable());
for (Entry<MessageQueue, AtomicLong> mqEntry : offsetSerializeWrapper.getOffsetTable().entrySet()) {
AtomicLong offset = mqEntry.getValue();
log.info("load consumer's offset, {} {} {}",
this.groupName,
mqEntry.getKey(),
offset.get());
}
}
}
// 從本地加載檔案
private OffsetSerializeWrapper readLocalOffset() throws MQClientException {
String content = null;
try {
// 讀取檔案
content = MixAll.file2String(this.storePath);
} catch (IOException e) {
log.warn("Load local offset store file exception", e);
}
if (null == content || content.length() == 0) {
return this.readLocalOffsetBak();
} else {
OffsetSerializeWrapper offsetSerializeWrapper = null;
try {
// 序列化
offsetSerializeWrapper =
OffsetSerializeWrapper.fromJson(content, OffsetSerializeWrapper.class);
} catch (Exception e) {
log.warn("readLocalOffset Exception, and try to correct", e);
return this.readLocalOffsetBak();
}
return offsetSerializeWrapper;
}
}
}
OffsetSerializeWrapper
OffsetSerializeWrapper中同樣使用了ConcurrentMap,從磁盤的offsets.json檔案中讀取資料后,將JSON轉為OffsetSerializeWrapper物件,就可以通過OffsetSerializeWrapper的offsetTable獲取到之前保存的每個訊息佇列的消費進度,然后加入到LocalFileOffsetStore的offsetTable中:
public class OffsetSerializeWrapper extends RemotingSerializable {
private ConcurrentMap<MessageQueue, AtomicLong> offsetTable =
new ConcurrentHashMap<MessageQueue, AtomicLong>();
public ConcurrentMap<MessageQueue, AtomicLong> getOffsetTable() {
return offsetTable;
}
public void setOffsetTable(ConcurrentMap<MessageQueue, AtomicLong> offsetTable) {
this.offsetTable = offsetTable;
}
}
持久化進度
updateOffset更新只是將記憶體中的資料進行了更改,并未保存到磁盤中,持久化的操作是在persistAll方法中實作的:
- 創建
OffsetSerializeWrapper物件 - 遍歷
LocalFileOffsetStore的offsetTable,將資料加入到OffsetSerializeWrapper的OffsetTable中 - 將
OffsetSerializeWrapper轉為JSON - 呼叫
string2File方法將JSON資料保存到磁盤檔案
public class LocalFileOffsetStore implements OffsetStore {
@Override
public void persistAll(Set<MessageQueue> mqs) {
if (null == mqs || mqs.isEmpty())
return;OffsetSerializeWrapper
// 創建
OffsetSerializeWrapper offsetSerializeWrapper = new OffsetSerializeWrapper();
// 遍歷offsetTable
for (Map.Entry<MessageQueue, AtomicLong> entry : this.offsetTable.entrySet()) {
if (mqs.contains(entry.getKey())) {
// 獲取拉取偏移量
AtomicLong offset = entry.getValue();
// 加入到OffsetSerializeWrapper的OffsetTable中
offsetSerializeWrapper.getOffsetTable().put(entry.getKey(), offset);
}
}
// 將物件轉為JSON
String jsonString = offsetSerializeWrapper.toJson(true);
if (jsonString != null) {
try {
// 將JSON資料保存到磁盤檔案
MixAll.string2File(jsonString, this.storePath);
} catch (IOException e) {
log.error("persistAll consumer offset Exception, " + this.storePath, e);
}
}
}
}
集群模式
集群模式下消費進度保存在Broker端,
更新進度
集群模式下的更新進度與廣播模式下的更新型別,都是只更新了offsetTable中的資料:
public class RemoteBrokerOffsetStore implements OffsetStore {
private ConcurrentMap<MessageQueue, AtomicLong> offsetTable =
new ConcurrentHashMap<MessageQueue, AtomicLong>();
@Override
public void updateOffset(MessageQueue mq, long offset, boolean increaseOnly) {
if (mq != null) {
// 獲取訊息佇列的進度
AtomicLong offsetOld = this.offsetTable.get(mq);
if (null == offsetOld) {
// 將消費進度保存在offsetTable中
offsetOld = this.offsetTable.putIfAbsent(mq, new AtomicLong(offset));
}
if (null != offsetOld) {
if (increaseOnly) {
MixAll.compareAndIncreaseOnly(offsetOld, offset);
} else {
// 更新拉取偏移量
offsetOld.set(offset);
}
}
}
}
}
加載
集群模式下加載消費進度需要從Broker獲取,在消費者發送訊息拉取請求的時候,Broker會計算消費偏移量,所以RemoteBrokerOffsetStore的load方法為空,什么也沒有干:
public class RemoteBrokerOffsetStore implements OffsetStore {
@Override
public void load() {
}
}
持久化
由于集群模式下消費進度保存在Broker端,所以persistAll方法中呼叫了updateConsumeOffsetToBroker向Broker發送請求進行消費進度保存:
public class RemoteBrokerOffsetStore implements OffsetStore {
@Override
public void persistAll(Set<MessageQueue> mqs) {
if (null == mqs || mqs.isEmpty())
return;
final HashSet<MessageQueue> unusedMQ = new HashSet<MessageQueue>();
for (Map.Entry<MessageQueue, AtomicLong> entry : this.offsetTable.entrySet()) {
MessageQueue mq = entry.getKey();
AtomicLong offset = entry.getValue();
if (offset != null) {
if (mqs.contains(mq)) {
try {
// 向Broker發送請求更新拉取偏移量
this.updateConsumeOffsetToBroker(mq, offset.get());
log.info("[persistAll] Group: {} ClientId: {} updateConsumeOffsetToBroker {} {}",
this.groupName,
this.mQClientFactory.getClientId(),
mq,
offset.get());
} catch (Exception e) {
log.error("updateConsumeOffsetToBroker exception, " + mq.toString(), e);
}
} else {
unusedMQ.add(mq);
}
}
}
// ...
}
}
持久化的觸發
MQClientInstance在啟動定時任務的方法startScheduledTask中注冊了定時任務,定時呼叫persistAllConsumerOffset對拉取進度進行持久化,persistAllConsumerOffset中又呼叫了MQConsumerInner的persistConsumerOffset方法:
public class MQClientInstance {
private void startScheduledTask() {
// ...
// 注冊定時任務,定時持久化拉取進度
this.scheduledExecutorService.scheduleAtFixedRate(new Runnable() {
@Override
public void run() {
try {
// 持久化
MQClientInstance.this.persistAllConsumerOffset();
} catch (Exception e) {
log.error("ScheduledTask persistAllConsumerOffset exception", e);
}
}
}, 1000 * 10, this.clientConfig.getPersistConsumerOffsetInterval(), TimeUnit.MILLISECONDS);
// ...
}
private void persistAllConsumerOffset() {
Iterator<Entry<String, MQConsumerInner>> it = this.consumerTable.entrySet().iterator();
while (it.hasNext()) {
Entry<String, MQConsumerInner> entry = it.next();
MQConsumerInner impl = entry.getValue();
// 呼叫persistConsumerOffset進行持久化
impl.persistConsumerOffset();
}
}
}
DefaultMQPushConsumerImpl是MQConsumerInner的一個子類,以它為例可以看到在persistConsumerOffset方法中呼叫了offsetStore的persistAll方法進行持久化:
public class DefaultMQPushConsumerImpl implements MQConsumerInner {
@Override
public void persistConsumerOffset() {
try {
this.makeSureStateOK();
Set<MessageQueue> mqs = new HashSet<MessageQueue>();
Set<MessageQueue> allocateMq = this.rebalanceImpl.getProcessQueueTable().keySet();
mqs.addAll(allocateMq);
// 拉取進度持久化
this.offsetStore.persistAll(mqs);
} catch (Exception e) {
log.error("group: " + this.defaultMQPushConsumer.getConsumerGroup() + " persistConsumerOffset exception", e);
}
}
}
總結

參考
丁威、周繼鋒《RocketMQ技術內幕》
RocketMQ版本:4.9.3
轉載請註明出處,本文鏈接:https://www.uj5u.com/houduan/500290.html
標籤:Java
上一篇:day06-Java流程控制
下一篇:Java面向物件(八)
