主頁 > 後端開發 > rocketMQ訊息佇列簡介及其實體

rocketMQ訊息佇列簡介及其實體

2023-06-15 07:36:29 後端開發

一、RocketMQ 核心的四大組件:

Producer:就是訊息生產者,可以集群部署,它會先和 NameServer 集群中的隨機一臺建立長連接,得知當前要發送的 Topic 存在哪臺 Broker Master上,然后再與其建立長連接,支持多種負載平衡模式發送訊息,

Consumer:訊息消費者,也可以集群部署,它也會先和 NameServer 集群中的隨機一臺建立長連接,得知當前要訊息的 Topic 存在哪臺 Broker Master、Slave上,然后它們建立長連接,支持集群消費和廣播消費訊息,

Broker:主要負責訊息的存盤、查詢消費,支持主從部署,一個 Master 可以對應多個 Slave,Master 支持讀寫,Slave 只支持讀,Broker 會向集群中的每一臺 NameServer 注冊自己的路由資訊,

NameServer:類似Zookeeper,是一個很簡單的 Topic 路由注冊中心,支持 Broker 的動態注冊和發現,保存 Topic 和 Borker 之間的關系,通常也是集群部署,但是各 NameServer 之間不會互相通信, 各 NameServer 都有完整的路由資訊,即無狀態,

二、rocketmq基本作業流程:

1、先啟動 NameServer 集群,各 NameServer 之間無任何資料互動,Broker在啟動的時候會注冊自己配置的Topic資訊到NameServer集群的每一臺機器中,即每一個NameServer均有該broker的Topic路由配置資訊,并向所有 NameServer 定期(每 30s)發送心跳包,包括:IP、Port、TopicInfo;NameServer 也會定期掃描 Broker 存活串列,如果超過 120s 沒有心跳則移除此 Broker 相關資訊,代表下線,

2、這樣每個 NameServer 就知道集群所有 Broker 的相關資訊,此時 Producer 上線會根據組態檔中的NameServer 地址自動連接一個NameServer ;每 30s 會從連接的 NameServer 獲取 Topic 和 Broker 的映射關系存在本地記憶體中,從 NameServer 就可以得知它要發送的某 Topic 訊息在哪個 Broker 上,和對應的 Broker (Master 角色的)建立長連接,發送訊息,

3、Consumer 上線也可以從 NameServer 得知它所要接收的 Topic 是哪個 Broker ,和對應的 Master、Slave 建立連接,接收訊息,

可以理解為如下:

name server:注冊中心

broker:訊息處理

procucer:生成訊息

consumer:消費訊息

每個組件都可以部署成集群模式進行水平擴展,
訊息由topic區分訊息型別(一級分類):如訂單訊息,物流訊息等
tag為二級分類
message queue為訊息型別下的訊息佇列,
用于并行發送和接受訊息,

四、基礎
分布式事務:
對于分布式事務,通俗地說就是,一次操作由若干分支操作組成,這些分支操作分屬不同應用,分布在不同服務器上,分布式事務需要保證這些分支操作要么全部成功,要么全部失敗,分布式事務與普通事務一樣,就是為了保證操作結果的一致性,

事務訊息:
RocketMQ提供了類似X/Open XA的分布式事務功能,通過事務訊息能達到分布式事務的最終一致,XA是一種分布式事務解決方案,一種分布式事務處理模式,

半事務訊息:
暫不能投遞的訊息,發送方已經成功地將訊息發送到了Broker,但Broker未收到最終確認指令,此時該訊息被標記成“暫不能投遞”狀態,即不能被消費者看到,處于該種狀態下的訊息即半事務訊息,

本地事務狀態:
Producer回呼操作執行的結果為本地事務狀態,其會發送給TC,而TC會再發送給TM,TM會根據TC發送來的本地事務狀態來決定全域事務確認令,

// 描述本地事務執行狀態 public enum LocalTransactionState {
COMMIT_MESSAGE, // 本地事務執行成功
ROLLBACK_MESSAGE, // 本地事務執行失敗
UNKNOW, // 不確定,表示需要進行回查以確定本地事務的執行結果
}

RocketMQ中的訊息回查設定:
關于訊息回查,有三個常見的屬性設定,它們都在broker加載的組態檔中設定,例如:

transactionTimeout=20,指定TM在 20 秒內應將最終確認狀態發送給TC,否則引發訊息回查,默認為 60 秒
transactionCheckMax=5,指定最多回查 5 次,超過后將丟棄訊息并記錄錯誤日志,默認 15 次,
transactionCheckInterval=10,指定設定的多次訊息回查的時間間隔為 10 秒,默認為 60 秒,
五、Topic與Broker的關系:

  • Borker中有一個或多個Topic
  • Topic中有一個或多個MessageQueue

Topic可以自動創建和手動創建;

1、手動創建也叫預先創建,就是在使用Topic之前就創建,可以通過命令列或者通過RocketMQ的管理界面(可視化控制臺)創建Topic,

/**
* 創建topic,引數分別是:borker的名稱,topic的名稱,queue的數量
* broker要和虛擬機broker.conf組態檔中brokername的名字一致
* newTopic的名字隨便起,queueNum8的意思是新建的訊息佇列數為8個
*/
producer.createTopic("broker_haoke_im","my-topic",8);

2、自動創建就是設定了autoCreateTopicEnable =true;

TBW102 是啥用的?就是一個接受自動創建主題的 Broker, 啟動會把這個默認Topic(主題)的Broker登記到 NameServer,這樣當 Producer 發送新 Topic 的訊息時候就得知哪個 Broker 可以自動創建主題,然后發往那個 Broker,

而 Broker 接受到這個訊息的時候發現沒找到對應的主題,但是它接受創建新主題,這樣就會創建對應的 Topic 路由資訊,

假設此時發送方還在連續快速的發送訊息,那 NameServer 上其實還沒有關于這個 Topic 的路由資訊,所以有機會讓別的允許自動創建的 Broker 也創建對應的 Topic 路由資訊,這樣集群里的 Broker 就能接受這個 Topic 的資訊,達到負載均衡的目的,但也有個別 Broker 可能,沒收到,

如果發送方這一次發了之后 30s 內一個都不發,之前的那個 Broker 隨著心跳把這個路由資訊更新到 NameServer 了,那么之后發送該 Topic 訊息的 Producer 從 NameServer 只能得知該 Topic 訊息只能發往之前的那臺 Broker ,這就不均衡了,如果這個新主題訊息很多,那臺 Broker 負載就很高了,

所以不建議線上開啟允許自動創建主題,即 autoCreateTopicEnable 引數,

Tags的使用

tag(標簽): 標簽可以被認為是對topic的進一步細化,一般在相同業務模塊中通過引入標簽來標記不同用途的訊息,區分相同topic下不同種類的訊息,生產到哪個topic的哪個tag下,消費者也是從topic的哪個tag進行消費,即實作訊息的過濾,

建議一個應用一個 Topic,利用 tages 來標記不同業務,因為 tages 設定比較靈活,且一個應用一個 Topic 很清晰,能直觀的辨別,

Keys的使用

如果有訊息業務上的唯一標識,請填寫到 keys 欄位中,方便日后的定位查找,

queue(佇列): queue是訊息的物理管理單位,而topic是邏輯管理單位,一個topic下可以有多個queue,默認自動創建是4個,手動創建是8個

 

六、下面以windows服務器為例演示使用rocketmq如下:

1、下載rocketmq的安裝包:https://rocketmq.apache.org/zh/download

2、下載rocketmq儀表盤(也就是可視化操作界面,是一個完整的java專案可以用idea運行)

3、修改conf/broker.conf配置在末尾添加如下配置(IP使用自己的),并保存,

brokerIP1=192.168.31.199

namesrvAddr=192.168.31.199:9876

4、配置ROCKET_HOME環境變數,路徑使用下載路徑;path中配置%ROCKET_HOME%\bin即可

5、啟動Namesrv

在rocketmq檔案的bin目錄下,進入cmd使用如下命令:start mqnamesrv.cmd

6、啟動Broker:start mqbroker.cmd -n 127.0.0.1:9876  autoCreateTopicEnable=true  (也就是說,producer使用RocketMQTemplate發送的訊息,就算Booker上的topic之前不存在,rocket也會幫我們創建好)

7、將儀表盤專案匯入idea,然后打開application.properties檔案修改rocket.config.namesrvAddr=localhost:9876;

8、啟動儀表盤專案:瀏覽器輸入http://localhost:8080/#/即可看到可視化界面;

9、java代碼創建生產者和消費者:

創建普通springboot專案,添加依賴

<dependency>
<groupId>org.apache.rocketmq</groupId>
<artifactId>rocketmq-spring-boot-starter</artifactId>
<version>2.0.1</version>
</dependency>

10、修改組態檔

# 應用名稱
spring:
application:
name: rocket-producer
# 應用服務 WEB 訪問埠
server:
port: 8002
rocketmq:
name-server: localhost:9876
producer:
group: my-group

11、創建測驗代碼

import org.apache.rocketmq.spring.core.RocketMQTemplate;
import org.springframework.scheduling.annotation.Scheduled;
import org.springframework.stereotype.Component;

import javax.annotation.Resource;

@Component
public class SendMessage {
@Resource
private RocketMQTemplate rocketMQTemplate;


@Scheduled(fixedRate = 5000)
public void run(){
//發送訊息
rocketMQTemplate.convertAndSend("test-topic-1", "Hello, World!");

}
}

12、創建消費者專案(同上)

消費端測驗代碼:
import org.apache.rocketmq.spring.annotation.RocketMQMessageListener;
import org.apache.rocketmq.spring.core.RocketMQListener;
import org.springframework.stereotype.Service;

@Service
@RocketMQMessageListener(topic = "test-topic-1", consumerGroup = "my-consumer_test-topic-1")
class MyConsumer1 implements RocketMQListener<String> {

/**
*需要注意的是,onMessage()封裝了ACK機制,消費者往外拋例外時,RocketMQ認為消費失敗,重新發送該條訊息,否則默認消費成功
*/

@Override
public void onMessage(String s) {
System.out.println(s);
}
}

轉載請註明出處,本文鏈接:https://www.uj5u.com/houduan/555164.html

標籤:Java

上一篇:jvm垃圾回收及記憶體模型

下一篇:返回列表

標籤雲
其他(160974) Python(38226) JavaScript(25495) Java(18239) C(15237) 區塊鏈(8270) C#(7972) AI(7469) 爪哇(7425) MySQL(7251) html(6777) 基礎類(6313) sql(6102) 熊猫(6058) PHP(5875) 数组(5741) R(5409) Linux(5347) 反应(5209) 腳本語言(PerlPython)(5129) 非技術區(4971) Android(4593) 数据框(4311) css(4259) 节点.js(4032) C語言(3288) json(3245) 列表(3129) 扑(3119) C++語言(3117) 安卓(2998) 打字稿(2995) VBA(2789) Java相關(2746) 疑難問題(2699) 细绳(2522) 單片機工控(2479) iOS(2435) ASP.NET(2404) MongoDB(2323) 麻木的(2285) 正则表达式(2254) 字典(2211) 循环(2198) 迅速(2185) 擅长(2169) 镖(2155) .NET技术(1984) 功能(1967) HtmlCss(1966) Web開發(1951) C++(1940) python-3.x(1918) 弹簧靴(1913) xml(1889) PostgreSQL(1881) .NETCore(1863) 谷歌表格(1846) Unity3D(1843) for循环(1842)

熱門瀏覽
  • 【C++】Microsoft C++、C 和匯編程式檔案

    ......

    uj5u.com 2020-09-10 00:57:23 more
  • 例外宣告

    相比于斷言適用于排除邏輯上不可能存在的狀態,例外通常是用于邏輯上可能發生的錯誤。 例外宣告 Item 1:當函式不可能拋出例外或不能接受拋出例外時,使用noexcept 理由 如果不打算拋出例外的話,程式就會認為無法處理這種錯誤,并且應當盡早終止,如此可以有效地阻止例外的傳播與擴散。 示例 //不可 ......

    uj5u.com 2020-09-10 00:57:27 more
  • Codeforces 1400E Clear the Multiset(貪心 + 分治)

    鏈接:https://codeforces.com/problemset/problem/1400/E 來源:Codeforces 思路:給你一個陣列,現在你可以進行兩種操作,操作1:將一段沒有 0 的區間進行減一的操作,操作2:將 i 位置上的元素歸零。最終問:將這個陣列的全部元素歸零后操作的最少 ......

    uj5u.com 2020-09-10 00:57:30 more
  • UVA11610 【Reverse Prime】

    本人看到此題沒有翻譯,就附帶了一個自己的翻譯版本 思考 這一題,它的第一個要求是找出所有 $7$ 位反向質數及其質因數的個數。 我們應該需要質數篩篩選1~$10^{7}$的所有數,這里就不慢慢介紹了。但是,重讀題,我們突然發現反向質數都是 $7$ 位,而將它反過來后的數字卻是 $6$ 位數,這就說明 ......

    uj5u.com 2020-09-10 00:57:36 more
  • 統計區間素數數量

    1 #pragma GCC optimize(2) 2 #include <bits/stdc++.h> 3 using namespace std; 4 bool isprime[1000000010]; 5 vector<int> prime; 6 inline int getlist(int ......

    uj5u.com 2020-09-10 00:57:47 more
  • C/C++編程筆記:C++中的 const 變數詳解,教你正確認識const用法

    1、C中的const 1、區域const變數存放在堆疊區中,會分配記憶體(也就是說可以通過地址間接修改變數的值)。測驗代碼如下: 運行結果: 2、全域const變數存放在只讀資料段(不能通過地址修改,會發生寫入錯誤), 默認為外部聯編,可以給其他源檔案使用(需要用extern關鍵字修飾) 運行結果: ......

    uj5u.com 2020-09-10 00:58:04 more
  • 【C++犯錯記錄】VS2019 MFC添加資源不懂如何修改資源宏ID

    1. 首先在資源視圖中,添加資源 2. 點擊新添加的資源,復制自動生成的ID 3. 在解決方案資源管理器中找到Resource.h檔案,編輯,使用整個專案搜索和替換的方式快速替換 宏宣告 4. Ctrl+Shift+F 全域搜索,點擊查找全部,然后逐個替換 5. 為什么使用搜索替換而不使用屬性視窗直 ......

    uj5u.com 2020-09-10 00:59:11 more
  • 【C++犯錯記錄】VS2019 MFC不懂的批量添加資源

    1. 打開資源頭檔案Resource.h,在其中預先定義好宏 ID(不清楚其實ID值應該設定多少,可以先新建一個相同的資源項,再在這個資源的ID值的基礎上遞增即可) 2. 在資源視圖中選中專案資源,按F7編輯資源檔案,按 ID 型別 相對路徑的形式添加 資源。(別忘了先把檔案拷貝到專案中的res檔案 ......

    uj5u.com 2020-09-10 01:00:19 more
  • C/C++編程筆記:關于C++的參考型別,專供新手入門使用

    今天要講的是C++中我最喜歡的一個用法——參考,也叫別名。 參考就是給一個變數名取一個變數名,方便我們間接地使用這個變數。我們可以給一個變數創建N個參考,這N + 1個變數共享了同一塊記憶體區域。(參考型別的變數會占用記憶體空間,占用的記憶體空間的大小和指標型別的大小是相同的。雖然參考是一個物件的別名,但 ......

    uj5u.com 2020-09-10 01:00:22 more
  • 【C/C++編程筆記】從頭開始學習C ++:初學者完整指南

    眾所周知,C ++的學習曲線陡峭,但是花時間學習這種語言將為您的職業帶來奇跡,并使您與其他開發人員區分開。您會更輕松地學習新語言,形成真正的解決問題的技能,并在編程的基礎上打下堅實的基礎。 C ++將幫助您養成良好的編程習慣(即清晰一致的編碼風格,在撰寫代碼時注釋代碼,并限制類內部的可見性),并且由 ......

    uj5u.com 2020-09-10 01:00:41 more
最新发布
  • rocketMQ訊息佇列簡介及其實體

    一、RocketMQ 核心的四大組件: Producer:就是訊息生產者,可以集群部署。它會先和 NameServer 集群中的隨機一臺建立長連接,得知當前要發送的 Topic 存在哪臺 Broker Master上,然后再與其建立長連接,支持多種負載平衡模式發送訊息。 Consumer:訊息消費者 ......

    uj5u.com 2023-06-15 07:36:29 more
  • jvm垃圾回收及記憶體模型

    1、了解垃圾回收之前,必須先了解記憶體模型 2、垃圾回收區域 a、 首先要標記垃圾,找出垃圾 b、Java垃圾回收(一)_java 垃圾回收_頭發慢點掉的小馬的博客-CSDN博客 垃圾回收器 方法區不需要連續的記憶體,可以選擇固定大小或者可擴展。并且還可以選擇不實作垃圾收集。相對而言,垃圾收集行為在這個 ......

    uj5u.com 2023-06-15 07:29:58 more
  • [ARM匯編]計算機原理與數制基礎—1.1.4 邏輯運算

    在計算機中,邏輯運算是對二進制資料進行操作的基礎。邏輯運算主要包括以下幾種:與(AND)、或(OR)、非(NOT)和異或(XOR)。接下來,我們將詳細介紹這幾種邏輯運算的原理及其應用。 #### 與(AND)運算 與運算的規則如下: - 0 AND 0 = 0 - 0 AND 1 = 0 - 1 A ......

    uj5u.com 2023-06-14 10:33:29 more
  • servlet介紹和如何在idea中寫一個servlet程式

    ## 1. Servlet ### 1.1 Servlet簡介 Servlet(Server Applet)是Java Servlet的簡稱,稱為小服務程式或服務連接器,用Java撰寫的服務器端程式,具有獨立于平臺和協議的特性,主要功能在于互動式地瀏覽和生成資料,生成動態Web內容。 - 把實作了S ......

    uj5u.com 2023-06-14 10:33:17 more
  • 消失的死鎖:從 JSF 執行緒池滿到 JVM 初始化原理剖析

    在一次上線時,按照正常流程上線后,觀察了線上報文、介面可用率十分鐘以上,未出現例外情況,結果在上線一小時后突然收到jsf執行緒池耗盡的報警,并且該應用一共有30臺機器,只有一臺機器出現該問題,迅速下線該機器的jsf介面,恢復線上。然后開始排查問題。 ......

    uj5u.com 2023-06-14 10:33:08 more
  • 驅動開發:內核ShellCode執行緒注入

    還記得`《驅動開發:內核LoadLibrary實作DLL注入》`中所使用的注入技術嗎,我們通過`RtlCreateUserThread`函式呼叫實作了注入DLL到應用層并執行,本章將繼續探索一個簡單的問題,如何注入`ShellCode`代碼實作反彈Shell,這里需要注意一般情況下`RtlCreat... ......

    uj5u.com 2023-06-14 10:32:59 more
  • servlet介紹和如何在idea中寫一個servlet程式

    ## 1. Servlet ### 1.1 Servlet簡介 Servlet(Server Applet)是Java Servlet的簡稱,稱為小服務程式或服務連接器,用Java撰寫的服務器端程式,具有獨立于平臺和協議的特性,主要功能在于互動式地瀏覽和生成資料,生成動態Web內容。 - 把實作了S ......

    uj5u.com 2023-06-14 10:31:48 more
  • 消失的死鎖:從 JSF 執行緒池滿到 JVM 初始化原理剖析

    在一次上線時,按照正常流程上線后,觀察了線上報文、介面可用率十分鐘以上,未出現例外情況,結果在上線一小時后突然收到jsf執行緒池耗盡的報警,并且該應用一共有30臺機器,只有一臺機器出現該問題,迅速下線該機器的jsf介面,恢復線上。然后開始排查問題。 ......

    uj5u.com 2023-06-14 10:31:40 more
  • 驅動開發:內核ShellCode執行緒注入

    還記得`《驅動開發:內核LoadLibrary實作DLL注入》`中所使用的注入技術嗎,我們通過`RtlCreateUserThread`函式呼叫實作了注入DLL到應用層并執行,本章將繼續探索一個簡單的問題,如何注入`ShellCode`代碼實作反彈Shell,這里需要注意一般情況下`RtlCreat... ......

    uj5u.com 2023-06-14 10:25:37 more
  • vs2022 wxWidgets

    # 下載 https://www.wxwidgets.org/downloads/ 下載壓縮包即可 ![image](https://img2023.cnblogs.com/blog/916065/202306/916065-20230614040303993-2082032985.png) # 編 ......

    uj5u.com 2023-06-14 08:34:12 more