目錄
- 一、背景
- 二、maven依賴
- 三、配置
- 3.1、application.yml配置
- 3.2、屬性配置類
- 3.3、ZookeeperConfig配置件
- 四、實戰
- 4.1、介面
- 4.2、介面核心實作
- 4.3、測驗類
- 4.4、結果
- 4.5、關于CountDownLatch
- 結語
一、背景
??我在之前的文章SpringBoot基于Zookeeper和Curator實作分布式鎖并分析其原理詳細介紹了它的使用及其原理,現在我們也根據這個思路,用zookeeper原生的方式來實作一個分布式鎖,加深對分布式鎖的理解,本文中Spring Boot的版本是2.5.2,zookeeper的版本是3.6.3,
??我們大致的大致的流程圖如下圖,可作為我們查看代碼的一個思路,不然看的頭大,(當然本圖是沒有包含可重入鎖的流程判斷在里面的)

二、maven依賴
pom.xml
<?xml version="1.0" encoding="UTF-8"?>
<project xmlns="http://maven.apache.org/POM/4.0.0" xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 https://maven.apache.org/xsd/maven-4.0.0.xsd">
<modelVersion>4.0.0</modelVersion>
<parent>
<groupId>org.springframework.boot</groupId>
<artifactId>spring-boot-starter-parent</artifactId>
<version>2.5.6</version>
<relativePath/> <!-- lookup parent from repository -->
</parent>
<groupId>com.alian</groupId>
<artifactId>zklock</artifactId>
<version>0.0.1-SNAPSHOT</version>
<name>zklock</name>
<description>Demo project for Spring Boot</description>
<properties>
<java.version>1.8</java.version>
</properties>
<dependencies>
<dependency>
<groupId>org.springframework.boot</groupId>
<artifactId>spring-boot-starter-web</artifactId>
<version>2.5.2</version>
</dependency>
<dependency>
<groupId>org.apache.zookeeper</groupId>
<artifactId>zookeeper</artifactId>
<version>3.6.3</version>
<exclusions>
<exclusion>
<groupId>org.slf4j</groupId>
<artifactId>slf4j-log4j12</artifactId>
</exclusion>
<exclusion>
<groupId>log4j</groupId>
<artifactId>log4j</artifactId>
</exclusion>
<exclusion>
<groupId>org.slf4j</groupId>
<artifactId>slf4j-api</artifactId>
</exclusion>
</exclusions>
</dependency>
<dependency>
<groupId>org.apache.commons</groupId>
<artifactId>commons-lang3</artifactId>
<version>3.12.0</version>
</dependency>
<!--主要用于Maps.newConcurrentMap()-->
<dependency>
<groupId>com.google.guava</groupId>
<artifactId>guava</artifactId>
<version>30.1-jre</version>
</dependency>
<dependency>
<groupId>org.projectlombok</groupId>
<artifactId>lombok</artifactId>
<version>1.16.14</version>
</dependency>
</dependencies>
<build>
<plugins>
<plugin>
<groupId>org.springframework.boot</groupId>
<artifactId>spring-boot-maven-plugin</artifactId>
</plugin>
</plugins>
</build>
</project>
三、配置
3.1、application.yml配置
application.yml
server:
port: 8082
servlet:
context-path: /zklock
app:
zookeeper:
server: 10.130.3.16:2181
session-timeout: 15000
#這里配置的路徑沒有用"/"結尾
root-lock-path: /root/alian
3.2、屬性配置類
??此配置類不懂的可以參考我另一篇文章:Spring Boot讀取組態檔常用方式
AppProperties.java
package com.alian.zklock.config;
import lombok.Data;
import org.springframework.boot.context.properties.ConfigurationProperties;
import org.springframework.stereotype.Component;
@Data
@Component
@ConfigurationProperties(prefix = "app.zookeeper")
public class AppProperties {
/**
* zookeeper服務地址
*/
private String server;
/**
* session超時時間
*/
private int sessionTimeout;
/**
* 分布式鎖路徑
*/
private String rootLockPath;
}
3.3、ZookeeperConfig配置件
ZookeeperConfig.java
package com.alian.zklock.config;
import lombok.extern.slf4j.Slf4j;
import org.apache.zookeeper.Watcher.Event.EventType;
import org.apache.zookeeper.Watcher.Event.KeeperState;
import org.apache.zookeeper.ZooKeeper;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Configuration;
import java.util.concurrent.CountDownLatch;
@Slf4j
@Configuration
public class ZookeeperConfig {
private static CountDownLatch countDownLatch = new CountDownLatch(1);
@Autowired
private AppProperties appProperties;
@Bean
public ZooKeeper zookeeper() throws Exception {
ZooKeeper zookeeper = new ZooKeeper(appProperties.getServer(), appProperties.getSessionTimeout(), event -> {
log.info("Receive watched event: {}", event.getState());
//獲取事件的狀態
KeeperState keeperState = event.getState();
//獲取時間型別
EventType eventType = event.getType();
//如果是建立連接
if (KeeperState.SyncConnected == keeperState) {
if (EventType.None == eventType) {
//如果建立連接成功,則發送信號量,讓后續阻塞程式向下執行
countDownLatch.countDown();
log.info("zookeeper建立連接");
}
}
});
//進行阻塞,當執行countDownLatch.countDown();后續代碼才會進行
countDownLatch.await();
return zookeeper;
}
}
??這里主要是對ZooKeeper 進行連接配置,關于CountDownLatch的使用,本文最后有相關的介紹,
四、實戰
??定義了兩個方法:加鎖和釋放鎖,
4.1、介面
ILockService.java
package com.alian.zklock.service;
import java.util.concurrent.TimeUnit;
public interface ILockService {
/**
* 加鎖
*
* @param lockPath
* @param time
* @param unit
* @return
*/
boolean lock(String lockPath, long time, TimeUnit unit);
/**
* 釋放鎖
*
* @return
*/
void release();
}
4.2、介面核心實作
??這個實作類的注釋,我想已經很詳細了,可以細細閱讀,可以加深你對zookeeper分布式鎖實作原理的理解,
ZookeeperLockService.java
package com.alian.zklock.service.impl;
import com.alian.zklock.service.ILockService;
import com.google.common.collect.Maps;
import lombok.extern.slf4j.Slf4j;
import org.apache.commons.lang3.StringUtils;
import org.apache.commons.lang3.tuple.Pair;
import org.apache.zookeeper.*;
import org.apache.zookeeper.data.Stat;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.stereotype.Service;
import java.util.Collections;
import java.util.List;
import java.util.concurrent.ConcurrentMap;
import java.util.concurrent.CountDownLatch;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicInteger;
@Slf4j
@Service
public class ZookeeperLockService implements ILockService {
//依賴需要匯入:<groupId>com.google.guava</groupId><artifactId>guava</artifactId><version>30.1-jre</version>
private final ConcurrentMap<Thread, LockData> threadData = Maps.newConcurrentMap();
@Autowired
private ZooKeeper zooKeeper;
//好的思想直接拿來用
private static class LockData {
final Thread owningThread;
final String lockPath;
final AtomicInteger lockCount = new AtomicInteger(1);
//構造方法
private LockData(Thread owningThread, String lockPath) {
this.owningThread = owningThread;
this.lockPath = lockPath;
}
}
/**
* 加鎖
*
* @param lockPath
* @return
* @throws Exception
*/
public boolean lock(String lockPath, long time, TimeUnit unit) {
//可重入,確保同一執行緒,可以重復加鎖
Thread currentThread = Thread.currentThread();
//根據執行緒號獲取執行緒鎖資料
LockData lockData = threadData.get(currentThread);
if (lockData != null) {
// 說明該執行緒已加鎖過,直接放行
lockData.lockCount.incrementAndGet();
return true;
}
String currentLockPath = attemptLock(lockPath, time, unit);
//如果不為空則表示獲取到了鎖
if (StringUtils.isNotBlank(currentLockPath)) {
//把資料快取起來
LockData newLockData = new LockData(currentThread, currentLockPath);
threadData.put(currentThread, newLockData);
return true;
}
return false;
}
/**
* 嘗試獲取鎖,獲取成功回傳鎖路徑
*
* @param lockPath
* @param time
* @param unit
* @return
*/
public String attemptLock(String lockPath, long time, TimeUnit unit) {
//創建臨時有序節點,傳入的lockPath沒有"/"
try {
String currentLockPath = zooKeeper.create(lockPath + "/", "0".getBytes(), ZooDefs.Ids.OPEN_ACL_UNSAFE, CreateMode.EPHEMERAL_SEQUENTIAL);
log.info("執行緒:【{}】->【{}】嘗試競爭鎖", Thread.currentThread().getName(), currentLockPath);
//創建臨時節點失敗
if (StringUtils.isBlank(currentLockPath)) {
throw new Exception("生成臨時節點例外");
}
//檢查當前節點是否獲取到了鎖
boolean hasLock = checkLocked(lockPath, currentLockPath, time, unit);
//獲取到了鎖則回傳鎖節點路徑
return hasLock ? currentLockPath : null;
} catch (Exception e) {
e.printStackTrace();
}
return null;
}
/**
* 檢查是否獲取到鎖
*
* @param lockPath
* @param currentLockPath
* @param time
* @param unit
* @return
* @throws Exception
*/
public boolean checkLocked(String lockPath, String currentLockPath, long time, TimeUnit unit) {
boolean hasLock = false;
boolean toDelete = false;
try {
while (!hasLock) {
//檢查是否獲取到了鎖,沒有獲取到則回傳前一個節點
Pair<Boolean, String> pair = getsTheLock(lockPath, currentLockPath);
//當前節點是否獲取到了鎖
boolean currentLock = pair.getLeft();
//獲取前一個節點
String preSequencePath = pair.getRight();
if (currentLock) {
//獲取到了鎖
hasLock = true;
} else {
//等待
final CountDownLatch latch = new CountDownLatch(1);
//訂閱比自己次小順序節點的洗掉事件
Watcher watcher = watchedEvent -> {
log.info("監聽到的變化【】 watchedEvent = {}", watchedEvent);
latch.countDown();
};
Stat stat = zooKeeper.exists(preSequencePath, watcher);
if (stat != null) {
log.info("執行緒:【{}】等待鎖【{}】釋放", Thread.currentThread().getName(), preSequencePath);
boolean await = latch.await(time, unit);
if (!await) {
//說明超時了
log.info("獲取鎖超時");
toDelete = true;
break;
}
}
//檢查鎖
Pair<Boolean, String> checkPair = getsTheLock(lockPath, currentLockPath);
if (checkPair.getLeft()) {
hasLock = true;
}
}
}
} catch (Exception e) {
log.error("檢查是否獲取到鎖例外", e);
if (e instanceof InterruptedException) {
toDelete = true;
}
} finally {
if (toDelete) {
deleteCurrentPath(currentLockPath);
}
}
return hasLock;
}
/**
* 檢測是否已經獲取到了鎖,沒有獲取到則回傳前一個節點
*
* @param lockPath
* @param currentLock
* @return
* @throws Exception
*/
private Pair<Boolean, String> getsTheLock(String lockPath, String currentLock) throws Exception {
//獲取根節點下所有子節點,不能用/結尾
List<String> childrenList = zooKeeper.getChildren(lockPath, false);
//節點按照編號,升序排列
Collections.sort(childrenList);
//如果是第一個,代表自己已經獲得了鎖
String currentLockNode = currentLock.substring(currentLock.lastIndexOf("/") + 1);
if (currentLockNode.equals(childrenList.get(0))) {
log.info("節點【{}】成功的獲取分布式鎖", currentLock);
return Pair.of(true, "");
}
//判斷自己排第幾個,回傳的是物件所在串列的序號
int index = Collections.binarySearch(childrenList, currentLockNode);
if (index < 0) { // 網路抖動,獲取到的子節點串列里可能已經沒有自己了
throw new Exception("節點沒有找到: " + currentLockNode);
}
//如果沒有獲得鎖,則要監聽前一個節點
String preSequencePath = lockPath + "/" + childrenList.get(index - 1);
//回傳監聽的前一個節點
return Pair.of(false, preSequencePath);
}
/**
* 洗掉當前獲取鎖的節點
*
* @param currentLockPath
*/
private void deleteCurrentPath(String currentLockPath) {
try {
//判斷路徑是否存在
Stat stat = zooKeeper.exists(currentLockPath, false);
if (stat != null) {
//存在則洗掉
zooKeeper.delete(currentLockPath, -1);
}
} catch (InterruptedException | KeeperException e) {
log.error("洗掉節點例外");
}
}
@Override
public void release() {
//獲取當前執行緒
Thread currentThread = Thread.currentThread();
//獲取當前執行緒的資料
LockData lockData = threadData.get(currentThread);
if (lockData == null) {
throw new IllegalMonitorStateException("You do not own the lock: ");
}
//鎖計數器減1
int newLockCount = lockData.lockCount.decrementAndGet();
if (newLockCount > 0) {
//可重入鎖,暫時不擅長節點
return;
}
if (newLockCount < 0) {
throw new IllegalMonitorStateException("Lock count has gone negative for lock: ");
}
try {
//洗掉節點
zooKeeper.delete(lockData.lockPath, -1);
log.info("執行緒:【{}】釋放鎖【{}】", Thread.currentThread().getName(), lockData.lockPath);
} catch (InterruptedException | KeeperException e) {
e.printStackTrace();
} finally {
threadData.remove(currentThread);
}
}
}
4.3、測驗類
??我們為了方便檢驗我們的分布式鎖,初始化庫存為100,就使用3個執行緒進行并發,每個執行緒減55個庫存,我這里也不使用測驗工具jmeter了,就相當于單機測驗了,(如果是要進行分布式部署測驗,那么庫存值不能像我這樣直接在程式寫死 ,可以放redis或者資料庫,然后通過負載均衡、壓力測驗工具jmeter去完成,具體使用可以參考:windows下Nginx配置及負載均衡使用),我們主要目的是:為了驗證我們寫的分布式鎖,加深對分布式鎖的理解,
TestLockService.java
package com.alian.zklock.service;
import lombok.extern.slf4j.Slf4j;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.stereotype.Service;
import javax.annotation.PostConstruct;
import java.util.concurrent.CountDownLatch;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicInteger;
@Slf4j
@Service
public class TestLockService {
@Autowired
private ILockService lockService;
AtomicInteger stock = new AtomicInteger(100);
@PostConstruct
public void testLock() {
final CountDownLatch countDownLatch = new CountDownLatch(3);
for (int i = 0; i < 3; i++) {
new Thread(() -> {
try {
//使當前執行緒在鎖存器倒計數至零之前一直等待,除非執行緒被中斷或超出了指定的等待時間,如果當前計數為零,則此方法立刻回傳true值
countDownLatch.await();
//獲得鎖
boolean lock = lockService.lock("/root/alian", 10, TimeUnit.SECONDS);
if (lock) {
//業務處理
Thread.sleep(100);
//庫存減1
decrement();
//釋放鎖
lockService.release();
log.info("執行緒【{}】扣減完,剩余庫存:{}", Thread.currentThread().getName(), stock.get());
}
} catch (InterruptedException e) {
e.printStackTrace();
} finally {
}
}, "Thread" + i).start();
//遞減鎖存器的計數,如果計數到達零,則釋放所有等待的執行緒,如果當前計數大于零,則將計數減少.
countDownLatch.countDown();
}
}
private void decrement() {
for (int i = 0; i < 5; i++) {
stock.decrementAndGet();
}
}
}
4.4、結果
運行結果圖:

從我們的結果圖可以看出來(為了方便,節點前面的變化文章里就省略了,實際是存在的):
- 同時三個執行緒(Thread0、Thread1、Thread2)創建了節點(180、179,181)去搶占資源
- Thread1創建的179號節點是最小的,獲取到了鎖,這時候,Thread0監聽179節點,Thread2監聽180節點
- Thread1扣減庫存5次,然后釋放鎖,也就是洗掉了節點179,觸發監聽
- 因為Thread0監聽179節點,所以Thread0繼續執行搶占到了鎖,同樣扣減庫存后,洗掉180節點
- 然后Thread2監聽的是180節點,同樣的Thread2搶占到了鎖,扣減庫存,洗掉181節點
- 最后得到庫存85
超時的驗證則可以在業務執行的時候設定一個休眠時間,可重入鎖也是支持的,直接使用curator里面的,優秀的東西就直接拿來用了
4.5、關于CountDownLatch
也許有很多小伙伴,不知道CountDownLatch是怎么用的,我這里就簡單介紹下,主要有兩個方法:
- public void countDown()
遞減鎖存器的計數,如果計數到達零,則釋放所有等待的執行緒,如果當前計數大于零,則將計數減少,
- public boolean await(long timeout,TimeUnit unit) throws InterruptedException
??使當前執行緒在鎖存器倒計數至0之前一直等待,除非執行緒被中斷或超出了指定的等待時間,如果計數到達零,則回傳true;如果在計數到達零之前超過了等待時間,則回傳false,以下三種情況之一前,該執行緒將一直出于休眠狀態:
- 如果計數到達零,則該方法回傳true值
- 如果超出了指定的等待時間,則回傳值為false,如果該時間小于等于零,則該方法根本不會等待
- 如果當前執行緒,在進入此方法時已經設定了該執行緒的中斷狀態;或者在等待時被中斷,則拋出InterruptedException,并且清除當前執行緒的已中斷狀態
??類似本文中的測驗方法,就相當于并發,當三個執行緒都創建完,都走到countDownLatch.await()這里就不執行了,直到執行countDownLatch.countDown()后面才會走,
public void race() {
final CountDownLatch countDownLatch = new CountDownLatch(1);
for (int i = 0; i < 3; i++) {
new Thread(() -> {
try {
countDownLatch.await();
Thread.sleep(100);
log.info(Thread.currentThread().getName()+"開始跑步");
} catch (InterruptedException e) {
e.printStackTrace();
}
}, "Thread" + i).start();
}
countDownLatch.countDown();
log.info("主執行緒執行完");
}
結果:
2021-10-26 20:43:06 458 [main] INFO:主執行緒執行完
2021-10-26 20:43:06 561 [Thread2] INFO:Thread2開始跑步
2021-10-26 20:43:06 561 [Thread0] INFO:Thread0開始跑步
2021-10-26 20:43:06 561 [Thread1] INFO:Thread1開始跑步
??我們也可以反過來,使主執行緒阻塞,這個時候就是執行緒執行到countDownLatch.await()后,主執行緒后面的不執行,直到前面的子執行緒都執行完,主執行緒才往后執行,
public void multitasking() throws InterruptedException {
final CountDownLatch countDownLatch = new CountDownLatch(3);
for (int i = 0; i < 3; i++) {
new Thread(() -> {
log.info(Thread.currentThread().getName()+"執行完");
countDownLatch.countDown();
}, "Thread" + i).start();
}
countDownLatch.await();
log.info("主執行緒執行完");
}
結果:
2021-10-26 20:45:21 053 [Thread0] INFO:Thread0執行完
2021-10-26 20:45:21 053 [Thread1] INFO:Thread1執行完
2021-10-26 20:45:21 053 [Thread2] INFO:Thread2執行完
2021-10-26 20:45:21 053 [main] INFO:主執行緒執行完
結語
??也許本文的寫的分布式還有些許的瑕疵,但我們主要目的是:為了加深對zookeeper分布式鎖實作原理的理解,實際使用中我們還是使用curator是比較方便和穩定,具體可以參考我另外一篇文章:SpringBoot基于Zookeeper和Curator實作分布式鎖并分析其原理,
轉載請註明出處,本文鏈接:https://www.uj5u.com/qita/339073.html
標籤:其他
