前言:MQ做應用解耦,流量削峰 這些是常識,RabbitMQ是實作了高級訊息佇列協議(AMQP)的開源訊息代理軟體(亦稱面向訊息的中間件),RabbitMQ服務器是用Erlang語言撰寫的,而集群和故障轉移是構建在開放電信平臺框架上的,所有主要的編程語言均有與代理介面通訊的客戶端庫
常用的主流的MQ有四個
ActiveMQ:
Apache下的一個子專案,使用Java完全支持JMS1.1和J2EE 1.4規范的 JMS Provider實作,少量代碼就可以高效地實作高級應用場景,可插拔的傳輸協議支持,比如:in-VM, TCP, SSL, NIO, UDP, multicast, JGroups and JXTA transports,ActiveMQ支持常用的多種語言客戶端 C++、Java、.Net,、Python、 Php、 Ruby等,
Kafka:
Apache下的一個子專案,使用scala實作的一個高性能分布式Publish/Subscribe訊息佇列系統,具有以下特性:
快速持久化:通過磁盤順序讀寫與零拷貝機制,可以在O(1)的系統開銷下進行訊息持久化;
高吞吐:在一臺普通的服務器上既可以達到10W/s的吞吐速率;
高堆積:支持topic下消費者較長時間離線,訊息堆積量大;
完全的分布式系統:Broker、Producer、Consumer都原生自動支持分布式,依賴zookeeper自動實作復雜均衡;
支持Hadoop資料并行加載:對于像Hadoop的一樣的日志資料和離線分析系統,但又要求實時處理的限制,這是一個可行的解決方案,
RocketMQ:
阿里系下開源的一款分布式、佇列模型的訊息中間件,原名Metaq,3.0版本名稱改為RocketMQ,是阿里參照kafka設計思想使用java實作的一套mq,同時將阿里系內部多款mq產品(Notify、metaq)進行整合,只維護核心功能,去除了所有其他運行時依賴,保證核心功能最簡化,在此基礎上配合阿里上述其他開源產品實作不同場景下mq的架構,目前主要多用于訂單交易系統,
具有以下特點:
能夠保證嚴格的訊息順序
提供針對訊息的過濾功能
提供豐富的訊息拉取模式
高效的訂閱者水平擴展能力
實時的訊息訂閱機制
億級訊息堆積能力
RabbitMQ:
使用Erlang撰寫的一個開源的訊息佇列,本身支持很多的協議:AMQP,XMPP, SMTP,STOMP,也正是如此,使的它變的非常重量級,更適合于企業級的開發,同時實作了Broker架構,核心思想是生產者不會將訊息直接發送給佇列,訊息在發送給客戶端時先在中心佇列排隊,對路由(Routing),負載均衡(Load balance)、資料持久化都有很好的支持,多用于進行企業級的ESB整合,
RabbitMQ支持AMQP協議:
本次是是使用RabbitMQ
安裝兩種方式 Linux 和win
第一種:
我把Erlang的安裝包和rabbitmq都放到網盤了下載不下來的自提
網盤:https://pan.baidu.com/s/17mwHs3mupk16VhMkTTicrg
密碼:sbl1
安裝需要Erlang語言
下載地址:
https://www.erlang.org/downloads
下載不來去git下載
github:https://github.com/erlang/otp/releases/tag/OTP-23.2.3
win64位

安裝





rabbitmq 安裝
下載地址
https://github.com/rabbitmq/rabbitmq-server/releases/tag/v3.8.12-beta.1

安裝一路下一步


安裝好之后到安裝sbin的目錄下 打開cmd命名 安裝可視化插件

命令
rabbitmq-plugins enable rabbitmq_management


安裝之后
本地啟動:
http://localhost:15672/
可視化頁面的埠默認就是:15672
操作埠是:5672

賬號:guest
密碼:guest

第二種Linux安裝 大致跟win一致
我個人就用Docker了安裝簡單比較快:
拉取鏡像 指定版本,該版本包含了web控制頁面
docker pull rabbitmq:management

啟動鏡像:
docker run -d --hostname my-rabbit --name rabbit -p 15672:15672 -p 5672:5672 rabbitmq:management
查看容器

賬號密碼都是默認:guest

頁面概要:
overview
Ready:待消費的訊息總數,
Unacked:待應答的訊息總數,
Total:總數 Ready+Unacked,
所有佇列的消費情況,速率=(num1-num0)/(s1-s0) num1:s1時刻的個數,num0:s0時刻的個數,
Publish:producter pub訊息的速率,
Publisher confirm:broker確認pub訊息的速率,
Deliver(manual ack):customer手動確認的速率,
Deliver( auto ack):customer自動確認的速率,
Consumer ack:customer正在確認的速率,
Redelivered:正在傳遞’redelivered’標志集的訊息的速率,
Get (manual ack):回應basic.get而要求確認的訊息的傳輸速率,
Get (auto ack):回應于basic.get而發送不需要確認的訊息的速率,
Return:將basic.return發送給producter的速率,
Disk read:queue從磁盤讀取訊息的速率,
Disk write:queue從磁盤寫入訊息的速率,

Connections
Virtual host:所屬的虛擬主機,
Name:名稱,
User name:使用的用戶名,
State:當前的狀態,running:運行中;idle:空閑,
SSL/TLS:是否使用ssl進行連接,
Protocol:使用的協議,
Channels:創建的channel的總數,
From client:每秒發出的資料包,
To client:每秒收到的資料包,

Channels
channel:名稱,
Virtual host:所屬的虛擬主機,
User name:使用的用戶名,
Mode:渠道保證模式, 可以是以下之一,或者不是:C: confirm,T:transactional(事務),
State :當前的狀態,running:運行中;idle:空閑,
Unconfirmed:待confirm的訊息總數,
Prefetch:設定的prefetch的個數,
Unacker:待ack的訊息總數,
publish:producter pub訊息的速率,
confirm:producter confirm訊息的速率,
deliver/get:consumer 獲取訊息的速率,
ack:consumer ack訊息的速率,

Exchanges
Virtual host:所屬的虛擬主機,
Name:名稱,
Type:exchange type,具體的type可以查看RabbitMq系列之一:基礎概念,
Features:功能, 可以是以下之一,或者不是:D: 持久化,T:Internal,存在改功能表示這個exchange不可以被client用來推送訊息,僅用來進行exchange和exchange之間的系結,否則可以推送訊息也可以系結,
Message rate in:訊息進入的速率,
Message rate out:訊息出去的速率,

Queues
Virtual host:所屬的虛擬主機,
Name:名稱,
Features:功能, 可以是以下之一,或者不是:D: 持久化,
State:當前的狀態,running:運行中;idle:空閑,
Ready:待消費的訊息總數,
Unacked:待應答的訊息總數,
Total:總數 Ready+Unacked,
incoming:訊息進入的速率,
deliver/get:訊息獲取的速率,
ack:訊息應答的速率,

Admin
Name:名稱,
Tags:角色標簽,只能選取一個,
Can access virtual hosts:允許進入的vhost,
Has password:設定了密碼,
administrator (超級管理員)
可登陸管理控制臺(啟用management plugin的情況下),可查看所有的資訊,并且可以對用戶,策略(policy)進行操作,
monitoring(監控者)
可登陸管理控制臺(啟用management plugin的情況下),同時可以查看rabbitmq節點的相關資訊(行程數,記憶體使用情況,磁盤使用情況等)
policymaker(策略制定者)
可登陸管理控制臺(啟用management plugin的情況下), 同時可以對policy進行管理,
management(普通管理者)
僅可登陸管理控制臺(啟用management plugin的情況下),無法看到節點資訊,也無法對策略進行管理,
none(其他)
無法登陸管理控制臺,通常就是普通的生產者和消費者,

創建交換器
direct :點對點
direct :比如是Exchanges 交換器是 名字test.direct 的也是路由的key,如果和Binding中的Binding的名字一樣的話,訊息就會發送到對應的對應的佇列中,路由的鍵與佇列的名字完全一致,就是完全匹配,如果一個佇列系結的到交換機要求鍵為test.direct 的話,那交換器只轉發名字為test.direct 的訊息別的佇列不會受到 ,別的是接受不到了 這是單波模式

fanout:廣播模式
fanout:每個發到fanout型別交換器的訊息都會分到所有的系結的對列上去,fanout交換器不處路由鍵,只是簡單的將佇列系結到交換器上,每個發發送到交換器的訊息都會被轉發到與該交換器系結的所有佇列上,fanout發送訊息是最快的

topic:主題模式
topic:topic交換器通過模式匹配分配訊息的路由鍵屬性,將路由鍵和某個模式進行匹配,此時佇列需要系結到一個模式上,它將路由鍵和系結鍵的字串切分成單詞,這些單詞之間用點隔開它同樣也會識別兩個通配符:符號“#和符號*”,#匹配個或多個單詞,*匹配一個單詞,

交換機添加好 創建訊息佇列
Queues
穿件訊息前先了解一下Exchanges和Binding的角色
首先生產者把訊息發送到Exchanges上訊息最終訊息到佇列唄消費者消費,而Binding決定交換器的訊息應該發送給那個佇列
創建佇列

我這里創建了4個佇列分是

接下來勁行系結
Exchanges交換機系結

測驗direct 系結可四個佇列

測驗 發送訊息

結果:只有一個符合

test.fanout:也系結這四個

測驗:發送訊息

結果:四個同事都被滿足了

test.topic:系結規則

測驗:

結果

測驗

結果:

和專案結合:
使用boot
pom依賴
<dependency>
<groupId>org.springframework.boot</groupId>
<artifactId>spring-boot-starter-amqp</artifactId>
</dependency>
yml
現在代碼操作的埠就是5672了不是15672切記
server:
port: 8081
spring:
rabbitmq:
host: localhost
username: guest
password: guest
port: 5672
當然要在主啟動類加上rabbitmq開啟注解
@EnableRabbit
@SpringBootApplication
@EnableRabbit
public class SpringBootRabbitmq {
public static void main(String[] args) {
SpringApplication.run(SpringBootRabbitmq.class,args);
}
}
使用rabbitmq的話要對訊息序列化不然可能會亂編碼 需要一個RabbitmqConfig 解決
@Configuration
public class RabbitmqConfig {
//Rabbitmq json序列化
@Bean
public MessageConverter messageConverter(){
return new Jackson2JsonMessageConverter();
}
}
rabbitmq封裝了一個工具類RabbitTemplate
點進去就可以看到這個方法,這個就經常用的發送訊息的方法

我就用點對點測驗
public void test(User user){
rabbitTemplate.convertAndSend("test.direct","mrtang",user);
}

接受:
public void test1(){
Object test = rabbitTemplate.receiveAndConvert("mrtang");
System.out.println(test);
}
當然一注解就可以搞定的
queues=“路由規則” 就可以實時接受資訊了 客戶端發送,消費端就可以拿到訊息了
@RabbitListener(queues = "mrtang")

創建廣播模式
路由不用寫 因為他系結的全有佇列都能收到,接受和單點一樣
public void testFanout(User user){
rabbitTemplate.convertAndSend("test.fanout","",user);
}
創建交換器和佇列代碼創建
AmqpAdmin創建 交換器 路由系結 和佇列
@Autowired
private AmqpAdmin amqpAdmin;
創建Exchange規則
public void exchanges(){
//創建單點
amqpAdmin.declareExchange(new DirectExchange("directExchange"));
//創建廣播
amqpAdmin.declareExchange(new FanoutExchange("fanoutExchange"));
//創建 主題
amqpAdmin.declareExchange(new TopicExchange("topicExchange"));
}
創建佇列
public void queue(){
//創建佇列 testQueue名稱 true 持久化
amqpAdmin.declareQueue(new Queue("testQueue",true));
}
系結
public void binding(){
//系結規則
/**
* testQueue 佇列名稱
* Binding.DestinationType.QUEUE 是系結佇列型別
* fanoutExchange 交換器 名稱
* testQueue 路由key
*/
amqpAdmin.declareBinding(new Binding("testQueue",Binding.DestinationType.QUEUE,"fanoutExchange","testQueue",null));
}
轉載請註明出處,本文鏈接:https://www.uj5u.com/ruanti/256844.html
標籤:其他
