環境準備
安裝RabbitMq: 個人是先在Win10便攜機上安裝VMWare Workstation, 再安裝Cent OS 作業系統,在此基礎上安裝RabbitMQ,
安裝程序可以參考這篇博客: https://blog.csdn.net/hsxy123123/article/details/104006744
需要注意RabbitMQ官網提供的erlang與RabbitMQ的配套版本,按版本安裝,

訊息生產者Demo代碼
private final static String EXCHANGE_NAME = "elon_exchange";
/**
* 生產訊息發送到交換器.
*
* @param messageBody 訊息體
* @author elon
*/
public void produceMessage2Exchange(String messageBody) {
ConnectionFactory factory = new ConnectionFactory();
factory.setHost("192.*.*.*");
factory.setPort(5672);
factory.setUsername("***");
factory.setPassword("*******");
try {
Connection connection = factory.newConnection();
Channel channel = connection.createChannel();
channel.exchangeDeclare(EXCHANGE_NAME, "fanout");
channel.basicPublish(EXCHANGE_NAME, "", null, messageBody.getBytes(Charsets.UTF_8));
LOGGER.info("Sent {}", messageBody);
} catch (Exception e) {
LOGGER.info("Produce message fail.", e);
}
}
訊息消費者Demo代碼
private final static String EXCHANGE_NAME = "elon_exchange";
/**
* 消費交換器轉發的訊息
*/
public void consumeMessageFromExchange(){
ConnectionFactory factory = new ConnectionFactory();
factory.setHost("192.*.*.*");
factory.setPort(5672);
factory.setUsername("***");
factory.setPassword("*******");
try {
Connection connection = factory.newConnection();
Channel channel = connection.createChannel();
channel.exchangeDeclare(EXCHANGE_NAME, "fanout");
String queueName = channel.queueDeclare().getQueue();
LOGGER.info("Get queue name:{}", queueName);
channel.queueBind(queueName, EXCHANGE_NAME, "");
DeliverCallback deliverCallback = (consumerTag, delivery) -> {
String message = new String(delivery.getBody(), StandardCharsets.UTF_8);
LOGGER.info("Receive message from binding exchange:{}", message);
};
channel.basicConsume(queueName, true, deliverCallback, tag->{});
} catch (Exception e) {
LOGGER.error("Consume message exception.", e);
}
}
核心邏輯是宣告一個隨機的佇列,系結到交換器,再通過佇列接收交換器轉發的訊息,多個佇列可系結到同一個交換器,從而啟到廣播訊息的效果,
轉載請註明出處,本文鏈接:https://www.uj5u.com/qita/423363.html
標籤:其他
上一篇:軟考高項筆記 | 大資料
