目錄
- 問題點
- broker接收流程圖
- 訊息接收
- 1.啟動入口`NettyRemotingServer.start()`
- 2.創建NettyServer通信通道
- 3.Netty接收核心處理類`NettyServerHandler`
- 3.1 請求處理`processRequestCommand`
- 3.2 訊息請求處理器`SendMessageProcessor`
- 3.3 訊息存盤`DefaultMessageStore`
問題點
1.Producer發送訊息之后Broker是如何接收?
2.Broker接收訊息之后是如何處理的?
broker接收流程圖

訊息接收
我們在之前的RocketMQ原始碼決議-Broker部分之Broker啟動程序文章中分析過,broker的啟動會呼叫BrokerStartup.start(),另外rocketmq是采用netty進行底層通信,所以broker也是通過netty接收訊息,并進行訊息處理,
1.啟動入口NettyRemotingServer.start()
其實NettyRemotingServer.start的啟動是在BrokerStartup.start()進行呼叫的,
public void start() throws Exception {
if (this.messageStore != null) {
this.messageStore.start();
}
if (this.remotingServer != null) {
//NettyRemotingServer.start()
this.remotingServer.start();
}
}
2.創建NettyServer通信通道
@Override
public void start() {
this.defaultEventExecutorGroup = new DefaultEventExecutorGroup(
nettyServerConfig.getServerWorkerThreads(),
new ThreadFactory() {
private AtomicInteger threadIndex = new AtomicInteger(0);
@Override
public Thread newThread(Runnable r) {
return new Thread(r, "NettyServerCodecThread_" + this.threadIndex.incrementAndGet());
}
});
prepareSharableHandlers();
ServerBootstrap childHandler =
this.serverBootstrap.group(this.eventLoopGroupBoss, this.eventLoopGroupSelector)
.channel(useEpoll() ? EpollServerSocketChannel.class : NioServerSocketChannel.class)
.option(ChannelOption.SO_BACKLOG, 1024)
.option(ChannelOption.SO_REUSEADDR, true)
.option(ChannelOption.SO_KEEPALIVE, false)
.childOption(ChannelOption.TCP_NODELAY, true)
.childOption(ChannelOption.SO_SNDBUF, nettyServerConfig.getServerSocketSndBufSize())
.childOption(ChannelOption.SO_RCVBUF, nettyServerConfig.getServerSocketRcvBufSize())
.localAddress(new InetSocketAddress(this.nettyServerConfig.getListenPort()))
.childHandler(new ChannelInitializer<SocketChannel>() {
@Override
public void initChannel(SocketChannel ch) throws Exception {
ch.pipeline()
.addLast(defaultEventExecutorGroup, HANDSHAKE_HANDLER_NAME, handshakeHandler)
.addLast(defaultEventExecutorGroup,
encoder,
new NettyDecoder(),
new IdleStateHandler(0, 0, nettyServerConfig.getServerChannelMaxIdleTimeSeconds()),
connectionManageHandler,
serverHandler
);
}
});
if (nettyServerConfig.isServerPooledByteBufAllocatorEnable()) {
childHandler.childOption(ChannelOption.ALLOCATOR, PooledByteBufAllocator.DEFAULT);
}
try {
ChannelFuture sync = this.serverBootstrap.bind().sync();
InetSocketAddress addr = (InetSocketAddress) sync.channel().localAddress();
this.port = addr.getPort();
} catch (InterruptedException e1) {
throw new RuntimeException("this.serverBootstrap.bind().sync() InterruptedException", e1);
}
if (this.channelEventListener != null) {
this.nettyEventExecutor.start();
}
this.timer.scheduleAtFixedRate(new TimerTask() {
@Override
public void run() {
try {
NettyRemotingServer.this.scanResponseTable();
} catch (Throwable e) {
log.error("scanResponseTable exception", e);
}
}
}, 1000 * 3, 1000);
}
prepareSharableHandlers
private void prepareSharableHandlers() {
handshakeHandler = new HandshakeHandler(TlsSystemConfig.tlsMode);
encoder = new NettyEncoder();
connectionManageHandler = new NettyConnectManageHandler();
serverHandler = new NettyServerHandler();
}
3.Netty接收核心處理類NettyServerHandler
NettyServerHandler底層實作了Netty框架的ChannelHandler,針對producer端發送過來的訊息進行了攔截處理,想深入了解可以自學一下Netty框架,下面我們重點分析一下NettyServerHandler的處理流程,
@ChannelHandler.Sharable
class NettyServerHandler extends SimpleChannelInboundHandler<RemotingCommand> {
@Override
protected void channelRead0(ChannelHandlerContext ctx, RemotingCommand msg) throws Exception {
//處理訊息接收
processMessageReceived(ctx, msg);
}
}
public void processMessageReceived(ChannelHandlerContext ctx, RemotingCommand msg) throws Exception {
final RemotingCommand cmd = msg;
if (cmd != null) {
switch (cmd.getType()) {
//處理請求的命令
case REQUEST_COMMAND:
processRequestCommand(ctx, cmd);
break;
case RESPONSE_COMMAND:
processResponseCommand(ctx, cmd);
break;
default:
break;
}
}
}
3.1 請求處理processRequestCommand
該方法主要是對brokerController.start()時通過registerProcessor注冊的事件管理映射物件processTable對應事件的事件處理邏輯
public void processRequestCommand(final ChannelHandlerContext ctx, final RemotingCommand cmd) {
//根據請求code獲取事件處理器、執行緒執行器,這里主要獲取SendMessageProcessor
final Pair<NettyRequestProcessor, ExecutorService> matched = this.processorTable.get(cmd.getCode());
final Pair<NettyRequestProcessor, ExecutorService> pair = null == matched ? this.defaultRequestProcessor : matched;
final int opaque = cmd.getOpaque();
if (pair != null) {
Runnable run = new Runnable() {
@Override
public void run() {
try {
//獲取遠程地址
String remoteAddr = RemotingHelper.parseChannelRemoteAddr(ctx.channel());
//事前鉤子
doBeforeRpcHooks(remoteAddr, cmd);
final RemotingResponseCallback callback = response -> {
doAfterRpcHooks(remoteAddr, cmd, response);
if (!cmd.isOnewayRPC()) {
if (response != null) {
response.setOpaque(opaque);
response.markResponseType();
try {
ctx.writeAndFlush(response);
} catch (Throwable e) {
log.error("process request over, but response failed", e);
log.error(cmd.toString());
log.error(response.toString());
}
} else {
}
}
};
if (pair.getObject1() instanceof AsyncNettyRequestProcessor) {
AsyncNettyRequestProcessor processor = (AsyncNettyRequestProcessor)pair.getObject1();
//異步處理請求
processor.asyncProcessRequest(ctx, cmd, callback);
} else {
NettyRequestProcessor processor = pair.getObject1();
//同步處理請求,SendMessageProcessor
RemotingCommand response = processor.processRequest(ctx, cmd);
callback.callback(response);
}
} catch (Throwable e) {
}
}
}
3.2 訊息請求處理器SendMessageProcessor
public class SendMessageProcessor extends AbstractSendMessageProcessor implements NettyRequestProcessor {
@Override
public RemotingCommand processRequest(ChannelHandlerContext ctx,
RemotingCommand request) throws RemotingCommandException {
RemotingCommand response = null;
try {
response = asyncProcessRequest(ctx, request).get();
} catch (InterruptedException | ExecutionException e) {
log.error("process SendMessage error, request : " + request.toString(), e);
}
return response;
}
public CompletableFuture<RemotingCommand> asyncProcessRequest(ChannelHandlerContext ctx,
RemotingCommand request) throws RemotingCommandException {
final SendMessageContext mqtraceContext;
switch (request.getCode()) {
// 消費者發送的重試訊息
case RequestCode.CONSUMER_SEND_MSG_BACK:
return this.asyncConsumerSendMsgBack(ctx, request);
// 生產者發送的普通訊息
default:
//根據請求組裝RequestHeader物件
SendMessageRequestHeader requestHeader = parseRequestHeader(request);
if (requestHeader == null) {
return CompletableFuture.completedFuture(null);
}
mqtraceContext = buildMsgContext(ctx, requestHeader);
this.executeSendMessageHookBefore(ctx, request, mqtraceContext);
if (requestHeader.isBatch()) {
// 處理批量訊息
return this.asyncSendBatchMessage(ctx, request, mqtraceContext, requestHeader);
} else {
// 處理單個訊息請求
return this.asyncSendMessage(ctx, request, mqtraceContext, requestHeader);
}
}
}
我們重點分析一下單條訊息的處理邏輯
private CompletableFuture<RemotingCommand> asyncSendMessage(ChannelHandlerContext ctx, RemotingCommand request,
SendMessageContext mqtraceContext,
SendMessageRequestHeader requestHeader) {
//初始化回應資料
final RemotingCommand response = preSend(ctx, request, requestHeader);
final SendMessageResponseHeader responseHeader = (SendMessageResponseHeader)response.readCustomHeader();
if (response.getCode() != -1) {
return CompletableFuture.completedFuture(response);
}
//獲取訊息內容body
final byte[] body = request.getBody();
//訊息發送的佇列id
int queueIdInt = requestHeader.getQueueId();
//訊息發送的topic資訊
TopicConfig topicConfig = this.brokerController.getTopicConfigManager().selectTopicConfig(requestHeader.getTopic());
//如果佇列id<0,隨機一個queueId
if (queueIdInt < 0) {
queueIdInt = randomQueueId(topicConfig.getWriteQueueNums());
}
//封裝資訊到MessageExtBrokerInner中
MessageExtBrokerInner msgInner = new MessageExtBrokerInner();
msgInner.setTopic(requestHeader.getTopic());
msgInner.setQueueId(queueIdInt);
//處理重試和死信佇列
if (!handleRetryAndDLQ(requestHeader, response, request, msgInner, topicConfig)) {
return CompletableFuture.completedFuture(response);
}
msgInner.setBody(body);
msgInner.setFlag(requestHeader.getFlag());
Map<String, String> origProps = MessageDecoder.string2messageProperties(requestHeader.getProperties());
MessageAccessor.setProperties(msgInner, origProps);
msgInner.setBornTimestamp(requestHeader.getBornTimestamp());
msgInner.setBornHost(ctx.channel().remoteAddress());
msgInner.setStoreHost(this.getStoreHost());
msgInner.setReconsumeTimes(requestHeader.getReconsumeTimes() == null ? 0 : requestHeader.getReconsumeTimes());
String clusterName = this.brokerController.getBrokerConfig().getBrokerClusterName();
MessageAccessor.putProperty(msgInner, MessageConst.PROPERTY_CLUSTER, clusterName);
if (origProps.containsKey(MessageConst.PROPERTY_WAIT_STORE_MSG_OK)) {
// There is no need to store "WAIT=true", remove it from propertiesString to save 9 bytes for each message.
// It works for most case. In some cases msgInner.setPropertiesString invoked later and replace it.
String waitStoreMsgOKValue = origProps.remove(MessageConst.PROPERTY_WAIT_STORE_MSG_OK);
msgInner.setPropertiesString(MessageDecoder.messageProperties2String(msgInner.getProperties()));
// Reput to properties, since msgInner.isWaitStoreMsgOK() will be invoked later
origProps.put(MessageConst.PROPERTY_WAIT_STORE_MSG_OK, waitStoreMsgOKValue);
} else {
msgInner.setPropertiesString(MessageDecoder.messageProperties2String(msgInner.getProperties()));
}
CompletableFuture<PutMessageResult> putMessageResult = null;
String transFlag = origProps.get(MessageConst.PROPERTY_TRANSACTION_PREPARED);
//如果是事務訊息走事務處理邏輯
if (transFlag != null && Boolean.parseBoolean(transFlag)) {
//是否可以走事務流程
if (this.brokerController.getBrokerConfig().isRejectTransactionMessage()) {
response.setCode(ResponseCode.NO_PERMISSION);
response.setRemark(
"the broker[" + this.brokerController.getBrokerConfig().getBrokerIP1()
+ "] sending transaction message is forbidden");
return CompletableFuture.completedFuture(response);
}
putMessageResult = this.brokerController.getTransactionalMessageService().asyncPrepareMessage(msgInner);
} else {
//普通訊息則使用MessageStore將訊息進行存盤
putMessageResult = this.brokerController.getMessageStore().asyncPutMessage(msgInner);
}
return handlePutMessageResultFuture(putMessageResult, response, request, msgInner, responseHeader, mqtraceContext, ctx, queueIdInt);
}
3.3 訊息存盤DefaultMessageStore
這里我們簡單了解一下訊息最侄訓通過commitLog進行存在,后面將會對訊息存盤模塊進行學習和講解,
@Override
public CompletableFuture<PutMessageResult> asyncPutMessage(MessageExtBrokerInner msg) {
CompletableFuture<PutMessageResult> putResultFuture = this.commitLog.asyncPutMessage(msg);
return putResultFuture;
}
大致總結一下:
- 初始化回應資料;
- 獲取要發送的佇列和topic配置,如果queueId<0,則隨機選擇一個;
- 構建訊息類;
- 處理重試和死信,對RETRY型別的訊息處理,如果超過最大消費次數,則topic修改成"%DLQ%" + 分組名,即加入死信佇列;
- 如果是事務訊息,則需要校驗是否不允許發送事務訊息;
- 使用MessageStore組件將訊息存盤在本地檔案,只存盤CommitLog檔案,ConsumerQueue檔案和IndexFile檔案會由后臺執行緒異步存盤;
- 處理訊息存盤結果
轉載請註明出處,本文鏈接:https://www.uj5u.com/qita/423724.html
標籤:其他
下一篇:企業資料發展三階段
