分布式鎖是在分布式環境下(多個JVM行程)控制多個客戶端對某一資源的同步訪問的一種實作,與之相對應的是執行緒鎖,執行緒鎖控制的是同一個JVM行程內多個執行緒之間的同步,分布式鎖的一般實作方法是在應用服務器之外通過一個共享的存盤服務器存盤鎖資源,同一時刻只有一個客戶端能占有鎖資源來完成,通常有基于Zookeeper,Redis,或資料庫三種實作形式,本文介紹基于Redis的實作方案,
要求
基于Redis實作分布式鎖需要滿足如下幾點要求:
- 在分布式集群中,被分布式鎖控制的方法或代碼段同一時刻只能被一個客戶端上面的一個執行緒執行,也就是互斥
- 鎖資訊需要設定過期時間,避免一個執行緒長期占有(比如在做解鎖操作前例外退出)而導致死鎖
- 加鎖與解鎖必須一致,誰加的鎖,就由誰來解(或過期超時),一個客戶端不能解開另一個客戶端加的鎖
- 加鎖與解鎖的程序必須保證原子性
實作
1. 加鎖實作
基于Redis的分布式鎖加鎖操作一般使用 SETNX 命令,其含義是“將 key 的值設為 value ,當且僅當 key 不存在,若給定的 key 已經存在,則 SETNX 不做任何動作”,
在 Spring Boot 中,可以使用 StringRedisTemplate 來實作,如下,一行代碼即可實作加鎖程序,(下列代碼給出兩種呼叫形式——立即回傳加鎖結果與給定超時時間獲取加鎖結果)
/**
* 嘗試獲取鎖(立即回傳)
* @param key 鎖的redis key
* @param value 鎖的value
* @param expire 過期時間/秒
* @return 是否獲取成功
*/
public boolean lock(String key, String value, long expire) {
return stringRedisTemplate.opsForValue().setIfAbsent(key, value, expire, TimeUnit.SECONDS);
}
/**
* 嘗試獲取鎖,并至多等待timeout時長
*
* @param key 鎖的redis key
* @param value 鎖的value
* @param expire 過期時間/秒
* @param timeout 超時時長
* @param unit 時間單位
* @return 是否獲取成功
*/
public boolean lock(String key, String value, long expire, long timeout, TimeUnit unit) {
long waitMillis = unit.toMillis(timeout);
long waitAlready = 0;
while (!stringRedisTemplate.opsForValue().setIfAbsent(key, value, expire, TimeUnit.SECONDS) && waitAlready < waitMillis) {
try {
Thread.sleep(waitMillisPer);
} catch (InterruptedException e) {
log.error("Interrupted when trying to get a lock. key: {}", key, e);
}
waitAlready += waitMillisPer;
}
if (waitAlready < waitMillis) {
return true;
}
log.warn("<====== lock {} failed after waiting for {} ms", key, waitAlready);
return false;
}
上述實作如何滿足前面提到的幾點要求:
- 客戶端互斥: 可以將expire過期時間設定為大于同步代碼的執行時間,比如同步代碼塊執行時間為1s,則可將expire設定為3s或5s,避免同步代碼執行程序中expire時間到,其它客戶端又可以獲取鎖執行同步代碼塊,
- 通過設定過期時間expire來避免某個客戶端長期占有鎖,
- 通過value來控制誰加的鎖,由誰解的邏輯,比如可以使用requestId作為value,requestId唯一標記一次請求,
- setIfAbsent方法 底層通過呼叫 Redis 的
SETNX命令,操作具備原子性,
錯誤示例:
網上有如下實作,
public boolean lock(String key, String value, long expire) {
boolean result = stringRedisTemplate.opsForValue().setIfAbsent(key, value);
if(result) {
stringRedisTemplate.expire(key, expire, TimeUnit.SECONDS);
}
return result;
}
該實作的問題是如果在result為true,但還沒成功設定expire時,程式例外退出了,將導致該鎖一直被占用而導致死鎖,不滿足第二點要求,
2. 解鎖實作
解鎖也需要滿足前面所述的四個要求,實作代碼如下:
private static final String RELEASE_LOCK_LUA_SCRIPT = "if redis.call('get', KEYS[1]) == ARGV[1] then return redis.call('del', KEYS[1]) else return 0 end";
private static final Long RELEASE_LOCK_SUCCESS_RESULT = 1L;
/**
* 釋放鎖
* @param key 鎖的redis key
* @param value 鎖的value
*/
public boolean unLock(String key, String value) {
DefaultRedisScript<Long> redisScript = new DefaultRedisScript<>(RELEASE_LOCK_LUA_SCRIPT, Long.class);
long result = stringRedisTemplate.execute(redisScript, Collections.singletonList(key), value);
return Objects.equals(result, RELEASE_LOCK_SUCCESS_RESULT);
}
這段實作使用一個Lua腳本來實作解鎖操作,保證操作的原子性,傳入的value值需與該執行緒加鎖時的value一致,可以使用requestId(具體實作下面給出),
錯誤示例:
public boolean unLock(String key, String value) {
String oldValue = https://www.cnblogs.com/spec-dog/p/stringRedisTemplate.opsForValue().get(key);
if(value.equals(oldValue)) {
stringRedisTemplate.delete(key);
}
}
該實作先獲取鎖的當前值,判斷兩值相等則洗掉,考慮一種極端情況,如果在判斷為true時,剛好該鎖過期時間到,另一個客戶端加鎖成功,則接下來的delete將不管三七二十一將別人加的鎖直接刪掉了,不滿足第三點要求,該示例主要是因為沒有保證解鎖操作的原子性導致,
3. 注解支持
為了方便使用,添加一個注解,可以放于方法上控制方法在分布式環境中的同步執行,
/**
* 標注在方法上的分布式鎖注解
*/
@Retention(RetentionPolicy.RUNTIME)
@Target(ElementType.METHOD)
public @interface DistributedLockable {
String key();
String prefix() default "disLock:";
long expire() default 10L; // 默認10s過期
}
添加一個切面來決議注解的處理,
/**
* 分布式鎖注解處理切面
*/
@Aspect
@Slf4j
public class DistributedLockAspect {
private DistributedLock lock;
public DistributedLockAspect(DistributedLock lock) {
this.lock = lock;
}
/**
* 在方法上執行同步鎖
*/
@Around(value = "https://www.cnblogs.com/spec-dog/p/@annotation(lockable)")
public Object distLock(ProceedingJoinPoint point, DistributedLockable lockable) throws Throwable {
boolean locked = false;
String key = lockable.prefix() + lockable.key();
try {
locked = lock.lock(key, WebUtil.getRequestId(), lockable.expire());
if(locked) {
return point.proceed();
} else {
log.info("Did not get a lock for key {}", key);
return null;
}
} catch (Exception e) {
throw e;
} finally {
if(locked) {
if(!lock.unLock(key, WebUtil.getRequestId())){
log.warn("Unlock {} failed, maybe locked by another client already. ", lockable.key());
}
}
}
}
}
RequestId 的實作如下,通過注冊一個Filter,在請求開始時生成一個uuid存于ThreadLocal中,在請求回傳時清除,
public class WebUtil {
public static final String REQ_ID_HEADER = "Req-Id";
private static final ThreadLocal<String> reqIdThreadLocal = new ThreadLocal<>();
public static void setRequestId(String requestId) {
reqIdThreadLocal.set(requestId);
}
public static String getRequestId(){
String requestId = reqIdThreadLocal.get();
if(requestId == null) {
requestId = ObjectId.next();
reqIdThreadLocal.set(requestId);
}
return requestId;
}
public static void removeRequestId() {
reqIdThreadLocal.remove();
}
}
public class RequestIdFilter implements Filter {
@Override
public void doFilter(ServletRequest servletRequest, ServletResponse servletResponse, FilterChain filterChain) throws IOException, ServletException {
HttpServletRequest httpServletRequest = (HttpServletRequest) servletRequest;
String reqId = httpServletRequest.getHeader(WebUtil.REQ_ID_HEADER);
//沒有則生成一個
if (StringUtils.isEmpty(reqId)) {
reqId = ObjectId.next();
}
WebUtil.setRequestId(reqId);
try {
filterChain.doFilter(servletRequest, servletResponse);
} finally {
WebUtil.removeRequestId();
}
}
}
//在配置類中注冊Filter
/**
* 添加RequestId
* @return
*/
@Bean
public FilterRegistrationBean requestIdFilter() {
RequestIdFilter reqestIdFilter = new RequestIdFilter();
FilterRegistrationBean registrationBean = new FilterRegistrationBean();
registrationBean.setFilter(reqestIdFilter);
List<String> urlPatterns = Collections.singletonList("/*");
registrationBean.setUrlPatterns(urlPatterns);
registrationBean.setOrder(Ordered.HIGHEST_PRECEDENCE + 1);
return registrationBean;
}
4. 使用注解
@DistributedLockable(key = "test", expire = 10)
public void test(){
System.out.println("執行緒-"+Thread.currentThread().getName()+"開始執行..." + LocalDateTime.now());
try {
Thread.sleep(2000);
} catch (InterruptedException e) {
e.printStackTrace();
}
System.out.println("執行緒-"+Thread.currentThread().getName()+"結束執行..." + LocalDateTime.now());
}
總結
本文給出了基于Redis的分布式鎖的實作方案與常見的錯誤示例,要保障分布式鎖的正確運行,需滿足本文所提的四個要求,尤其注意保證加鎖解鎖操作的原子性,設定過期時間,及對同一個鎖的加鎖解鎖執行緒一致,原文地址: http://blog.jboost.cn/distributedlock.html
[轉載請注明出處]
作者:雨歌
歡迎關注作者公眾號:半路雨歌,查看更多技術干貨文章

轉載請註明出處,本文鏈接:https://www.uj5u.com/houduan/141640.html
標籤:Java
上一篇:Java8——Stream流
