使用java代碼操作rabbitmq時,首先需要一個有創建用戶等權限的管理員賬號,需要在rabbitmq的后臺管理頁面手動創建這個賬號,系統推薦的這幾個tag可以讓賬號有rabbitmq后臺管理頁面的訪問權限

圖一
管理賬號創建完成后就可以在代碼中操作新增編輯mq賬號及vhost等等了,點擊rabbitmq后臺管理頁面左下角的HTTP API(見上文圖一左下角)可以查看所有API介面
以下代碼中,rabbitmqUsername 為管理員賬號的用戶名,rabbitmqPassword為管理員賬號的密碼,rabbitmqUrl為rabbitmq服務器介面地址(例:http://127.0.0.1:15672/api/)
1. 新增用戶或修改用戶密碼
1.1 API

1.2 代碼示例
以下代碼中,yourUsername為新增賬號的用戶名,yourPassword為新增賬號的密碼,guest為新增賬號的tag,可以自定義,也可以使用rabbitmq提供的tag(見上文圖一),該介面也可以用來修改已有賬號的密碼
//add user String enc = new String( Base64.encodeBase64((rabbitmqUsername + ":" + rabbitmqPassword).getBytes() ) ); HttpPut putCriaUsuario = new HttpPut( rabbitmqUrl+"users/"+yourUserName ); // RabbitMQ requires a user with create permission, create it mannually first putCriaUsuario.addHeader( "Authorization", "Basic " + enc ); putCriaUsuario.addHeader( "content-type", "application/json" ); putCriaUsuario.setEntity( new StringEntity( "{\"password\":\""+yourPassword+"\",\"tags\":\"guest\"}" ) ); CloseableHttpClient client = HttpClients.createDefault(); client.execute( putCriaUsuario );
2. 新增vhost
2.1 API
2.2 代碼示例
以下代碼中 yourVhost 為新增vhost的名稱,guest為自定義的tag
// 管理員賬號用戶名密碼 String enc = new String( Base64.encodeBase64((rabbitmqUsername + ":" + rabbitmqPassword).getBytes() ) ); //add vhost HttpPut putVhost = new HttpPut( rabbitmqUrl+"vhosts/"+yourVhost ); putVhost.addHeader( "Authorization", "Basic " + enc ); putVhost.addHeader( "content-type", "application/json" ); putVhost.setEntity( new StringEntity( "{\"tags\":\"guest\"}" ) ); CloseableHttpClient putVhostClient = HttpClients.createDefault(); putVhostClient.execute( putVhost );
3. mq用戶系結vhost并設定權限
3.1 API

3.2 代碼示例
以下代碼中,yourVhost 與yourUsername為系結的mq用戶與vhost的名稱,代碼示例中該用戶對該vhost只開啟了read權限,如果需要開啟全部的configure(配置),write(寫入),read(讀取)權限,引數需要寫成:
"{\"configure\":\".*\",\"write\":\".*\",\"read\":\".*\"}"
//管理員賬號用戶名密碼 String enc = new String( Base64.encodeBase64((rabbitmqUsername + ":" + rabbitmqPassword).getBytes() ) ); //add permissions and bind user&vhost HttpPut putPermissions = new HttpPut( rabbitmqUrl+"permissions/"+yourVhost+"/"+yourUsername); putPermissions.addHeader( "Authorization", "Basic " + enc ); putPermissions.addHeader( "content-type", "application/json" ); putPermissions.setEntity( new StringEntity( "{\"configure\":\"\",\"write\":\"\",\"read\":\".*\"}" ) ); CloseableHttpClient putPermissionsClient = HttpClients.createDefault(); putPermissionsClient.execute( putPermissions );
4. 動態創建exchange交換機和queue佇列,并系結指定vhost虛擬機
//add exchange, queue and bind vhost RabbitModuleInfo rabbitModuleInfo = new RabbitModuleInfo(); rabbitModuleInfo.setVhost(vhost); RabbitModuleInfo.Queue queue = new RabbitModuleInfo.Queue(); Map<String, Object> arguments = new HashMap<>(); //訊息過期時間 arguments.put("x-message-ttl",3600000); queue.setName(queueName); queue.setArguments(arguments); rabbitModuleInfo.setQueue(queue); RabbitModuleInfo.Exchange exchange = new RabbitModuleInfo.Exchange(); exchange.setName(exchangeName); rabbitModuleInfo.setExchange(exchange); rabbitModuleInfo.setRoutingKey(queueName); rabbitModuleInitializer.declareRabbitModule(rabbitModuleInfo);
/** * RabbitMQ佇列初始化器 */ public class RabbitModuleInitializer{ private AmqpAdmin amqpAdmin; private RealtimePushProducer realtimePushProducer; public RabbitModuleInitializer(AmqpAdmin amqpAdmin,RealtimePushProducer realtimePushProducer) { this.amqpAdmin = amqpAdmin; this.realtimePushProducer = realtimePushProducer; } /** * RabbitMQ 根據配置動態創建和系結佇列、交換機 */ public void declareRabbitModule(RabbitModuleInfo rabbitModuleInfo) { configParamValidate(rabbitModuleInfo); // 佇列 Queue queue = convertQueue(rabbitModuleInfo.getQueue()); // 交換機 Exchange exchange = convertExchange(rabbitModuleInfo.getExchange()); // 系結關系 String routingKey = rabbitModuleInfo.getRoutingKey(); String queueName = rabbitModuleInfo.getQueue().getName(); String exchangeName = rabbitModuleInfo.getExchange().getName(); Binding binding = new Binding(queueName, Binding.DestinationType.QUEUE, exchangeName, routingKey, null); realtimePushProducer.bindVhostExchangeQueue(rabbitModuleInfo.getVhost(),exchange,queue,binding); } /** * RabbitMQ動態配置引數校驗 * * @param rabbitModuleInfo */ public void configParamValidate(RabbitModuleInfo rabbitModuleInfo) { String routingKey = rabbitModuleInfo.getRoutingKey(); Assert.isTrue(StrUtil.isNotBlank(routingKey), "RoutingKey 未配置"); Assert.isTrue(rabbitModuleInfo.getExchange() != null, "routingKey:{}未配置exchange", routingKey); Assert.isTrue(StrUtil.isNotBlank(rabbitModuleInfo.getExchange().getName()), "routingKey:{}未配置exchange的name屬性", routingKey); Assert.isTrue(rabbitModuleInfo.getQueue() != null, "routingKey:{}未配置queue", routingKey); Assert.isTrue(StrUtil.isNotBlank(rabbitModuleInfo.getQueue().getName()), "routingKey:{}未配置exchange的name屬性", routingKey); } /** * 轉換生成RabbitMQ佇列 * * @param queue * @return */ public Queue convertQueue(RabbitModuleInfo.Queue queue) { Map<String, Object> arguments = queue.getArguments(); // 轉換ttl的型別為long if (arguments != null && arguments.containsKey("x-message-ttl")) { arguments.put("x-message-ttl", Convert.toLong(arguments.get("x-message-ttl"))); } // 是否需要系結死信佇列 String deadLetterExchange = queue.getDeadLetterExchange(); String deadLetterRoutingKey = queue.getDeadLetterRoutingKey(); if (StrUtil.isNotBlank(deadLetterExchange) && StrUtil.isNotBlank(deadLetterRoutingKey)) { if (arguments == null) { arguments = new HashMap<>(4); } arguments.put("x-dead-letter-exchange", deadLetterExchange); arguments.put("x-dead-letter-routing-key", deadLetterRoutingKey); } return new Queue(queue.getName(), queue.isDurable(), queue.isExclusive(), queue.isAutoDelete(), arguments); } /** * 轉換生成RabbitMQ交換機 * * @param exchangeInfo * @return */ public Exchange convertExchange(RabbitModuleInfo.Exchange exchangeInfo) { AbstractExchange exchange = null; RabbitExchangeTypeEnum exchangeType = exchangeInfo.getType(); String exchangeName = exchangeInfo.getName(); boolean isDurable = exchangeInfo.isDurable(); boolean isAutoDelete = exchangeInfo.isAutoDelete(); Map<String, Object> arguments = exchangeInfo.getArguments(); switch (exchangeType) { case DIRECT:// 直連交換機 exchange = new DirectExchange(exchangeName, isDurable, isAutoDelete, arguments); break; case TOPIC: // 主題交換機 exchange = new TopicExchange(exchangeName, isDurable, isAutoDelete, arguments); break; case FANOUT: //扇形交換機 exchange = new FanoutExchange(exchangeName, isDurable, isAutoDelete, arguments); break; case HEADERS: // 頭交換機 exchange = new HeadersExchange(exchangeName, isDurable, isAutoDelete, arguments); break; } return exchange; } }
/** * 根據配置動態創建和系結佇列、交換機 * @param vhost * @param exchange * @param queue * @param binding */ @Override public void bindVhostExchangeQueue(String vhost, Exchange exchange, Queue queue, Binding binding) { ConnectionFactory factory = queueConfig.pushConnectionFactory(rabbitProperties,vhost); RabbitAdmin rabbitAdmin = new RabbitAdmin(factory); log.debug("bind vhost={},exchange={},queue={}",vhost,exchange.getName(),queue.getName()); // 創建佇列 rabbitAdmin.declareQueue(queue); // 創建交換機 rabbitAdmin.declareExchange(exchange); // 佇列 系結 交換機 rabbitAdmin.declareBinding(binding); }
/** * 生成指定vhost的ConnectionFactory * @param rabbitProperties * @param vhost * @return */ public ConnectionFactory pushConnectionFactory(RabbitProperties rabbitProperties, String vhost) { CachingConnectionFactory cachingConnectionFactory = new CachingConnectionFactory(); cachingConnectionFactory.setHost(rabbitProperties.getHost()); cachingConnectionFactory.setPort(rabbitProperties.getPort()); cachingConnectionFactory.setUsername(rabbitProperties.getUsername()); cachingConnectionFactory.setPassword(rabbitProperties.getPassword()); cachingConnectionFactory.setVirtualHost(vhost); return cachingConnectionFactory; }
轉載請註明出處,本文鏈接:https://www.uj5u.com/houduan/541255.html
標籤:其他
上一篇:Dubbo 3 埠協議復用
下一篇:各個作業系統的除錯方法
