前言
本篇文章會首先介紹MQ的概念,及核心概念及核心設計來理解出為什么需要訊息中間件,訊息中間間的作用,然后會接著介紹常用的activeMq的安裝,以及在spring中集成,一些比較常用的命令,
無論是在傳統還是互聯網中,都是協作的,系統之間有聯系,系統之間需要交流的方式分為兩種同步呼叫和異步呼叫,對應起來馬上回應和延時回應
對于同步和異步效果對比 上來說:
- 同步-串行:

下單過后 需要等待所有服務完成,才能完成資料
缺點:耗時比較長 180ms
- 異步-并行

利用執行緒池異步方式處理,降低處理時間
缺點是:高耦合、需要執行緒池管理, 寫訂單服務代碼 需要把所有的物流服務,,,都和訂單服務耦合起來
- 異步-MQ

對應起來mq系統做通知得作用,所有服務訂閱,通知需要得服務
最后達到異步解耦得效果
通過訊息中間件可以達到異步解耦所有系統得效果,
訊息中間件MQ
概述
MQ(Message Queue)訊息佇列,是基礎資料結構中“先進先出”的一種資料結構,一般用來解決應用解耦,異步訊息,流量削峰等問題,實作高性能,高可用,可伸縮和最終一致性架構,
特點:
- 利用 高效可靠得訊息傳遞機制 進行平臺無關的資料交流;
- 并基于資料通信來進行分布式系統的集成;
- 通過提供訊息傳遞和訊息排隊模型,它可以在分布式環境下擴展行程間通信;
訊息中間件的應用場景
跨系統資料傳遞、高并發流量削峰(緩沖)、資料異步處理、系統解耦,,,等等
常用的訊息中間件
ActiveMq、RabbitMq、Kafka、RoctetMq
對于message-Queue訊息中間件中的訊息對應的是資料物件,有可能是服務這些,
為什么要用Queue:也是來源于佇列的特性先進先出,不用堆疊這些結構,訊息是有順序,有時間先后的,
訊息中間件設計
本質
一種具備接收請求、保存資料、發送資料等功能的網路應用,和一般網路應用程式的區別是它負責資料的接收和傳遞,所以性能一般高于普通程式,
核心構成
協議、持久化機制、訊息分發機制、高可用設計、高可靠設計,
它因為需要接受保存發送資料,有了幾個核心機制, 所有的中間件都會包括這幾個構成,
協議
協議是計算機之間通信時共同遵從一組約定,都遵從相同的約定,計算機之間才能相互交流,是對資料格式和計算機之間交換資料時必須遵守的規則的正式描述,
協議三要素:
- 語法:即資料與控制資訊的結構或格式,
- 語意:即需要發出何種控制資訊,完成何種動作以及做出何種回應;
- 時序:即事件實作順序的詳細描述,
也就是計算機之間相互交流約定的術語,
協議三要素舉例:
語法: http規定了請求報文和回應報文的具體格式,
語意:客戶端主動發起的操作稱為請求;
時序:— 個請求對應— 個回應
至于在訊息中間件中不用http,大家肯定能想到,http的訊息頭也好,還是cookie等等欄位,太繁雜了,導致效率不高,而訊息中間件業務場景比較專一,所以不需要這種重協議,不用什么都考慮到,
AMQP協議
AMQP,即Advanced Message Queuing Protocol,一個提供統一訊息服務的應用層標準高級訊息佇列協議,是應用層協議的一個開放標準,為面向訊息的中間件設計,基于此協議的客戶端與訊息中間件可傳遞訊息,并不受客戶端/中間件不同產品,不同的開發語言等條件的限制,Erlang中的實作有RabbitMQ等,
特性:
事務支持、持久化支持、出生金融行業、在可靠訊息處理上具備天然的優勢,

MQTT協議
MQTT協議訊息佇列遙測傳輸是IBM開發的一個即時通訊協議,物聯網系統架構中的重要組成部分,
MQTT(訊息佇列遙測傳輸)是ISO 標準(ISO/IEC PRF 20922)下基于發布/訂閱范式的訊息協議,它作業在 TCP/IP協議族上,是為硬體性能低下的遠程設備以及網路狀況糟糕的情況下而設計的發布/訂閱型訊息協議,為此,它需要一個訊息中間件 ,
MQTT是一個基于客戶端-服務器的訊息發布/訂閱傳輸協議,MQTT協議是輕量、簡單、開放和易于實作的,這些特點使它適用范圍非常廣泛,在很多情況下,包括受限的環境中,如:機器與機器(M2M)通信和物聯網(IoT),其在,通過衛星鏈路通信傳感器、偶爾撥號的醫療設備、智能家居、及一些小型化設備中已廣泛使用,
特性:
輕量、結構簡單、傳輸快、沒有事務支持、沒有持久化相關設計
應用場景:
適用于計算能力有限、低帶寬、網路不穩定的場景,

Open message協議
Open message是近一兩年由阿里發起、與雅虎、滴滴出行、Streamlio等公司共同參與創立的分布式訊息中間件、流處理領域的應用開發標準,是國內首個在全球范圍內發起的分布式訊息領域國際標準,
特性:
結構簡單、決議快、有事務設計、有持久化設計

Kafka協議
kafka協議是基于TCP的二進制協議,訊息內部是通過長度來分隔,由一些基本資料型別組成,Kafka專有協議
特性
結構簡單、決議快、無事務設計、有持久化設計,

持久化
持久化是將程式資料在持久狀態和瞬時狀態間轉換的機制,通俗的講,就是瞬時資料(比如記憶體中的資料,是不能永久保存的)持久化為持久資料(比如持久化至資料庫中,能夠長久保存),

常用的持久化方式就兩種:
一是在磁盤中,二是在檔案系統中,

大部分訊息中間支持的是檔案系統中,在資料庫中支持比較重,并且繁雜
訊息分發
產生訊息分發策略的原因
也是在于 消費的方式 服務在去處理時,是多種多樣的

需要不同系統上進行消費,然后以及在業務場景二中,并不需要這么繁雜

需要一個確認機制,
常見的分發策略

- 發布訂閱: 發一次資料,被所有訂閱的服務都收到
- 輪詢分發: 發一條訊息,根據服務進行分發資料,有些是按權重進行輪詢
- 公平分發:一般是權重進行公平分發
- 重發:重發概念,需要有一個有一個訊息確認機制,來保證資料安全
- 訊息拉取:由服務決定,而不是進行分發,服務想要時去拉取
高可用
“高可用性”(High Availability)通常來描述一個系統經過專門的設計,從而減少停工時間,而保持其服務的高度可用性,
在這里描述的是指產品在規定的條件和規定的時刻或時間區間內處于可執行規定功能狀態的能力,當業務量大時,一臺訊息中間件服務器可能無法滿足需求,所以需要訊息中間件能夠部署集群,達到高可用的目的,
主從方案共享資料的方式

通過共享資料方式來保證資料的一致性,
Master-Slave主從同步部署方式

也是只能在主節點進行操作資料
Blocker-Cluster多主從集群同步部署方式

Blocker-Cluster多主從集群轉發部署方式

將資料進行分開,
高可靠
最主要資料不能丟失
高可靠性是指系統可以無故障地持續運行,比如— 個系統從來不崩潰、報錯,或者崩潰、
報錯的幾率較低,那就是高可靠,
在高并發業務場景下,如果不能保證系統的高可靠,那造成的損失將會非常嚴重,
- 保證訊息中間件的高可靠性,可以從以下幾方面考慮
- 訊息傳輸可靠:通過協議來保證系統間資料決議的正確性,
- 訊息存盤可靠:通過持久化來保證訊息的存盤可靠性
ActiveMQ
ActiveMQ是Apache出品的,比較經典的訊息中間件,
官網地址:activemq.apache.org
Apache ActiveMQ? is the most popular open source, multi-protocol, Java-based messaging
server. It supports industry standard protocols so users get the benefits of client choices across a
broad range of languages and platforms. Connectivity from C, C++, Python, .Net, and more is
available. Integrate your multi-platform applications using the ubiquitous AMQP protocol.
Exchange messages between your web applications using STOMP over websockets. Manage your
IoT devices using MQTT. Support your existing JMS infrastructure and beyond. ActiveMQ offers
the power and flexibility to support any messaging use-case.
- 用AMQP工業標準協議進行多平臺應用集成;
- Web應用可基于websocket用STOMP協議與ActiveMQ直接互動
- 物聯網設備用MQTT協議
- 基于JMS的已有基礎設施也支持
- 還有更多…
ActiveMQ Artemis 是高性能目標為非阻塞的版本,基于事件驅動,這里介紹是 ActiveMQ 5 "Classic" 經典版本
ActiveMQ作為一個老牌的訊息中間件,其提供了詳細的官方檔案,
active mq官方檔案
在官方檔案中
包含了一些學習目錄檔案,包含了使用


以及包含了 特性 以及使用說明等等
ActiveMQ常用的應用場景
- 用AMQP工業標準協議進行多平臺應用集成;
- Web應用可基于websocket的stomp協議與ActiveMQ直接互動,
- 物聯網設備MQTT協議
- 基于JMS的已有基礎設施也支持
- 還有一些更多
ActiveMQ安裝
ActiveMQ是用JAVA開發的,跨平臺的,開箱即可使用
- 虛擬機軟體:Oracle VM VirtualBox 下載地址:https://www.virtualbox.org/wiki/Downloads
- Linux: Centos 7 CentOS-7-x86_64-Minimal-1810.iso 阿里云鏡像下載地址:https://mirrors.aliyun.com/centos/7.6.1810/isos/x86_64/
- Jdk 8 jdk-8u221-linux-x64.rpm
安裝包對應下載:https://activemq.apache.org/components/classic/download/

也可在linux機器上直接下載:
wget –c http://mirror.bit.edu.cn/apache/activemq/5.15.9/apache-activemq-5.15.9- bin.tar.gz
安裝
- 創建安裝目錄
mkdir /usr/activemq
- 解壓安裝包到安裝目錄
tar -zxvf apache-activemq-5.15.9-bin.tar.gz -C /usr/activemq
- 為方便配置時書寫,創建軟鏈接
ln -s /usr/activemq/apache-activemq-5.15.9 /usr/activemq/latest
- 熟悉activemq的目錄構成:
cd /usr/activemq/latest/bin
./activemq console
./activemq start
INFO: Loading '/usr/activemq/apache-activemq-5.15.9//bin/env' INFO: Using java '/usr/bin/java' INFO: Starting - inspect logfiles specified in logging.properties and log4j.properties to get details INFO: pidfile created : '/usr/activemq/apache-activemq- 5.15.9//data/activemq.pid' (pid '14887')
Apache ActiveMQ 5.15.9 (localhost, ID:ntbk11111-50816-1428933306116-0:1) started | org.apache.activemq.broker.BrokerService | main
[root@localhost latest]# jps 25778 activemq.jar 25805 Jps
./activemq stop
./activemq
瀏覽用法說明資訊
#Web管理埠默認為8161,通訊埠默認為61616
firewall-cmd --zone=public --add-port=8161/tcp --permanent
firewall-cmd --zone=public --add-port=61616/tcp --permanent
systemctl restart firewalld.service
systemctl stop firewalld
systemctl disable firewalld
- 以普通用戶activemq 身份來運行
useradd activemq
chown -R activemq:users /usr/activemq
- 創建全域默認的組態檔,并配置activemq
cp /usr/activemq/latest/bin/env /etc/default/activemq
sed -i '~s/^ACTIVEMQ\_USER=""/ACTIVEMQ\_USER="activemq"/'
/etc/default/activemq
vim /etc/default/activemq
# Active MQ installation dirs # ACTIVEMQ_HOME="<Installationdir>/"
# ACTIVEMQ_BASE="$ACTIVEMQ_HOME" # ACTIVEMQ_CONF="$ACTIVEMQ_BASE/conf"
# ACTIVEMQ_DATA="$ACTIVEMQ_BASE/data" # ACTIVEMQ_TMP="$ACTIVEMQ_BASE/tmp"
# Set jvm memory configuration (minimal/maximum amount of memory) ACTIVEMQ_OPTS_MEMORY="-Xms64M -Xmx1G"
chmod 644 /etc/default/activemq
-
安裝啟動腳本
ln -snf /usr/activemq/latest/bin/activemq /etc/init.d/activemq
-
激活啟動服務
# RHEL
chkconfig --add activemq
chkconfig activemq on
systemctl enable activemq
- 手動啟動服務
systemctl start activemq
activeMQ管理臺
自帶的管理臺,在瀏覽器中訪問http://服務IP:8161/admin即可進入
- broker 展示的實體 MQ的資訊

- queue 佇列中資料資訊 ,包含入隊 訊息,出隊資訊,以及 消費者資訊
- topics 發布訂閱的主題資訊

- 訂閱者 subscribers

以及后面的訂閱者網路連接等等,都是需要使用到的
send a jms message 控制臺
ActiveMQ 配置

這里面包含 webapps 和組態檔,以及 data 日志檔案 以及資料檔案地,
組態檔
web服務控制臺 的 組態檔 jetty.xml

- 打開有埠號等的配置,沿用spring的配置,包含安全配置,埠號配置


- real.properties 配置對應的用戶資訊

以及 group.properties 對于訊息的配置 , log4j.properties 配置日志 檔案,
ActiveMQ使用
直接添加maven依賴即可,其實作在基本的使用方式都是采用這種方式,從而達到引入 引入activemq-all.jar 的目的
<dependency>
<groupId>org.apache.activemq</groupId>
<artifactId>activemq-all</artifactId>
<version>5.15.9</version>
</dependency>
使用方式
創建一個消費者,進行 請求消費資料,這是最原始的,但我們一般不使用這種方式
try {
// brokerURL
// http://activemq.apache.org/connection-configuration-uri.html
// 1、創建連接工廠
connectionFactory = new ActiveMQConnectionFactory(this.brokerUrl);
// 2、創建連接物件
conn = connectionFactory.createConnection(); conn.start();
// 一定要啟動
// 3、創建會話(可以創建一個或者多個session)
session = conn.createSession(false, Session.AUTO_ACKNOWLEDGE);
// 4、創建訊息消費目標(Topic or Queue)
Destination destination = session.createQueue(destinationUrl);
// 5、創建訊息消費者 http://activemq.apache.org/destination- options.html
consumer = session.createConsumer(destination);
// 6、接收訊息(沒有訊息就持續等待)
Message message = consumer.receive();
if (message instanceof TextMessage) {
System.out.println("收到文本訊息:" + ((TextMessage) message).getText());
} else {
System.out.println(message);
}consumer.close();
session.close();
conn.close();
以及生產者
public class Producer {
public static void main(String[] args) {
new ProducerThread("tcp://mq.study.com:61616", "queue1").start();
new ProducerThread("tcp://mq.study.com:61616", "queue1").start();
}
static class ProducerThread extends Thread {
String brokerUrl;
String destinationUrl;
public ProducerThread(String brokerUrl, String destinationUrl) {
this.brokerUrl = brokerUrl;
this.destinationUrl = destinationUrl;
}
@Override
public void run() {
ActiveMQConnectionFactory connectionFactory;
Connection conn;
Session session;
try {
// 1、創建連接工廠
connectionFactory = new ActiveMQConnectionFactory(brokerUrl);
connectionFactory.setUseAsyncSend(true);
// 2、創建連接
conn = connectionFactory.createConnection();
conn.start(); // 一定要start
// 3、創建會話(可以創建一個或者多個session)
session = conn.createSession(false, Session.AUTO_ACKNOWLEDGE);
// 4、創建訊息發送目標 (Topic or Queue)
Destination destination = session.createQueue(destinationUrl);
// 5、用目的地創建訊息生產者
MessageProducer producer = session.createProducer(destination);
// 設定遞送模式(持久化 / 不持久化)
producer.setDeliveryMode(DeliveryMode.PERSISTENT);
// 6、創建一條文本訊息
String text = "Hello world! From: " + Thread.currentThread().getName() + " : "
+ System.currentTimeMillis();
TextMessage message = session.createTextMessage(text);
// 7、通過producer 發送訊息
System.out.println("Sent message: " + text);
CountDownLatch cdl = new CountDownLatch(1);
((ActiveMQMessageProducer) producer).send(message, new AsyncCallback() {
@Override
public void onException(JMSException exception) {
// TODO Auto-generated method stub
}
@Override
public void onSuccess() {
try {
System.out.println(Thread.currentThread().getName() + " 異步發送完成:messageId: "
+ message.getJMSMessageID() + " " + text);
} catch (JMSException e) {
}
}
});
// cdl.await();
// 8、 清理、關閉連接
session.close();
conn.close();
} catch (JMSException e) {
e.printStackTrace();
}
}
}
}
在topic服務器上,會將訊息單獨發送 ,為每個消費者都維護了一個佇列,

在使用程序中
- 在發布訂閱模式下,如果在訂閱在發布之后,不應該收到訊息,
- 持久訂閱 MQ登記有名的訂閱,消費者 掛了,會持久等待
spring boot 中使用ActiveMQ
參考包中
<dependency>
<groupId>org.springframework.boot</groupId>
<artifactId>spring-boot-starter-activemq</artifactId>
</dependency>
配置activemq broker連接引數 (application.yml)
spring:
activemq:
broker-url: tcp://mq.study.com:61616
#user: admin
#password: secret
使用方式很簡單
@Autowired
private JmsTemplate jmsTemplate;
@PostConstruct
public void sendMessage() {
// Send a message with a POJO - the template reuse the message converter
System.out.println("Sending an email message.");
jmsTemplate.convertAndSend("mailbox", new Email("info@example.com", "Hello"));
}
public static void main(String[] args) {
SpringApplication.run(Producer.class, args);
}
轉載請註明出處,本文鏈接:https://www.uj5u.com/qita/333623.html
標籤:其他
