很長時間沒有分享過學習心得了,看了下發布記錄,最后一篇文章的時間都在2020-12-10年了,今天抽時間整理下一個很早就想整理的技術分享,順便說句題外話,因為我一直沒時間整理,再加上開發的小伙伴對Mq的理解不夠,我開掉了好幾個處理這個事情的開發小伙伴,所以我希望這篇文章能對大家帶來一點幫助,
背景說明
Mq(訊息佇列)做為一個消峰工具而常被使用,我們常用的Mq主要分為以下四種:
- ActiveMQ
- RabbitMq
- Kafka
- RocketMq
今天主要是聊聊RabbitMq,業務場景上選擇RabbitMq的原因有很多,今天就不細說了,今天主要是說下如何動態創建佇列,并實作動態監聽的的方法,
需求背景
做為一個CRM-SAAS平臺,每天平臺會進入大量的客戶資訊,那么我們需要用高效的方式把這些資料及時的發給銷售,那么這里需要考慮以下幾個問題:
- 下發資料的及時性
- 資料分組
- 接收人員屬于不同的分組和不同的級別
- 資料不滿足下發條件(這里舉個例子:接收人員都在忙的情況,可能需要過段時間重發)重發的問題
技術方案
- 為保證資料及時性,資料進入系統之后及時推進消費佇列
- 針對資料分組和接收人員不同的分組和不同的級別,并要人為可控的話,那么就設定不同的佇列來進行監聽消費,我們還可以讓佇列名稱變得有意義,從佇列名稱上獲取我們所需要某些必要資訊,例如資料屬于那個分組,資料應該下發的群體等,
基于上述考慮,我們選擇RabbitMq來實作這個方案,既然是不同的佇列消費不同的資料,那么第一步就是考慮如何動態創建佇列,因為這里還要設定一個人為可控,也就是人員可以管理,所以比然后伴隨著佇列的洗掉和重建,
佇列的創建方式
基于注解的使用
@Bean
public Queue syncCdrQueue(){
return new Queue(CrmMqConstant.SYNC_CDR_TO_CRM_Q,true,false,false);
}
非注解配置
Channel channelForm = connectionFactory().createConnection().createChannel(false);
channelForm.queueDeclare(nameForm, true, false, false, null);
基于RabbitAdmin
rabbitAdmin.declareQueue(queue);
rabbitAdmin.declareExchange(fanoutExchange);
rabbitAdmin.declareBinding(BindingBuilder.bind(queue).to(fanoutExchange));
從創建佇列的靈活度來說,肯定是依次減弱的:
- 注解方式:提前定義佇列名稱,一般以常量來定義,當然也支持變數的方式,但是對于加載先后的要求就高了,例如:這里用一個動態IP作為佇列名稱舉例
private final String QUEUE_NAME="crm.websocket."+ IPUtils.getLocalhostIp(); @Bean public Queue queue(){ return new Queue(QUEUE_NAME,true,false,false); } //監聽 @RabbitListener(queues = "#{queue.name}") - 非注解方式:這個其實就是通過ConnectionFactory來獲取通道創建佇列的,這個比較適合在建立鏈接的時候使用,所以一般在批量初始化佇列時候比較合適
@Bean
public List<String> mqMsgQueues() throws IOException {
List<String> queueNames = new ArrayList<String>();
List<Map<String,Object>> engineList = autoAssignEngineService.getAllAutoAssignEngine(-1,-1);
logger.info("engineList:{}", JsonUtils.toJson(engineList));
if(engineList != null && engineList.size() > 0) {
for(Map<String,Object> engine : engineList) {
String groupId = String.valueOf(engine.get("orgId"));
String semAdType = String.valueOf(engine.get("semAdType"));
logger.info("groupId:{},semAdType:{}", groupId,semAdType);
createQueue(queueNames, groupId,semAdType,"1");
createQueue(queueNames, groupId,semAdType,"2");
}
}
return queueNames;
}
private void createQueue(List<String> queueNames, String groupId, String semType, String level) throws IOException {
String nameForm = queue +"."+ groupId+"."+semType + "." + level;
logger.info("nameForm:{}",nameForm);
Channel channelForm = connectionFactory().createConnection().createChannel(false);
channelForm.queueDeclare(nameForm, true, false, false, null);
channelForm.exchangeDeclare(topicExchange, BuiltinExchangeType.TOPIC,true);
channelForm.queueBind(nameForm,topicExchange,routingKey + "."+groupId+"."+semType+"."+level);
queueNames.add(nameForm);
}
- 基于RabbitAdmin的方式:那么這個就相對來說比較靈活,支持隨時創建佇列了,那么簡單封裝下:
public void createMqQueue(String queueName,String exName,String rk,String type){ Properties properties = rabbitAdmin.getQueueProperties(queueName); if(properties==null) { Queue queue = new Queue(queueName, true, false, false, null); if(BuiltinExchangeType.DIRECT.getType().equals(type)) { DirectExchange directExchange = new DirectExchange(exName); rabbitAdmin.declareQueue(queue); rabbitAdmin.declareExchange(directExchange); rabbitAdmin.declareBinding(BindingBuilder.bind(queue).to(directExchange).with(rk)); }else if(BuiltinExchangeType.FANOUT.getType().equals(type)){ FanoutExchange fanoutExchange = new FanoutExchange(exName); rabbitAdmin.declareQueue(queue); rabbitAdmin.declareExchange(fanoutExchange); rabbitAdmin.declareBinding(BindingBuilder.bind(queue).to(fanoutExchange)); }else{ TopicExchange topicExchange = new TopicExchange(exName); rabbitAdmin.declareQueue(queue); rabbitAdmin.declareExchange(topicExchange); rabbitAdmin.declareBinding(BindingBuilder.bind(queue).to(topicExchange).with(rk)); } } }我們知道如何動態創建佇列之后,接下來我們想辦法解決動態消費監聽得事情就行:
動態消費監聽
RabbitMq得抽象監聽類是:AbstractMessageListenerContainer,他下面有三個實作類,這里就使用SimpleMessageListenerContainer類來進行簡單得說明,
方式一(我一個前輩得方式):
初始化佇列,存盤在靜態快取,用不同得bean來加載監聽:
private List<Map<String,String>> groupOrgIds = new ArrayList<Map<String,String>>()
@PostConstruct
public void init() {
if (logger.isDebugEnabled()) {
logger.debug("initbean...");
}
List<AutoAssignEngine> engineList = autoAssignEngineService.getAllAutoAssignEngine();
if (engineList != null && engineList.size() > 0) {
for(AutoAssignEngine engine : engineList) {
createQueueList(engine.getOrgId(),engine.getSemAdType(),"1");
createQueueList(engine.getOrgId(),engine.getSemAdType(),"2");
}
}
}
private void createQueueList(String orgId,String semType,String userLevel) {
Map<String,String> feed = new HashMap<String, String>();
feed.put("orgId", orgId);
feed.put("type", semType);
feed.put("userLevel", userLevel);
groupOrgIds.add(feed);
}
public SimpleMessageListenerContainer setContainer() {
SimpleMessageListenerContainer container = new SimpleMessageListenerContainer();
container.setConnectionFactory(rabbitConfig.connectionFactory());
container.setAcknowledgeMode(AcknowledgeMode.AUTO);
container.setMaxConcurrentConsumers(8);
container.setConcurrentConsumers(5);
container.setPrefetchCount(10);
return container;
}
public SimpleMessageListenerContainer queueMethod(SimpleMessageListenerContainer container) {
Map<String,String> orgIdMap = groupOrgIds.get(0);
String orgId = orgIdMap.get("orgId");
String sourceType = orgIdMap.get("type");
String userLevel = orgIdMap.get("userLevel");
String queueNames=queueName + "." + orgId+"."+sourceType+"."+userLevel;
container.addQueueNames(queueNames);
excute(orgId,sourceType, container,queueNames);
groupOrgIds.remove(0);
return container;
}
public SimpleMessageListenerContainer excute(String orgId,String semAdType, SimpleMessageListenerContainer container,String queneName) {
container.setMessageListener(new ChannelAwareMessageListener() {
@Override
public void onMessage(Message message, Channel channel) throws Exception {
}
});
return container;
}
/**
* 創建多個佇列監聽,利用Bean的初始化順序,去消費groupOrgIds
*/
@Bean
public SimpleMessageListenerContainer container1() {
SimpleMessageListenerContainer container = setContainer();
queueMethod(container);
return container;
}
@Bean
public SimpleMessageListenerContainer container2() {
SimpleMessageListenerContainer container = setContainer();
queueMethod(container);
return container;
}
.....
那么這種方式呢確實能動態監聽不同得佇列和消費,但是因為利用得是Bean得初始化得方式,所以每次變更需要加載得佇列內容就得重新加載Bean,也就是需要重啟服務,
方式二:真正得動態監聽
@Bean
public SimpleMessageListenerContainer simpleMessageListenerContainer() {
SimpleMessageListenerContainer container = new SimpleMessageListenerContainer();
container.setConnectionFactory(rabbitConfig.connectionFactory());
container.setAcknowledgeMode(AcknowledgeMode.AUTO);
container.setMaxConcurrentConsumers(8);
container.setConcurrentConsumers(5);
container.setPrefetchCount(10);
// 查詢有多少分配引擎,每個分配引擎一個佇列
List<AutoAssignEngine> engineList = autoAssignEngineService.getAllAutoAssignEngine();
if (engineList != null && engineList.size() > 0) {
for(AutoAssignEngine engine : engineList) {
mqService.addNewListener(engine.getOrgId(),engine.getSemAdType(),"1",container);
mqService.addNewListener(engine.getOrgId(),engine.getSemAdType(),"2",container);
}
}
return container;
}
public Boolean addNewListener(String orgId,String semType,String userLevel,SimpleMessageListenerContainer container ){
String queueNames=queueName + "." + orgId+"."+semType+"."+userLevel;
container.addQueueNames(queueNames);
container.setMessageListener(new ChannelAwareMessageListener() {
@Override
public void onMessage(Message message, Channel channel) throws Exception {
}
});
return true;
}
問題1:這里再接收訊息(onMessage方法內)得時候不要用方法傳參,會出現并發問題,
解決方式1:
String receiveQueueName = message.getMessageProperties().getConsumerQueue();
佇列名稱決議獲取,本人使用,
解決方式2:
使用final變數重新接收傳參,不過這個有待測驗,不一定又用,
問題2:這不是還是在Bean初始化得時候加載得嘛,如果想要在服務啟動之后再增加監聽如何處理
完整得動態創建佇列和監聽(業務程序種實作)
我們知道如何創建佇列和監聽之后就開始解決問題2,
需求:變更現有佇列,
轉化需求為:洗掉現有佇列和監聽,新建新得佇列并增加監聽
問題:推送和消費不再統一服務,
解決方式:暴露介面,利用http請求實作同步,
代碼實作:
消費端
public Boolean updateListener(String orgId,String semType,String oldOrg){
logger.info("================================消費端開始處理");
String newFirstQueueName = queueName+"."+orgId+"."+semType+"."+1;
String newFirstRk = routingKey+"."+orgId+"."+semType+"."+1;
String newSecondQueueName = queueName+"."+orgId+"."+semType+"."+2;
String newSecondRk = routingKey+"."+orgId+"."+semType+"."+2;
createMqQueue(newFirstQueueName,topicExchange,newFirstRk, BuiltinExchangeType.TOPIC.getType());
createMqQueue(newSecondQueueName,topicExchange,newSecondRk, BuiltinExchangeType.TOPIC.getType());
logger.info("================================創建佇列");
SimpleMessageListenerContainer container = SpringCtxUtils.getBean(SimpleMessageListenerContainer.class);
String oneQueueNames=queueName + "." + orgId+"."+semType+"."+1;
String twoQueueNames=queueName + "." + orgId+"."+semType+"."+2;
if(!"NO".equals(oldOrg)) {
String oneOldQueueNames = queueName + "." + oldOrg + "." + semType + "." + 1;
String twoOldQueueNames = queueName + "." + oldOrg + "." + semType + "." + 2;
container.removeQueueNames(oneOldQueueNames);
container.removeQueueNames(twoOldQueueNames);
logger.info("================================洗掉監聽成功");
}
container.addQueueNames(oneQueueNames);
container.addQueueNames(twoQueueNames);
logger.info("================================添加監聽成功");
container.setMessageListener(new ChannelAwareMessageListener() {
@Override
public void onMessage(Message message, Channel channel) throws Exception {
}
});
return true;
}
public void createMqQueue(String queueName,String exName,String rk,String type){
Properties properties = rabbitAdmin.getQueueProperties(queueName);
if(properties==null) {
Queue queue = new Queue(queueName, true, false, false, null);
if(BuiltinExchangeType.DIRECT.getType().equals(type)) {
DirectExchange directExchange = new DirectExchange(exName);
rabbitAdmin.declareQueue(queue);
rabbitAdmin.declareExchange(directExchange);
rabbitAdmin.declareBinding(BindingBuilder.bind(queue).to(directExchange).with(rk));
}else if(BuiltinExchangeType.FANOUT.getType().equals(type)){
FanoutExchange fanoutExchange = new FanoutExchange(exName);
rabbitAdmin.declareQueue(queue);
rabbitAdmin.declareExchange(fanoutExchange);
rabbitAdmin.declareBinding(BindingBuilder.bind(queue).to(fanoutExchange));
}else{
TopicExchange topicExchange = new TopicExchange(exName);
rabbitAdmin.declareQueue(queue);
rabbitAdmin.declareExchange(topicExchange);
rabbitAdmin.declareBinding(BindingBuilder.bind(queue).to(topicExchange).with(rk));
}
}
}
暴露介面:
@GetMapping("/add-listener/{orgId}/{semType}/{oldOrg}")
public ComResponse addListener(@PathVariable("orgId") String orgId,@PathVariable("semType") String semType,@PathVariable("oldOrg") String oldOrg){
mqService.updateListener(orgId,semType,oldOrg);
return ComResponse.successResponse();
}
注意:執行順序,創建新佇列,洗掉監聽,添加監聽
推送端
//添加新得監聽
String requestUrl = consumerUrl+"/"+newOrg+"/"+semType+"/"+oldOrgId;
String result = restTemplateService.getWithNoParams(requestUrl,String.class);
log.info("請求結束:{}",result);
if(!"NO".equals(oldOrgId)) {
String firstQueueName = queue + "." + oldOrgId + "." + semType + "." + 1;
String secondQueueName = queue + "." + oldOrgId + "." + semType + "." + 2;
mqService.deleteMqQueue(firstQueueName);
mqService.deleteMqQueue(secondQueueName);
log.info("洗掉佇列結束");
}
//新增新的的佇列
String newFirstQueueName = queue+"."+newOrg+"."+semType+"."+1;
String newFirstRk = routingKey+"."+newOrg+"."+semType+"."+1;
String newSecondQueueName = queue+"."+newOrg+"."+semType+"."+2;
String newSecondRk = routingKey+"."+newOrg+"."+semType+"."+2;
mqService.createMqQueue(newFirstQueueName,topicExchange,newFirstRk, BuiltinExchangeType.TOPIC.getType());
mqService.createMqQueue(newSecondQueueName,topicExchange,newSecondRk, BuiltinExchangeType.TOPIC.getType());
log.info("添加佇列結束");
注意:執行順序:變更監聽,洗掉佇列,添加新得佇列
到這里基本上就實作了動態創建佇列和動態監聽,大家如果有什么不太明白得可以留言,抽時間整理得,所以寫得比較草,大家湊合著看把,
轉載請註明出處,本文鏈接:https://www.uj5u.com/qita/431072.html
標籤:其他
上一篇:物體門店如何轉型?
