訊息中間件簡介
MQ全稱(Message Queue)`又名訊息佇列,是一種異步通訊的中間件,可以將它理解成郵局,發送者將訊息傳遞到郵局,然后由郵局幫我們發送給具體的訊息接收者(消費者),具體發送程序與時間我們無需關心,它也不會干擾我進行其它事情,常見的MQ有kafka、activemq、rocketMQ、rabbitmq等等**
訊息中間件的應用場景
跨系統資料傳遞、高并發流量削峰、資料異步處理,,,,
訊息中間件對比

綜上,各種對比之后,有如下建議:
一般的業務系統要引入 MQ,最早大家都用 ActiveMQ,但是現在確實大家用的不多了,沒經過大規模吞吐量場景的驗證,社區也不是很活躍,所以大家還是算了吧,我個人不推薦用這個了;
后來大家開始用 RabbitMQ,但是確實 erlang 語言阻止了大量的 Java 工程師去深入研究和掌控它,對公司而言,幾乎處于不可控的狀態,但是確實人家是開源的,比較穩定的支持,活躍度也高;
不過現在確實越來越多的公司會去用 RocketMQ,確實很不錯,畢竟是阿里出品,但社區可能有突然黃掉的風險(目前 RocketMQ 已捐給 Apache,但 GitHub 上的活躍度其實不算高)對自己公司技術實力有絕對自信的,推薦用 RocketMQ,否則回去老老實實用 RabbitMQ 吧,人家有活躍的開源社區,絕對不會黃,
所以中小型公司,技術實力較為一般,技術挑戰不是特別高,用 RabbitMQ 是不錯的選擇;大型公司,基礎架構研發實力較強,用 RocketMQ 是很好的選擇,
如果是大資料領域的實時計算、日志采集等場景,用 Kafka 是業內標準的,絕對沒問題,社區活躍度很高,絕對不會黃,何況幾乎是全世界這個領域的事實性規范,
安裝使用
rabbitmq的windows版本安裝,可以參考這個地址: https://blog.csdn.net/hzw19920329/article/details/53156015
Rabbitmq基礎概念
Broker:簡單來說就是訊息佇列服務器物體 Exchange:訊息交換機,它指定訊息按什么規則,路由到哪個佇列 Queue:訊息佇列載體,每個訊息都會被投入到一個或多個佇列 Binding:系結,它的作用就是把exchange和queue按照路由規則系結起來 Routing Key:路由關鍵字,exchange根據這個關鍵字進行訊息投遞 vhost:虛擬主機,一個broker里可以開設多個vhost,用作不同用戶的權限分離 producer:訊息生產者,就是投遞訊息的程式 consumer:訊息消費者,就是接受訊息的程式 channel:訊息通道,在客戶端的每個連接里,可建立多個channel,每個channel代表一個會話任務
rabbitmq運作流程

上圖所示,在整個訊息中間件的使用程序中,我們主要配置的是exchange,queue,rooting key三個,下面的代碼主要也是根據著三個進行訊息的生產和消費,
常用的交換機型別有:fanout、direct、topic、headers,他們特性如下:
fanout:系結的都發送,忽略路由,不用傳routekey
direct:全文匹配--常用
topic:模糊匹配--常用
headers:條件判斷,性能很差,不建議使用,我們代碼的demo,使用的是direct和topic,后續通過插件的形式,我們還添加了x-delayed-message這個延遲訊息佇列,
springboot代碼實作:
步驟1:pom檔案中引入相應的組件
<!--rabbitmq -->
<dependency>
<groupId>org.springframework.boot</groupId>
<artifactId>spring-boot-starter-amqp</artifactId>
</dependency>
步驟2:部署好mq之后,在application.properties檔案中添加rabbitmq相應的配置
spring.rabbitmq.host=129.204.x.xxx
spring.rabbitmq.port=5672
spring.rabbitmq.username=xxx
spring.rabbitmq.password=xxx
spring.rabbitmq.virtual-host=xx
spring.rabbitmq.publisher-confirms=true
spring.rabbitmq.listener.simple.acknowledge-mode=manual
步驟3:進行交換機,佇列,routingkey的定義和系結,具體的代碼在RabbitConfig檔案中,
@Configuration
public class RabbitConfig {
?
//系結鍵
public final static String man = "kms.topic.man";
public final static String woman = "kms.topic.woman";
?
/**
* 佇列 起名:TestDirectQueue
* @return
*/
@Bean
public Queue DirectQueue() {
// durable:是否持久化,默認是false,持久化佇列:會被存盤在磁盤上,當訊息代理重啟時仍然存在,暫存佇列:當前連接有效
// exclusive:默認也是false,只能被當前創建的連接使用,而且當連接關閉后佇列即被洗掉,此參考優先級高于durable
// autoDelete:是否自動洗掉,當沒有生產者或者消費者使用此佇列,該佇列會自動洗掉,
// return new Queue("TestDirectQueue",true,true,false);
//一般設定一下佇列的持久化就好,其余兩個就是默認false
return new Queue("kms.direct.queue",true);
}
?
/**
* Direct交換機 起名:TestDirectExchange
* @return
*/
@Bean
DirectExchange DirectExchange() {
return new DirectExchange("kms.direct.exchange",true,false);
}
?
/**
* 系結 將佇列和交換機系結, 并設定用于匹配鍵:TestDirectRouting
* @return
*/
@Bean
Binding bindingDirect() {
return BindingBuilder.bind(DirectQueue()).to(DirectExchange()).with("kms.direct.routingKey");
}
?
@Bean
public Queue firstTopicQueue() {
return new Queue(RabbitConfig.man);
}
?
@Bean
public Queue secondTopicQueue() {
return new Queue(RabbitConfig.woman);
}
?
/**
* 主題型交換機1
* @return
*/
@Bean
TopicExchange topicExchange() {
return new TopicExchange("kms.topic.exchange");
}
?
/**
* 將firstQueue和topicExchange系結,而且系結的鍵值為topic.man
* 這樣只要是訊息攜帶的路由鍵是topic.man,才會分發到該佇列
* @return
*/
@Bean
Binding bindingExchangeMessage() {
return BindingBuilder.bind(firstTopicQueue()).to(topicExchange()).with(man);
}
?
/**
* 將secondQueue和topicExchange系結,而且系結的鍵值為用上通配路由鍵規則topic.#
* 這樣只要是訊息攜帶的路由鍵是以topic.開頭,都會分發到該佇列
* @return
*/
@Bean
Binding bindingExchangeMessage2() {
return BindingBuilder.bind(secondTopicQueue()).to(topicExchange()).with("kms.topic.#");
}
?
/**
* TODO:RabbitMQ延遲佇列
* @return
*/
@Bean
public Queue delayQueue(){
return QueueBuilder.durable("kms.delay.queue").build();
}
?
@Bean
public CustomExchange delayExchange(){
Map<String,Object> map= Maps.newHashMap();
map.put("x-delayed-type","direct");
return new CustomExchange("kms.delay.exchange","x-delayed-message",true,false,map);
}
?
@Bean
public Binding delayBinding(){
return BindingBuilder.bind(delayQueue()).to(delayExchange()).with("kms.delay.routingKey").noargs();
}
?
?
}
步驟4:生產者創建訊息的發布,代碼在MqProviderService中,其中留意setup()方法,該方法的作用是為了判斷訊息是否成功發送到中間件, 訊息發送完畢后,則回呼此方法 ack代表發送是否成功,
/**
* 發送資料到mq
* @Author: yechongbai
* @Date: 2020/5/11 16:04
* @Copyright: www.zektech.cn
* @since 1.0
*/
@Service
public class MqProviderService extends GlobalResponseHandler {
private final Logger logger = LoggerFactory.getLogger(MqProviderService.class);
?
@Autowired
private RabbitTemplate rabbitTemplate;
?
@PostConstruct
public void setup() {
// 訊息發送完畢后,則回呼此方法 ack代表發送是否成功
rabbitTemplate.setConfirmCallback(new RabbitTemplate.ConfirmCallback() {
@Override
public void confirm(CorrelationData correlationData, boolean ack, String cause) {
System.out.println("進入回呼,,,");
// ack為true,代表MQ已經準確收到訊息
if (!ack) {
System.out.println("發送訊息到MQ失敗");
System.out.println("ConfirmCallback: "+"原因:"+cause);
return;
}
try {
System.out.println(correlationData.getId()+":成功發送訊息到MQ");
// 修改本地訊息表的狀態為“已發送”,
//TODO:修改本地表
} catch (Exception e) {
logger.error("警告:修改本地訊息表的狀態時出現例外", e);
}
?
}
});
}
?
/**
* 發送測驗資訊--直連交換機
* @return
*/
public String sendDirectMessage(){
String messageId = String.valueOf(UUID.randomUUID());
UserInfoVO userInfoVO=new UserInfoVO();
userInfoVO.setPersonNo(messageId);
userInfoVO.setPersonName("張三");
userInfoVO.setPhoto("111");
//將訊息攜帶系結鍵值:TestDirectRouting 發送到交換機TestDirectExchange
System.out.println("推送訊息:"+ JSON.toJSONString(userInfoVO));
//步驟1.添加訂單記錄
//前提:訊息不能丟失情況需要插入,如果類似微信推送這些不一定需要送到的可以不執行步驟2
// 步驟2.插入推送訊息到資料庫 步驟1和步驟2需要事務,才能保證后續該訊息是否被消費
// CorrelationData 當收到訊息回執時,會附帶上這個引數
rabbitTemplate.convertAndSend("kms.direct.exchange", "kms.direct.routingKey", userInfoVO,new CorrelationData(messageId));
return buildSuccessResult();
}
?
/**
* 發送測驗資訊--主題交換機1
* @return
*/
public String sendTopicMessage(){
String messageId = String.valueOf(UUID.randomUUID());
UserInfoVO userInfoVO=new UserInfoVO();
userInfoVO.setPersonNo(messageId);
userInfoVO.setPersonName("主題交換機1");
userInfoVO.setPhoto("111");
//將訊息攜帶系結鍵值:TestDirectRouting 發送到交換機TestDirectExchange
System.out.println("推送訊息:"+ JSON.toJSONString(userInfoVO));
//步驟1.添加訂單記錄
//前提:訊息不能丟失情況需要插入,如果類似微信推送這些不一定需要送到的可以不執行步驟2
// 步驟2.插入推送訊息到資料庫 步驟1和步驟2需要事務,才能保證后續該訊息是否被消費
// CorrelationData 當收到訊息回執時,會附帶上這個引數
rabbitTemplate.convertAndSend("kms.topic.exchange", "kms.topic.man", JSON.toJSONString(userInfoVO),new CorrelationData(messageId));
return buildSuccessResult();
}
?
/**
* 發送測驗資訊--主題交換機2
* @return
*/
public String sendTopicMessage2(){
String messageId = String.valueOf(UUID.randomUUID());
UserInfoVO userInfoVO=new UserInfoVO();
userInfoVO.setPersonNo(messageId);
userInfoVO.setPersonName("主題交換機2");
userInfoVO.setPhoto("222");
//將訊息攜帶系結鍵值:TestDirectRouting 發送到交換機TestDirectExchange
System.out.println("推送訊息:"+ JSON.toJSONString(userInfoVO));
//步驟1.添加訂單記錄
//前提:訊息不能丟失情況需要插入,如果類似微信推送這些不一定需要送到的可以不執行步驟2
// 步驟2.插入推送訊息到資料庫 步驟1和步驟2需要事務,才能保證后續該訊息是否被消費
// CorrelationData 當收到訊息回執時,會附帶上這個引數
rabbitTemplate.convertAndSend("kms.topic.exchange", "kms.topic.woman", JSON.toJSONString(userInfoVO),new CorrelationData(messageId));
return buildSuccessResult();
}
?
/**
* 發送測驗資訊--延遲佇列
* @param ttl
* @return
*/
public String sendDelayMessage(Long ttl){
String messageId = String.valueOf(UUID.randomUUID());
UserInfoVO userInfoVO=new UserInfoVO();
userInfoVO.setPersonNo(messageId);
userInfoVO.setPersonName("張三");
userInfoVO.setPhoto("111");
System.out.println(LocalDateTime.now()+":推送訊息:"+ JSON.toJSONString(userInfoVO));
rabbitTemplate.convertAndSend("kms.delay.exchange", "kms.delay.routingKey", userInfoVO, new MessagePostProcessor() {
@Override
public Message postProcessMessage(Message message) throws AmqpException {
MessageProperties mp=message.getMessageProperties();
mp.setDeliveryMode(MessageDeliveryMode.PERSISTENT);
mp.setHeader("x-delay",ttl);
System.out.println("延遲佇列生產者-發出訊息:"+JSON.toJSONString(userInfoVO)+",TTL:"+ttl);
return message;
}
}, new CorrelationData(messageId));
return buildSuccessResult();
}
}
步驟5:訊息接收者---消費訊息,新建MqConsumerService,進行訊息的監聽并且消費, 默認情況下spring-boot-data-amqp是自動ACK機制,就意味著 MQ 會在訊息消費完畢后自動幫我們去ACK,這樣依賴就存在這樣一個問題:如果報錯了,訊息不會丟失,會無限回圈消費,很容易就吧磁盤空間耗完,雖然可以配置消費的次數但這種做法也有失優雅,目前比較推薦的就是我們手動ACK然后將消費錯誤的訊息轉移到其它的訊息佇列中,做補償處理,由于我們需要手動控制ACK,因此下面監聽完訊息后需要呼叫basicAck通知rabbitmq訊息已被正確消費,可以將遠程佇列中的訊息洗掉 ,所以需要手動執行:channel.basicAck(tag,true);
@Component
public class MqConsumerService {
?
/**
* 監聽的佇列名稱 TestDirectQueue
* @param testMessage
* @param channel
* @param tag
* @throws IOException
*/
@RabbitListener(queues = "kms.direct.queue")
public void processDirect(UserInfoVO testMessage,Channel channel, @Header(AmqpHeaders.DELIVERY_TAG) long tag) throws IOException {
try{
System.out.println("DirectReceiver消費者收到訊息 : "+ JSON.toJSONString(testMessage));
//TODO:接收到訊息進行業務操作,操作成功,告訴mq該訊息已經消費
//執行業務操作,同一個資料不能處理兩次,根據業務情況去重,保證冪等性, (拓展:redis記錄處理情況)
// 開啟了手工確認機制,如果不加這個,專案重新啟動,則改訊息會被重新消費
// 例外的話,可以選擇讓它重新入列,或者丟棄
channel.basicAck(tag,true);
}catch (Exception e){
// 例外情況 :根據需要去: 重發/ 丟棄
// 重發一定次數后, 丟棄, 日志告警,b1:true表示重新入列,false表示不入列
channel.basicNack(tag, false, false);
// 系統 關鍵資料,永遠是有人工干預
}
}
?
/**
* 監聽的佇列名稱 TestDirectQueue
* @param testMessage
* @param channel
* @param tag
* @throws IOException
*/
@RabbitListener(queues = "kms.topic.man")
public void processTopic(String testMessage,Channel channel, @Header(AmqpHeaders.DELIVERY_TAG) long tag) throws IOException {
try{
JSONObject orderInfo = JSONObject.parseObject(testMessage);
System.out.println("TopicReceiver消費者收到訊息 : "+ testMessage);
System.out.println("收到的personNO:"+orderInfo.getString("personNo"));
//TODO:接收到訊息進行業務操作
channel.basicAck(tag,true);
}catch (Exception e){
// 例外情況 :根據需要去: 重發/ 丟棄
// 重發一定次數后, 丟棄, 日志告警,b1:true表示重新入列,false表示不入列
channel.basicNack(tag, false, false);
// 系統 關鍵資料,永遠是有人工干預
}
}
?
/**
* 監聽的佇列名稱 TestDirectQueue
* @param testMessage
* @param channel
* @param tag
* @throws IOException
*/
@RabbitListener(queues = "kms.topic.woman")
public void processTopic2(String testMessage,Channel channel, @Header(AmqpHeaders.DELIVERY_TAG) long tag) throws IOException {
try{
JSONObject orderInfo = JSONObject.parseObject(testMessage);
System.out.println("TopicWomenReceiver消費者收到訊息 : "+ testMessage);
System.out.println("收到的personNO:"+orderInfo.getString("personNo"));
channel.basicAck(tag,true);
}catch (Exception e){
// 例外情況 :根據需要去: 重發/ 丟棄
// 重發一定次數后, 丟棄, 日志告警,b1:true表示重新入列,false表示不入列
channel.basicNack(tag, false, false);
// 系統 關鍵資料,永遠是有人工干預
}
}
?
/**
* 監聽的佇列名稱 DelayQueue
* @param testMessage
* @param channel
* @param tag
* @throws IOException
*/
@RabbitListener(queues = "kms.delay.queue")
public void processDelay(UserInfoVO testMessage,Channel channel, @Header(AmqpHeaders.DELIVERY_TAG) long tag) throws IOException {
try{
System.out.println(LocalDateTime.now()+":DelayQueue消費者收到訊息 : "+ JSON.toJSONString(testMessage));
channel.basicAck(tag,true);
}catch (Exception e){
// 例外情況 :根據需要去: 重發/ 丟棄
// 重發一定次數后, 丟棄, 日志告警,b1:true表示重新入列,false表示不入列
channel.basicNack(tag, false, false);
// 系統 關鍵資料,永遠是有人工干預
}
}
?
}
步驟6:進行測驗,新增controller,呼叫訊息生產的方法,進行訊息消費的查看,
@RestController
@RequestMapping("/remote/mq")
public class TestMqController {
@Autowired
private MqProviderService mqProviderService;
?
@PostMapping("sendDirectMessage")
public String sendDirectMessage(){
mqProviderService.sendDirectMessage();
return "ok";
}
?
@PostMapping("sendTopicMessage")
public String sendTopicMessage(){
mqProviderService.sendTopicMessage();
return "ok";
}
?
@PostMapping("sendTopicMessage2")
public String sendTopicMessage2(){
mqProviderService.sendTopicMessage2();
return "ok";
}
?
@PostMapping("sendDelayMessage")
public String sendDelayMessage(){
mqProviderService.sendDelayMessage(10*1000L);
return "ok";
}
}
執行sendDelayMessage輸出結果如下:
2020-05-15T15:16:29.437:推送訊息:{"personName":"張三","personNo":"5cd80fa8-42c0-4b70-932f-422ca4167a5a","photo":"111"}
延遲佇列生產者-發出訊息:{"personName":"張三","personNo":"5cd80fa8-42c0-4b70-932f-422ca4167a5a","photo":"111"},TTL:10000
進入回呼,,,
5cd80fa8-42c0-4b70-932f-422ca4167a5a:成功發送訊息到MQ
2020-05-15T15:16:39.933:DelayQueue消費者收到訊息 : {"personName":"張三","personNo":"5cd80fa8-42c0-4b70-932f-422ca4167a5a","photo":"111"}
轉載請註明出處,本文鏈接:https://www.uj5u.com/houduan/194139.html
標籤:Java
上一篇:多執行緒-執行緒間的通信
下一篇:多執行緒-生產者和消費者問題
