Rocketmq有個 事務訊息的功能, 按照官方的例子 unknown訊息 卻沒能觸發listener的 checkLocalTransaction方法
使用的版本從4.4 4.5 都試了 ,未能奏效 ,
用的是官方的example里面的 transaction包里的例子:
public class TransactionProducer {
public static void main(String[] args) throws MQClientException, InterruptedException {
TransactionListener transactionListener = new TransactionListenerImpl();
TransactionMQProducer producer = new TransactionMQProducer("MyTopic");
ExecutorService executorService = new ThreadPoolExecutor(2, 5, 100, TimeUnit.SECONDS, new ArrayBlockingQueue<Runnable>(2000), new ThreadFactory() {
@Override
public Thread newThread(Runnable r) {
Thread thread = new Thread(r);
thread.setName("client-transaction-msg-check-thread");
return thread;
}
});
producer.setNamesrvAddr("127.0.0.1:9876");
producer.setExecutorService(executorService);
producer.setTransactionListener(transactionListener);
producer.start();
// String[] tags = new String[] {"TagA", "TagB", "TagC", "TagD", "TagE"};
String[] tags = new String[] {"TagA"};
for (int i = 0; i < 1; i++) {
try {
Message msg =
new Message("TopicTest1234", tags[i % tags.length], "KEY" + i,
("Hello RocketMQ " + i).getBytes(RemotingHelper.DEFAULT_CHARSET));
SendResult sendResult = producer.sendMessageInTransaction(msg, null);
System.out.printf("%s%n", sendResult);
Thread.sleep(10);
} catch (MQClientException | UnsupportedEncodingException e) {
e.printStackTrace();
}
}
for (int i = 0; i < 100000; i++) {
Thread.sleep(1000);
}
producer.shutdown();
}
}
public class TransactionListenerImpl implements TransactionListener {
private AtomicInteger transactionIndex = new AtomicInteger(0);
private ConcurrentHashMap<String, Integer> localTrans = new ConcurrentHashMap<>();
@Override
public LocalTransactionState executeLocalTransaction(Message msg, Object arg) {
int value = transactionIndex.getAndIncrement();
int status = value % 3;
localTrans.put(msg.getTransactionId(), status);
return LocalTransactionState.UNKNOW;
}
@Override
public LocalTransactionState checkLocalTransaction(MessageExt msg) {
System.out.println("========================== 回查 msg " + msg.getMsgId() + "=====================");
Integer status = localTrans.get(msg.getTransactionId());
if (null != status) {
switch (status) {
case 0:
return LocalTransactionState.UNKNOW;
case 1:
return LocalTransactionState.COMMIT_MESSAGE;
case 2:
return LocalTransactionState.ROLLBACK_MESSAGE;
default:
return LocalTransactionState.COMMIT_MESSAGE;
}
}
return LocalTransactionState.COMMIT_MESSAGE;
}
}
啟動這個TransactionProducer , 他會發一條訊息, 在listener中故意 在executeLocalTransaction() 方法里
return LocalTransactionState.UNKNOW;
但是一直等checkLocalTransaction() 重查 狀態方法的呼叫,卻一直未進入此方法 , 那這個重查機制就沒能見識到,
有高手能釋疑下嗎
轉載請註明出處,本文鏈接:https://www.uj5u.com/houduan/236171.html
標籤:Java相關
上一篇:java解答題
下一篇:說兩個特普遍,也特爛的架構
