訊息發送流程

DefaultMQProducer-發送入口
訊息發送的入口在DefaultMQProducer類中的send方法,這個方法有很多多載方法,對應的方法引數也各不一樣,
SendResult send(Message msg);
SendResult send(Message msg,long timeout);
SendResult send(Message msg,SendCallback sendCallback,long timeout);
SendResult send(Message msg, MessageQueue mq, SendCallback sendCallback, long timeout);
SendResult send(Message msg,MessageQueueSelector selector, Object arg,SendCallback sendCallback,long timeout);
......省略更多
重點引數說明一下:
| 引數 | 型別 | 說明 |
|---|---|---|
| msg | Message | 訊息物件 |
| mq | MessageQueue | 目標訊息佇列,通過它獲得要傳遞訊息到指定的訊息佇列 |
| sendCallback | SendCallback | 異步模式在發送完成(成功或不成功)時的回呼 |
| selector | MessageQueueSelector | 訊息佇列選擇器 |
| arg | Object | 該引數與訊息佇列選擇器配合使用 |
| timeout | long | 發送超時時間 |
DefaultMQProduceImpl-訊息發送實作類
send 方法對應的實作直接委托給DefaultMQProduceImpl來實作具體細節,具體有兩種默認的發送訊息的實作,一個是同步發送,一個是異步發送,兩者區別很簡單,異步相對于同步方法來說多了一個自定義回呼SendCallback實作,看一下兩者的對比如下:
/**
* DEFAULT SYNC
*/
public SendResult send(
Message msg) throws MQClientException, RemotingException, MQBrokerException, InterruptedException {
return send(msg, this.defaultMQProducer.getSendMsgTimeout());
}
/**
* DEFAULT ASYNC
*/
public void send(Message msg,
SendCallback sendCallback) throws MQClientException, RemotingException, InterruptedException {
send(msg, sendCallback, this.defaultMQProducer.getSendMsgTimeout());
}
上面兩個方法呼叫的底層都是一個,即sendDefaultImpl,重點引數說明一下,
| 引數 | 型別 | 說明 |
|---|---|---|
| msg | Message | 訊息物件 |
| communicationModemq | CommunicationMode | 通信方式,同步或者異步 |
| sendCallback | SendCallback | 異步模式在發送完成(成功或不成功)時的回呼 |
| timeout | long | 發送超時時間 |
sendDefaultImpl-發送訊息底層方法
private SendResult sendDefaultImpl(
Message msg,
final CommunicationMode communicationMode,
final SendCallback sendCallback,
final long timeout) {
//確認Producer客戶端狀態是否ok-(ServieceState.RUNNING)
this.makeSureStateOK();
/**
* 訊息校驗,包含訊息的topic校驗和訊息體的校驗,
* topic校驗包含以下幾點,topic的名字,長度以及是否為不準用的topic(SCHEDULE_TOPIC_XXXX)
* 訊息體校驗 訊息體是不是空和訊息體的長度
*/
Validators.checkMessage(msg, this.defaultMQProducer);
//本次呼叫的id
final long invokeID = random.nextLong();
//本地呼叫開始時間
long beginTimestampFirst = System.currentTimeMillis();
long beginTimestampPrev = beginTimestampFirst;
long endTimestamp = beginTimestampFirst;
/**
* 嘗試去查找Topic的訂閱資訊
* 1.從topicPublishInfoTable快取中查找
* 2.如果快取為空,則從nameserver拉取配置
*/
TopicPublishInfo topicPublishInfo = this.tryToFindTopicPublishInfo(msg.getTopic());
if (topicPublishInfo != null && topicPublishInfo.ok()) {
boolean callTimeout = false;
MessageQueue mq = null;
Exception exception = null;
SendResult sendResult = null;
//若通信模式為同步模式,則失敗重試次數=3次(默認2次+1),若為異步模式則為1次
int timesTotal = communicationMode == CommunicationMode.SYNC
? 1 + this.defaultMQProducer.getRetryTimesWhenSendFailed() : 1;
int times = 0;
//記錄每一次重試時候發送訊息目標Broker名字的陣列
String[] brokersSent = new String[timesTotal];
//進行重試次數回圈發送訊息邏輯
for (; times < timesTotal; times++) {
//獲取broker名字,第一次為null,第二次為上次選擇的broker名字
String lastBrokerName = null == mq ? null : mq.getBrokerName();
//根據Topic訂閱資訊+上一次broker名字選擇一個訊息佇列
// 有失敗重試策略,默認使用 RoundRobin 演算法,可以通過 DefaultMQProducer#setSendLatencyFaultEnable 設定啟用 LatencyFault 策略
MessageQueue mqSelected = this.selectOneMessageQueue(topicPublishInfo, lastBrokerName);
//如果佇列不為空
if (mqSelected != null) {
mq = mqSelected;
//將本次選中的訊息佇列的broker名字添加到陣列中
brokersSent[times] = mq.getBrokerName();
//重新賦值呼叫開始時間
beginTimestampPrev = System.currentTimeMillis();
if (times > 0) {
//Reset topic with namespace during resend.
msg.setTopic(this.defaultMQProducer.withNamespace(msg.getTopic()));
}
//發送訊息的花費時間
long costTime = beginTimestampPrev - beginTimestampFirst;
//如果設定的超時時間小于花費時間,則進行超時標記,并break中斷,超時時間默認是3000毫秒
if (timeout < costTime) {
callTimeout = true;
break;
}
//發送訊息到選中的佇列中
sendResult = this.sendKernelImpl(msg, mq, communicationMode, sendCallback, topicPublishInfo,
timeout - costTime);
endTimestamp = System.currentTimeMillis();
// 當設定啟用 LatencyFault 策略時,更新 FaultItem
//根據發送所耗費的時間決定是不是要將該broker加入到故障串列中
this.updateFaultItem(mq.getBrokerName(), endTimestamp - beginTimestampPrev, false);
//如果是SYNC同步模式下發送失敗可進行重試,ASYNC/ONEWAY模式下直接回傳null
switch (communicationMode) {
case ASYNC:
return null;
case ONEWAY:
return null;
case SYNC:
if (sendResult.getSendStatus() != SendStatus.SEND_OK) {
if (this.defaultMQProducer.isRetryAnotherBrokerWhenNotStoreOK()) {
continue;
}
}
return sendResult;
default:
break;
}
} else {
break;
}
}
if (sendResult != null) {
return sendResult;
}
String info = String.format("Send [%d] times, still failed, cost [%d]ms, Topic: %s, BrokersSent: %s",
times,
System.currentTimeMillis() - beginTimestampFirst,
msg.getTopic(),
Arrays.toString(brokersSent));
info += FAQUrl.suggestTodo(FAQUrl.SEND_MSG_FAILED);
MQClientException mqClientException = new MQClientException(info, exception);
if (callTimeout) {
throw new RemotingTooMuchRequestException("sendDefaultImpl call timeout");
}
}
validateNameServerSetting();
}
這里對整個方法的邏輯步驟進行一個梳理:
- 檢查Producer客戶端狀態是否是運行狀態,校驗訊息的合法性,生成全域呼叫id(用于打日志)和相關的時間戳,
- 根據訊息的Topic獲取對應的訊息訂閱資訊,獲取邏輯在
tryToFindTopicPublishInfo,這里會從快取map里獲取,如果沒有則回去namerServer拉取相應的訂閱資訊,稍后也會對該方法進行分析, - 根據不同的訊息發送模式,計算訊息失敗重試的次數(同步模式是2+1,異步模式是1次,客戶端默認為
retryTimesWhenSendFailed=2) - 根據選擇的brokerName,選擇一個訊息佇列
MessageQueue,這里選擇佇列的方法是selectOneMessageQueue,稍后進行分析, - 選擇好佇列之后,記錄對應的brokerName,對重試次數范圍內以及發送超時時間之內進行進行訊息發送,發送的邏輯在
sendKernelImpl, - 發送之后進行容錯策略的更新,更新邏輯在
updateFaultItem,之后進行分析, - 根據不同的訊息通信模式進行處理,只有同步模式有處理邏輯,在同步邏輯發送失敗的時候如果開啟了
retryAnotherBrokerWhenNotStoreOK(失敗重試發送到另外的broker),則進行重試,這里的重試還是跟上面的重試有區別的,這里是呼叫內部發送實作并且有回傳SendResult的情況,但是發送結果不是成功,
下面針對上述重要步驟的重要邏輯分別進行分析
1.嘗試去查找Topic訂閱資訊tryToFindTopicPublishInfo
//Topic訂閱資訊快取Map
private final ConcurrentMap<String/* topic */, TopicPublishInfo> topicPublishInfoTable =
new ConcurrentHashMap<String, TopicPublishInfo>();
private TopicPublishInfo tryToFindTopicPublishInfo(final String topic) {
//從Topic快取Map中根據topic獲取相應的訂閱資訊
TopicPublishInfo topicPublishInfo = this.topicPublishInfoTable.get(topic);
//如果快取為慷訓者訂閱資訊messageQueueList為空
if (null == topicPublishInfo || !topicPublishInfo.ok()) {
//創建一個TopicPublishInfo放入快取中
this.topicPublishInfoTable.putIfAbsent(topic, new TopicPublishInfo());
//根據topic從nameServer拉取最新的topic路由資訊,并更新快取
this.mQClientFactory.updateTopicRouteInfoFromNameServer(topic);
//從快取中獲取訂閱資訊
topicPublishInfo = this.topicPublishInfoTable.get(topic);
}
//如果topic訂閱資訊包含路由資訊并且messageQueueList不為空,則直接回傳
if (topicPublishInfo.isHaveTopicRouterInfo() || topicPublishInfo.ok()) {
return topicPublishInfo;
} else {
//重新從nameServer拉取最新的topic路由資訊,并直接回傳
this.mQClientFactory.updateTopicRouteInfoFromNameServer(topic, true, this.defaultMQProducer);
topicPublishInfo = this.topicPublishInfoTable.get(topic);
return topicPublishInfo;
}
}
方法中關于updateTopicRouteInfoFromNameServer方法決議可以參考我上一篇的文章(),
2.選擇訊息佇列selectOneMessageQueue
該方法設計延遲故障的邏輯,下一篇進行詳細分析,邏輯簡單過一下,
public MessageQueue selectOneMessageQueue(final TopicPublishInfo tpInfo, final String lastBrokerName) {
//呼叫mq故障策略的selectOneMessageQueue
return this.mqFaultStrategy.selectOneMessageQueue(tpInfo, lastBrokerName);
}
public MessageQueue selectOneMessageQueue(final TopicPublishInfo tpInfo, final String lastBrokerName) {
//省略延遲故障的邏輯
return tpInfo.selectOneMessageQueue(lastBrokerName);
}
public MessageQueue selectOneMessageQueue(final String lastBrokerName) {
//如果lastBrokerName為空,說明第一次正常發送
if (lastBrokerName == null) {
return selectOneMessageQueue();
} else {
//如果lastBrokerName不為空,說明進行了重試
for (int i = 0; i < this.messageQueueList.size(); i++) {
//獲取一個佇列(brokerName!=lastBrokerName)的訊息佇列
int index = this.sendWhichQueue.incrementAndGet();
int pos = Math.abs(index) % this.messageQueueList.size();
if (pos < 0)
pos = 0;
MessageQueue mq = this.messageQueueList.get(pos);
if (!mq.getBrokerName().equals(lastBrokerName)) {
return mq;
}
}
//如果還沒有獲取到訊息對接,則再進行一次選擇
return selectOneMessageQueue();
}
}
public MessageQueue selectOneMessageQueue() {
//獲取一個亂數,并進行取模
int index = this.sendWhichQueue.incrementAndGet();
int pos = Math.abs(index) % this.messageQueueList.size();
if (pos < 0)
pos = 0;
//獲取MessageQueue
return this.messageQueueList.get(pos);
}
3.發送訊息準備作業sendKernelImpl
sendKernelImpl方法內主要是構建發送網路請求的請求頭和一些鉤子方法,
private SendResult sendKernelImpl(final Message msg,
final MessageQueue mq,
final CommunicationMode communicationMode,
final SendCallback sendCallback,
final TopicPublishInfo topicPublishInfo,
final long timeout) {
long beginStartTime = System.currentTimeMillis();
//根據brokerName從broker集群資訊brokerAddrTable快取中獲取brokerId=0的地址
String brokerAddr = this.mQClientFactory.findBrokerAddressInPublish(mq.getBrokerName());
//如果快取不存在
if (null == brokerAddr) {
//嘗試去從nameServer拉取配置,更新brokerAddrTable快取Map,并回傳地址
tryToFindTopicPublishInfo(mq.getTopic());
brokerAddr = this.mQClientFactory.findBrokerAddressInPublish(mq.getBrokerName());
}
SendMessageContext context = null;
if (brokerAddr != null) {
//是否使用vip埠
brokerAddr = MixAll.brokerVIPChannel(this.defaultMQProducer.isSendMessageWithVIPChannel(), brokerAddr);
//訊息內容
byte[] prevBody = msg.getBody();
try {
//for MessageBatch,ID has been set in the generating process
//MessageBatch批量訊息生成時已經設定uniqId,如果是單個訊息,則再生成一次
if (!(msg instanceof MessageBatch)) {
MessageClientIDSetter.setUniqID(msg);
}
boolean topicWithNamespace = false;
//獲取clientConfig的nameSpace資訊
if (null != this.mQClientFactory.getClientConfig().getNamespace()) {
msg.setInstanceId(this.mQClientFactory.getClientConfig().getNamespace());
topicWithNamespace = true;
}
//訊息標識
int sysFlag = 0;
boolean msgBodyCompressed = false;
//嘗試對訊息進行壓縮,批量訊息不支持壓縮
if (this.tryToCompressMessage(msg)) {
sysFlag |= MessageSysFlag.COMPRESSED_FLAG;
msgBodyCompressed = true;
}
//獲取訊息的失誤屬性
final String tranMsg = msg.getProperty(MessageConst.PROPERTY_TRANSACTION_PREPARED);
//如果是事務訊息,則添加標記
if (tranMsg != null && Boolean.parseBoolean(tranMsg)) {
sysFlag |= MessageSysFlag.TRANSACTION_PREPARED_TYPE;
}
//鉤子方法呼叫
if (hasCheckForbiddenHook()) {
//......省略準備邏輯
this.executeCheckForbiddenHook(checkForbiddenContext);
}
if (this.hasSendMessageHook()) {
//......省略準備邏輯
this.executeSendMessageHookBefore(context);
}
//構建發送訊息請求投
SendMessageRequestHeader requestHeader = new SendMessageRequestHeader();
requestHeader.setProducerGroup(this.defaultMQProducer.getProducerGroup());
requestHeader.setTopic(msg.getTopic());
requestHeader.setDefaultTopic(this.defaultMQProducer.getCreateTopicKey());
requestHeader.setDefaultTopicQueueNums(this.defaultMQProducer.getDefaultTopicQueueNums());
requestHeader.setQueueId(mq.getQueueId());
requestHeader.setSysFlag(sysFlag);
requestHeader.setBornTimestamp(System.currentTimeMillis());
requestHeader.setFlag(msg.getFlag());
requestHeader.setProperties(MessageDecoder.messageProperties2String(msg.getProperties()));
requestHeader.setReconsumeTimes(0);
requestHeader.setUnitMode(this.isUnitMode());
requestHeader.setBatch(msg instanceof MessageBatch);
//如果訊息是重試訊息
if (requestHeader.getTopic().startsWith(MixAll.RETRY_GROUP_TOPIC_PREFIX)) {
//獲取訊息重新消費次數
String reconsumeTimes = MessageAccessor.getReconsumeTime(msg);
if (reconsumeTimes != null) {
//將重新消費次數set到請求頭中
requestHeader.setReconsumeTimes(Integer.valueOf(reconsumeTimes));
MessageAccessor.clearProperty(msg, MessageConst.PROPERTY_RECONSUME_TIME);
}
//獲取訊息的最大重試次數
String maxReconsumeTimes = MessageAccessor.getMaxReconsumeTimes(msg);
if (maxReconsumeTimes != null) {
// 將重試次數set到請求頭中,并清空屬性
requestHeader.setMaxReconsumeTimes(Integer.valueOf(maxReconsumeTimes));
MessageAccessor.clearProperty(msg, MessageConst.PROPERTY_MAX_RECONSUME_TIMES);
}
}
SendResult sendResult = null;
//訊息通信模式,同步模式/異步模式/oneway
switch (communicationMode) {
//異步模式
case ASYNC:
Message tmpMessage = msg;
boolean messageCloned = false;
if (msgBodyCompressed) {
tmpMessage = MessageAccessor.cloneMessage(msg);
messageCloned = true;
msg.setBody(prevBody);
}
if (topicWithNamespace) {
if (!messageCloned) {
tmpMessage = MessageAccessor.cloneMessage(msg);
messageCloned = true;
}
msg.setTopic(NamespaceUtil.withoutNamespace(msg.getTopic(), this.defaultMQProducer.getNamespace()));
}
long costTimeAsync = System.currentTimeMillis() - beginStartTime;
if (timeout < costTimeAsync) {
throw new RemotingTooMuchRequestException("sendKernelImpl call timeout");
}
//異步的方式發送訊息
sendResult = this.mQClientFactory.getMQClientAPIImpl().sendMessage(
brokerAddr,
mq.getBrokerName(),
tmpMessage,
requestHeader,
timeout - costTimeAsync,
communicationMode,
sendCallback,
topicPublishInfo,
this.mQClientFactory,
this.defaultMQProducer.getRetryTimesWhenSendAsyncFailed(),
context,
this);
break;
case ONEWAY:
case SYNC:
long costTimeSync = System.currentTimeMillis() - beginStartTime;
if (timeout < costTimeSync) {
throw new RemotingTooMuchRequestException("sendKernelImpl call timeout");
}
//同步的方式發送訊息
sendResult = this.mQClientFactory.getMQClientAPIImpl().sendMessage(
brokerAddr,
mq.getBrokerName(),
msg,
requestHeader,
timeout - costTimeSync,
communicationMode,
context,
this);
break;
default:
assert false;
break;
}
//如果有發送訊息的鉤子,則進行呼叫
if (this.hasSendMessageHook()) {
context.setSendResult(sendResult);
this.executeSendMessageHookAfter(context);
}
return sendResult;
} finally {
msg.setBody(prevBody);
msg.setTopic(NamespaceUtil.withoutNamespace(msg.getTopic(), this.defaultMQProducer.getNamespace()));
}
}
throw new MQClientException("The broker[" + mq.getBrokerName() + "] not exist", null);
}
4.發送訊息MQClientAPIImpl#sendMessage
發送訊息的邏輯最后是通過MQClientAPIImpl來實作的,這個類之前在將MQClientInstance啟動時創建,這個類是MQ內部一些操作的api的實作,包括發送,消費訊息和admin控制臺的一些操作指令的實作,以及一些網路請求的處理,直接看最終實作邏輯,
public SendResult sendMessage(
final String addr,
final String brokerName,
final Message msg,
final SendMessageRequestHeader requestHeader,
final long timeoutMillis,
final CommunicationMode communicationMode,
final SendCallback sendCallback,
final TopicPublishInfo topicPublishInfo,
final MQClientInstance instance,
final int retryTimesWhenSendFailed,
final SendMessageContext context,
final DefaultMQProducerImpl producer
) throws RemotingException, MQBrokerException, InterruptedException {
long beginStartTime = System.currentTimeMillis();
//根據訊息的型別和模式選擇不同的請求code,然后封裝到RemotingCommand
RemotingCommand request = null;
//獲取訊息的型別
String msgType = msg.getProperty(MessageConst.PROPERTY_MESSAGE_TYPE);
//是否為reply訊息
boolean isReply = msgType != null && msgType.equals(MixAll.REPLY_MESSAGE_FLAG);
if (isReply) {
if (sendSmartMsg) {
SendMessageRequestHeaderV2 requestHeaderV2 = SendMessageRequestHeaderV2.createSendMessageRequestHeaderV2(requestHeader);
request = RemotingCommand.createRequestCommand(RequestCode.SEND_REPLY_MESSAGE_V2, requestHeaderV2);
} else {
request = RemotingCommand.createRequestCommand(RequestCode.SEND_REPLY_MESSAGE, requestHeader);
}
} else {
if (sendSmartMsg || msg instanceof MessageBatch) {
// 該類的 field 全為 a,b,c,d 等,可以加速 FastJson 反序列化
SendMessageRequestHeaderV2 requestHeaderV2 = SendMessageRequestHeaderV2.createSendMessageRequestHeaderV2(requestHeader);
// 根據 request code 創建 RPC 請求物件
// 該設計是通過型別碼的形式,來標識不同型別的請求
request = RemotingCommand.createRequestCommand(msg instanceof MessageBatch ? RequestCode.SEND_BATCH_MESSAGE : RequestCode.SEND_MESSAGE_V2, requestHeaderV2);
} else {
request = RemotingCommand.createRequestCommand(RequestCode.SEND_MESSAGE, requestHeader);
}
}
//設定訊息體
request.setBody(msg.getBody());
//訊息通信模式,同步模式/異步模式/oneway
switch (communicationMode) {
//單向,沒有回傳
case ONEWAY:
this.remotingClient.invokeOneway(addr, request, timeoutMillis);
return null;
//異步
case ASYNC:
final AtomicInteger times = new AtomicInteger();
//訊息準備時間
long costTimeAsync = System.currentTimeMillis() - beginStartTime;
//如果超時了,就報錯
if (timeoutMillis < costTimeAsync) {
throw new RemotingTooMuchRequestException("sendMessage call timeout");
}
this.sendMessageAsync(addr, brokerName, msg, timeoutMillis - costTimeAsync, request, sendCallback, topicPublishInfo, instance,
retryTimesWhenSendFailed, times, context, producer);
return null;
//同步
case SYNC:
//訊息準備時間
long costTimeSync = System.currentTimeMillis() - beginStartTime;
//如果超時了,就報錯
if (timeoutMillis < costTimeSync) {
throw new RemotingTooMuchRequestException("sendMessage call timeout");
}
return this.sendMessageSync(addr, brokerName, msg, timeoutMillis - costTimeSync, request);
default:
assert false;
break;
}
return null;
}
這里只對同步方法的邏輯進行簡單的分析:
private SendResult sendMessageSync(
final String addr,
final String brokerName,
final Message msg,
final long timeoutMillis,
final RemotingCommand request
) throws RemotingException, MQBrokerException, InterruptedException {
//同步發送的邏輯,這里就是通過對Netty客戶端的呼叫來發送訊息
RemotingCommand response = this.remotingClient.invokeSync(addr, request, timeoutMillis);
assert response != null;
//回傳資訊的處理,包含事務訊息相關事務id,訊息的偏移量等的決議
return this.processSendResponse(brokerName, msg, response,addr);
}
invokeSync的邏輯是在NettyRemotingClient類中進行實作和擴展的,該類主要是對Netty的一些擴展實作和內部邏輯封裝,作用是對網路請求的一些操作,
@Override
public RemotingCommand invokeSync(String addr, final RemotingCommand request, long timeoutMillis)
throws InterruptedException, RemotingConnectException, RemotingSendRequestException, RemotingTimeoutException {
//訊息發送時間戳
long beginStartTime = System.currentTimeMillis();
//根據broker的地址獲取Netty的channel物件
final Channel channel = this.getAndCreateChannel(addr);
//如果channel不為空并且活躍的
if (channel != null && channel.isActive()) {
try {
//事前鉤子方法呼叫
doBeforeRpcHooks(addr, request);
//花費時間
long costTime = System.currentTimeMillis() - beginStartTime;
//如果超時了,則報錯
if (timeoutMillis < costTime) {
throw new RemotingTimeoutException("invokeSync call timeout");
}
//呼叫Netty進行訊息發送
RemotingCommand response = this.invokeSyncImpl(channel, request, timeoutMillis - costTime);
//事后鉤子方法呼叫
doAfterRpcHooks(RemotingHelper.parseChannelRemoteAddr(channel), request, response);
return response;
} catch (RemotingSendRequestException e) {
log.warn("invokeSync: send request exception, so close the channel[{}]", addr);
this.closeChannel(addr, channel);
throw e;
} catch (RemotingTimeoutException e) {
if (nettyClientConfig.isClientCloseSocketIfTimeout()) {
this.closeChannel(addr, channel);
log.warn("invokeSync: close socket because of timeout, {}ms, {}", timeoutMillis, addr);
}
log.warn("invokeSync: wait response timeout exception, the channel[{}]", addr);
throw e;
}
} else {
this.closeChannel(addr, channel);
throw new RemotingConnectException(addr);
}
}
整個訊息發送邏輯已經大概的講完,因細節比較多,可自行查看對應原始碼,
轉載請註明出處,本文鏈接:https://www.uj5u.com/qita/423263.html
標籤:其他
上一篇:pandas使用query函式和sample函式、使用query函式篩選dataframe中的特定資料行并使用sample函式獲取指定個數的隨機抽樣資料
下一篇:【Kafka】kafka Removed ??? expired offsets in ??? milliseconds.
