Linux、Windows安裝ActiveMQ
- 一、下載
- 二、Linux安裝
- 2.1、下載方式
- 2.2、測驗
- 2.3、配置
- 三、Windows安裝
- 四、ActiveMQ集群
- 4.1、為什么使用集群
- 4.2、Master Slave 部署(主從)
- 4.2.1、Share storage master slave(共享存盤)
- 4.2.2、Replicated LevelDB Store(使用ZooKeeper協調多個Broker)
- 4.2.3、KahaDB
一、下載
下載地址:https://activemq.apache.org/components/classic/download/
選擇相應的系統版本

二、Linux安裝
2.1、下載方式
- 下載tar.gz包到本地,然后上傳到服務器
- 通過wget下載
安裝路徑:/opt/home/mq
## 下載
wget https://dlcdn.apache.org/activemq/5.16.3/apache-activemq-5.16.3-bin.tar.gz
## 復制到其它服務器
scp -r /opt/home/mq/apache-activemq-5.16.3-bin.tar.gz root@192.168.0.5:/opt/home/mq/
## 解壓
tar -zxvf apache-activemq-5.16.3-bin.tar.gz
## 切換到apache-activemq-5.16.3/bin/目錄下
cd apache-activemq-5.16.3/bin/
## 啟動
./activemq start
## 停止
./activemq stop
## 重啟
./activemq restart
## 查看狀態
./activemq status
## 檢查埠是否使用
netstat -lnp | grep 8161
netstat -lnp | grep 61616
2.2、測驗
瀏覽器訪問:http://192.168.0.3:8161/

發現無法訪問此網站,防火墻放行埠也無法訪問
2.3、配置
這是因為 Activemq 默認只能當前服務器訪問,如果需要外部訪問,需要修改jetty.xml檔案
## 修改apache-activemq-5.16.3/conf/jetty.xml檔案
vim /opt/home/mq/apache-activemq-5.16.3/conf/jetty.xml
## 找到id為jettyPort的bean 將其中的127.0.0.1 改為 0.0.0.0
<bean id="jettyPort" class="org.apache.activemq.web.WebConsolePort" init-method="start">
<!-- the default port number for the web console -->
<property name="host" value="0.0.0.0"/>
<property name="port" value="8161"/>
</bean>
## 重啟ActiveMQ
./activemq restart
再通過瀏覽器訪問:http://192.168.0.3:8161/
默認用戶名/密碼:admin/admin

三、Windows安裝
下載解壓后,進入bin\win64下,雙擊activemq.bat,啟動

訪問:http://127.0.0.1:8161
選擇Manage ActiveMQ broker(管理ActiveMQ代理),輸入賬號和密碼登陸,
默認用戶名/密碼:admin/admin


四、ActiveMQ集群
集群分為兩種方式:
-
偽集群:集群節點都搭在一臺機器上
-
真集群:集群節點分布在多臺機器上
4.1、為什么使用集群
-
實作高可用,以排除單點故障引起的服務中斷,
-
實作負載均衡,以提升效率為更多的客戶提供服務,
4.2、Master Slave 部署(主從)
通過部署多個broker實體,選舉產生一個master和多個slave,master宕機后由slave接管服務來達到高可用性,Master-Slave的方式雖然能解決多服務熱備的高可用問題,但無法解決負載均衡和分布式的問題,Broker Cluster的部署方式剛好可以解決負載均衡的問題,一般兩者結合使用,
這里主要介紹2種配置方案
- Shared File System Master slave 模式
- JDBC Store Master Slave 模式
4.2.1、Share storage master slave(共享存盤)
此模式中Master和Slave的資料是共享的(相當于共享同一個資料庫),當master失效后,slave會自動接管服務,無需手動進行資料的copy與同步,因為master存盤資料之后,這些資料在任何時候對slave都是可見的,
master與slave之間,通過共享檔案的“排他鎖”或者分布式排他鎖(ZooKeeper)來決定Broker的狀態與角色,獲取鎖權限的Broker作為master,其它的Broker則作為slave,如果master失效,它必將失去鎖權限,那么其它的slave將通過鎖競爭來選舉新master,沒有獲取鎖權限的Broker作為slave,并等待鎖的釋放(間歇性嘗試獲取鎖),
1、Shared File System Master Slave模式(只適合單臺主機部署,不適合多臺主機部署)
這種方式是最常用的模式,架構簡單,可靠實用,我們只需要一個SAN檔案系統即可,使用檔案系統來共享資料檔案,多個Broker共享同一個檔案系統,
<!--
假設在本地啟動兩個broker,組態檔activemq.xml 里面的持久化檔案目錄都設定成同一個即可,
這里默認使用的kahaDB ,你也可以用levelDB,不推薦AMQDB
-->
<persistenceAdapter>
<kahaDB directory=/opt/home/mq/apache-activemq-5.16.3/data/shared_kahadb" />
</persistenceAdapter>
2、JDBC Store Master Slave模式(適合多臺主機部署)
資料存盤用的是資料庫(MySQL/Oracle等),相對于日志檔案而言,JDBC Store通常認為是低效的,配置如下
<broker brokerName="broker-locahost-0">
...
<persistenceAdapter>
<!-- 這個是可以自動生成表結構的-->
<jdbcPersistenceAdapter dataSource="#mysql-ds" />
</persistenceAdapter>
</broker>
<!-- 配置jdbc資料庫連接 -->
<bean id="mysql-ds" class="org.apache.commons.dbcp2.BasicDataSource" destroy-method="close">
<property name="driverClassName" value="com.mysql.jdbc.Driver"/>
<property name="url" value="jdbc:mysql://192.168.0.3:3306/activemq?relaxAutoCommit=true"/>
<property name="username" value="root"/>
<property name="password" value="root"/>
<property name="poolPreparedStatements" value="true"/>
</bean>
<bean id="mysql-ds" class="org.apache.commons.dbcp2.BasicDataSource" destroy-method="close">
<property name="driverClassName" value="com.mysql.jdbc.Driver"/>
<property name="url" value="jdbc:mysql://123.57.80.91:3306/activemq?relaxAutoCommit=true"/>
<property name="username" value="root"/>
<property name="password" value="Java_521"/>
<property name="poolPreparedStatements" value="true"/>
</bean>
下載 mysql-connector-java-5.1.47.jar放到/opt/home/mq/apache-activemq-5.16.3/lib目錄下
新建activemq資料庫,
-- 如果存在資料庫洗掉
DROP DATABASE IF EXISTS `activemq`;
-- 創建資料庫
CREATE DATABASE `activemq` CHARACTER SET 'utf8mb4' COLLATE 'utf8mb4_general_ci';
-- 使用activemq資料庫
USE activemq;
SET NAMES utf8mb4;
SET FOREIGN_KEY_CHECKS = 0;
-- ----------------------------
-- Table structure for activemq_acks
-- ----------------------------
DROP TABLE IF EXISTS `activemq_acks`;
CREATE TABLE `activemq_acks` (
`CONTAINER` varchar(250) CHARACTER SET latin1 COLLATE latin1_swedish_ci NOT NULL,
`SUB_DEST` varchar(250) CHARACTER SET latin1 COLLATE latin1_swedish_ci NULL DEFAULT NULL,
`CLIENT_ID` varchar(250) CHARACTER SET latin1 COLLATE latin1_swedish_ci NOT NULL,
`SUB_NAME` varchar(250) CHARACTER SET latin1 COLLATE latin1_swedish_ci NOT NULL,
`SELECTOR` varchar(250) CHARACTER SET latin1 COLLATE latin1_swedish_ci NULL DEFAULT NULL,
`LAST_ACKED_ID` bigint(20) NULL DEFAULT NULL,
`PRIORITY` bigint(20) NOT NULL DEFAULT 5,
`XID` varchar(250) CHARACTER SET latin1 COLLATE latin1_swedish_ci NULL DEFAULT NULL,
PRIMARY KEY (`CONTAINER`, `CLIENT_ID`, `SUB_NAME`, `PRIORITY`) USING BTREE,
INDEX `ACTIVEMQ_ACKS_XIDX`(`XID`) USING BTREE
) ENGINE = InnoDB CHARACTER SET = latin1 COLLATE = latin1_swedish_ci ROW_FORMAT = Dynamic;
-- ----------------------------
-- Records of activemq_acks
-- ----------------------------
-- ----------------------------
-- Table structure for activemq_lock
-- ----------------------------
DROP TABLE IF EXISTS `activemq_lock`;
CREATE TABLE `activemq_lock` (
`ID` bigint(20) NOT NULL,
`TIME` bigint(20) NULL DEFAULT NULL,
`BROKER_NAME` varchar(250) CHARACTER SET latin1 COLLATE latin1_swedish_ci NULL DEFAULT NULL,
PRIMARY KEY (`ID`) USING BTREE
) ENGINE = InnoDB CHARACTER SET = latin1 COLLATE = latin1_swedish_ci ROW_FORMAT = Dynamic;
-- ----------------------------
-- Records of activemq_lock
-- ----------------------------
INSERT INTO `activemq_lock` VALUES (1, NULL, NULL);
-- ----------------------------
-- Table structure for activemq_msgs
-- ----------------------------
DROP TABLE IF EXISTS `activemq_msgs`;
CREATE TABLE `activemq_msgs` (
`ID` bigint(20) NOT NULL,
`CONTAINER` varchar(250) CHARACTER SET latin1 COLLATE latin1_swedish_ci NOT NULL,
`MSGID_PROD` varchar(250) CHARACTER SET latin1 COLLATE latin1_swedish_ci NULL DEFAULT NULL,
`MSGID_SEQ` bigint(20) NULL DEFAULT NULL,
`EXPIRATION` bigint(20) NULL DEFAULT NULL,
`MSG` blob NULL,
`PRIORITY` bigint(20) NULL DEFAULT NULL,
`XID` varchar(250) CHARACTER SET latin1 COLLATE latin1_swedish_ci NULL DEFAULT NULL,
PRIMARY KEY (`ID`) USING BTREE,
INDEX `ACTIVEMQ_MSGS_MIDX`(`MSGID_PROD`, `MSGID_SEQ`) USING BTREE,
INDEX `ACTIVEMQ_MSGS_CIDX`(`CONTAINER`) USING BTREE,
INDEX `ACTIVEMQ_MSGS_EIDX`(`EXPIRATION`) USING BTREE,
INDEX `ACTIVEMQ_MSGS_PIDX`(`PRIORITY`) USING BTREE,
INDEX `ACTIVEMQ_MSGS_XIDX`(`XID`) USING BTREE,
INDEX `ACTIVEMQ_MSGS_IIDX`(`ID`, `XID`, `CONTAINER`) USING BTREE
) ENGINE = InnoDB CHARACTER SET = latin1 COLLATE = latin1_swedish_ci ROW_FORMAT = Dynamic;
-- ----------------------------
-- Records of activemq_msgs
-- ----------------------------
SET FOREIGN_KEY_CHECKS = 1;
資料表說明:
ACTIVEMQ_MSGS
-- 訊息表,預設表名ACTIVEMQ_MSGS,Queue和Topic都存在里面
-- ID:自增的資料庫主鍵
-- CONTAINER:訊息的Destination
-- MSGID_PROD:訊息發送者的主鍵
-- MSG_SEQ:是發送訊息的順序,MSGID_PROD+MSG_SEQ可以組成JMS的MessageID
-- EXPIRATION:訊息的過期時間,存盤的是從1970-01-01到現在的毫秒數
-- MSG:訊息本體的Java序列化物件的二進制資料
-- PRIORITY:優先級,從0-9,數值越大優先級越高
ACTIVEMQ_ACKS
-- 用于存盤訂閱關系,如果是持久化Topic,訂閱者和服務器的訂閱關系在這個表保存
-- CONTAINER: 訊息的Destination
-- SUB_DEST: 如果是使用Static集群, 這個欄位會有集群其他系統的資訊
-- CLIENT_ID : 每個訂閱者都必須有一個唯一的客戶端ID用以區分
-- SUB_NAME: 訂閱者名稱
-- SELECTOR: 選擇器,可以選擇只消費滿足條件的訊息, 條件可以用自定義屬性實作, 可支持多屬性AND和OR操作
-- LAST_ACKED_ID: 記錄消費過的ID
-- ACTIVEMQ_ACKS表存盤訂閱的訊息和最后一個持久訂閱接收的訊息ID
ACTIVEMQ_LOCK
-- 在集群環境中才有用, 只有一個Broker可以獲得訊息, 稱為Master Broker, 其他的只能作為備份等待Master Broker不可用, 才可能成為下一個Master Broker, 這個表用于記錄哪個Broker是當前的Master Broker.
啟動activemq,如果啟動報錯,嘗試將資料庫編碼改為latin1 或者ASCII
./activemq start
測驗代碼
public class Producer {
private static final String ACTIVEMQ_URL = "failover:(tcp://192.168.0.3:61616,tcp://192.168.0.4:61616,tcp://192.168.0.5:61616)?randomize=true";
private static final String ACTIVEMQ_QUEUE_NAME = "queue-test";
public static void main(String[] args) throws JMSException {
ActiveMQConnectionFactory activeMQConnectionFactory = new ActiveMQConnectionFactory(ACTIVEMQ_URL);
Connection connection = activeMQConnectionFactory.createConnection();
connection.start();
Session session = connection.createSession(false, Session.AUTO_ACKNOWLEDGE);
Queue queue = session.createQueue(ACTIVEMQ_QUEUE_NAME);
MessageProducer producer = session.createProducer(queue);
for (int i = 0; i < 10; i++) {
TextMessage textMessage = session.createTextMessage("***發送訊息順序: " + i + "***發送訊息優先級:" + i);
textMessage.setLongProperty(ScheduledMessage.AMQ_SCHEDULED_DELAY, 5000L);//設定延遲時間
// producer.send(textMessage);
producer.send(queue, textMessage);
}
System.out.println("訊息發送完成");
producer.close();
session.close();
connection.close();
}
}
public class Consumer {
private static final String ACTIVEMQ_URL = "failover:(tcp://192.168.0.3:61616,tcp://192.168.0.4:61616,tcp://192.168.0.5:61616)?randomize=true";
private static final String ACTIVEMQ_QUEUE_NAME = "queue-test";
public static void main(String[] args) throws JMSException, IOException {
ActiveMQConnectionFactory activeMQConnectionFactory = new ActiveMQConnectionFactory(ACTIVEMQ_URL);
Connection connection = activeMQConnectionFactory.createConnection();
connection.start();
Session session = connection.createSession(false, Session.AUTO_ACKNOWLEDGE);
Queue queue = session.createQueue(ACTIVEMQ_QUEUE_NAME);
MessageConsumer consumer = session.createConsumer(queue);
consumer.setMessageListener(new MessageListener() {
@Override
public void onMessage(Message message) {
if (message instanceof TextMessage) {
try {
TextMessage textMessage = (TextMessage) message;
String messageText = textMessage.getText();
System.out.println("***消費者接收訊息: " + messageText);
} catch (JMSException e) {
e.printStackTrace();
}
}
}
});
//保證控制臺不滅
System.in.read();
}
}
4.2.2、Replicated LevelDB Store(使用ZooKeeper協調多個Broker)
基于復制的LevelDB Store模式是ActiveMQ 5.9以后新增的特性,這是ActiveMQ全力打造的HA存盤引擎, 一般都使用這種方式,由于利用zk 進行配置管理,可以方便監控,同時配置也相對簡單,
使用ZooKeeper(集群)注冊所有的ActiveMQ Broker,只有其中的一個Broker可以對外提供服務(也就是Master節點),其他的Broker處于待機狀態,被視為Slave,如果Master因故障而不能提供服務,則利用ZooKeeper的內部選舉機制會從Slave中選舉出一個Broker充當Master節點,繼續對外提供服務,
由于基于ZooKeeper(通常ZooKeeper集群至少需要3個實體,才能保證ZooKeeper本身的高可用性),所以Broker最低需要3個,
警告
Warning
The LevelDB store has been deprecated and is no longer supported or recommended for use. The recommended store is KahaDB
意思是:
LevelDB存盤已被棄用,不再支持或建議使用,推薦的使用KahaDB
參考:http://activemq.apache.org/replicated-leveldb-store.html
4.2.3、KahaDB
KahaDB 是一個基于檔案的持久性資料庫,它位于使用它的訊息代理本地,它已針對快速持久性進行了優化,它是自ActiveMQ 5.4以來的默認存盤機制,KahaDB 使用更少的檔案描述符并提供比其前身AMQ 訊息存盤更快的恢復,
配置
<broker brokerName="broker">
<persistenceAdapter>
<kahaDB directory="activemq-data" journalMaxFileLength="32mb"/>
</persistenceAdapter>
</broker>
KahaDB 屬性
| 屬性 | 默認 | 注釋 |
|---|---|---|
archiveCorruptedIndex | false | 如果true,啟動時發現的損壞索引將被存檔(不洗掉), |
archiveDataLogs | false | 如果true, 會將訊息資料日志移動到存檔目錄而不是將其洗掉, |
checkForCorruptJournalFiles | false | 如果true,將在啟動時檢查損壞的日志檔案并嘗試恢復它們, |
checkpointInterval | 5000 | 檢查點日志之前的時間(毫秒), |
checksumJournalFiles | true | 為日志檔案創建校驗和,需要存在校驗和才能使持久性配接器能夠檢測損壞的日志檔案,ActiveMQ 5.9.0之前:默認為false. |
cleanupInterval | 30000 | 確定哪些日志檔案(如果有)可以從訊息存盤中洗掉的連續檢查之間的間隔(以毫秒為單位),符合條件的期刊檔案是指沒有未完成的參考文獻, |
compactAcksAfterNoGC | 10 | 從ActiveMQ 5.14.0 開始:當啟用確認壓縮功能時,此值控制必須完成多少存盤 GC 回圈,并且在觸發壓縮邏輯以可能將分布在日志檔案中的舊確認壓縮到新日志之前沒有其他檔案被清理檔案,設定的值越低,壓縮可能發生得越快,如果它經常運行,這可能會影響性能, |
compactAcksIgnoresStoreGrowth | false | 從ActiveMQ 5.14.0 開始:當啟用確認壓縮功能時,此值控制壓縮是在存盤仍在增長時運行還是僅在存盤停止增長時才運行(由于空閑或達到存盤限制),如果啟用壓縮,無論商店是否仍有空間或處于活動狀態,都會運行壓縮,這會降低整體性能但會更快地回收空間, |
concurrentStoreAndDispatchQueues | true | 允許將 Queue 訊息分派給感興趣的客戶端,以與訊息存盤同時發生, |
concurrentStoreAndDispatchTopics | false | 允許將 Topic 訊息分派給感興趣的客戶端,與訊息存盤同時發生,不建議啟用此屬性, |
directory | activemq-data | 用于存盤訊息存盤資料和日志檔案的目錄的路徑, |
directoryArchive | null | 定義將資料日志移動到的目錄,當它們包含的所有訊息都已被使用時, |
enableAckCompaction | true | 從ActiveMQ 5.14.0 開始:此設定控制存盤是否對僅包含訊息確認的舊日志日志檔案執行定期壓縮,通過將這些較舊的確認壓縮到新的日志日志檔案中,可以洗掉較舊的檔案以釋放空間并允許訊息存盤繼續運行而不會達到存盤大小限制, |
enableIndexWriteAsync | false | 如果true,則索引異步更新, |
enableJournalDiskSyncs | true | 確保每個日志寫入后跟一個磁盤同步(JMS 持久性要求),自ActiveMQ 5.14.0 起,此屬性已棄用,從ActiveMQ 5.14.0 開始:請參閱journalDiskSyncStrategy, |
ignoreMissingJournalfiles | false | 如果true,則忽略丟失日志檔案的報告, |
indexCacheSize | 10000 | 記憶體中快取的索引頁數, |
indexDirectory | 從ActiveMQ 5.10.0 開始:如果設定,則配置 KahaDB 索引檔案(db.data和 db.redo)的存盤位置,如果未設定,則索引檔案存盤在directory屬性指定的目錄中 , | |
indexWriteBatchSize | 1000 | 批量寫入的索引數, |
journalDiskSyncInterval | 1000 | 何時執行磁盤同步的時間間隔(毫秒) journalDiskSyncStrategy=periodic,僅當自上次磁盤同步后日志發生寫入或日志滾動到新日志檔案時,才會執行同步, |
journalDiskSyncStrategy | always | 從ActiveMQ 5.14.0 開始:此設定配置磁盤同步策略,可用同步策略的串列是(按照降低安全性和提高性能的順序):always確保每個日志寫入后跟一個磁盤同步(JMS 持久性要求),這是最安全的選項,但也是最慢的,因為它需要在每次寫入訊息后進行同步,這相當于已棄用的屬性 enableJournalDiskSyncs=true,periodic磁盤將按設定的時間間隔(如果發生寫入)而不是在每次日志寫入后同步,這將減少磁盤上的負載并提高吞吐量,滾動到新的日志檔案時,磁盤也將同步,默認間隔為 1 秒,默認間隔提供了非常好的性能,同時比 never磁盤同步,因為資料丟失被限制為最多 1 秒的價值,請參閱journalDiskSyncInterval更改磁盤同步的頻率,never永遠不會顯式呼叫同步,由作業系統重繪到磁盤,這相當于設定了 deprecated 屬性enableJournalDiskSyncs=false,這是最快的選項,但最不安全,因為無法保證資料何時重繪到磁盤,因此,訊息丟失可能會在代理失敗時發生, |
journalMaxFileLength | 32mb | 設定訊息資料日志最大大小的提示, |
maxAsyncJobs | 10000 | 將排隊等待存盤的異步訊息的最大數量(應與并發 MessageProducers 的數量相同), |
preallocationScope | entire_journal | 從ActiveMQ 5.14.0 開始:此設定配置日志資料檔案的預分配方式,默認策略在第一次使用時使用 appender 執行緒預分配日志檔案, entire_journal_async將在單獨的執行緒中提前使用預分配,none禁用預分配,在 SSD 上,使用 entire_journal_async可避免在首次使用時延遲等待預分配的寫入,注意:在 HDD 上,額外的磁盤執行緒爭用會產生負面影響,因此使用默認值, |
preallocationStrategy | sparse_file | 從ActiveMQ 5.12.0 開始:此設定配置代理將如何在需要新日志檔案時嘗試預分配日志檔案,sparse_file- 設定檔案長度,但不填充任何資料,os_kernel_copy- 將預分配委托給作業系統,zeros - 每個預分配的日志檔案只包含0x00整個內容, |
storeOpenWireVersion | 11 | 確定編組到 KahaDB 日志的 OpenWire 命令的版本,ActiveMQ 5.12.0之前:默認值為6. 代理的某些功能取決于存盤在來自較新協議修訂版的 OpenWire 命令中的資訊,如果存盤版本設定為較低的值,這些功能可能無法正常作業,來自高于 5.9.0 的代理版本的 KahaDB 存盤在許多情況下仍可由代理讀取,但會導致代理繼續使用舊的存盤版本,這意味著新功能可能無法按預期作業,對于在ActiveMQ 5.9.0之前的版本中創建的 KahaDB 存盤,必須手動設定storeOpenWireVersion="6"以便無錯誤地啟動代理, |
轉載請註明出處,本文鏈接:https://www.uj5u.com/qita/350778.html
標籤:其他
