1.activeMQ的主要功能
- 實作高可用、高伸縮、高性能、易用和安全的企業級面向訊息服務的系統
- 異步訊息的消費和處理
- 控制訊息的消費順序
- 可以和Spring/springBoot整合簡化編碼
- 配置集群容錯的MQ集群
2.activeMQ安裝
下載地址:http://activemq.apache.org/components/classic/download/
這里筆者是下載的linux版的:

因為activeMQ底層是使用java撰寫的,所以需要安裝jdk,這個請移步我之前的博客:
https://www.cnblogs.com/pluto-charon/p/11746636.html
安裝activeMq:
# 安裝apache
[root@localhost ~]# yum install ttpd
# 下載的apache-activemq并上傳到linux的home下,解壓
[root@localhost home]# tar -zxvf apache-activemq-5.16.0-bin.tar.gz
# 進入到bin目錄下
[root@localhost home]# cd /apache-activemq-5.16.0/bin
# 啟動
[root@localhost bin]# ./activemq start
INFO: Loading '/home/apache-activemq-5.16.0//bin/env'
INFO: Using java '/usr/local/java/jdk1.8.0_20//bin/java'
INFO: Starting - inspect logfiles specified in logging.properties and log4j.properties to get details
INFO: pidfile created : '/home/apache-activemq-5.16.0//data/activemq.pid' (pid '7517')
# activemq的默認埠是61616,查看是否啟動的三種方式
# 第一種
[root@localhost bin]# ps -ef |grep activemq
# 第二種
[root@localhost bin]# netstat -ano|grep 61616
tcp6 0 0 :::61616 :::* LISTEN off (0.00/0/0)
# 第三種
[root@localhost bin]# lsof -i:61616
COMMAND PID USER FD TYPE DEVICE SIZE/OFF NODE NAME
java 7517 root 132u IPv6 39926 0t0 TCP *:61616 (LISTEN)
# 帶日志的啟動方式
[root@localhost bin]# ./activemq start > /home/apache-activemq-5.16.0/myrunmq.log
[root@localhost bin]# cd ..
# 可以看到,啟動日志都已經記錄到日志里了
[root@localhost apache-activemq-5.16.0]# cat myrunmq.log
INFO: Loading '/home/apache-activemq-5.16.0//bin/env'
INFO: Using java '/usr/local/java/jdk1.8.0_20//bin/java'
INFO: Starting - inspect logfiles specified in logging.properties and log4j.properties to get details
INFO: pidfile created : '/home/apache-activemq-5.16.0//data/activemq.pid' (pid '7787')
# 關閉activemq
[root@localhost bin]# ./activemq stop
前臺訪問的埠是8161,在查看前臺時,要關閉linux和windows的防火墻:
# 關閉linux防火墻
[root@localhost apache-activemq-5.16.0]# systemctl stop firewalld
在訪問之前,需要修改conf目錄下的jetty.xml,將下面的host修改成自己的ip,以及修改用戶名和密碼,
<bean id="jettyPort" init-method="start">
<!-- the default port number for the web console -->
<property name="host" value="https://www.cnblogs.com/pluto-charon/p/127.0.0.1"/>
<property name="port" value="https://www.cnblogs.com/pluto-charon/p/8161"/>
</bean>
# 用戶名和密碼可修改可不修改,默認為admin/admin
<bean id="securityConstraint" >
<property name="name" value="https://www.cnblogs.com/pluto-charon/p/BASIC" />
<property name="roles" value="https://www.cnblogs.com/pluto-charon/p/user,admin" />
<!-- set authenticate=false to disable login -->
<property name="authenticate" value="https://www.cnblogs.com/pluto-charon/p/true" />
</bean>
修改完成之后重啟activemq
[root@localhost bin]# ./activemq restart
查看,地址為192.168.189.150:8161

到這里就說明activemq安裝成功了,
3.JMS
JMS(java message service)是一個用于提供訊息服務的技術規范,他制定了在整個訊息服務提供程序中的所有資料結構和互動流程,當兩個程式使用jms進行通信時,他們并不是直接相連的,而是通過一個共同的訊息收發服務連接起來的,達到解耦的效果,jms為標準訊息協議和訊息服務提供了一組通用的介面,包括創建、發送、讀取訊息等,

1 JMS的優勢:
異步:客戶端不用發送請求,JMS自動將訊息發送給客戶端
可靠:JMS保證訊息只傳遞一次
2.JMS的四大組件:
-
JMS provider:實作了jms介面和規范的訊息中間件
-
JMS producer:訊息生產者,創建和發送JMS訊息的客戶端應用
-
JMS consumer:訊息消費者,接受和處理JMS訊息的客戶端應用
-
JMS message:由訊息頭、訊息屬性、訊息體組成
訊息頭(在send方法之前,通過setXXX()設定):
JMSDestination:訊息發送的目的地,主要是指Queue(點對點傳送模型)和Topic(發布訂閱模型)
JMSDeliverMode:訊息是否持久
JMSExpiration:設定訊息過期時間
JMSPriority:訊息優先級,0-4被稱為普通訊息,5-9是加急訊息,默認為4
JMSMessageID:唯一識別每個訊息的標識,由MQ產者或者自己設定
訊息屬性:除訊息頭以外的值,如識別,去重,重點標注等方法,如textMessage.setStringProperty("c1","VIP");
訊息體:
TextMessage:普通字串
MapMessage:map型別,其中key為String型別,而值為java的基本型別
BytesMessage:二進制陣列訊息
StreamMessage:java資料流訊息,用個標準流來順序填充和讀取
ObjectMessage:物件訊息,包含一個可序列化的java物件
3.JMS的傳送模型:
-
點對點訊息傳送模型:應用程式由訊息佇列、發送者、接收者組成,每個訊息發送給一個特殊的訊息佇列,該佇列保存了所有發送給它的訊息,處理消費掉的和已過期的訊息
點對點訊息傳送的特性:
1.每個訊息只有一個接收者
2.訊息發送者和接收者沒有時間依賴性
3.當訊息發送者發送訊息時,無論接收者程式在不在運行,都能發送訊息
4.當接收者收到訊息時,會發送確認收到通知
-
發布訂閱訊息傳遞模型:發布者發布一個訊息,該訊息通過topic傳遞給所有訂閱的客戶端,發布者和訂閱者彼此不知道對方,是匿名的且可以動態發布和訊息訂閱,
發布訂閱訊息傳遞的特性:
1.一個訊息可以傳遞給多個訂閱者
2.發布者和訂閱者有時間依賴性
3.為了緩和嚴格的時間相關性,JMS允許訂閱者創建一個可持久化的訂閱
4.生產者代碼實作
1.引入jar包
<dependency>
<groupId>org.apache.activemq</groupId>
<artifactId>activemq-all</artifactId>
<version>5.16.0</version>
</dependency>
2.生產者代碼
package activemq;
import org.apache.activemq.ActiveMQConnectionFactory;
import javax.jms.Connection;
import javax.jms.JMSException;
import javax.jms.MessageProducer;
import javax.jms.Queue;
import javax.jms.Session;
import javax.jms.TextMessage;
/**
* @className: Jmsproducer
* @description: activemq生產者
* @author: charon
* @create: 2020-12-27 22:36
*/
public class JmsProducer {
/** 宣告activemq的地址 */
private static final String ACTIVEMQ_URL = "tcp://192.168.189.150:61616";
/** 佇列名 */
private static final String QUEUE_NAME = "queue01";
/**
* @param args 引數
*/
public static void main(String[] args) throws JMSException {
// 創建連接工廠
ActiveMQConnectionFactory activeMQConnectionFactory = new ActiveMQConnectionFactory(ACTIVEMQ_URL);
// 獲得連接
Connection conn = activeMQConnectionFactory.createConnection();
conn.start();
// 創建會話
Session session = conn.createSession(false,Session.AUTO_ACKNOWLEDGE);
// 創建佇列
Queue queue = session.createQueue(QUEUE_NAME);
// 創建訊息的生產者
MessageProducer messageProducer = session.createProducer(queue);
// 創建訊息
for (int i = 0; i < 5; i++) {
// 訊息體
TextMessage textMessage = session.createTextMessage("textMessage:第【 "+i+" 】條訊息");
// 訊息頭
// textMessage.setJMSDeliveryMode(DeliveryMode.NON_PERSISTENT));
// 訊息屬性
// textMessage.setStringProperty("c1","VIP");
messageProducer.send(textMessage);
}
// 關閉資源
messageProducer.close();
session.close();
conn.close();
}
}
運行代碼在瀏覽器上查看,可以看到queue01里面有5條訊息:

- Number Of Pending Messages:等待消費的訊息 這個是當前未出佇列的數量,可以理解為總接收數-總出佇列數
- Number Of Consumers:消費者的數量
- Messages Enqueued:進入佇列的訊息 進入佇列的總數量,包括出佇列的, 這個數量只增不減
- Messages Dequeued:出了佇列的訊息 可以理解為是消費這消費掉的數量
5.消費者代碼實作
package activemq;
import org.apache.activemq.ActiveMQConnectionFactory;
import javax.jms.Connection;
import javax.jms.JMSException;
import javax.jms.Message;
import javax.jms.MessageConsumer;
import javax.jms.MessageListener;
import javax.jms.Queue;
import javax.jms.Session;
import javax.jms.TextMessage;
import java.io.IOException;
/**
* @className: JmsConsumer
* @description: activeMq的消費者
* @author: charon
* @create: 2020-12-28 08:10
*/
public class JmsConsumer {
/** 宣告activemq的地址 */
private static final String ACTIVEMQ_URL = "tcp://192.168.189.150:61616";
/** 佇列名 */
private static final String QUEUE_NAME = "queue01";
public static void main(String[] args) throws JMSException, IOException {
// 創建連接工廠
ActiveMQConnectionFactory activeMQConnectionFactory = new ActiveMQConnectionFactory(ACTIVEMQ_URL);
// 獲得連接
Connection conn = activeMQConnectionFactory.createConnection();
conn.start();
// 創建會話
Session session = conn.createSession(false,Session.AUTO_ACKNOWLEDGE);
// 創建佇列
Queue queue = session.createQueue(QUEUE_NAME);
// 創建訊息的生產者
MessageConsumer messageConsumer = session.createConsumer(queue);
// 同步方式,生產環境并不適用,這種方式將阻塞知道獲得并回傳第一條訊息
// while (true){
// TextMessage textMessage =(TextMessage) messageConsumer.receive();
// if(null!=textMessage){
// System.out.println("---消費者收到訊息:"+textMessage.getText());
// }else{
// break;
// }
// }
// 異步方式,創建監聽,在又訊息到達時,呼叫listener的onMessage方法,
messageConsumer.setMessageListener(new MessageListener() {
@Override
public void onMessage(Message message) {
if(message != null && message instanceof TextMessage){
TextMessage textMessage = (TextMessage) message;
System.out.println("--消費者接受到訊息:"+textMessage);
}
}
});
System.in.read();
// 關閉資源
messageConsumer.close();
session.close();
conn.close();
}
}
運行消費者的代碼,應該我上面生產者的代碼運行了兩次,所以訊息有10條,

6.activeMQ集群搭建
在這里,筆者使用的基于Zookeeper+levelDb搭建的activeMq集群,為了避免單點故障,使用一主兩從的架構,使用Zookeeper集群注冊所有的ActiveMQ Broker但只有其中一個Broker可以提供服務,它被視為master,也就是說如果master因為故障而不能提供服務,Zookeeper會從SLave中選舉出一個Broker充當master,
我這邊的zookeeper集群已經搭建好了,150和151是follower,152是leader,
# 每臺服務器上安裝activeMq,同時在集群環境下,activemq的jetty.xml檔案重的host要改成0.0.0.0
# 修改activeMq.xml,注釋掉kahadb這個配置,actviemq默認的是kahadb,并且添加leveldb
[root@localhost conf]# vi activemq.xml
<!-- <persistenceAdapter>
<kahaDB directory="${activemq.data}/kahadb"/>
</persistenceAdapter> -->
<persistenceAdapter>
<replicatedLevelDB
directory="${activemq.data}/leveldb"
replicas="3"
<!--實體間的通信地址-->
bind="tcp://0.0.0.0:62222"
<!--zookeeper的地址-->
zkAddress="192.168.189.150:2181,192.168.189.151:2181,192.168.189.152:2181"
<!--修改為每個服務器的節點的ip-->
hostname="192.168.189.152"
sync="local_disk"
zkPath="/activemq/leveldb-stores"/>
</persistenceAdapter>
# 啟動三個節點的activemq
[root@localhost bin]# ./activemq restart
# 查看 連接zookeeper客戶端
[root@localhost bin]# zkCli.sh
[zk: localhost(CONNECTED) 1] ls /activemq/leveldb-stores
[00000000022, 00000000020, 00000000021]
# 訪問
[zk: 192.168.189.150(CONNECTED) 3] get /activemq/leveldb-stores/00000000020
{"id":"localhost","container":null,"address":"tcp://192.168.189.150:62222","position":-1,"weight":1,"elected":"0000000020"}
[zk: 192.168.189.150(CONNECTED) 4] get /activemq/leveldb-stores/00000000021
{"id":"localhost","container":null,"address":null,"position":-1,"weight":1,"elected":null}
[zk: 192.168.189.150(CONNECTED) 5] get /activemq/leveldb-stores/00000000022
{"id":"localhost","container":null,"address":null,"position":-1,"weight":1,"elected":null}
從上面可以看到,只有00000000020這個幾點的elected里面有值,表明它被選舉為master節點了,
在瀏覽器上依次訪問:192.168.189.150:8161 , 192.168.189.151:8161,192.168.189.152:8161
只有192.168.189.150:8161可以訪問成功,因為只有master節點可以對外提供訪問,所以只有一個節點能訪問到,那么它就是master節點,
第二種查看的方式:
查看activemq的日志,最后一行,可以看到,MasterLevelDBStore即為master節點,SlaveLevelDBStore即為slave節點,


第三種查看的方式為使用zookeeper的可視化工具,
由于activeMq集群是基于zookeeper集群實作的,所以要注意一下三點:
- activeMQ的客戶端只能訪問master的Broker,其它處于Slave的Broker不能訪問,所以客戶端連接的Broker應該使用failover協議
- 當一個activeMQ節點掛掉或者一個Zookeeper節點掛掉,activeMQ服務正常運轉,但是如果僅剩一個activeMQ節點,由于不能選舉Master,所以activeMQ不能正常運行;(一個就不成集群了)
- 同理,如果Zookeeper僅剩一個節點是活動的,不管activeMQ是都存活或者說不管activeMQ個節點是否存活,activeMQ不能正常提供服務,必須依賴于Zookeeper集群服務,
7.集群代碼實作
集群的代碼和上面單機的代碼大致是一直的,就只需要修改一個activemq的地址,
/** 宣告集群中activemq的地址,使用failover協議,隨機 */
private static final String ACTIVEMQ_URL = "failover:(tcp://192.168.189.150:61616,tcp://192.168.189.151:61616,tcp://192.168.189.152:61616)?Randomize=false";
8.activemq的高級特性
1.訊息發送方式
默認情況下,非持久化的訊息是異步發送的,持久化的訊息是同步發送的,但是在開啟事務的情況下,訊息都是異步發送的,效率會有2個數量級的提升,所以在發送持久化訊息時,請開啟事務模式,
2.儲存機制
在通常情況下,非持久化的訊息時存盤在記憶體中的,持久化訊息時存盤在檔案中的,他們的最大限制在組態檔中的
所以盡量不要用非持久化檔案,如果非要用的化,可以將臨時檔案的限制調大,同時,非持久化的訊息要及時處理,不要堆積,或者啟動事務,啟動事務后,commit()會等待服務器的訊息回傳,也不會導致訊息丟失了,
3.死信佇列
一條訊息在被重發多次后(默認是6次),將會被ActiveMQ移入死信佇列;說白了就是例外訊息的歸并處理的集合,主要是處理失敗的訊息,可以在activeMQ.DLQ這個佇列中查看,
4.重復訊息,冪等性呼叫
在網路延遲的情況洗啊,可能會造成MQ重試,可能會造成重復消費,如果訊息是做資料庫的插入操作,給這個訊息做一個唯一主鍵,那么就算出現重復消費的情況,因為唯一主鍵,會造成主鍵沖突,避免資料庫出現臟資料,如果是第三方消費,可以在每條資料里面加一個全域唯一的id,如果訊息消費了,就將訊息存在redis中,在消費訊息之前將id到redis中查詢一下,判斷是否消費過,如果沒有消費過,就處理,如果消費過了,就不處理了,
參考網址:
https://blog.csdn.net/weixin_34122548/article/details/91929810?utm_medium=distribute.pc_relevant.none-task-blog-baidujs_title-2&spm=1001.2101.3001.4242
轉載請註明出處,本文鏈接:https://www.uj5u.com/houduan/244118.html
標籤:Java
