目錄
一、NameServer 介紹
二、NameServer 功能串列
三、NameServer 架構分析
四、NameServer 工程目錄決議
五、NameServer 啟動流程分析
1) 加載配置
2) initialize()
3) 啟動server
六、NameServer核心原始碼決議
1. 路由注冊
1) broker向NameServer 發送心跳包
2) NameServer 處理心跳包
2. 路由洗掉
3. 路由發現
rocketmq版本: 4.8.0
一、NameServer 介紹
NameServer 是rocketmq核心組件之一,與zookeeper一樣天生具有分布式的特性,在rocketmq中擔當著路由注冊、發現、動態地維護broker相關資訊的角色, NameServer 不提供Master-slave同步機制,但是能夠保證資料的最終一致性,
二、NameServer 功能串列
- 動態路由發現和注冊功能,broker 啟動時,會將brokerAddr 注冊到NameServer里, 路由發現是指客戶端會定時的向NameServer根據topic拉取路由的最新資訊,
- 動態剔除功能,每隔10 s NameServer 會自動掃描所有的broker, 如果有broker失效,那么會從地址串列里將其剔除掉,
三、NameServer 架構分析
下面是 rocketmq 的部署圖

核心原理決議
Broker訊息服務器啟動時會自動向NameServer 注冊資訊,訊息生產者在發送訊息時,會在NameServer的地址串列里通過負載均衡選擇一個Broker進行訊息發送, NameServer 與每臺broker保持長連接,broker會每隔30s向NameServer發送一個心跳包,NameServer每間隔10s查看broker是否存活,如果broker掛掉了,判斷掛掉的邏輯是brokerLiveTable檢測上次的心跳包與當前系統時間的時間差,如果時間戳大于120s, 那么就將broker從服務地址串列里剔除,
這樣設計的目的是降低NameServer 的復雜性, 在訊息發送端提供容錯機制來保證訊息發送的高可用性,
NameServer 可以通過集群來保證高可用性,但在同一時刻有可能獲取到資料是不一致的,因為不提供同步機制,但能夠保證多個節點的最終一致性,NameServer 這樣設計是為了簡單高效,
四、NameServer 工程目錄決議
工程目錄結構以及決議如下:
namesrv
├─ NamesrvController.java // 執行初始化邏輯,加載配置、注冊Processor等
├─ NamesrvStartup.java // NameServer的啟動類, 啟動netty server
├─ kvconfig
│ ├─ KVConfigManager.java // namespace和config配置管理
│ └─ KVConfigSerializeWrapper.java // 將獲取到的配置json序列化
├─ processor
│ ├─ ClusterTestRequestProcessor.java //處理請求型別,
│ └─ DefaultRequestProcessor.java // 默認地請求處理器, 處理資料包
└─ routeinfo
├─ BrokerHousekeepingService.java // 管理netty 的channel
└─ RouteInfoManager.java // 路由管理器,維護topic, broker,
//clusterName, brokerAddr等資訊
通過簡單地分析可以發現netty 是rocketmq 網路通信的核心,掌握netty 的常見用法是非常有必要的,
五、NameServer 啟動流程分析
1) 加載配置
加載 namesrvConfig 和 nettyServerConfig, 如果有手動配置也可以生效, 使用option類封裝引數,在程式運行前添加配置Program arguments, 添加的格式: 例如 -c , -p 等,

public static NamesrvController createNamesrvController(String[] args) throws IOException, JoranException {
....
final NamesrvController controller = new NamesrvController(namesrvConfig, nettyServerConfig);
// remember all configs to prevent discard
controller.getConfiguration().registerConfig(properties);
return controller;
}
2) initialize()
NamesrvController 在執行start()方法前需要做一些準備作業,比如加載配置、創建Netty Server實體、注冊請求處理器、掃描所有的失聯的broker等

具體的解釋如下注釋:
public boolean initialize() {
// 加載k,v 相關配置,含自定義配置,
this.kvConfigManager.load();
// 啟動netty server, 管理channel
this.remotingServer = new NettyRemotingServer(this.nettyServerConfig, this.brokerHousekeepingService);
// 初始化netty 執行緒池
this.remotingExecutor =
Executors.newFixedThreadPool(nettyServerConfig.getServerWorkerThreads(), new ThreadFactoryImpl("RemotingExecutorThread_"));
// 注冊netty 請求Handler, 可以通過NettyRequestProcessor介面找到其實作類
this.registerProcessor();
// 與broker建立長連接,掃描所有的broker
this.scheduledExecutorService.scheduleAtFixedRate(new Runnable() {
@Override
public void run() {
NamesrvController.this.routeInfoManager.scanNotActiveBroker();
}
}, 5, 10, TimeUnit.SECONDS);
// 列印所有的config
this.scheduledExecutorService.scheduleAtFixedRate(new Runnable() {
@Override
public void run() {
NamesrvController.this.kvConfigManager.printAllPeriodically();
}
}, 1, 10, TimeUnit.MINUTES);
// 監聽檔案里的配置是否修改
if (TlsSystemConfig.tlsMode != TlsMode.DISABLED) {
// Register a listener to reload SslContext
try {
fileWatchService = new FileWatchService(
new String[] {
TlsSystemConfig.tlsServerCertPath,
TlsSystemConfig.tlsServerKeyPath,
TlsSystemConfig.tlsServerTrustCertPath
},
new FileWatchService.Listener() {
boolean certChanged, keyChanged = false;
@Override
public void onChanged(String path) {
if (path.equals(TlsSystemConfig.tlsServerTrustCertPath)) {
log.info("The trust certificate changed, reload the ssl context");
reloadServerSslContext();
}
if (path.equals(TlsSystemConfig.tlsServerCertPath)) {
certChanged = true;
}
if (path.equals(TlsSystemConfig.tlsServerKeyPath)) {
keyChanged = true;
}
if (certChanged && keyChanged) {
log.info("The certificate and private key changed, reload the ssl context");
certChanged = keyChanged = false;
reloadServerSslContext();
}
}
private void reloadServerSslContext() {
((NettyRemotingServer) remotingServer).loadSslContext();
}
});
} catch (Exception e) {
log.warn("FileWatchService created error, can't load the certificate dynamically");
}
}
return true;
}
如果initialize()方法回傳false, 那么需要檢查一些相關配置是否正確, 回傳true后,就可以執行最后一步controller.start()方法, 該方法表示NameServer正式啟動,
3) 啟動server
接下來看下源代碼分析start()方法做了哪些事

public void start() throws Exception {
// 1. 啟動netty server
this.remotingServer.start();
// 2. 啟動檔案掃描執行緒,監聽核心配置是否修改,
if (this.fileWatchService != null) {
this.fileWatchService.start();
}
}
可以通過debug發現,首先會進入到NettyRemotingServer類里的start()方法, 該方法實作了nettyServer, 初始化netty的執行緒組和實體化 ServerBootStrap,

然后開啟一個執行緒執行FileWatchService 的run()方法:
啟動成功后,會在控制臺列印 boot success的字樣,

六、NameServer核心原始碼決議
1. 路由注冊
1) broker向NameServer 發送心跳包
找到brokerController的start()方法里,broker 通過 BrokerController.this.
registerBrokerAll(true,false) 方法來向NameServer 發送心跳包,其中使用定時任務 sheduledExecutorService 執行緒池定時發送,

然后進入到doRegisterBrokerAll()方法,找到BrokerOuterApi里的registerBrokerAll()方法, 通過RegiterBrokerRequestHeader類封裝broker相關的資訊, RegiterBrokerRequestHeader 主要屬性如下:
- brokerName: broker名稱,
- brokerAddr: broker的地址,
- cluterName: broker所在集群的名稱,
- haServerAddr: 集群master的地址,
- brokerId: brokerId為0的時候表示該broker為master, 如果大于0,表示該broker為slave,
public List<RegisterBrokerResult> registerBrokerAll(
final String clusterName,
final String brokerAddr,
final String brokerName,
final long brokerId,
final String haServerAddr,
final TopicConfigSerializeWrapper topicConfigWrapper,
final List<String> filterServerList,
final boolean oneway,
final int timeoutMills,
final boolean compressed) {
final List<RegisterBrokerResult> registerBrokerResultList = new CopyOnWriteArrayList<>();
List<String> nameServerAddressList = this.remotingClient.getNameServerAddressList();
if (nameServerAddressList != null && nameServerAddressList.size() > 0) {
// 封裝broker資訊
final RegisterBrokerRequestHeader requestHeader = new RegisterBrokerRequestHeader();
requestHeader.setBrokerAddr(brokerAddr);
requestHeader.setBrokerId(brokerId);
requestHeader.setBrokerName(brokerName);
requestHeader.setClusterName(clusterName);
requestHeader.setHaServerAddr(haServerAddr);
requestHeader.setCompressed(compressed);
RegisterBrokerBody requestBody = new RegisterBrokerBody();
requestBody.setTopicConfigSerializeWrapper(topicConfigWrapper);
requestBody.setFilterServerList(filterServerList);
final byte[] body = requestBody.encode(compressed);
final int bodyCrc32 = UtilAll.crc32(body);
requestHeader.setBodyCrc32(bodyCrc32);
// 等待所有的NameServer都含有broker資訊后,才表示執行完畢,
final CountDownLatch countDownLatch = new CountDownLatch(nameServerAddressList.size());
// 把該broker的資訊注冊到所有的NameServer上,
for (final String namesrvAddr : nameServerAddressList) {
brokerOuterExecutor.execute(new Runnable() {
@Override
public void run() {
try {
RegisterBrokerResult result = registerBroker(namesrvAddr,oneway, timeoutMills,requestHeader,body);
if (result != null) {
registerBrokerResultList.add(result);
}
log.info("register broker[{}]to name server {} OK", brokerId, namesrvAddr);
} catch (Exception e) {
log.warn("registerBroker Exception, {}", namesrvAddr, e);
} finally {
countDownLatch.countDown();
}
}
});
}
try {
// 默認超時時間為6s, 在BrokerConfig里配有registerBrokerTimeoutMills=6000
countDownLatch.await(timeoutMills, TimeUnit.MILLISECONDS);
} catch (InterruptedException e) {
}
}
return registerBrokerResultList;
}
2) NameServer 處理心跳包
首先DefaultRequestProcessor 網路處理器決議請求型別,請求型別如果為RequestCode.
REGISTER_BROKER, 則最終的請求會到RouteInfoManager里的registerBroker()方法,
public RemotingCommand registerBroker(ChannelHandlerContext ctx,
RemotingCommand request) throws RemotingCommandException {
final RemotingCommand response = RemotingCommand.createResponseCommand(RegisterBrokerResponseHeader.class);
final RegisterBrokerResponseHeader responseHeader = (RegisterBrokerResponseHeader) response.readCustomHeader();
final RegisterBrokerRequestHeader requestHeader =
(RegisterBrokerRequestHeader) request.decodeCommandCustomHeader(RegisterBrokerRequestHeader.class);
if (!checksum(ctx, request, requestHeader)) {
response.setCode(ResponseCode.SYSTEM_ERROR);
response.setRemark("crc32 not match");
return response;
}
// 決議資料包
TopicConfigSerializeWrapper topicConfigWrapper;
if (request.getBody() != null) {
topicConfigWrapper = TopicConfigSerializeWrapper.decode(request.getBody(), TopicConfigSerializeWrapper.class);
} else {
topicConfigWrapper = new TopicConfigSerializeWrapper();
topicConfigWrapper.getDataVersion().setCounter(new AtomicLong(0));
topicConfigWrapper.getDataVersion().setTimestamp(0);
}
// 用RouteInfoManager 注冊broker
RegisterBrokerResult result = this.namesrvController.getRouteInfoManager().registerBroker(
requestHeader.getClusterName(),
requestHeader.getBrokerAddr(),
requestHeader.getBrokerName(),
requestHeader.getBrokerId(),
requestHeader.getHaServerAddr(),
topicConfigWrapper,
null,
ctx.channel()
);
// 回應broker
responseHeader.setHaServerAddr(result.getHaServerAddr());
responseHeader.setMasterAddr(result.getMasterAddr());
byte[] jsonValue = this.namesrvController.getKvConfigManager().getKVListByNamespace(NamesrvUtil.NAMESPACE_ORDER_TOPIC_CONFIG);
response.setBody(jsonValue);
response.setCode(ResponseCode.SUCCESS);
response.setRemark(null);
return response;
}
RouteInfoManager 里的registerBroker方法將broker的資訊最終添加到 clusterAddrTable、brokerAddrTable、brokerLiveTable、filterServerTable里,
2. 路由洗掉
RouteInfoManager 的scanNotActiveBroker ()方法
3. 路由發現
RocketMQ的路由發現是非實時的,當Topic路由發生變化時,NameServer不主動推送給客戶端,而是由客戶端定時拉取主題最新的路由,根據主題拉取最新路由的編碼為: GET_ROUTEINFO_BY_TOPIC
轉載請註明出處,本文鏈接:https://www.uj5u.com/ruanti/287920.html
標籤:其他
上一篇:高可用架構-限流如何實作

