Zookeeper 分布式鎖
什么是分布式鎖?
在進行分布式鎖操作之前,我們得知道什么是分布式鎖,在單體應用中,使用 Java API 自帶的 Lock 或者是 synchronize 就可以解決多執行緒帶來的并發問題,但是在集群環境中,上述的方法并不能解決服務與服務之間的并發問題,
分布式鎖一般用在分布式系統或者多個應用中,用來控制同一任務是否執行或者任務的執行順序,在專案中,部署了多個tomcat應用,在執行定時任務時就會遇到同一任務可能執行多次的情況,我們可以借助分布式鎖,保證在同一時間只有一個tomcat應用執行了定時任務
具體可以看這位大佬的解釋,說的通俗易懂,
實作分布式鎖的方式
- 資料庫實作(效率低,不推薦)
- redis 實作(使用 redission 實作,當需要考慮死鎖和釋放問題,比較繁瑣)
- Zookeeper 實作(使用臨時節點,效率高)
- Spring Cloud 實作全域鎖(內置的)
Zookeeper 實作分布式鎖
實作原理
使用 Zookeeper 創建臨時順序節點,判斷自己是不是當前節點下的最小節點,是的話就是獲取到了鎖,直接執行業務代碼,不是的話,便對前一個節點進行監聽,獲取到鎖,執行完業務代碼后,delete 節點釋放當前鎖,然后下面的節點接收到通知,
案例實戰
下面的代碼基于 Zookeeper(1)-安裝與基礎使用
原生 Zookeeper 案例
撰寫分布式鎖的代碼
public class DistributedLock {
private final String connectString = "192.168.3.33:2181";
private final int sessionTimeout = 2000;
private final ZooKeeper zooKeeper;
private final String rootNode = "locks";
private final String subNode = "seq-";
private CountDownLatch connectLatch = new CountDownLatch(1);
private CountDownLatch waitLatch = new CountDownLatch(1);
private String waitPath;
private String currentNode;
public DistributedLock() throws IOException, KeeperException, InterruptedException {
// 獲取連接
zooKeeper = new ZooKeeper(connectString, sessionTimeout, new Watcher() {
@Override
public void process(WatchedEvent event) {
// 連接建立時, 打開 latch, 喚醒 wait 在該 latch 上的執行緒
if (event.getState() == Event.KeeperState.SyncConnected) {
connectLatch.countDown();
}
// 發生了 waitPath 的洗掉事件
if (event.getType() == Event.EventType.NodeDeleted && event.getPath().equals(waitPath)) {
waitLatch.countDown();
}
}
});
connectLatch.await();
// 判斷節點/locks 是否存在
Stat stat = zooKeeper.exists("/" + rootNode, false);
// 如果根節點不存在則創建永久根節點
if (stat == null) {
System.out.println("根節點不存在!");
zooKeeper.create("/" + rootNode, new byte[0], ZooDefs.Ids.OPEN_ACL_UNSAFE, CreateMode.PERSISTENT);
}
}
// 獲取鎖
public void zkLock() throws KeeperException, InterruptedException {
// 在根節點下創建臨時順序節點,回傳值為創建的節點路徑
currentNode = zooKeeper.create("/" + rootNode + "/" + subNode, null, ZooDefs.Ids.OPEN_ACL_UNSAFE, CreateMode.EPHEMERAL_SEQUENTIAL);
// 獲取所有的節點
List <String> children = zooKeeper.getChildren("/" + rootNode, false);
// 串列中只有一個節點,就直接獲取到鎖
if (children.size() == 0) {
return;
} else {
// 對節點進行排序
Collections.sort(children);
//當前節點名稱
String thisNode = currentNode.substring(("/" + rootNode + "/").length());
// 獲取當前節點在陣列中的位置
int indexOf = children.indexOf(thisNode);
if (indexOf == -1) {
System.out.println("資料例外");
} else if (indexOf == 0) {
// index == 0 說明 thisNode 在串列中最小,當前 client 獲取鎖
return;
} else {
// 獲得排名比 currentNode 前 1 位的節點
this.waitPath = "/" + rootNode + "/" + children.get(indexOf - 1);
// 在 waitPath 上注冊監聽器, 當 waitPath 被洗掉時,zookeeper 會回呼監聽器的 process 方法
zooKeeper.getData(waitPath, true, new Stat());
waitLatch.await();
return;
}
}
}
// 釋放鎖
public void unZkLock() {
try {
zooKeeper.delete(this.currentNode, -1);
} catch (InterruptedException | KeeperException e) {
e.printStackTrace();
}
}
}
測驗代碼
public class DistributedLockTest {
public static void main(String[] args) throws InterruptedException, IOException, KeeperException {
// 創建分布式鎖 1
final DistributedLock lock1 = new DistributedLock();
// 創建分布式鎖 2
final DistributedLock lock2 = new DistributedLock();
new Thread(new Runnable() {
@Override
public void run() {
try {
lock1.zkLock();
System.out.println("執行緒 1 獲取鎖");
Thread.sleep(5 * 1000);
lock1.unZkLock();
System.out.println("執行緒 1 釋放鎖");
} catch (Exception e) {
e.printStackTrace();
}
}
}).start();
new Thread(new Runnable() {
@Override
public void run() {
try {
lock2.zkLock();
System.out.println("執行緒 2 獲取鎖");
Thread.sleep(5 * 1000);
lock2.unZkLock();
System.out.println("執行緒 2 釋放鎖");
} catch (Exception e) {
e.printStackTrace();
}
}
}).start();
}
}
輸出資訊
執行緒 1 獲取到鎖了!
執行緒 1 再次獲取到鎖了!
休息一下!
執行緒 1 釋放鎖了!
執行緒 1 釋放鎖了!
執行緒 2 獲取到鎖了!
執行緒 2 再次獲取到鎖了!
休息一下!
執行緒 2 釋放鎖了!
執行緒 2 釋放鎖了!
可能在測驗中會報:Will not attempt to authenticate using SASL (unknown error) 這個錯誤資訊,
解決方案
在上面獲取 Zookeeper 連接的代碼中自定義 ZKClientConfig 配置資訊,將 ENABLE_CLIENT_SASL_KEY 改成 false,
ZKClientConfig config = new ZKClientConfig();
config.setProperty(ZKClientConfig.ENABLE_CLIENT_SASL_KEY, "false");
// 獲取連接
zooKeeper = new ZooKeeper(connectString, sessionTimeout, new Watcher() {
@Override
public void process(WatchedEvent event) {
// 連接建立時, 打開 latch, 喚醒 wait 在該 latch 上的執行緒
if (event.getState() == Event.KeeperState.SyncConnected) {
connectLatch.countDown();
}
// 發生了 waitPath 的洗掉事件
if (event.getType() == Event.EventType.NodeDeleted && event.getPath().equals(waitPath)) {
waitLatch.countDown();
}
}
}, config);
Curator 案例
匯入 POM 檔案
<dependency>
<groupId>org.apache.curator</groupId>
<artifactId>curator-framework</artifactId>
<version>4.3.0</version>
</dependency>
<dependency>
<groupId>org.apache.curator</groupId>
<artifactId>curator-recipes</artifactId>
<version>4.3.0</version>
</dependency>
<dependency>
<groupId>org.apache.curator</groupId>
<artifactId>curator-client</artifactId>
<version>4.3.0</version>
</dependency>
實戰代碼
public class CuratorLockTest {
// 測驗代碼
public static void main(String[] args) throws Exception {
// 創建分布式鎖1
InterProcessMutex locks1 = new InterProcessMutex(getCuratorFramework(), "/locks");
// 創建分布式鎖2
InterProcessMutex locks2 = new InterProcessMutex(getCuratorFramework(), "/locks");
new Thread(new Runnable() {
@Override
public void run() {
try {
// 獲取到鎖
locks1.acquire();
System.out.println("執行緒 1 獲取到鎖了!");
locks1.acquire();
System.out.println("執行緒 1 再次獲取到鎖了!");
System.out.println("休息一下!");
Thread.sleep(5 * 1000);
locks1.release();
System.out.println("執行緒 1 釋放鎖了!");
locks1.release();
System.out.println("執行緒 1 釋放鎖了!");
} catch (Exception e) {
e.printStackTrace();
}
}
}).start();
new Thread(new Runnable() {
@Override
public void run() {
try {
// 獲取到鎖
locks2.acquire();
System.out.println("執行緒 2 獲取到鎖了!");
locks2.acquire();
System.out.println("執行緒 2 再次獲取到鎖了!");
System.out.println("休息一下!");
Thread.sleep(5 * 1000);
locks2.release();
System.out.println("執行緒 2 釋放鎖了!");
locks2.release();
System.out.println("執行緒 2 釋放鎖了!");
} catch (Exception e) {
e.printStackTrace();
}
}
}).start();
}
// 創建連接
private static CuratorFramework getCuratorFramework() throws Exception {
ExponentialBackoffRetry backoffRetry = new ExponentialBackoffRetry(3000, 3);
DefaultZookeeperFactory zookeeperFactory = new DefaultZookeeperFactory();
CuratorFramework client = CuratorFrameworkFactory.builder().connectString("192.168.3.33:2181")
.sessionTimeoutMs(2000).retryPolicy(backoffRetry)..build();
client.start();
System.out.println("客戶端啟動成功!");
return client;
}
}
輸出資訊
執行緒 2 獲取到鎖了!
執行緒 2 再次獲取到鎖了!
休息一下!
執行緒 2 釋放鎖了!
執行緒 2 釋放鎖了!
執行緒 1 獲取到鎖了!
執行緒 1 再次獲取到鎖了!
休息一下!
執行緒 1 釋放鎖了!
執行緒 1 釋放鎖了!
可能在測驗中會報:Will not attempt to authenticate using SASL (unknown error) 這個錯誤資訊,
解決方案
使用 Curator 出現這個問題的方案還是和上面原生的是一樣,因為其本質還是通過 Zookeeper 的客戶端代碼去進行一個連接,
創建自定義 ZookeeperFactory
public class DefaultZookeeperFactory implements ZookeeperFactory {
@Override
public ZooKeeper newZooKeeper(String connectString, int sessionTimeout, Watcher watcher, boolean b) throws Exception {
// 自定義 ZKClientConfig 配置
ZKClientConfig config = new ZKClientConfig();
config.setProperty(ZKClientConfig.ENABLE_CLIENT_SASL_KEY, "false");
return new ZooKeeper(connectString, sessionTimeout, watcher, b, config);
}
}
使用 CuratorFrameworkFactory 創建連接的時候匯入自定義 ZookeeperFactory
CuratorFramework client = CuratorFrameworkFactory.builder().connectString("192.168.3.33:2181")
.sessionTimeoutMs(2000).retryPolicy(backoffRetry).
zookeeperFactory(zookeeperFactory).build();
轉載請註明出處,本文鏈接:https://www.uj5u.com/houduan/442764.html
標籤:Java
