1.在網上看了一些解決這個問題的辦法,大部分朋友都說是要在實體化 DefaultMQProducer 的時候指定惟一的 instanceName 來解決,竊以為這樣雖然解決了問題,但卻是不應該用的解決辦法,為什么這樣說?因為官網介紹客戶端公共引數的時候對這個instanceName有明確的說明
| instanceName | DEFAULT | 客戶端實體名稱,客戶端創建的多個Producer、Consumer實際是共用一個內部實體(這個實體包含網路連接、執行緒資源等) |
所以,這個 instanceName 所標識的實體會同時創建自己的網路連接,執行緒資源,如果每次創建一個 Producer 都指定不同的 instanceName 這樣就會 浪費 更多資源 比如記憶體和執行緒,網路IO,還會降低訊息處理的效率,按照說明,應該是盡可能多個Producer共用一個instanceName 才合理,
2.另外,題目上的報錯,是因為 group 已被創建,為什么要用指定不同且唯一的 instanceName 來解決呢?不能因為這樣能解決就這樣解決,實際上,如果用 DefaultMQProducer 來實體 producer 則會把創建好的producer先放到一個 producerTable
ConcurrentMap<String/* group */, MQProducerInner> producerTable = new ConcurrentHashMap<String, MQProducerInner>();
中,代碼中的方法是
boolean registerOK = mQClientFactory.registerProducer(this.defaultMQProducer.getProducerGroup(), this);
這個方法里 關鍵地方是
MQProducerInner prev = this.producerTable.putIfAbsent(group, producer);
在添加的時候如果發以group為鍵的producer已存在,則注冊失敗,這里的鍵是group所以,當我們已經創建了同group的producer時,如果這個 producer沒有shutdown,則再次以同樣的group創建producer的時候就會報題目中的錯誤,
而shutdown之后之所以不報錯是因為,shutdown這個方法本身呼叫 的是 unregisterProducer(String group) 在類 MQClientInstance 中,這個方法是包含從 producerTable 中把已添加的producer先移除,然后再shutdown的,具體代碼是下面這樣的
this.mQClientFactory.unregisterProducer(this.defaultMQProducer.getProducerGroup());
this.defaultAsyncSenderExecutor.shutdown();
所以先 呼叫 DefaultMQProducer shutdown 之后再創建新的同group的producer是不會報錯的,
3.我們再看為什么每次用 DefaultMQProducer 來創建 producer的時候如果 都設定不同的instanceName為什么也不會報錯呢?這是因為如果設定的instanceName是唯一的,則在注冊producer之前,如果設定的group 不是默認的,則每次 獲取的mQClientFactory 都是不同的,而 producerTable 是 mQClientFactory類里的一個屬性,這樣當然producerTable也是不同的,這樣注冊producer當然是注冊到不同的producerTable中去了,所以不會報錯,
if (!this.defaultMQProducer.getProducerGroup().equals(MixAll.CLIENT_INNER_PRODUCER_GROUP)) {
this.defaultMQProducer.changeInstanceNameToPID();
}
this.mQClientFactory = MQClientManager.getInstance().getOrCreateMQClientInstance(this.defaultMQProducer, rpcHook);
但是,這種解決辦法是不可取的,因為instanceName是一個比較重(隔離資料多,創建耗時長,消費資源多)的引數,
4.那么我們怎么更好的解決這個問題呢?我們可以參考原始碼中 logappender 模塊中 的 ProducerInstance 類來實作,這個類在原始碼中位于 org.apache.rocketmq.logappender.common 下面
下面是這個類的原始碼
public class ProducerInstance {
public static final String APPENDER_TYPE = "APPENDER_TYPE";
public static final String LOG4J_APPENDER = "LOG4J_APPENDER";
public static final String LOG4J2_APPENDER = "LOG4J2_APPENDER";
public static final String LOGBACK_APPENDER = "LOGBACK_APPENDER";
public static final String DEFAULT_GROUP = "rocketmq_appender";
private ConcurrentHashMap<String, MQProducer> producerMap = new ConcurrentHashMap<String, MQProducer>();
private static ProducerInstance instance = new ProducerInstance();
public static ProducerInstance getProducerInstance() {
return instance;
}
/**
根據 nameServerAddress 和 group 生成 producer 在 producerMap 中的鍵
**/
private String genKey(String nameServerAddress, String group) {
return nameServerAddress + "_" + group;
}
/**
根據 nameServerAddress 和 group 獲取已注冊到producerMap中的producer,如果不存在,則呼叫 DefaultMQProducer 生成新的producer注冊并回傳
**/
public MQProducer getInstance(String nameServerAddress, String group) throws MQClientException {
if (StringUtils.isBlank(group)) {
group = DEFAULT_GROUP;
}
String genKey = genKey(nameServerAddress, group);
MQProducer p = getProducerInstance().producerMap.get(genKey);
if (p != null) {
return p;
}
DefaultMQProducer defaultMQProducer = new DefaultMQProducer(group);
defaultMQProducer.setNamesrvAddr(nameServerAddress);
MQProducer beforeProducer = null;
beforeProducer = getProducerInstance().producerMap.putIfAbsent(genKey, defaultMQProducer);
if (beforeProducer != null) {
return beforeProducer;
}
defaultMQProducer.start();
return defaultMQProducer;
}
/**
根據 nameServerAddress 和 group 移除已注冊到producerMap中的producer,同時shutdown
**/
public void removeAndClose(String nameServerAddress, String group) {
if (group == null) {
group = DEFAULT_GROUP;
}
String genKey = genKey(nameServerAddress, group);
MQProducer producer = getProducerInstance().producerMap.remove(genKey);
if (producer != null) {
producer.shutdown();
}
}
/**
移除 producerMap 中所有的 producer 并全部關閉,
**/
public void closeAll() {
Set<Map.Entry<String, MQProducer>> entries = getProducerInstance().producerMap.entrySet();
for (Map.Entry<String, MQProducer> entry : entries) {
getProducerInstance().producerMap.remove(entry.getKey());
entry.getValue().shutdown();
}
}
}
可以把這個類直接復制到要使用的專案中,然后在要使用指定 nameServerAddress 和 group 的 producer 時,直接用下面的方法獲取一個,
MQProducer producer = ProducerInstance.getProducerInstance().getInstance("localhost:9876", "test-group");
/*
自己生成message訊息,然后下面發送
*/
producer.send(message);
/*
如果是比較頻繁使用的producer,發送完訊息后不用關閉和移除下次再用的時候可以直接再獲取拿來就可以發送訊息,
對于確定要隔比較長時間不用的producer,可以用下面的方法 移除并關閉
*/
ProducerInstance.getProducerInstance().removeAndClose("localhost:9876", "test-group");
我們會發現,這個類獲取 producer 實體的時候只用了 nameServerAddress 和 group 這兩個引數,如果我們確實需要操作不同的 instanceName 下的 producer 時,該怎么辦呢?直接改造 這個類里的方法,添加上
instanceName 引數 即可,
加引數后的類如下,使用方式沒什么差別只是多了個引數而已,
public class ProducerInstance {
public static final String APPENDER_TYPE = "APPENDER_TYPE";
public static final String LOG4J_APPENDER = "LOG4J_APPENDER";
public static final String LOG4J2_APPENDER = "LOG4J2_APPENDER";
public static final String LOGBACK_APPENDER = "LOGBACK_APPENDER";
public static final String DEFAULT_GROUP = "rocketmq_appender";
private ConcurrentHashMap<String, MQProducer> producerMap = new ConcurrentHashMap<String, MQProducer>();
private static ProducerInstance instance = new ProducerInstance();
public static ProducerInstance getProducerInstance() {
return instance;
}
private String genKey(String nameServerAddress, String group,String instanceName) {
return nameServerAddress + "_" + group + "_" + instanceName;
}
public MQProducer getInstance(String nameServerAddress, String group,String instanceName) throws MQClientException {
if (StringUtils.isBlank(group)) {
group = DEFAULT_GROUP;
}
if (StringUtils.isBlank(instanceName)) {
instanceName = "DEFAULT";
}
String genKey = genKey(nameServerAddress, group, instanceName);
MQProducer p = getProducerInstance().producerMap.get(genKey);
if (p != null) {
return p;
}
DefaultMQProducer defaultMQProducer = new DefaultMQProducer(group);
defaultMQProducer.setNamesrvAddr(nameServerAddress);
defaultMQProducer.setInstanceName(instanceName);
MQProducer beforeProducer = null;
beforeProducer = getProducerInstance().producerMap.putIfAbsent(genKey, defaultMQProducer);
if (beforeProducer != null) {
return beforeProducer;
}
defaultMQProducer.start();
return defaultMQProducer;
}
public void removeAndClose(String nameServerAddress, String group, String instanceName) {
if (group == null) {
group = DEFAULT_GROUP;
}
if (StringUtils.isBlank(instanceName)) {
instanceName = "DEFAULT";
}
String genKey = genKey(nameServerAddress, group,instanceName);
MQProducer producer = getProducerInstance().producerMap.remove(genKey);
if (producer != null) {
producer.shutdown();
}
}
public void closeAll() {
Set<Map.Entry<String, MQProducer>> entries = getProducerInstance().producerMap.entrySet();
for (Map.Entry<String, MQProducer> entry : entries) {
getProducerInstance().producerMap.remove(entry.getKey());
entry.getValue().shutdown();
}
}
}
轉載請註明出處,本文鏈接:https://www.uj5u.com/qita/232665.html
標籤:其他
上一篇:溪源的Java筆記—訊息佇列
