主頁 >  其他 > Rocketmq原始碼決議-Producer訊息發送程序(1)

Rocketmq原始碼決議-Producer訊息發送程序(1)

2022-02-02 07:59:09 其他

訊息發送流程

在這里插入圖片描述

DefaultMQProducer-發送入口

訊息發送的入口在DefaultMQProducer類中的send方法,這個方法有很多多載方法,對應的方法引數也各不一樣,

SendResult send(Message msg);
SendResult send(Message msg,long timeout);
SendResult send(Message msg,SendCallback sendCallback,long timeout);
SendResult send(Message msg, MessageQueue mq, SendCallback sendCallback, long timeout);
SendResult send(Message msg,MessageQueueSelector selector, Object arg,SendCallback sendCallback,long timeout);

......省略更多

重點引數說明一下:

引數型別說明
msgMessage訊息物件
mqMessageQueue目標訊息佇列,通過它獲得要傳遞訊息到指定的訊息佇列
sendCallbackSendCallback異步模式在發送完成(成功或不成功)時的回呼
selectorMessageQueueSelector訊息佇列選擇器
argObject該引數與訊息佇列選擇器配合使用
timeoutlong發送超時時間

DefaultMQProduceImpl-訊息發送實作類

send 方法對應的實作直接委托給DefaultMQProduceImpl來實作具體細節,具體有兩種默認的發送訊息的實作,一個是同步發送,一個是異步發送,兩者區別很簡單,異步相對于同步方法來說多了一個自定義回呼SendCallback實作,看一下兩者的對比如下:

    /**
     * DEFAULT SYNC
     */
    public SendResult send(
        Message msg) throws MQClientException, RemotingException, MQBrokerException, InterruptedException {
        return send(msg, this.defaultMQProducer.getSendMsgTimeout());
    }
   /**
     * DEFAULT ASYNC
     */
    public void send(Message msg,
        SendCallback sendCallback) throws MQClientException, RemotingException, InterruptedException {
        send(msg, sendCallback, this.defaultMQProducer.getSendMsgTimeout());
    }

上面兩個方法呼叫的底層都是一個,即sendDefaultImpl,重點引數說明一下,

引數型別說明
msgMessage訊息物件
communicationModemqCommunicationMode通信方式,同步或者異步
sendCallbackSendCallback異步模式在發送完成(成功或不成功)時的回呼
timeoutlong發送超時時間

sendDefaultImpl-發送訊息底層方法

private SendResult sendDefaultImpl(
            Message msg,
            final CommunicationMode communicationMode,
            final SendCallback sendCallback,
            final long timeout) {
        //確認Producer客戶端狀態是否ok-(ServieceState.RUNNING)
        this.makeSureStateOK();
		/**
		 * 訊息校驗,包含訊息的topic校驗和訊息體的校驗,
		 *  topic校驗包含以下幾點,topic的名字,長度以及是否為不準用的topic(SCHEDULE_TOPIC_XXXX)
		 *  訊息體校驗 訊息體是不是空和訊息體的長度
		 */ 
        Validators.checkMessage(msg, this.defaultMQProducer);
        //本次呼叫的id
        final long invokeID = random.nextLong();
        //本地呼叫開始時間
        long beginTimestampFirst = System.currentTimeMillis();
        long beginTimestampPrev = beginTimestampFirst;
        long endTimestamp = beginTimestampFirst;
	    /**
	     * 嘗試去查找Topic的訂閱資訊
	     * 1.從topicPublishInfoTable快取中查找
	     * 2.如果快取為空,則從nameserver拉取配置
	     */
        TopicPublishInfo topicPublishInfo = this.tryToFindTopicPublishInfo(msg.getTopic());
        if (topicPublishInfo != null && topicPublishInfo.ok()) {
            boolean callTimeout = false;
            MessageQueue mq = null;
            Exception exception = null;
            SendResult sendResult = null;
            //若通信模式為同步模式,則失敗重試次數=3次(默認2次+1),若為異步模式則為1次
            int timesTotal = communicationMode == CommunicationMode.SYNC
                             ? 1 + this.defaultMQProducer.getRetryTimesWhenSendFailed() : 1;
            int times = 0;
            //記錄每一次重試時候發送訊息目標Broker名字的陣列
            String[] brokersSent = new String[timesTotal];
            //進行重試次數回圈發送訊息邏輯
            for (; times < timesTotal; times++) {
                //獲取broker名字,第一次為null,第二次為上次選擇的broker名字
                String lastBrokerName = null == mq ? null : mq.getBrokerName();
                //根據Topic訂閱資訊+上一次broker名字選擇一個訊息佇列
                // 有失敗重試策略,默認使用 RoundRobin 演算法,可以通過 DefaultMQProducer#setSendLatencyFaultEnable 設定啟用 LatencyFault 策略
                MessageQueue mqSelected = this.selectOneMessageQueue(topicPublishInfo, lastBrokerName);
                //如果佇列不為空
                if (mqSelected != null) {
                    mq = mqSelected;
                    //將本次選中的訊息佇列的broker名字添加到陣列中
                    brokersSent[times] = mq.getBrokerName();
                    //重新賦值呼叫開始時間
                    beginTimestampPrev = System.currentTimeMillis();
                    if (times > 0) {
                        //Reset topic with namespace during resend.
                       msg.setTopic(this.defaultMQProducer.withNamespace(msg.getTopic()));
                    }
                    //發送訊息的花費時間
                    long costTime = beginTimestampPrev - beginTimestampFirst;
                  	//如果設定的超時時間小于花費時間,則進行超時標記,并break中斷,超時時間默認是3000毫秒
                    if (timeout < costTime) {
                        callTimeout = true;
                        break;
                    }
					//發送訊息到選中的佇列中
                    sendResult = this.sendKernelImpl(msg, mq, communicationMode, sendCallback, topicPublishInfo,
                                                     timeout - costTime);
                    endTimestamp = System.currentTimeMillis();
                    // 當設定啟用 LatencyFault 策略時,更新 FaultItem
                    //根據發送所耗費的時間決定是不是要將該broker加入到故障串列中  
                    this.updateFaultItem(mq.getBrokerName(), endTimestamp - beginTimestampPrev, false);
                    //如果是SYNC同步模式下發送失敗可進行重試,ASYNC/ONEWAY模式下直接回傳null
                    switch (communicationMode) {
                        case ASYNC:
                            return null;
                        case ONEWAY:
                            return null;
                        case SYNC:
                            if (sendResult.getSendStatus() != SendStatus.SEND_OK) {
                                if (this.defaultMQProducer.isRetryAnotherBrokerWhenNotStoreOK()) {
                                    continue;
                                }
                            }

                            return sendResult;
                        default:
                            break;
                    }

                } else {
                    break;
                }
            }

            if (sendResult != null) {
                return sendResult;
            }

            String info = String.format("Send [%d] times, still failed, cost [%d]ms, Topic: %s, BrokersSent: %s",
                                        times,
                                        System.currentTimeMillis() - beginTimestampFirst,
                                        msg.getTopic(),
                                        Arrays.toString(brokersSent));

            info += FAQUrl.suggestTodo(FAQUrl.SEND_MSG_FAILED);

            MQClientException mqClientException = new MQClientException(info, exception);
            if (callTimeout) {
                throw new RemotingTooMuchRequestException("sendDefaultImpl call timeout");
            }
        }
        validateNameServerSetting();
    }

這里對整個方法的邏輯步驟進行一個梳理:

  1. 檢查Producer客戶端狀態是否是運行狀態,校驗訊息的合法性,生成全域呼叫id(用于打日志)和相關的時間戳,
  2. 根據訊息的Topic獲取對應的訊息訂閱資訊,獲取邏輯在tryToFindTopicPublishInfo,這里會從快取map里獲取,如果沒有則回去namerServer拉取相應的訂閱資訊,稍后也會對該方法進行分析,
  3. 根據不同的訊息發送模式,計算訊息失敗重試的次數(同步模式是2+1,異步模式是1次,客戶端默認為retryTimesWhenSendFailed=2
  4. 根據選擇的brokerName,選擇一個訊息佇列MessageQueue,這里選擇佇列的方法是selectOneMessageQueue,稍后進行分析,
  5. 選擇好佇列之后,記錄對應的brokerName,對重試次數范圍內以及發送超時時間之內進行進行訊息發送,發送的邏輯在sendKernelImpl
  6. 發送之后進行容錯策略的更新,更新邏輯在updateFaultItem,之后進行分析,
  7. 根據不同的訊息通信模式進行處理,只有同步模式有處理邏輯,在同步邏輯發送失敗的時候如果開啟了retryAnotherBrokerWhenNotStoreOK(失敗重試發送到另外的broker),則進行重試,這里的重試還是跟上面的重試有區別的,這里是呼叫內部發送實作并且有回傳SendResult的情況,但是發送結果不是成功,

下面針對上述重要步驟的重要邏輯分別進行分析

1.嘗試去查找Topic訂閱資訊tryToFindTopicPublishInfo

	//Topic訂閱資訊快取Map
    private final ConcurrentMap<String/* topic */, TopicPublishInfo> topicPublishInfoTable =
        new ConcurrentHashMap<String, TopicPublishInfo>();
        
    private TopicPublishInfo tryToFindTopicPublishInfo(final String topic) {
    	//從Topic快取Map中根據topic獲取相應的訂閱資訊
        TopicPublishInfo topicPublishInfo = this.topicPublishInfoTable.get(topic);
        //如果快取為慷訓者訂閱資訊messageQueueList為空
        if (null == topicPublishInfo || !topicPublishInfo.ok()) {
            //創建一個TopicPublishInfo放入快取中
            this.topicPublishInfoTable.putIfAbsent(topic, new TopicPublishInfo());
            //根據topic從nameServer拉取最新的topic路由資訊,并更新快取
            this.mQClientFactory.updateTopicRouteInfoFromNameServer(topic);
            //從快取中獲取訂閱資訊
            topicPublishInfo = this.topicPublishInfoTable.get(topic);
        }
		//如果topic訂閱資訊包含路由資訊并且messageQueueList不為空,則直接回傳
        if (topicPublishInfo.isHaveTopicRouterInfo() || topicPublishInfo.ok()) {
            return topicPublishInfo;
        } else {
        	//重新從nameServer拉取最新的topic路由資訊,并直接回傳
            this.mQClientFactory.updateTopicRouteInfoFromNameServer(topic, true, this.defaultMQProducer);
            topicPublishInfo = this.topicPublishInfoTable.get(topic);
            return topicPublishInfo;
        }
    }

方法中關于updateTopicRouteInfoFromNameServer方法決議可以參考我上一篇的文章(),

2.選擇訊息佇列selectOneMessageQueue

該方法設計延遲故障的邏輯,下一篇進行詳細分析,邏輯簡單過一下,

    public MessageQueue selectOneMessageQueue(final TopicPublishInfo tpInfo, final String lastBrokerName) {
        //呼叫mq故障策略的selectOneMessageQueue
        return this.mqFaultStrategy.selectOneMessageQueue(tpInfo, lastBrokerName);
    }
    
    public MessageQueue selectOneMessageQueue(final TopicPublishInfo tpInfo, final String lastBrokerName) {
 		//省略延遲故障的邏輯
        return tpInfo.selectOneMessageQueue(lastBrokerName);
    }
    public MessageQueue selectOneMessageQueue(final String lastBrokerName) {
        //如果lastBrokerName為空,說明第一次正常發送
        if (lastBrokerName == null) {
            return selectOneMessageQueue();
        } else {
            //如果lastBrokerName不為空,說明進行了重試
            for (int i = 0; i < this.messageQueueList.size(); i++) {
                //獲取一個佇列(brokerName!=lastBrokerName)的訊息佇列
                int index = this.sendWhichQueue.incrementAndGet();
                int pos = Math.abs(index) % this.messageQueueList.size();
                if (pos < 0)
                    pos = 0;
                MessageQueue mq = this.messageQueueList.get(pos);
                if (!mq.getBrokerName().equals(lastBrokerName)) {
                    return mq;
                }
            }
            //如果還沒有獲取到訊息對接,則再進行一次選擇
            return selectOneMessageQueue();
        }
    }
    public MessageQueue selectOneMessageQueue() {
    	//獲取一個亂數,并進行取模
        int index = this.sendWhichQueue.incrementAndGet();
        int pos = Math.abs(index) % this.messageQueueList.size();
        if (pos < 0)
            pos = 0;
        //獲取MessageQueue
        return this.messageQueueList.get(pos);
    }

3.發送訊息準備作業sendKernelImpl

sendKernelImpl方法內主要是構建發送網路請求的請求頭和一些鉤子方法,

    private SendResult sendKernelImpl(final Message msg,
        final MessageQueue mq,
        final CommunicationMode communicationMode,
        final SendCallback sendCallback,
        final TopicPublishInfo topicPublishInfo,
        final long timeout)  {
        long beginStartTime = System.currentTimeMillis();
        //根據brokerName從broker集群資訊brokerAddrTable快取中獲取brokerId=0的地址
        String brokerAddr = this.mQClientFactory.findBrokerAddressInPublish(mq.getBrokerName());
        //如果快取不存在
        if (null == brokerAddr) {
            //嘗試去從nameServer拉取配置,更新brokerAddrTable快取Map,并回傳地址
            tryToFindTopicPublishInfo(mq.getTopic());
            brokerAddr = this.mQClientFactory.findBrokerAddressInPublish(mq.getBrokerName());
        }

        SendMessageContext context = null;
        if (brokerAddr != null) {
        	//是否使用vip埠
            brokerAddr = MixAll.brokerVIPChannel(this.defaultMQProducer.isSendMessageWithVIPChannel(), brokerAddr);
			//訊息內容
            byte[] prevBody = msg.getBody();
            try {
                //for MessageBatch,ID has been set in the generating process
                //MessageBatch批量訊息生成時已經設定uniqId,如果是單個訊息,則再生成一次
                if (!(msg instanceof MessageBatch)) {
                    MessageClientIDSetter.setUniqID(msg);
                }

                boolean topicWithNamespace = false;
                //獲取clientConfig的nameSpace資訊
                if (null != this.mQClientFactory.getClientConfig().getNamespace()) {
                    msg.setInstanceId(this.mQClientFactory.getClientConfig().getNamespace());
                    topicWithNamespace = true;
                }
				//訊息標識
                int sysFlag = 0;
                boolean msgBodyCompressed = false;
                //嘗試對訊息進行壓縮,批量訊息不支持壓縮
                if (this.tryToCompressMessage(msg)) {
                    sysFlag |= MessageSysFlag.COMPRESSED_FLAG;
                    msgBodyCompressed = true;
                }
				//獲取訊息的失誤屬性
                final String tranMsg = msg.getProperty(MessageConst.PROPERTY_TRANSACTION_PREPARED);
                //如果是事務訊息,則添加標記
                if (tranMsg != null && Boolean.parseBoolean(tranMsg)) {
                    sysFlag |= MessageSysFlag.TRANSACTION_PREPARED_TYPE;
                }
				//鉤子方法呼叫
                if (hasCheckForbiddenHook()) {
                    //......省略準備邏輯
                    this.executeCheckForbiddenHook(checkForbiddenContext);
                }
                if (this.hasSendMessageHook()) {
                   //......省略準備邏輯
                   this.executeSendMessageHookBefore(context);
                }
				//構建發送訊息請求投
                SendMessageRequestHeader requestHeader = new SendMessageRequestHeader();
                requestHeader.setProducerGroup(this.defaultMQProducer.getProducerGroup());
                requestHeader.setTopic(msg.getTopic());
                requestHeader.setDefaultTopic(this.defaultMQProducer.getCreateTopicKey());
                requestHeader.setDefaultTopicQueueNums(this.defaultMQProducer.getDefaultTopicQueueNums());
                requestHeader.setQueueId(mq.getQueueId());
                requestHeader.setSysFlag(sysFlag);
                requestHeader.setBornTimestamp(System.currentTimeMillis());
                requestHeader.setFlag(msg.getFlag());
                requestHeader.setProperties(MessageDecoder.messageProperties2String(msg.getProperties()));
                requestHeader.setReconsumeTimes(0);
                requestHeader.setUnitMode(this.isUnitMode());
                requestHeader.setBatch(msg instanceof MessageBatch);
                //如果訊息是重試訊息
                if (requestHeader.getTopic().startsWith(MixAll.RETRY_GROUP_TOPIC_PREFIX)) {
                    //獲取訊息重新消費次數
                    String reconsumeTimes = MessageAccessor.getReconsumeTime(msg);
                    if (reconsumeTimes != null) {
                    //將重新消費次數set到請求頭中
                    requestHeader.setReconsumeTimes(Integer.valueOf(reconsumeTimes));
                    MessageAccessor.clearProperty(msg, MessageConst.PROPERTY_RECONSUME_TIME);
                    }
					//獲取訊息的最大重試次數
                    String maxReconsumeTimes = MessageAccessor.getMaxReconsumeTimes(msg);
                    if (maxReconsumeTimes != null) {
                       // 將重試次數set到請求頭中,并清空屬性
                       requestHeader.setMaxReconsumeTimes(Integer.valueOf(maxReconsumeTimes));
                        MessageAccessor.clearProperty(msg, MessageConst.PROPERTY_MAX_RECONSUME_TIMES);
                    }
                }

                SendResult sendResult = null;
                //訊息通信模式,同步模式/異步模式/oneway
                switch (communicationMode) {
                    //異步模式
                    case ASYNC:
                        Message tmpMessage = msg;
                        boolean messageCloned = false;
                        if (msgBodyCompressed) {
                            tmpMessage = MessageAccessor.cloneMessage(msg);
                            messageCloned = true;
                            msg.setBody(prevBody);
                        }

                        if (topicWithNamespace) {
                            if (!messageCloned) {
                                tmpMessage = MessageAccessor.cloneMessage(msg);
                                messageCloned = true;
                            }
                            msg.setTopic(NamespaceUtil.withoutNamespace(msg.getTopic(), this.defaultMQProducer.getNamespace()));
                        }

                        long costTimeAsync = System.currentTimeMillis() - beginStartTime;
                        if (timeout < costTimeAsync) {
                            throw new RemotingTooMuchRequestException("sendKernelImpl call timeout");
                        }
                        //異步的方式發送訊息
                        sendResult = this.mQClientFactory.getMQClientAPIImpl().sendMessage(
                            brokerAddr,
                            mq.getBrokerName(),
                            tmpMessage,
                            requestHeader,
                            timeout - costTimeAsync,
                            communicationMode,
                            sendCallback,
                            topicPublishInfo,
                            this.mQClientFactory,
                            this.defaultMQProducer.getRetryTimesWhenSendAsyncFailed(),
                            context,
                            this);
                        break;
                    case ONEWAY:
                    case SYNC:
                        long costTimeSync = System.currentTimeMillis() - beginStartTime;
                        if (timeout < costTimeSync) {
                            throw new RemotingTooMuchRequestException("sendKernelImpl call timeout");
                        }
                        //同步的方式發送訊息
                        sendResult = this.mQClientFactory.getMQClientAPIImpl().sendMessage(
                            brokerAddr,
                            mq.getBrokerName(),
                            msg,
                            requestHeader,
                            timeout - costTimeSync,
                            communicationMode,
                            context,
                            this);
                        break;
                    default:
                        assert false;
                        break;
                }
				//如果有發送訊息的鉤子,則進行呼叫
                if (this.hasSendMessageHook()) {
                    context.setSendResult(sendResult);
                    this.executeSendMessageHookAfter(context);
                }

                return sendResult;
            } finally {
                msg.setBody(prevBody);
                msg.setTopic(NamespaceUtil.withoutNamespace(msg.getTopic(), this.defaultMQProducer.getNamespace()));
            }
        }
        throw new MQClientException("The broker[" + mq.getBrokerName() + "] not exist", null);
    }

4.發送訊息MQClientAPIImpl#sendMessage

發送訊息的邏輯最后是通過MQClientAPIImpl來實作的,這個類之前在將MQClientInstance啟動時創建,這個類是MQ內部一些操作的api的實作,包括發送,消費訊息和admin控制臺的一些操作指令的實作,以及一些網路請求的處理,直接看最終實作邏輯,

    public SendResult sendMessage(
        final String addr,
        final String brokerName,
        final Message msg,
        final SendMessageRequestHeader requestHeader,
        final long timeoutMillis,
        final CommunicationMode communicationMode,
        final SendCallback sendCallback,
        final TopicPublishInfo topicPublishInfo,
        final MQClientInstance instance,
        final int retryTimesWhenSendFailed,
        final SendMessageContext context,
        final DefaultMQProducerImpl producer
    ) throws RemotingException, MQBrokerException, InterruptedException {
        long beginStartTime = System.currentTimeMillis();
        //根據訊息的型別和模式選擇不同的請求code,然后封裝到RemotingCommand
        RemotingCommand request = null;
        //獲取訊息的型別
        String msgType = msg.getProperty(MessageConst.PROPERTY_MESSAGE_TYPE);
        //是否為reply訊息
        boolean isReply = msgType != null && msgType.equals(MixAll.REPLY_MESSAGE_FLAG);
        if (isReply) {
            if (sendSmartMsg) { 
                SendMessageRequestHeaderV2 requestHeaderV2 = SendMessageRequestHeaderV2.createSendMessageRequestHeaderV2(requestHeader);
                request = RemotingCommand.createRequestCommand(RequestCode.SEND_REPLY_MESSAGE_V2, requestHeaderV2);
            } else {
                request = RemotingCommand.createRequestCommand(RequestCode.SEND_REPLY_MESSAGE, requestHeader);
            }
        } else {
            if (sendSmartMsg || msg instanceof MessageBatch) {
            	// 該類的 field 全為 a,b,c,d 等,可以加速 FastJson 反序列化
                SendMessageRequestHeaderV2 requestHeaderV2 = SendMessageRequestHeaderV2.createSendMessageRequestHeaderV2(requestHeader);
                // 根據 request code 創建 RPC 請求物件
            	// 該設計是通過型別碼的形式,來標識不同型別的請求
                request = RemotingCommand.createRequestCommand(msg instanceof MessageBatch ? RequestCode.SEND_BATCH_MESSAGE : RequestCode.SEND_MESSAGE_V2, requestHeaderV2);
            } else {
                request = RemotingCommand.createRequestCommand(RequestCode.SEND_MESSAGE, requestHeader);
            }
        }
        //設定訊息體
        request.setBody(msg.getBody());
        //訊息通信模式,同步模式/異步模式/oneway
        switch (communicationMode) {
       		//單向,沒有回傳
            case ONEWAY:
                this.remotingClient.invokeOneway(addr, request, timeoutMillis);
                return null;
            //異步
            case ASYNC:
                final AtomicInteger times = new AtomicInteger();
                //訊息準備時間
                long costTimeAsync = System.currentTimeMillis() - beginStartTime;
                //如果超時了,就報錯
                if (timeoutMillis < costTimeAsync) {
                    throw new RemotingTooMuchRequestException("sendMessage call timeout");
                }
                this.sendMessageAsync(addr, brokerName, msg, timeoutMillis - costTimeAsync, request, sendCallback, topicPublishInfo, instance,
                    retryTimesWhenSendFailed, times, context, producer);
                return null;
            //同步
            case SYNC:
                //訊息準備時間
                long costTimeSync = System.currentTimeMillis() - beginStartTime;
                //如果超時了,就報錯
                if (timeoutMillis < costTimeSync) {
                    throw new RemotingTooMuchRequestException("sendMessage call timeout");
                }
                return this.sendMessageSync(addr, brokerName, msg, timeoutMillis - costTimeSync, request);
            default:
                assert false;
                break;
        }

        return null;
    }

這里只對同步方法的邏輯進行簡單的分析:

    private SendResult sendMessageSync(
        final String addr,
        final String brokerName,
        final Message msg,
        final long timeoutMillis,
        final RemotingCommand request
    ) throws RemotingException, MQBrokerException, InterruptedException {
		//同步發送的邏輯,這里就是通過對Netty客戶端的呼叫來發送訊息
        RemotingCommand response = this.remotingClient.invokeSync(addr, request, timeoutMillis);
        assert response != null;
        //回傳資訊的處理,包含事務訊息相關事務id,訊息的偏移量等的決議
        return this.processSendResponse(brokerName, msg, response,addr);
    }

invokeSync的邏輯是在NettyRemotingClient類中進行實作和擴展的,該類主要是對Netty的一些擴展實作和內部邏輯封裝,作用是對網路請求的一些操作,

    @Override
    public RemotingCommand invokeSync(String addr, final RemotingCommand request, long timeoutMillis)
        throws InterruptedException, RemotingConnectException, RemotingSendRequestException, RemotingTimeoutException {
        //訊息發送時間戳
        long beginStartTime = System.currentTimeMillis();
        //根據broker的地址獲取Netty的channel物件
        final Channel channel = this.getAndCreateChannel(addr);
        //如果channel不為空并且活躍的
        if (channel != null && channel.isActive()) {
            try {
            	//事前鉤子方法呼叫
                doBeforeRpcHooks(addr, request);
                //花費時間
                long costTime = System.currentTimeMillis() - beginStartTime;
                //如果超時了,則報錯
                if (timeoutMillis < costTime) {
                    throw new RemotingTimeoutException("invokeSync call timeout");
                }
                //呼叫Netty進行訊息發送
                RemotingCommand response = this.invokeSyncImpl(channel, request, timeoutMillis - costTime);
                //事后鉤子方法呼叫
                doAfterRpcHooks(RemotingHelper.parseChannelRemoteAddr(channel), request, response);
                return response;
            } catch (RemotingSendRequestException e) {
                log.warn("invokeSync: send request exception, so close the channel[{}]", addr);
                this.closeChannel(addr, channel);
                throw e;
            } catch (RemotingTimeoutException e) {
                if (nettyClientConfig.isClientCloseSocketIfTimeout()) {
                    this.closeChannel(addr, channel);
                    log.warn("invokeSync: close socket because of timeout, {}ms, {}", timeoutMillis, addr);
                }
                log.warn("invokeSync: wait response timeout exception, the channel[{}]", addr);
                throw e;
            }
        } else {
            this.closeChannel(addr, channel);
            throw new RemotingConnectException(addr);
        }
    }

整個訊息發送邏輯已經大概的講完,因細節比較多,可自行查看對應原始碼,

轉載請註明出處,本文鏈接:https://www.uj5u.com/qita/423263.html

標籤:其他

上一篇:pandas使用query函式和sample函式、使用query函式篩選dataframe中的特定資料行并使用sample函式獲取指定個數的隨機抽樣資料

下一篇:【Kafka】kafka Removed ??? expired offsets in ??? milliseconds.

標籤雲
其他(157675) Python(38076) JavaScript(25376) Java(17977) C(15215) 區塊鏈(8255) C#(7972) AI(7469) 爪哇(7425) MySQL(7132) html(6777) 基礎類(6313) sql(6102) 熊猫(6058) PHP(5869) 数组(5741) R(5409) Linux(5327) 反应(5209) 腳本語言(PerlPython)(5129) 非技術區(4971) Android(4554) 数据框(4311) css(4259) 节点.js(4032) C語言(3288) json(3245) 列表(3129) 扑(3119) C++語言(3117) 安卓(2998) 打字稿(2995) VBA(2789) Java相關(2746) 疑難問題(2699) 细绳(2522) 單片機工控(2479) iOS(2429) ASP.NET(2402) MongoDB(2323) 麻木的(2285) 正则表达式(2254) 字典(2211) 循环(2198) 迅速(2185) 擅长(2169) 镖(2155) 功能(1967) .NET技术(1958) Web開發(1951) python-3.x(1918) HtmlCss(1915) 弹簧靴(1913) C++(1909) xml(1889) PostgreSQL(1872) .NETCore(1853) 谷歌表格(1846) Unity3D(1843) for循环(1842)

熱門瀏覽
  • 網閘典型架構簡述

    網閘架構一般分為兩種:三主機的三系統架構網閘和雙主機的2+1架構網閘。 三主機架構分別為內端機、外端機和仲裁機。三機無論從軟體和硬體上均各自獨立。首先從硬體上來看,三機都用各自獨立的主板、記憶體及存盤設備。從軟體上來看,三機有各自獨立的作業系統。這樣能達到完全的三機獨立。對于“2+1”系統,“2”分為 ......

    uj5u.com 2020-09-10 02:00:44 more
  • 如何從xshell上傳檔案到centos linux虛擬機里

    如何從xshell上傳檔案到centos linux虛擬機里及:虛擬機CentOs下執行 yum -y install lrzsz命令,出現錯誤:鏡像無法找到軟體包 前言 一、安裝lrzsz步驟 二、上傳檔案 三、遇到的問題及解決方案 總結 前言 提示:其實很簡單,往虛擬機上安裝一個上傳檔案的工具 ......

    uj5u.com 2020-09-10 02:00:47 more
  • 一、SQLMAP入門

    一、SQLMAP入門 1、判斷是否存在注入 sqlmap.py -u 網址/id=1 id=1不可缺少。當注入點后面的引數大于兩個時。需要加雙引號, sqlmap.py -u "網址/id=1&uid=1" 2、判斷文本中的請求是否存在注入 從文本中加載http請求,SQLMAP可以從一個文本檔案中 ......

    uj5u.com 2020-09-10 02:00:50 more
  • Metasploit 簡單使用教程

    metasploit 簡單使用教程 浩先生, 2020-08-28 16:18:25 分類專欄: kail 網路安全 linux 文章標簽: linux資訊安全 編輯 著作權 metasploit 使用教程 前言 一、Metasploit是什么? 二、準備作業 三、具體步驟 前言 Msfconsole ......

    uj5u.com 2020-09-10 02:00:53 more
  • 游戲逆向之驅動層與用戶層通訊

    驅動層代碼: #pragma once #include <ntifs.h> #define add_code CTL_CODE(FILE_DEVICE_UNKNOWN,0x800,METHOD_BUFFERED,FILE_ANY_ACCESS) /* 更多游戲逆向視頻www.yxfzedu.com ......

    uj5u.com 2020-09-10 02:00:56 more
  • 北斗電力時鐘(北斗授時服務器)讓網路資料更精準

    北斗電力時鐘(北斗授時服務器)讓網路資料更精準 北斗電力時鐘(北斗授時服務器)讓網路資料更精準 京準電子科技官微——ahjzsz 近幾年,資訊技術的得了快速發展,互聯網在逐漸普及,其在人們生活和生產中都得到了廣泛應用,并且取得了不錯的應用效果。計算機網路資訊在電力系統中的應用,一方面使電力系統的運行 ......

    uj5u.com 2020-09-10 02:01:03 more
  • 【CTF】CTFHub 技能樹 彩蛋 writeup

    ?碎碎念 CTFHub:https://www.ctfhub.com/ 筆者入門CTF時時剛開始刷的是bugku的舊平臺,后來才有了CTFHub。 感覺不論是網頁UI設計,還是題目質量,賽事跟蹤,工具軟體都做得很不錯。 而且因為獨到的金幣制度的確讓人有一種想去刷題賺金幣的感覺。 個人還是非常喜歡這個 ......

    uj5u.com 2020-09-10 02:04:05 more
  • 02windows基礎操作

    我學到了一下幾點 Windows系統目錄結構與滲透的作用 常見Windows的服務詳解 Windows埠詳解 常用的Windows注冊表詳解 hacker DOS命令詳解(net user / type /md /rd/ dir /cd /net use copy、批處理 等) 利用dos命令制作 ......

    uj5u.com 2020-09-10 02:04:18 more
  • 03.Linux基礎操作

    我學到了以下幾點 01Linux系統介紹02系統安裝,密碼啊破解03Linux常用命令04LAMP 01LINUX windows: win03 8 12 16 19 配置不繁瑣 Linux:redhat,centos(紅帽社區版),Ubuntu server,suse unix:金融機構,證券,銀 ......

    uj5u.com 2020-09-10 02:04:30 more
  • 05HTML

    01HTML介紹 02頭部標簽講解03基礎標簽講解04表單標簽講解 HTML前段語言 js1.了解代碼2.根據代碼 懂得挖掘漏洞 (POST注入/XSS漏洞上傳)3.黑帽seo 白帽seo 客戶網站被黑帽植入劫持代碼如何處理4.熟悉html表單 <html><head><title>TDK標題,描述 ......

    uj5u.com 2020-09-10 02:04:36 more
最新发布
  • 2023年最新微信小程式抓包教程

    01 開門見山 隔一個月發一篇文章,不過分。 首先回顧一下《微信系結手機號資料庫被脫庫事件》,我也是第一時間得知了這個訊息,然后跟蹤了整件事情的經過。下面是這起事件的相關截圖以及近日流出的一萬條資料樣本: 個人認為這件事也沒什么,還不如關注一下之前45億快遞資料查詢渠道疑似在近日復活的訊息。 訊息是 ......

    uj5u.com 2023-04-20 08:48:24 more
  • web3 產品介紹:metamask 錢包 使用最多的瀏覽器插件錢包

    Metamask錢包是一種基于區塊鏈技術的數字貨幣錢包,它允許用戶在安全、便捷的環境下管理自己的加密資產。Metamask錢包是以太坊生態系統中最流行的錢包之一,它具有易于使用、安全性高和功能強大等優點。 本文將詳細介紹Metamask錢包的功能和使用方法。 一、 Metamask錢包的功能 數字資 ......

    uj5u.com 2023-04-20 08:47:46 more
  • vulnhub_Earth

    前言 靶機地址->>>vulnhub_Earth 攻擊機ip:192.168.20.121 靶機ip:192.168.20.122 參考文章 https://www.cnblogs.com/Jing-X/archive/2022/04/03/16097695.html https://www.cnb ......

    uj5u.com 2023-04-20 07:46:20 more
  • 從4k到42k,軟體測驗工程師的漲薪史,給我看哭了

    清明節一過,盲猜大家已經無心上班,在數著日子準備過五一,但一想到銀行卡里的余額……瞬間心情就不美麗了。最近,2023年高校畢業生就業調查顯示,本科畢業月平均起薪為5825元。調查一出,便有很多同學表示自己又被平均了。看著這一資料,不免讓人想到前不久中國青年報的一項調查:近六成大學生認為畢業10年內會 ......

    uj5u.com 2023-04-20 07:44:00 more
  • 最新版本 Stable Diffusion 開源 AI 繪畫工具之中文自動提詞篇

    🎈 標簽生成器 由于輸入正向提示詞 prompt 和反向提示詞 negative prompt 都是使用英文,所以對學習母語的我們非常不友好 使用網址:https://tinygeeker.github.io/p/ai-prompt-generator 這個網址是為了讓大家在使用 AI 繪畫的時候 ......

    uj5u.com 2023-04-20 07:43:36 more
  • 漫談前端自動化測驗演進之路及測驗工具分析

    隨著前端技術的不斷發展和應用程式的日益復雜,前端自動化測驗也在不斷演進。隨著 Web 應用程式變得越來越復雜,自動化測驗的需求也越來越高。如今,自動化測驗已經成為 Web 應用程式開發程序中不可或缺的一部分,它們可以幫助開發人員更快地發現和修復錯誤,提高應用程式的性能和可靠性。 ......

    uj5u.com 2023-04-20 07:43:16 more
  • CANN開發實踐:4個DVPP記憶體問題的典型案例解讀

    摘要:由于DVPP媒體資料處理功能對存放輸入、輸出資料的記憶體有更高的要求(例如,記憶體首地址128位元組對齊),因此需呼叫專用的記憶體申請介面,那么本期就分享幾個關于DVPP記憶體問題的典型案例,并給出原因分析及解決方法。 本文分享自華為云社區《FAQ_DVPP記憶體問題案例》,作者:昇騰CANN。 DVPP ......

    uj5u.com 2023-04-20 07:43:03 more
  • msf學習

    msf學習 以kali自帶的msf為例 一、msf核心模塊與功能 msf模塊都放在/usr/share/metasploit-framework/modules目錄下 1、auxiliary 輔助模塊,輔助滲透(埠掃描、登錄密碼爆破、漏洞驗證等) 2、encoders 編碼器模塊,主要包含各種編碼 ......

    uj5u.com 2023-04-20 07:42:59 more
  • Halcon軟體安裝與界面簡介

    1. 下載Halcon17版本到到本地 2. 雙擊安裝包后 3. 步驟如下 1.2 Halcon軟體安裝 界面分為四大塊 1. Halcon的五個助手 1) 影像采集助手:與相機連接,設定相機引數,采集影像 2) 標定助手:九點標定或是其它的標定,生成標定檔案及內參外參,可以將像素單位轉換為長度單位 ......

    uj5u.com 2023-04-20 07:42:17 more
  • 在MacOS下使用Unity3D開發游戲

    第一次發博客,先發一下我的游戲開發環境吧。 去年2月份買了一臺MacBookPro2021 M1pro(以下簡稱mbp),這一年來一直在用mbp開發游戲。我大致分享一下我的開發工具以及使用體驗。 1、Unity 官網鏈接: https://unity.cn/releases 我一般使用的Apple ......

    uj5u.com 2023-04-20 07:40:19 more