當多個客戶端對zk集群中的資源進行訪問時,為了保持資源訪問的有序性和穩健性,在每個客戶端進行訪問時需要在訪問期間保持其對該份資源的獨占性,用分布式鎖來實作此步驟,當某行程訪問資源結束后將會釋放掉鎖以供下一個節點對資料的訪問,
分布式鎖的實作思路:
- 接收到客戶端的請求后在
/locks節點下創建一個臨時帶序號的節點, - 判斷當前創建的節點是否為序號最小的節點,是則獲取到鎖,否則監聽其上一個節點,因為默認是當前
/locks節點中序號最小的節點優先獲取到鎖, - 獲取到鎖并處理完業務后該節點釋放掉鎖(該序號最小的節點被洗掉)然后后面的鎖升為序號最小的節點,遞回前面的步驟,
目前有成熟的分布式鎖框架,但是為了鞏固基礎手寫一個,涉及到多執行緒,撰寫步驟較為復雜,
package com.tommy.case2;
import org.apache.zookeeper.*;
import java.io.IOException;
import java.util.Collections;
import java.util.List;
import java.util.concurrent.CountDownLatch;
public class DistributeLock {
private final String connectString = "192.168.20.151:2181,192.168.20.152:2181,192.168.20.153:2181";
private final int sessionTimeout = 2000000;
private final ZooKeeper zk;
private int count = 0;
private String waitPath;
private CountDownLatch connectLatch = new CountDownLatch(1);
private CountDownLatch waitLatch = new CountDownLatch(1);
private String currentMode;
public DistributeLock() throws IOException, InterruptedException, KeeperException {
// 獲取連接
zk = new ZooKeeper(connectString, sessionTimeout, new Watcher() {
@Override
public void process(WatchedEvent watchedEvent) {
// connectLatch 如果連接上zk,釋放掉
if (watchedEvent.getState() == Event.KeeperState.SyncConnected) {
connectLatch.countDown();
}
// waitLatch 需要釋放
if (watchedEvent.getType() == Event.EventType.NodeDeleted && watchedEvent.getPath().equals(waitPath)) {
waitLatch.countDown();
}
}
});
// 等待zk正常連接后,才往下執行,增強代碼的健壯性,
connectLatch.await();
// 判斷根節點是否存在
try {
byte[] data = zk.getData("/locks", false, null);
} catch (KeeperException.NoNodeException e) {
System.out.println("directory /locks is not exists, creating....");
zk.create("/locks", "locks".getBytes(), ZooDefs.Ids.OPEN_ACL_UNSAFE, CreateMode.PERSISTENT);
System.out.println("directory /locks creating success!!!!");
}
}
public void zkLock() throws InterruptedException, KeeperException {
// 創建節點(臨時帶序號的節點)
currentMode = zk.create("/locks/" + "seq-", null, ZooDefs.Ids.OPEN_ACL_UNSAFE, CreateMode.EPHEMERAL_SEQUENTIAL);
// 判斷是否為序號最小的節點,如果是則獲取到鎖,否則對上一個節點進行監聽
List<String> children = zk.getChildren("/locks", false);
count = children.size();
if (count == 1) {
return;
} else {
Collections.sort(children);
String thisNode = currentMode.substring("/locks/".length());
// 獲取當前節點到在集合children中的位置
int index = children.indexOf(thisNode);
if (index == -1) {
System.out.println("error Data");
} else if (index == 0) {
// only one node in children
return;
} else {
// listen last node's change
waitPath = "/locks/" + children.get(index - 1);
zk.getData(waitPath, true, null);
waitLatch.await();
return;
}
}
}
public void unZkLock() {
// 洗掉節點以釋放掉鎖
try {
zk.delete(currentMode, -1);
} catch (InterruptedException e) {
e.printStackTrace();
} catch (KeeperException e) {
e.printStackTrace();
}
}
}
package com.tommy.case2;
import org.apache.zookeeper.KeeperException;
import java.io.IOException;
public class DistributeLockTest {
public static void main(String[] args) throws IOException, InterruptedException, KeeperException {
final DistributeLock lock1 = new DistributeLock();
final DistributeLock lock2 = new DistributeLock();
new Thread(new Runnable() {
@Override
public void run() {
try {
lock1.zkLock();
System.out.println("執行緒1啟動,獲取到鎖");
Thread.sleep(5 * 1000);
lock1.unZkLock();
System.out.println("執行緒1釋放鎖");
} catch (InterruptedException e) {
e.printStackTrace();
} catch (KeeperException e) {
e.printStackTrace();
}
}
}).start();
new Thread(new Runnable() {
@Override
public void run() {
try {
lock2.zkLock();
System.out.println("執行緒2啟動,獲取到鎖");
Thread.sleep(5 * 1000);
lock2.unZkLock();
System.out.println("執行緒2釋放鎖");
} catch (InterruptedException e) {
e.printStackTrace();
} catch (KeeperException e) {
e.printStackTrace();
}
}
}).start();
}
}
轉載請註明出處,本文鏈接:https://www.uj5u.com/qita/350849.html
標籤:其他
