主頁 > 後端開發 > 【RocketMQ】順序訊息實作原理

【RocketMQ】順序訊息實作原理

2022-11-22 07:11:25 後端開發

全域有序
在RocketMQ中,如果使訊息全域有序,可以為Topic設定一個訊息佇列,使用一個生產者單執行緒發送資料,消費者端也使用單執行緒進行消費,從而保證訊息的全域有序,但是這種方式效率低,一般不使用,

區域有序
假設一個Topic分配了兩個訊息佇列,生產者在發送訊息的時候,可以對訊息設定一個路由ID,比如想保證一個訂單的相關訊息有序,那么就使用訂單ID當做路由ID,在發送訊息的時候,通過訂單ID對訊息佇列的個數取余,根據取余結果選擇訊息佇列,這樣同一個訂單的資料就可以保證發送到一個訊息佇列中,消費者端使用MessageListenerOrderly處理有序訊息,這就是RocketMQ的區域有序,保證訊息在某個訊息佇列中有序,

接下來看RoceketMQ原始碼中提供的順序訊息例子(稍微做了一些修改):

生產者

public class Producer {
    public static void main(String[] args) throws UnsupportedEncodingException {
        try {
            // 創建生產者
            DefaultMQProducer producer = new DefaultMQProducer("生產者組");
            // 啟動
            producer.start();
            // 創建TAG
            String[] tags = new String[] {"TagA", "TagB", "TagC", "TagD", "TagE"};
            for (int i = 0; i < 100; i++) {
                // 生成訂單ID
                int orderId = i % 10;
                // 創建訊息
                Message msg =
                    new Message("TopicTest", tags[i % tags.length], "KEY" + i,
                        ("Hello RocketMQ " + i).getBytes(RemotingHelper.DEFAULT_CHARSET));
                SendResult sendResult = producer.send(msg, new MessageQueueSelector() {
                    @Override
                    public MessageQueue select(List<MessageQueue> mqs, Message msg, Object arg) {
                        // 獲取訂單ID
                        Integer id = (Integer) arg;
                        // 對訊息佇列個數取余
                        int index = id % mqs.size();
                        // 根據取余結果選擇訊息要發送給哪個訊息佇列
                        return mqs.get(index);
                    }
                }, orderId); // 這里傳入了訂單ID
                System.out.printf("%s%n", sendResult);
            }

            producer.shutdown();
        } catch (MQClientException | RemotingException | MQBrokerException | InterruptedException e) {
            e.printStackTrace();
        }
    }
}

消費者

public class Consumer {

    public static void main(String[] args) throws MQClientException {
        // 創建消費者
        DefaultMQPushConsumer consumer = new DefaultMQPushConsumer("消費者組");
        consumer.setConsumeFromWhere(ConsumeFromWhere.CONSUME_FROM_FIRST_OFFSET);
        // 訂閱主題
        consumer.subscribe("TopicTest", "TagA || TagC || TagD");
        // 注冊訊息監聽器,使用的是MessageListenerOrderly
        consumer.registerMessageListener(new MessageListenerOrderly() {
            @Override
            public ConsumeOrderlyStatus consumeMessage(List<MessageExt> msgs, ConsumeOrderlyContext context) {
                context.setAutoCommit(true);
                // 列印訊息
                System.out.printf("%s Receive New Messages: %s %n", Thread.currentThread().getName(), msgs);
                return ConsumeOrderlyStatus.SUCCESS;
            }
        });
        consumer.start();
        System.out.printf("Consumer Started.%n");
    }
}

從例子中可以看出生產者在發送訊息的時候,通過訂單ID作為路由資訊,將同一個訂單ID的訊息發送到了同一個訊息佇列中,保證同一個訂單ID的訊息有序,那么消費者端是如何保證訊息的順序讀取呢?接下來就去看下原始碼,

順序訊息實作原理

在【RocketMQ】訊息的拉取一文中講到,消費者在啟動時會呼叫DefaultMQPushConsumerImpl的start方法:

public class DefaultMQPushConsumer extends ClientConfig implements MQPushConsumer {
    
    /**
     * 默認的訊息推送實作類
     */
    protected final transient DefaultMQPushConsumerImpl defaultMQPushConsumerImpl;
    
    /**
     * 啟動
     */
    @Override
    public void start() throws MQClientException {
        setConsumerGroup(NamespaceUtil.wrapNamespace(this.getNamespace(), this.consumerGroup));
        // 啟動消費者
        this.defaultMQPushConsumerImpl.start();
        // ...
    }
}

DefaultMQPushConsumerImpl的start方法中,對訊息監聽器型別進行了判斷,如果型別是MessageListenerOrderly表示要進行順序消費,此時使用ConsumeMessageOrderlyServiceConsumeMessageService進行實體化,然后呼叫它的start方法進行啟動:

public class DefaultMQPushConsumerImpl implements MQConsumerInner {
    // 訊息消費service
    private ConsumeMessageService consumeMessageService;
  
    public synchronized void start() throws MQClientException {
        switch (this.serviceState) {
            case CREATE_JUST:
                // ...
            
                // 如果是順序消費
                if (this.getMessageListenerInner() instanceof MessageListenerOrderly) {
                    // 設定順序消費標記
                    this.consumeOrderly = true;
                    // 創建consumeMessageService,使用的是ConsumeMessageOrderlyService
                    this.consumeMessageService =
                        new ConsumeMessageOrderlyService(this, (MessageListenerOrderly) this.getMessageListenerInner());
                } else if (this.getMessageListenerInner() instanceof MessageListenerConcurrently) {
                    this.consumeOrderly = false;
                    // 并發消費使用ConsumeMessageConcurrentlyService
                    this.consumeMessageService =
                        new ConsumeMessageConcurrentlyService(this, (MessageListenerConcurrently) this.getMessageListenerInner());
                }
                // 啟動ConsumeMessageService
                this.consumeMessageService.start();

                // ...
                break;
          // ...
        }
        // ...
    }
}

加鎖定時任務

進入到ConsumeMessageOrderlyService的start方法中,可以看到,如果是集群模式,會啟動一個定時加鎖的任務,周期性的對訂閱的訊息佇列進行加鎖,具體是通過呼叫RebalanceImpl的lockAll方法實作的:

public class ConsumeMessageOrderlyService implements ConsumeMessageService {
    public void start() {
      
        // 如果是集群模式
        if (MessageModel.CLUSTERING.equals(ConsumeMessageOrderlyService.this.defaultMQPushConsumerImpl.messageModel())) {
            this.scheduledExecutorService.scheduleAtFixedRate(new Runnable() {
                @Override
                public void run() {
                    try {
                        // 周期性的執行加鎖方法
                        ConsumeMessageOrderlyService.this.lockMQPeriodically();
                    } catch (Throwable e) {
                        log.error("scheduleAtFixedRate lockMQPeriodically exception", e);
                    }
                }
            }, 1000 * 1, ProcessQueue.REBALANCE_LOCK_INTERVAL, TimeUnit.MILLISECONDS);
        }
    }
  
    public synchronized void lockMQPeriodically() {
        if (!this.stopped) {
            // 進行加鎖
            this.defaultMQPushConsumerImpl.getRebalanceImpl().lockAll();
        }
    }
}

為什么集群模式下需要加鎖?
因為廣播模式下,訊息佇列會分配給消費者下的每一個消費者,而在集群模式下,一個訊息佇列同一時刻只能被同一個消費組下的某一個消費者進行,所以在廣播模式下不存在競爭關系,也就不需要對訊息佇列進行加鎖,而在集群模式下,有可能因為負載均衡等原因將某一個訊息佇列分配到了另外一個消費者中,因此在集群模式下就要加鎖,當某個訊息佇列被鎖定時,其他的消費者不能進行消費,

訊息佇列加鎖

RebalanceImpllockAll方法中,首先從處理隊串列中獲取當前消費者訂閱的所有訊息佇列MessageQueue資訊,回傳資料是一個MAP,key為broker名稱,value為broker下的訊息佇列,接著對MAP進行遍歷,處理每一個broker下的訊息佇列:

  1. 獲取broker名稱,根據broker名稱查找broker的相關資訊;
  2. 構建加鎖請求,在請求中設定要加鎖的訊息佇列,然后將請求發送給broker,表示要對這些訊息佇列進行加鎖;
  3. 加鎖請求回傳的回應結果中包含了加鎖成功的訊息佇列,此時遍歷加鎖成功的訊息佇列,將訊息佇列對應的ProcessQueue中的locked屬性置為true表示該訊息佇列已加鎖成功;
  4. 處理加鎖失敗的訊息佇列,如果回應中未包含某個訊息佇列的資訊,表示此訊息佇列加鎖失敗,需要將其對應的ProcessQueue物件中的locked屬性置為false表示加鎖失敗;
public abstract class RebalanceImpl {
    public void lockAll() {
        // 從處理隊串列中獲取broker對應的訊息佇列,key為broker名稱,value為broker下的訊息佇列
        HashMap<String, Set<MessageQueue>> brokerMqs = this.buildProcessQueueTableByBrokerName();
        // 遍歷訂閱的訊息佇列
        Iterator<Entry<String, Set<MessageQueue>>> it = brokerMqs.entrySet().iterator();
        while (it.hasNext()) {
            Entry<String, Set<MessageQueue>> entry = it.next();
            // broker名稱
            final String brokerName = entry.getKey();
            // 獲取訊息佇列
            final Set<MessageQueue> mqs = entry.getValue();
            if (mqs.isEmpty())
                continue;
            // 根據broker名稱獲取broker資訊
            FindBrokerResult findBrokerResult = this.mQClientFactory.findBrokerAddressInSubscribe(brokerName, MixAll.MASTER_ID, true);
            if (findBrokerResult != null) {
                // 構建加鎖請求
                LockBatchRequestBody requestBody = new LockBatchRequestBody();
                // 設定消費者組
                requestBody.setConsumerGroup(this.consumerGroup);
                // 設定ID
                requestBody.setClientId(this.mQClientFactory.getClientId());
                // 設定要加鎖的訊息佇列
                requestBody.setMqSet(mqs);

                try {
                    // 批量進行加鎖,回傳加鎖成功的訊息佇列
                    Set<MessageQueue> lockOKMQSet =
                        this.mQClientFactory.getMQClientAPIImpl().lockBatchMQ(findBrokerResult.getBrokerAddr(), requestBody, 1000);
                    // 遍歷加鎖成功的佇列
                    for (MessageQueue mq : lockOKMQSet) {
                        // 從處理隊串列中獲取對應的處理佇列物件
                        ProcessQueue processQueue = this.processQueueTable.get(mq);
                        // 如果不為空,設定locked為true表示加鎖成功
                        if (processQueue != null) {
                            if (!processQueue.isLocked()) {
                                log.info("the message queue locked OK, Group: {} {}", this.consumerGroup, mq);
                            }
                            // 設定加鎖成功標記
                            processQueue.setLocked(true);
                            processQueue.setLastLockTimestamp(System.currentTimeMillis());
                        }
                    }
                    // 處理加鎖失敗的訊息佇列
                    for (MessageQueue mq : mqs) {
                        if (!lockOKMQSet.contains(mq)) {
                            ProcessQueue processQueue = this.processQueueTable.get(mq);
                            if (processQueue != null) {
                                // 設定加鎖失敗標記
                                processQueue.setLocked(false);
                                log.warn("the message queue locked Failed, Group: {} {}", this.consumerGroup, mq);
                            }
                        }
                    }
                } catch (Exception e) {
                    log.error("lockBatchMQ exception, " + mqs, e);
                }
            }
        }
    }
}

在【RocketMQ】訊息的拉取一文中講到,消費者需要先向Broker發送拉取訊息請求,從Broker中拉取訊息,拉取訊息請求構建在RebalanceImpl的updateProcessQueueTableInRebalance方法中,拉取訊息的回應結果處理在PullCallback的onSuccess方法中,接下來看下順序消費時在這兩個程序中是如何處理的,

拉取訊息

上面已經知道,在使用順序訊息時,會周期性的對訂閱的訊息佇列進行加鎖,不過由于負載均衡等原因,有可能給當前消費者分配新的訊息佇列,此時可能還未來得及通過定時任務加鎖,所以消費者在構建訊息拉取請求前會再次進行判斷,如果processQueueTable中之前未包含某個訊息佇列,會先呼叫lock方法進行加鎖,lock方法的實作邏輯與lockAll基本一致,如果加鎖成功構建拉取請求進行訊息拉取,如果加鎖失敗,則跳過繼續處理下一個訊息佇列:

public abstract class RebalanceImpl {
    private boolean updateProcessQueueTableInRebalance(final String topic, final Set<MessageQueue> mqSet,
        final boolean isOrder) {
        // ...
        List<PullRequest> pullRequestList = new ArrayList<PullRequest>();
        // 遍歷佇列集合
        for (MessageQueue mq : mqSet) {
            // 如果processQueueTable之前不包含當前的訊息佇列
            if (!this.processQueueTable.containsKey(mq)) {
                // 如果是順序消費,呼叫lock方法進行加鎖,如果加鎖失敗不往下執行,繼續處理下一個訊息佇列
                if (isOrder && !this.lock(mq)) {
                    log.warn("doRebalance, {}, add a new mq failed, {}, because lock failed", consumerGroup, mq);
                    continue;
                }
                // ... 
                // 如果偏移量大于等于0
                if (nextOffset >= 0) {
                    // 放入處理隊串列中
                    ProcessQueue pre = this.processQueueTable.putIfAbsent(mq, pq);
                    if (pre != null) {
                        log.info("doRebalance, {}, mq already exists, {}", consumerGroup, mq);
                    } else {
                        // 如果之前不存在,構建PullRequest,之后對請求進行處理,進行訊息拉取
                        log.info("doRebalance, {}, add a new mq, {}", consumerGroup, mq);
                        PullRequest pullRequest = new PullRequest();
                        pullRequest.setConsumerGroup(consumerGroup);
                        pullRequest.setNextOffset(nextOffset);
                        pullRequest.setMessageQueue(mq);
                        pullRequest.setProcessQueue(pq);
                        pullRequestList.add(pullRequest);
                        changed = true;
                    }
                } else {
                    log.warn("doRebalance, {}, add new mq failed, {}", consumerGroup, mq);
                }
            }
        }
        // 添加訊息拉取請求
        this.dispatchPullRequest(pullRequestList);

        return changed;
    }
  
    public boolean lock(final MessageQueue mq) {
        // 獲取broker資訊
        FindBrokerResult findBrokerResult = this.mQClientFactory.findBrokerAddressInSubscribe(mq.getBrokerName(), MixAll.MASTER_ID, true);
        if (findBrokerResult != null) {
            // 構建加鎖請求
            LockBatchRequestBody requestBody = new LockBatchRequestBody();
            requestBody.setConsumerGroup(this.consumerGroup);
            requestBody.setClientId(this.mQClientFactory.getClientId());
            // 設定要加鎖的訊息佇列
            requestBody.getMqSet().add(mq);

            try {
                // 發送加鎖請求
                Set<MessageQueue> lockedMq =
                    this.mQClientFactory.getMQClientAPIImpl().lockBatchMQ(findBrokerResult.getBrokerAddr(), requestBody, 1000);
                for (MessageQueue mmqq : lockedMq) {
                    ProcessQueue processQueue = this.processQueueTable.get(mmqq);
                    // 如果加鎖成功設定成功標記
                    if (processQueue != null) {
                        processQueue.setLocked(true);
                        processQueue.setLastLockTimestamp(System.currentTimeMillis());
                    }
                }
                boolean lockOK = lockedMq.contains(mq);
                log.info("the message queue lock {}, {} {}",
                    lockOK ? "OK" : "Failed",
                    this.consumerGroup,
                    mq);
                return lockOK;
            } catch (Exception e) {
                log.error("lockBatchMQ exception, " + mq, e);
            }
        }

        return false;
    }
}

順序訊息消費

PullCallbackonSuccess方法中可以看到,如果從Broker拉取到訊息,會呼叫ConsumeMessageService的submitConsumeRequest方法將訊息提交到ConsumeMessageService中進行消費:

public class DefaultMQPushConsumerImpl implements MQConsumerInner {
    public void pullMessage(final PullRequest pullRequest) {
        // ...
        // 拉取訊息回呼函式
        PullCallback pullCallback = new PullCallback() {
            @Override
            public void onSuccess(PullResult pullResult) {
                if (pullResult != null) {
                    // 處理拉取結果
                    pullResult = DefaultMQPushConsumerImpl.this.pullAPIWrapper.processPullResult(pullRequest.getMessageQueue(), pullResult,
                            subscriptionData);
                    // 判斷拉取結果
                    switch (pullResult.getPullStatus()) {
                        case FOUND:
                            // ...
                            // 如果未拉取到訊息
                            if (pullResult.getMsgFoundList() == null || pullResult.getMsgFoundList().isEmpty()) {
                                // 將拉取請求放入到阻塞佇列中再進行一次拉取
                                DefaultMQPushConsumerImpl.this.executePullRequestImmediately(pullRequest);
                            } else {
                                // ...
                                // 如果拉取到訊息,將訊息提交到ConsumeMessageService中進行消費
                                DefaultMQPushConsumerImpl.this.consumeMessageService.submitConsumeRequest(
                                        pullResult.getMsgFoundList(),
                                        processQueue,
                                        pullRequest.getMessageQueue(),
                                        dispatchToConsume);
                                // ...
                            }
                        // ...
                    }
                }
            }
        };
    }
}

前面知道順序消費時使用的是ConsumeMessageOrderlyService,首先在ConsumeMessageOrderlyService的建構式中可以看到
初始化了一個訊息消費執行緒池,也就是說順序消費時也是開啟多執行緒進行消費的:

public class ConsumeMessageOrderlyService implements ConsumeMessageService {
    public ConsumeMessageOrderlyService(DefaultMQPushConsumerImpl defaultMQPushConsumerImpl,
        MessageListenerOrderly messageListener) {
        // ...
        // 設定訊息消費執行緒池
        this.consumeExecutor = new ThreadPoolExecutor(
            this.defaultMQPushConsumer.getConsumeThreadMin(),
            this.defaultMQPushConsumer.getConsumeThreadMax(),
            1000 * 60,
            TimeUnit.MILLISECONDS,
            this.consumeRequestQueue,
            new ThreadFactoryImpl(consumeThreadPrefix));
        this.scheduledExecutorService = Executors.newSingleThreadScheduledExecutor(new ThreadFactoryImpl("ConsumeMessageScheduledThread_"));
    }
}

接下來看submitConsumeRequest方法,可以看到構建了ConsumeRequest物件,將拉取的訊息提交到了訊息消費執行緒池中進行消費:

public class ConsumeMessageOrderlyService implements ConsumeMessageService {
   
    @Override
    public void submitConsumeRequest(
        final List<MessageExt> msgs,
        final ProcessQueue processQueue,
        final MessageQueue messageQueue,
        final boolean dispathToConsume) {
        if (dispathToConsume) {
            // 構建ConsumeRequest
            ConsumeRequest consumeRequest = new ConsumeRequest(processQueue, messageQueue);
            this.consumeExecutor.submit(consumeRequest);
        }
    }
    
}

消費時的訊息佇列鎖

ConsumeRequestConsumeMessageOrderlyService的內部類,它有兩個成員變數,分別為MessageQueue訊息佇列和它對應的處理佇列ProcessQueue物件,
在run方法中,對訊息進行消費,處理邏輯如下:

  1. 判斷ProcessQueue是否被洗掉,如果被洗掉終止處理;
  2. 呼叫messageQueueLock的ftchLockObject方法獲取訊息佇列的物件鎖,然后使用synchronized進行加鎖,這里加鎖的原因是因為順序消費使用的是執行緒池,可以設定多個執行緒同時進行消費,所以某個執行緒在進行訊息消費的時候要對訊息佇列加鎖,防止其他執行緒并發消費,破壞訊息的順序性
  3. 如果是廣播模式、或者當前的訊息佇列已經加鎖成功(Locked置為true)并且加鎖時間未過期,開始對拉取的訊息進行遍歷:
  • 如果是集群模式并且訊息佇列加鎖失敗,呼叫tryLockLaterAndReconsume稍后重新進行加鎖;
  • 如果是集群模式并且訊息佇列加鎖時間已經過期,呼叫tryLockLaterAndReconsume稍后重新進行加鎖;
  • 如果當前時間距離開始處理的時間超過了最大消費時間,呼叫submitConsumeRequestLater稍后重新進行處理;
  • 獲取批量消費訊息個數,從ProcessQueue獲取訊息內容,如果訊息獲取不為空,添加訊息消費鎖,然后呼叫messageListener的consumeMessage方法進行訊息消費;
public class ConsumeMessageOrderlyService implements ConsumeMessageService {
 
   class ConsumeRequest implements Runnable {
        private final ProcessQueue processQueue; // 訊息佇列對應的處理佇列
        private final MessageQueue messageQueue; // 訊息佇列

        public ConsumeRequest(ProcessQueue processQueue, MessageQueue messageQueue) {
            this.processQueue = processQueue;
            this.messageQueue = messageQueue;
        }

        @Override
        public void run() {
            // 處理佇列如果已經被置為洗掉狀態,跳過不進行處理
            if (this.processQueue.isDropped()) {
                log.warn("run, the message queue not be able to consume, because it's dropped. {}", this.messageQueue);
                return;
            }
            // 獲取訊息佇列的物件鎖
            final Object objLock = messageQueueLock.fetchLockObject(this.messageQueue);
            // 物件訊息佇列的物件鎖加鎖
            synchronized (objLock) {
                // 如果是廣播模式、或者當前的訊息佇列已經加鎖成功并且加鎖時間未過期
                if (MessageModel.BROADCASTING.equals(ConsumeMessageOrderlyService.this.defaultMQPushConsumerImpl.messageModel())
                    || (this.processQueue.isLocked() && !this.processQueue.isLockExpired())) {
                    final long beginTime = System.currentTimeMillis();
                    for (boolean continueConsume = true; continueConsume; ) {
                        // 判斷processQueue是否洗掉
                        if (this.processQueue.isDropped()) {
                            log.warn("the message queue not be able to consume, because it's dropped. {}", this.messageQueue);
                            break;
                        }
                        // 如果是集群模式并且processQueue的加鎖失敗
                        if (MessageModel.CLUSTERING.equals(ConsumeMessageOrderlyService.this.defaultMQPushConsumerImpl.messageModel())
                            && !this.processQueue.isLocked()) {
                            log.warn("the message queue not locked, so consume later, {}", this.messageQueue);
                            // 稍后進行加鎖
                            ConsumeMessageOrderlyService.this.tryLockLaterAndReconsume(this.messageQueue, this.processQueue, 10);
                            break;
                        }
                        // 如果是集群模式并且訊息佇列加鎖時間已經過期
                        if (MessageModel.CLUSTERING.equals(ConsumeMessageOrderlyService.this.defaultMQPushConsumerImpl.messageModel())
                            && this.processQueue.isLockExpired()) {
                            log.warn("the message queue lock expired, so consume later, {}", this.messageQueue);
                            // 稍后進行加鎖
                            ConsumeMessageOrderlyService.this.tryLockLaterAndReconsume(this.messageQueue, this.processQueue, 10);
                            break;
                        }

                        long interval = System.currentTimeMillis() - beginTime;
                        // 如果當前時間距離開始處理的時間超過了最大消費時間
                        if (interval > MAX_TIME_CONSUME_CONTINUOUSLY) {
                            // 稍后重新進行處理
                            ConsumeMessageOrderlyService.this.submitConsumeRequestLater(processQueue, messageQueue, 10);
                            break;
                        }
                        // 批量消費訊息個數
                        final int consumeBatchSize =
                            ConsumeMessageOrderlyService.this.defaultMQPushConsumer.getConsumeMessageBatchMaxSize();
                        // 獲取訊息內容
                        List<MessageExt> msgs = this.processQueue.takeMessages(consumeBatchSize);
                        defaultMQPushConsumerImpl.resetRetryAndNamespace(msgs, defaultMQPushConsumer.getConsumerGroup());
                        if (!msgs.isEmpty()) {
                            // ...
                            try {
                                // 加消費鎖
                                this.processQueue.getConsumeLock().lock();
                                if (this.processQueue.isDropped()) {
                                    log.warn("consumeMessage, the message queue not be able to consume, because it's dropped. {}",
                                        this.messageQueue);
                                    break;
                                }
                                // 消費訊息
                                status = messageListener.consumeMessage(Collections.unmodifiableList(msgs), context);
                            } catch (Throwable e) {
                                log.warn(String.format("consumeMessage exception: %s Group: %s Msgs: %s MQ: %s",
                                    RemotingHelper.exceptionSimpleDesc(e),
                                    ConsumeMessageOrderlyService.this.consumerGroup,
                                    msgs,
                                    messageQueue), e);
                                hasException = true;
                            } finally {
                                // 釋放訊息消費鎖
                                this.processQueue.getConsumeLock().unlock();
                            }
                            // ...
                            ConsumeMessageOrderlyService.this.getConsumerStatsManager()
                                .incConsumeRT(ConsumeMessageOrderlyService.this.consumerGroup, messageQueue.getTopic(), consumeRT);

                            continueConsume = ConsumeMessageOrderlyService.this.processConsumeResult(msgs, status, context, this);
                        } else {
                            continueConsume = false;
                        }
                    }
                } else {
                    if (this.processQueue.isDropped()) {
                        log.warn("the message queue not be able to consume, because it's dropped. {}", this.messageQueue);
                        return;
                    }

                    ConsumeMessageOrderlyService.this.tryLockLaterAndReconsume(this.messageQueue, this.processQueue, 100);
                }
            }
        }

    }
}

MessageQueueLock中使用了ConcurrentHashMap存盤每個訊息佇列對應的物件鎖,物件鎖實際上是一個Object類的物件,從Map中獲取訊息佇列的物件鎖時,如果物件鎖不存在,則新建一個Object物件,并放入Map集合中:

public class MessageQueueLock {
    private ConcurrentMap<MessageQueue, Object> mqLockTable =
        new ConcurrentHashMap<MessageQueue, Object>();

    public Object fetchLockObject(final MessageQueue mq) {
        // 獲取訊息佇列對應的物件鎖,也就是一個Object型別的物件
        Object objLock = this.mqLockTable.get(mq);
        // 如果獲取尾款
        if (null == objLock) {
            // 創建物件
            objLock = new Object();
            // 加入到Map中
            Object prevLock = this.mqLockTable.putIfAbsent(mq, objLock);
            if (prevLock != null) {
                objLock = prevLock;
            }
        }

        return objLock;
    }
}

訊息消費鎖

ProcessQueue中持有一個訊息消費鎖,消費者呼叫consumeMessage進行訊息前,會添加消費鎖,上面已經知道在處理拉取到的訊息時就已經呼叫messageQueueLock的fetchLockObject方法獲取訊息佇列的物件鎖然后使用syncronized對其加鎖,那么為什么在消費之前還要再加一個消費鎖呢?

public class ProcessQueue {
    // 訊息消費鎖
    private final Lock consumeLock = new ReentrantLock();

    public Lock getConsumeLock() {
        return consumeLock;
    }
}

這里講一個小技巧,如果在查看原始碼的時候對某個方法有疑問,可以查看一下這個方法在哪里被呼叫了,結合呼叫處的代碼處理邏輯進行猜測,
那么就來看下getConsumeLock在哪里被呼叫了,可以看到除了C的run方法中呼叫了之外,RebalancePushImpl中的removeUnnecessaryMessageQueue方法也呼叫了getConsumeLock方法:

removeUnnecessaryMessageQueue方法從名字上可以看出,是移除不需要的訊息佇列,RebalancePushImpl是與負載均衡相關的類,所以猜測有可能在負載均衡時,需要移除某個訊息佇列,那么消費者在進行消費的時候就要獲取ProcessQueue的consumeLock進行加鎖,防止正在消費的程序中,消費佇列被移除:

public class RebalancePushImpl extends RebalanceImpl {
   @Override
    public boolean removeUnnecessaryMessageQueue(MessageQueue mq, ProcessQueue pq) {
        this.defaultMQPushConsumerImpl.getOffsetStore().persist(mq);
        this.defaultMQPushConsumerImpl.getOffsetStore().removeOffset(mq);
        // 如果是順序消費并且是集模式
        if (this.defaultMQPushConsumerImpl.isConsumeOrderly()
            && MessageModel.CLUSTERING.equals(this.defaultMQPushConsumerImpl.messageModel())) {
            try {
                // 進行加鎖
                if (pq.getConsumeLock().tryLock(1000, TimeUnit.MILLISECONDS)) {
                    try {
                        return this.unlockDelay(mq, pq);
                    } finally {
                        pq.getConsumeLock().unlock();
                    }
                } else {
                    log.warn("[WRONG]mq is consuming, so can not unlock it, {}. maybe hanged for a while, {}",
                        mq,
                        pq.getTryUnlockTimes());

                    pq.incTryUnlockTimes();
                }
            } catch (Exception e) {
                log.error("removeUnnecessaryMessageQueue Exception", e);
            }

            return false;
        }
        return true;
    }
}

不過在消費者在消費訊息前已經對佇列進行了加鎖,負載均衡的時候為什么不使用佇列鎖而要使用消費鎖?

這里應該是為了減小鎖的粒度,因為消費者在對訊息佇列加鎖后,還進行了一系列的判斷,校驗都成功之后從處理佇列中獲取訊息內容,之后才開始消費訊息,如果負載均衡使用訊息佇列鎖就要等待整個程序完成才有可能加鎖成功,這樣顯然會降低性能,而如果使用訊息消費鎖,就可以減少等待的時間,并且消費者在進行訊息消費前也會判斷ProcessQueue是否被移除,所以只要保證consumeMessage方法在執行的程序中,ProcessQueue不被移除即可,

總結

目前一共涉及了三把鎖,它們分別對應不同的情況:

向Broker申請的訊息佇列鎖

集群模式下一個訊息佇列同一時刻只能被同一個消費組下的某一個消費者進行,為了避免負載均衡等原因引起的變動,消費者會向Broker發送請求對訊息佇列進行加鎖,如果加鎖成功,記錄到訊息佇列對應的ProcessQueue中的locked變數中,它是boolean型別的:

public class ProcessQueue {
    private volatile boolean locked = false;
}

消費者處理拉取訊息時的訊息佇列鎖

消費者在處理拉取到的訊息時,由于可以開啟多執行緒進行處理,所以處理訊息前通過MessageQueueLock中的mqLockTable獲取到了訊息佇列對應的鎖,鎖住要處理的訊息佇列,這里加訊息佇列鎖主要是處理多執行緒之間的競爭:

public class MessageQueueLock {
    private ConcurrentMap<MessageQueue, Object> mqLockTable =
        new ConcurrentHashMap<MessageQueue, Object>();

訊息消費鎖

消費者在呼叫consumeMessage方法之前會加消費鎖,主要是為了避免在消費訊息時,由于負載均衡等原因,ProcessQueue被洗掉:


public class ProcessQueue {
    private final Lock consumeLock = new ReentrantLock();
}

參考
丁威、周繼鋒《RocketMQ技術內幕》

RocketMQ版本:4.9.3

轉載請註明出處,本文鏈接:https://www.uj5u.com/houduan/537687.html

標籤:其他

上一篇:OpenGL Windows 搭建環境(MFC版本)

下一篇:day20-web開發會話技術02

標籤雲
其他(157675) Python(38076) JavaScript(25376) Java(17977) C(15215) 區塊鏈(8255) C#(7972) AI(7469) 爪哇(7425) MySQL(7132) html(6777) 基礎類(6313) sql(6102) 熊猫(6058) PHP(5869) 数组(5741) R(5409) Linux(5327) 反应(5209) 腳本語言(PerlPython)(5129) 非技術區(4971) Android(4554) 数据框(4311) css(4259) 节点.js(4032) C語言(3288) json(3245) 列表(3129) 扑(3119) C++語言(3117) 安卓(2998) 打字稿(2995) VBA(2789) Java相關(2746) 疑難問題(2699) 细绳(2522) 單片機工控(2479) iOS(2429) ASP.NET(2402) MongoDB(2323) 麻木的(2285) 正则表达式(2254) 字典(2211) 循环(2198) 迅速(2185) 擅长(2169) 镖(2155) 功能(1967) .NET技术(1958) Web開發(1951) python-3.x(1918) HtmlCss(1915) 弹簧靴(1913) C++(1909) xml(1889) PostgreSQL(1872) .NETCore(1853) 谷歌表格(1846) Unity3D(1843) for循环(1842)

熱門瀏覽
  • 【C++】Microsoft C++、C 和匯編程式檔案

    ......

    uj5u.com 2020-09-10 00:57:23 more
  • 例外宣告

    相比于斷言適用于排除邏輯上不可能存在的狀態,例外通常是用于邏輯上可能發生的錯誤。 例外宣告 Item 1:當函式不可能拋出例外或不能接受拋出例外時,使用noexcept 理由 如果不打算拋出例外的話,程式就會認為無法處理這種錯誤,并且應當盡早終止,如此可以有效地阻止例外的傳播與擴散。 示例 //不可 ......

    uj5u.com 2020-09-10 00:57:27 more
  • Codeforces 1400E Clear the Multiset(貪心 + 分治)

    鏈接:https://codeforces.com/problemset/problem/1400/E 來源:Codeforces 思路:給你一個陣列,現在你可以進行兩種操作,操作1:將一段沒有 0 的區間進行減一的操作,操作2:將 i 位置上的元素歸零。最終問:將這個陣列的全部元素歸零后操作的最少 ......

    uj5u.com 2020-09-10 00:57:30 more
  • UVA11610 【Reverse Prime】

    本人看到此題沒有翻譯,就附帶了一個自己的翻譯版本 思考 這一題,它的第一個要求是找出所有 $7$ 位反向質數及其質因數的個數。 我們應該需要質數篩篩選1~$10^{7}$的所有數,這里就不慢慢介紹了。但是,重讀題,我們突然發現反向質數都是 $7$ 位,而將它反過來后的數字卻是 $6$ 位數,這就說明 ......

    uj5u.com 2020-09-10 00:57:36 more
  • 統計區間素數數量

    1 #pragma GCC optimize(2) 2 #include <bits/stdc++.h> 3 using namespace std; 4 bool isprime[1000000010]; 5 vector<int> prime; 6 inline int getlist(int ......

    uj5u.com 2020-09-10 00:57:47 more
  • C/C++編程筆記:C++中的 const 變數詳解,教你正確認識const用法

    1、C中的const 1、區域const變數存放在堆疊區中,會分配記憶體(也就是說可以通過地址間接修改變數的值)。測驗代碼如下: 運行結果: 2、全域const變數存放在只讀資料段(不能通過地址修改,會發生寫入錯誤), 默認為外部聯編,可以給其他源檔案使用(需要用extern關鍵字修飾) 運行結果: ......

    uj5u.com 2020-09-10 00:58:04 more
  • 【C++犯錯記錄】VS2019 MFC添加資源不懂如何修改資源宏ID

    1. 首先在資源視圖中,添加資源 2. 點擊新添加的資源,復制自動生成的ID 3. 在解決方案資源管理器中找到Resource.h檔案,編輯,使用整個專案搜索和替換的方式快速替換 宏宣告 4. Ctrl+Shift+F 全域搜索,點擊查找全部,然后逐個替換 5. 為什么使用搜索替換而不使用屬性視窗直 ......

    uj5u.com 2020-09-10 00:59:11 more
  • 【C++犯錯記錄】VS2019 MFC不懂的批量添加資源

    1. 打開資源頭檔案Resource.h,在其中預先定義好宏 ID(不清楚其實ID值應該設定多少,可以先新建一個相同的資源項,再在這個資源的ID值的基礎上遞增即可) 2. 在資源視圖中選中專案資源,按F7編輯資源檔案,按 ID 型別 相對路徑的形式添加 資源。(別忘了先把檔案拷貝到專案中的res檔案 ......

    uj5u.com 2020-09-10 01:00:19 more
  • C/C++編程筆記:關于C++的參考型別,專供新手入門使用

    今天要講的是C++中我最喜歡的一個用法——參考,也叫別名。 參考就是給一個變數名取一個變數名,方便我們間接地使用這個變數。我們可以給一個變數創建N個參考,這N + 1個變數共享了同一塊記憶體區域。(參考型別的變數會占用記憶體空間,占用的記憶體空間的大小和指標型別的大小是相同的。雖然參考是一個物件的別名,但 ......

    uj5u.com 2020-09-10 01:00:22 more
  • 【C/C++編程筆記】從頭開始學習C ++:初學者完整指南

    眾所周知,C ++的學習曲線陡峭,但是花時間學習這種語言將為您的職業帶來奇跡,并使您與其他開發人員區分開。您會更輕松地學習新語言,形成真正的解決問題的技能,并在編程的基礎上打下堅實的基礎。 C ++將幫助您養成良好的編程習慣(即清晰一致的編碼風格,在撰寫代碼時注釋代碼,并限制類內部的可見性),并且由 ......

    uj5u.com 2020-09-10 01:00:41 more
最新发布
  • Rust中的智能指標:Box<T> Rc<T> Arc<T> Cell<T> RefCell<T> Weak

    Rust中的智能指標是什么 智能指標(smart pointers)是一類資料結構,是擁有資料所有權和額外功能的指標。是指標的進一步發展 指標(pointer)是一個包含記憶體地址的變數的通用概念。這個地址參考,或 ” 指向”(points at)一些其 他資料 。參考以 & 符號為標志并借用了他們所 ......

    uj5u.com 2023-04-20 07:24:10 more
  • Java的值傳遞和參考傳遞

    值傳遞不會改變本身,參考傳遞(如果傳遞的值需要實體化到堆里)如果發生修改了會改變本身。 1.基本資料型別都是值傳遞 package com.example.basic; public class Test { public static void main(String[] args) { int ......

    uj5u.com 2023-04-20 07:24:04 more
  • [2]SpinalHDL教程——Scala簡單入門

    第一個 Scala 程式 shell里面輸入 $ scala scala> 1 + 1 res0: Int = 2 scala> println("Hello World!") Hello World! 檔案形式 object HelloWorld { /* 這是我的第一個 Scala 程式 * 以 ......

    uj5u.com 2023-04-20 07:23:58 more
  • 理解函式指標和回呼函式

    理解 函式指標 指向函式的指標。比如: 理解函式指標的偽代碼 void (*p)(int type, char *data); // 定義一個函式指標p void func(int type, char *data); // 宣告一個函式func p = func; // 將指標p指向函式func ......

    uj5u.com 2023-04-20 07:23:52 more
  • Django筆記二十五之資料庫函式之日期函式

    本文首發于公眾號:Hunter后端 原文鏈接:Django筆記二十五之資料庫函式之日期函式 日期函式主要介紹兩個大類,Extract() 和 Trunc() Extract() 函式作用是提取日期,比如我們可以提取一個日期欄位的年份,月份,日等資料 Trunc() 的作用則是截取,比如 2022-0 ......

    uj5u.com 2023-04-20 07:23:45 more
  • 一天吃透JVM面試八股文

    什么是JVM? JVM,全稱Java Virtual Machine(Java虛擬機),是通過在實際的計算機上仿真模擬各種計算機功能來實作的。由一套位元組碼指令集、一組暫存器、一個堆疊、一個垃圾回收堆和一個存盤方法域等組成。JVM屏蔽了與作業系統平臺相關的資訊,使得Java程式只需要生成在Java虛擬機 ......

    uj5u.com 2023-04-20 07:23:31 more
  • 使用Java接入小程式訂閱訊息!

    更新完微信服務號的模板訊息之后,我又趕緊把微信小程式的訂閱訊息給實作了!之前我一直以為微信小程式也是要企業才能申請,沒想到小程式個人就能申請。 訊息推送平臺🔥推送下發【郵件】【短信】【微信服務號】【微信小程式】【企業微信】【釘釘】等訊息型別。 https://gitee.com/zhongfuch ......

    uj5u.com 2023-04-20 07:22:59 more
  • java -- 緩沖流、轉換流、序列化流

    緩沖流 緩沖流, 也叫高效流, 按照資料型別分類: 位元組緩沖流:BufferedInputStream,BufferedOutputStream 字符緩沖流:BufferedReader,BufferedWriter 緩沖流的基本原理,是在創建流物件時,會創建一個內置的默認大小的緩沖區陣列,通過緩沖 ......

    uj5u.com 2023-04-20 07:22:49 more
  • Java-SpringBoot-Range請求頭設定實作視頻分段傳輸

    老實說,人太懶了,現在基本都不喜歡寫筆記了,但是網上有關Range請求頭的文章都太水了 下面是抄的一段StackOverflow的代碼...自己大修改過的,寫的注釋挺全的,應該直接看得懂,就不解釋了 寫的不好...只是希望能給視頻網站開發的新手一點點幫助吧. 業務場景:視頻分段傳輸、視頻多段傳輸(理 ......

    uj5u.com 2023-04-20 07:22:42 more
  • Windows 10開發教程_編程入門自學教程_菜鳥教程-免費教程分享

    教程簡介 Windows 10開發入門教程 - 從簡單的步驟了解Windows 10開發,從基本到高級概念,包括簡介,UWP,第一個應用程式,商店,XAML控制元件,資料系結,XAML性能,自適應設計,自適應UI,自適應代碼,檔案管理,SQLite資料庫,應用程式到應用程式通信,應用程式本地化,應用程式 ......

    uj5u.com 2023-04-20 07:22:35 more