RabbitMQ是由erlang語言開發,基于AMQP(Advanced Message Queue 高級訊息佇列協議)協議實作的訊息佇列,在分布式系統開發中應用的非常廣泛
RabbitMQ官方地址:http://www.rabbitmq.com/
RabbitMQ提供了5中作業模式:簡單模式,work模式,Publish/Subscribe發布與訂閱模式(也可稱作廣播模式),Routing路由模式,Topics主題模式;
簡單模式實作:
1.匯入依賴:
<dependencies>
<dependency>
<groupId>com.rabbitmq</groupId>
<artifactId>amqp-client</artifactId>
<version>5.6.0</version>
</dependency>
</dependencies>
2.創建生產者
public class Producer {
public static void main(String[] args) throws IOException, TimeoutException {
// 創建鏈接工廠物件
ConnectionFactory connectionFactory = new ConnectionFactory();
connectionFactory.setHost("localhost");
connectionFactory.setPort(5672);
// 設定虛擬主機名字和登錄,默認/
connectionFactory.setVirtualHost("/xxx");
connectionFactory.setUsername("root");
connectionFactory.setPassword("123456");
Connection connection = connectionFactory.newConnection();
// 創建訊息通道
Channel channel = connection.createChannel();
// arg0:佇列名稱 arg1:是否持久化 arg2:是否排外 arg3:關閉連接時佇列是否自動洗掉 arg4:佇列其他引數
channel.queueDeclare("simple_queue", true, false, false, null);
String message = "你好,世界,";
// 訊息發送
// arg0:交換機名稱,沒有指定使用默認的Default Exchange
// arg1:路由key,點對點模式可以使用佇列名稱 arg2:指定訊息其他屬性 arg3:訊息的位元組碼
channel.basicPublish("", "simple_queue", null, message.getBytes());
channel.close();
connection.close();
}
}
在執行完上述方法后,就可以在控制臺查看創建的佇列了
3.創建消費者
public class Consumer {
public static void main(String[] args) throws IOException, TimeoutException {
// 創建鏈接工廠物件
ConnectionFactory connectionFactory = new ConnectionFactory();
connectionFactory.setHost("localhost");
connectionFactory.setPort(5672);
// 設定虛擬主機名字,默認/
connectionFactory.setVirtualHost("/xxx");
connectionFactory.setUsername("root");
connectionFactory.setPassword("123456");
// 創建一個新鏈接
Connection connection = connectionFactory.newConnection();
Channel channel = connection.createChannel();
channel.queueDeclare("simple_queue", true, false, false, null);
// 創建消費者,并消費訊息
DefaultConsumer consumer = new DefaultConsumer(channel){
/**
* @param consumerTag 消費者標簽,在channel.basicConsume時候可以指定
* @param envelope 訊息包的內容,可從中獲取訊息id,訊息routing key,交換機,訊息和重發標志(收到訊息失敗后是否需要重新發送)
* @param properties 訊息屬性資訊
* @param body 訊息體
**/
@Override
public void handleDelivery(String consumerTag, Envelope envelope, AMQP.BasicProperties properties, byte[] body) throws IOException {
String routingKey = envelope.getRoutingKey();
String exchange = envelope.getExchange();
long deliveryTag = envelope.getDeliveryTag();
String message = new String(body, "UTF-8");
System.out.println("路由:" + routingKey + ",交換機:" + exchange + ",訊息id:" + deliveryTag + ",訊息體:" + message);
}
};
// 訊息監聽 arg0:監聽的佇列名稱
// arg1:是否自動應答,設定為true為表示訊息接收到自動向mq回復接收到了,mq接收到回復會洗掉訊息,設定為false則需要手動確認
// arg2:消費者接收訊息到后回呼(消費訊息)
channel.basicConsume("simple_queue", true, consumer);
// 關閉資源(不建議關閉,建議一直監聽訊息)
}
}
到此一個簡單模式的RabbitMQ就搭建好啦,其他模式的后面再更,
轉載請註明出處,本文鏈接:https://www.uj5u.com/houduan/249687.html
標籤:Java
上一篇:java8流式操作
