先看下圖

以上圖例展示了mq事務訊息解決分布式事務的producer環節,consumer正常消費即可,
show your code
根據以上流程我們可以用rocketmq很簡單的實作如下代碼,為了減少部分業務代碼入侵做了一點點封裝;
以下專案基于springboot2.1.3,此處引入jdbc,大家需要注意,可以和mybatis、mybatis-plus共用事務管理器(想了解jdbc與mybatis如何共用事務管理器,自行百度),假如你是jpa 或者habinate 你就不能這樣封裝,
<dependency>
<groupId>org.springframework.boot</groupId>
<artifactId>spring-boot-starter-aop</artifactId>
</dependency>
<dependency>
<groupId>org.springframework</groupId>
<artifactId>spring-jdbc</artifactId>
</dependency>
<dependency>
<groupId>org.apache.rocketmq</groupId>
<artifactId>rocketmq-spring-boot-starter</artifactId>
<version>2.1.1</version>
</dependency>
需在你的業務庫插入如下表記錄事務日志,
CREATE TABLE `transaction_log` (
`id` int(20) NOT NULL AUTO_INCREMENT,
`trx_id` varchar(128) NOT NULL,
PRIMARY KEY (`id`),
UNIQUE KEY `idx_transaction_id` (`trx_id`) USING BTREE
) ENGINE=InnoDB AUTO_INCREMENT=1 DEFAULT CHARSET=utf8mb4;
由于我們使用rocketmq-spring-boot-starter這個自動配置,所以我們可以直接使用RocketMQTemplate來發送訊息,非常方便
demo中配置
rocketmq.name-server=127.0.0.1:9876
rocketmq.producer.group=tx_group
rocketmq.producer.send-message-timeout=3000
package com.xxx.fw.rocketmq.trx.core;
import com.xxx.fw.rocketmq.trx.config.TrxContextHolder;
import javax.annotation.Resource;
import org.apache.rocketmq.spring.core.RocketMQLocalTransactionListener;
import org.apache.rocketmq.spring.core.RocketMQLocalTransactionState;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.springframework.messaging.Message;
/**
* @Description 抽象類讓業務繼承此類,并實作doBusiness方法,實作不同的業務定制
* @Author 姚仲杰
* * @Date 2020/12/21 20:51
*/
public abstract class AbstractTransactionListener implements RocketMQLocalTransactionListener {
private final static Logger LOGGER= LoggerFactory.getLogger(AbstractTransactionListener.class);
@Resource
TransactionLogService transactionLogService;
public abstract void doBusiness(Object o);
/**這個方法中執行本地事務
* @param message 已發送到mq的事務訊息
* @param o 要保存到庫的物件
* @return
*/
@Override
public RocketMQLocalTransactionState executeLocalTransaction(Message message, Object o) {
RocketMQLocalTransactionState state;
TrxContextHolder.setTrxId(message.getHeaders().getId().toString());
try{
LOGGER.info("執行業務邏輯,trx_id:[{}]",message.getHeaders().getId());
doBusiness(o);
state=RocketMQLocalTransactionState.COMMIT;
LOGGER.info("執行業務邏輯---[COMMIT]");
}catch(Exception ex){
LOGGER.info("訊息事務回滾[ROLLBACK] {}",ex);
state=RocketMQLocalTransactionState.ROLLBACK;
}
return state;
}
/**此方法提供統一回查
* @param message 回查的訊息資料
* @return
*/
@Override
public RocketMQLocalTransactionState checkLocalTransaction(Message message) {
RocketMQLocalTransactionState state=RocketMQLocalTransactionState.UNKNOWN;
if (transactionLogService.query(message.getHeaders().getId().toString())>0){
LOGGER.info("commit msg trx_id:{}",message.getHeaders().getId());
state=RocketMQLocalTransactionState.COMMIT;
}else{
LOGGER.info("LocalTransaction review [UNKNOWN] will try again");
state=RocketMQLocalTransactionState.UNKNOWN;
}
return state;
}
}
自定義注解
package com.xxx.fw.rocketmq.trx.config;
import java.lang.annotation.ElementType;
import java.lang.annotation.Inherited;
import java.lang.annotation.Retention;
import java.lang.annotation.RetentionPolicy;
import java.lang.annotation.Target;
/**
* @Description TODO
* @Author 姚仲杰
* @Date 2020/12/28 11:28
*/
@Target(ElementType.METHOD)
@Retention(RetentionPolicy.RUNTIME)
@Inherited
public @interface MqTrx {
}
切面處理
package com.xxx.fw.rocketmq.trx.aspect;
import com.xxx.fw.rocketmq.trx.config.MqTrx;
import com.xxx.fw.rocketmq.trx.config.TrxContextHolder;
import com.xxx.fw.rocketmq.trx.core.TransactionLogService;
import javax.annotation.Resource;
import org.aspectj.lang.JoinPoint;
import org.aspectj.lang.annotation.After;
import org.aspectj.lang.annotation.Aspect;
import org.aspectj.lang.annotation.Pointcut;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.springframework.core.annotation.Order;
import org.springframework.stereotype.Component;
import org.springframework.transaction.annotation.EnableTransactionManagement;
/**
* @Description TODO
* @Author 姚仲杰
* @Date 2020/12/28 11:29
*/
//此處需介紹下注解順序,當前注解需包在事務注解中執行,注解先進后出所以讓事務注解order為0 本注解順序為1 則先進事務注解在進本注解
@EnableTransactionManagement(order = 0)
@Order(1)
@Component
@Aspect
public class TrxAspect {
public static final Logger LOGGER= LoggerFactory.getLogger(TrxAspect.class);
@Resource
TransactionLogService transactionLogService;
@Pointcut("@annotation(com.xxx.fw.rocketmq.trx.config.MqTrx)")
public void pointcut(){};
@After("pointcut() && @annotation(mqTrx)")
public void after(JoinPoint joinPoint, MqTrx mqTrx){
try {
String id = TrxContextHolder.getTrxId();
transactionLogService.insert(id);
LOGGER.info("事務訊息日志入庫成功,trx_id:[{}]", id);
}finally {
TrxContextHolder.remove();
}
}
}
mq事務背景關系管理
package com.xxx.fw.rocketmq.trx.config;
import java.util.HashMap;
import java.util.Map;
/**
* @Description TODO
* @Author 姚仲杰
* @Date 2020/12/28 11:42
*/
public class TrxContext {
private ThreadLocal<Map<String,String>> threadLocal=new ThreadLocal<Map<String,String>>(){
@Override
protected Map<String, String> initialValue() {
return new HashMap<String, String>();
}
};
public String put(String key, String value) {
return threadLocal.get().put(key, value);
}
public String get(String key) {
return threadLocal.get().get(key);
}
public String remove(String key) {
return threadLocal.get().remove(key);
}
public Map<String, String> entries() {
return threadLocal.get();
}
}
package com.xxx.fw.rocketmq.trx.config;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
/**
* @Description 此類用于傳遞事務id到切面中,即發送完成后執行本地事務前,將trx_id放入本地執行緒,然后通過切面把事務id寫入到事務日志表
* @Author 姚仲杰
* @Date 2020/12/28 11:46
*/
public class TrxContextHolder {
private static final Logger LOGGER = LoggerFactory.getLogger(TrxContextHolder.class);
public static final TrxContext TRX_CONTEXT_HOLDER=new TrxContext();
public static final String TRX_ID="TRX_ID";
public static String getTrxId(){
return TRX_CONTEXT_HOLDER.get(TRX_ID);
}
public static void setTrxId(String trxId){
if (LOGGER.isDebugEnabled()) {
LOGGER.debug("set trx_id:[{}]", trxId);
}
TRX_CONTEXT_HOLDER.put(TRX_ID, trxId);
}
public static String remove() {
String trxId = TRX_CONTEXT_HOLDER.remove(TRX_ID);
if (LOGGER.isDebugEnabled()) {
LOGGER.debug("remove trx_id:[{}] ", trxId);
}
return trxId;
}
}
package com.xxx.fw.rocketmq.trx.core;
import javax.annotation.Resource;
import org.springframework.jdbc.core.JdbcTemplate;
import org.springframework.stereotype.Component;
import org.springframework.util.StringUtils;
/**
* @Description 這里使用jdbcTemplate內置日志入庫和查詢操作讓業務開發無需關注這些代碼只需要添加一個注解即可
* @Author 姚仲杰
* @Date 2020/12/28 19:32
*/
@Component
public class TransactionLogService {
private final static String INSERT_TRX_LOG="insert into transaction_log (trx_id)value('%s')";
private final static String CHECK_TRX_LOG="select count(*) from transaction_log where trx_id='%s'";
@Resource
JdbcTemplate jdbcTemplate;
public int insert(String trxId){
if (StringUtils.isEmpty(trxId)){
throw new IllegalArgumentException("trxId must not be null");
}
int update = jdbcTemplate.update(String.format(INSERT_TRX_LOG, trxId));
return update;
}
public int query(String trxId){
if (StringUtils.isEmpty(trxId)){
throw new IllegalArgumentException("trxId must not be null");
}
int count = jdbcTemplate
.queryForObject(String.format(CHECK_TRX_LOG, trxId), int.class);
return count;
}
}
提供封裝的發送服務以及工具類
package com.xxx.fw.rocketmq.trx.core;
import com.xxx.fw.rocketmq.trx.util.TagsUtil;
import javax.annotation.Resource;
import org.apache.rocketmq.client.producer.TransactionSendResult;
import org.apache.rocketmq.spring.core.RocketMQTemplate;
import org.springframework.messaging.Message;
import org.springframework.messaging.support.MessageBuilder;
import org.springframework.stereotype.Component;
/**
* @Description 為了拼接tag實作不同業務
* @Author 姚仲杰
* @Date 2020/12/28 20:21
*/
@Component
public class TrxService {
@Resource
RocketMQTemplate rocketMQTemplate;
/**
* @param msg 發送給訊息佇列得訊息
* @param tag 訊息佇列得tag,組裝后形成trx_topic:tag
* @param o 要入庫得資料
* @return
*/
public TransactionSendResult send(String msg,String tag,Object o){
Message message = MessageBuilder.withPayload(msg).build();
TransactionSendResult result = rocketMQTemplate
.sendMessageInTransaction(TagsUtil.bindTag(tag), message, o);
return result;
}
}
工具類
package com.xxx.fw.rocketmq.trx.util;
import org.springframework.util.StringUtils;
/**
* @Description 拼接tag
* @Author 姚仲杰
* @Date 2020/12/28 19:59
*/
public class TagsUtil {
public static final String TRX_TOPIC="trx_topic:";
public static String bindTag(String tag){
if (StringUtils.isEmpty(tag)){
throw new IllegalArgumentException("trx_tag must not be empty");
}
return TRX_TOPIC+tag;
}
}
轉載請註明出處,本文鏈接:https://www.uj5u.com/ruanti/242371.html
標籤:其他
上一篇:溪源的Java筆記—微服務
