一、RocketMQ 核心的四大組件:
Producer:就是訊息生產者,可以集群部署,它會先和 NameServer 集群中的隨機一臺建立長連接,得知當前要發送的 Topic 存在哪臺 Broker Master上,然后再與其建立長連接,支持多種負載平衡模式發送訊息,
Consumer:訊息消費者,也可以集群部署,它也會先和 NameServer 集群中的隨機一臺建立長連接,得知當前要訊息的 Topic 存在哪臺 Broker Master、Slave上,然后它們建立長連接,支持集群消費和廣播消費訊息,
Broker:主要負責訊息的存盤、查詢消費,支持主從部署,一個 Master 可以對應多個 Slave,Master 支持讀寫,Slave 只支持讀,Broker 會向集群中的每一臺 NameServer 注冊自己的路由資訊,
NameServer:類似Zookeeper,是一個很簡單的 Topic 路由注冊中心,支持 Broker 的動態注冊和發現,保存 Topic 和 Borker 之間的關系,通常也是集群部署,但是各 NameServer 之間不會互相通信, 各 NameServer 都有完整的路由資訊,即無狀態,
二、rocketmq基本作業流程:
1、先啟動 NameServer 集群,各 NameServer 之間無任何資料互動,Broker在啟動的時候會注冊自己配置的Topic資訊到NameServer集群的每一臺機器中,即每一個NameServer均有該broker的Topic路由配置資訊,并向所有 NameServer 定期(每 30s)發送心跳包,包括:IP、Port、TopicInfo;NameServer 也會定期掃描 Broker 存活串列,如果超過 120s 沒有心跳則移除此 Broker 相關資訊,代表下線,
2、這樣每個 NameServer 就知道集群所有 Broker 的相關資訊,此時 Producer 上線會根據組態檔中的NameServer 地址自動連接一個NameServer ;每 30s 會從連接的 NameServer 獲取 Topic 和 Broker 的映射關系存在本地記憶體中,從 NameServer 就可以得知它要發送的某 Topic 訊息在哪個 Broker 上,和對應的 Broker (Master 角色的)建立長連接,發送訊息,
3、Consumer 上線也可以從 NameServer 得知它所要接收的 Topic 是哪個 Broker ,和對應的 Master、Slave 建立連接,接收訊息,
可以理解為如下:
name server:注冊中心broker:訊息處理
procucer:生成訊息
consumer:消費訊息
每個組件都可以部署成集群模式進行水平擴展,
訊息由topic區分訊息型別(一級分類):如訂單訊息,物流訊息等
tag為二級分類
message queue為訊息型別下的訊息佇列,
用于并行發送和接受訊息,
四、基礎
分布式事務:
對于分布式事務,通俗地說就是,一次操作由若干分支操作組成,這些分支操作分屬不同應用,分布在不同服務器上,分布式事務需要保證這些分支操作要么全部成功,要么全部失敗,分布式事務與普通事務一樣,就是為了保證操作結果的一致性,
事務訊息:
RocketMQ提供了類似X/Open XA的分布式事務功能,通過事務訊息能達到分布式事務的最終一致,XA是一種分布式事務解決方案,一種分布式事務處理模式,
半事務訊息:
暫不能投遞的訊息,發送方已經成功地將訊息發送到了Broker,但Broker未收到最終確認指令,此時該訊息被標記成“暫不能投遞”狀態,即不能被消費者看到,處于該種狀態下的訊息即半事務訊息,
本地事務狀態:
Producer回呼操作執行的結果為本地事務狀態,其會發送給TC,而TC會再發送給TM,TM會根據TC發送來的本地事務狀態來決定全域事務確認令,
// 描述本地事務執行狀態 public enum LocalTransactionState {
COMMIT_MESSAGE, // 本地事務執行成功
ROLLBACK_MESSAGE, // 本地事務執行失敗
UNKNOW, // 不確定,表示需要進行回查以確定本地事務的執行結果
}
RocketMQ中的訊息回查設定:
關于訊息回查,有三個常見的屬性設定,它們都在broker加載的組態檔中設定,例如:
transactionTimeout=20,指定TM在 20 秒內應將最終確認狀態發送給TC,否則引發訊息回查,默認為 60 秒
transactionCheckMax=5,指定最多回查 5 次,超過后將丟棄訊息并記錄錯誤日志,默認 15 次,
transactionCheckInterval=10,指定設定的多次訊息回查的時間間隔為 10 秒,默認為 60 秒,
五、Topic與Broker的關系:
- Borker中有一個或多個Topic
- Topic中有一個或多個MessageQueue
Topic可以自動創建和手動創建;
1、手動創建也叫預先創建,就是在使用Topic之前就創建,可以通過命令列或者通過RocketMQ的管理界面(可視化控制臺)創建Topic,
/**
* 創建topic,引數分別是:borker的名稱,topic的名稱,queue的數量
* broker要和虛擬機broker.conf組態檔中brokername的名字一致
* newTopic的名字隨便起,queueNum8的意思是新建的訊息佇列數為8個
*/
producer.createTopic("broker_haoke_im","my-topic",8);
2、自動創建就是設定了autoCreateTopicEnable =true;
TBW102 是啥用的?就是一個接受自動創建主題的 Broker, 啟動會把這個默認Topic(主題)的Broker登記到 NameServer,這樣當 Producer 發送新 Topic 的訊息時候就得知哪個 Broker 可以自動創建主題,然后發往那個 Broker,
而 Broker 接受到這個訊息的時候發現沒找到對應的主題,但是它接受創建新主題,這樣就會創建對應的 Topic 路由資訊,
假設此時發送方還在連續快速的發送訊息,那 NameServer 上其實還沒有關于這個 Topic 的路由資訊,所以有機會讓別的允許自動創建的 Broker 也創建對應的 Topic 路由資訊,這樣集群里的 Broker 就能接受這個 Topic 的資訊,達到負載均衡的目的,但也有個別 Broker 可能,沒收到,
如果發送方這一次發了之后 30s 內一個都不發,之前的那個 Broker 隨著心跳把這個路由資訊更新到 NameServer 了,那么之后發送該 Topic 訊息的 Producer 從 NameServer 只能得知該 Topic 訊息只能發往之前的那臺 Broker ,這就不均衡了,如果這個新主題訊息很多,那臺 Broker 負載就很高了,
所以不建議線上開啟允許自動創建主題,即 autoCreateTopicEnable 引數,
Tags的使用
tag(標簽): 標簽可以被認為是對topic的進一步細化,一般在相同業務模塊中通過引入標簽來標記不同用途的訊息,區分相同topic下不同種類的訊息,生產到哪個topic的哪個tag下,消費者也是從topic的哪個tag進行消費,即實作訊息的過濾,
建議一個應用一個 Topic,利用 tages 來標記不同業務,因為 tages 設定比較靈活,且一個應用一個 Topic 很清晰,能直觀的辨別,
Keys的使用
如果有訊息業務上的唯一標識,請填寫到 keys 欄位中,方便日后的定位查找,
queue(佇列): queue是訊息的物理管理單位,而topic是邏輯管理單位,一個topic下可以有多個queue,默認自動創建是4個,手動創建是8個

六、下面以windows服務器為例演示使用rocketmq如下:
1、下載rocketmq的安裝包:https://rocketmq.apache.org/zh/download
2、下載rocketmq儀表盤(也就是可視化操作界面,是一個完整的java專案可以用idea運行)
3、修改conf/broker.conf配置在末尾添加如下配置(IP使用自己的),并保存,
brokerIP1=192.168.31.199
namesrvAddr=192.168.31.199:9876
4、配置ROCKET_HOME環境變數,路徑使用下載路徑;path中配置%ROCKET_HOME%\bin即可
5、啟動Namesrv
在rocketmq檔案的bin目錄下,進入cmd使用如下命令:start mqnamesrv.cmd
6、啟動Broker:start mqbroker.cmd -n 127.0.0.1:9876 autoCreateTopicEnable=true (也就是說,producer使用RocketMQTemplate發送的訊息,就算Booker上的topic之前不存在,rocket也會幫我們創建好)
7、將儀表盤專案匯入idea,然后打開application.properties檔案修改rocket.config.namesrvAddr=localhost:9876;
8、啟動儀表盤專案:瀏覽器輸入http://localhost:8080/#/即可看到可視化界面;
9、java代碼創建生產者和消費者:
創建普通springboot專案,添加依賴
<dependency>
<groupId>org.apache.rocketmq</groupId>
<artifactId>rocketmq-spring-boot-starter</artifactId>
<version>2.0.1</version>
</dependency>
10、修改組態檔
# 應用名稱
spring:
application:
name: rocket-producer
# 應用服務 WEB 訪問埠
server:
port: 8002
rocketmq:
name-server: localhost:9876
producer:
group: my-group
11、創建測驗代碼
import org.apache.rocketmq.spring.core.RocketMQTemplate;
import org.springframework.scheduling.annotation.Scheduled;
import org.springframework.stereotype.Component;
import javax.annotation.Resource;
@Component
public class SendMessage {
@Resource
private RocketMQTemplate rocketMQTemplate;
@Scheduled(fixedRate = 5000)
public void run(){
//發送訊息
rocketMQTemplate.convertAndSend("test-topic-1", "Hello, World!");
}
}
12、創建消費者專案(同上)
消費端測驗代碼:
import org.apache.rocketmq.spring.annotation.RocketMQMessageListener;
import org.apache.rocketmq.spring.core.RocketMQListener;
import org.springframework.stereotype.Service;
@Service
@RocketMQMessageListener(topic = "test-topic-1", consumerGroup = "my-consumer_test-topic-1")
class MyConsumer1 implements RocketMQListener<String> {
/**
*需要注意的是,onMessage()封裝了ACK機制,消費者往外拋例外時,RocketMQ認為消費失敗,重新發送該條訊息,否則默認消費成功
*/
@Override
public void onMessage(String s) {
System.out.println(s);
}
}
轉載請註明出處,本文鏈接:https://www.uj5u.com/houduan/555164.html
標籤:Java
上一篇:jvm垃圾回收及記憶體模型
下一篇:返回列表
