- 資料發布與訂閱:發布訂閱模型,就是發布者將資料發布到ZK節點上,供訂閱者動態獲取資料,實作資料的集中管理和動態更新,
- 配置中心:在應用中,將全域的配置資訊放到ZK上集中管理,在應用啟動的時候主動獲取一次配置,同時,在節點上注冊一個watcher,保證每次資料更新時會通知訂閱者更新配置資訊,
- 元資料維護:在分布式搜索服務中,索引的元資訊和服務器集群機器的節點狀態保存在ZK的指定節點上,供各個客戶端使用,
- 分布式日志收集系統:這個系統的核心作業是收集不同服務器的日志,收集器通常是按照應用來分配任務單元,因此在ZK上用應用名創建一個節點,將需要收集的服務器的IP注冊到這個節點的子結點上,
- 命名服務:客戶端應用能夠根據指定名字來獲取資源或服務的地址、提供者等資訊,
- 分布式通知/協調:ZK中特有的watcher注冊與異步通知機制,能夠很好的實作分布式環境下不同系統之間的通知與協調,實作對資料變更的實時處理,使用方法通常是:不同系統對ZK上的同一個節點進行注冊,監聽節點的變化,任意一個系統對節點進行了更新,其它系統都能接到通知,并進行相應處理,
- 任務匯報:類似于任務分發系統,子任務啟動后,在ZK上注冊一個臨時節點,并定時將自己的進度寫入這個節點,這樣任務管理者可以實時查看任務進度,
- 服務器串列維護:能都動態監聽服務器的上下線
- 服務端代碼
package zookeeper; import org.apache.zookeeper.CreateMode; import org.apache.zookeeper.ZooDefs; import org.apache.zookeeper.ZooKeeper; import java.io.IOException; public class Server { private static final String CONNECT_STRING = "172.17.23.79:2181,172.17.23.79:2182"; private static final int TIME_OUT = 15000; private ZooKeeper zooKeeper; public Server() throws IOException { this.zooKeeper = new ZooKeeper(CONNECT_STRING, TIME_OUT, watchedEvent -> { }); } /** * 創建臨時序列節點(服務器關閉時節點會自動洗掉) * @param serverName * @return * @throws Exception */ public String regsterServer(String serverName) throws Exception { String result = zooKeeper.create("/clusterServer/server", serverName.getBytes(), ZooDefs.Ids.OPEN_ACL_UNSAFE, CreateMode.EPHEMERAL_SEQUENTIAL); System.out.println(serverName+" server start...."); return result; } public static void main(String[] args) throws Exception { Server server = new Server(); server.regsterServer(args[0]); //保持程式運行,防止程式停止運行導致節點自動洗掉 while (true) { } } }View Code
-
-
- 客戶端代碼
-
package zookeeper; import org.apache.zookeeper.ZooKeeper; import java.io.IOException; import java.util.ArrayList; import java.util.List; /** * 獲取并監聽注冊的服務器串列 */ public class Client { private static final String CONNECT_STRING = "172.17.23.79:2181,172.17.23.79:2182"; private static final int TIME_OUT = 15000; private ZooKeeper zooKeeper; public Client() throws IOException { this.zooKeeper = new ZooKeeper(CONNECT_STRING, TIME_OUT, watchedEvent -> { System.out.println("watcher works."); try { getServers(); } catch (Exception e) { e.printStackTrace(); } }); } public void getServers() throws Exception { List<String> servers = new ArrayList<>(); List<String> children = zooKeeper.getChildren("/clusterServer", true, null); for (String child : children) { String server = getData("/clusterServer/"+child); servers.add(server); } System.out.println(servers); } public String getData(String path) throws Exception { byte[] data = https://www.cnblogs.com/BlueStarWei/p/zooKeeper.getData(path, true, null); return new String(data); } public static void main(String[] args) throws Exception { Client client = new Client(); client.getServers(); while (true) { } } }View Code
- 分布式鎖:
- 保持獨占:通常的做法是把ZK的節點看作一把鎖,通過create node的方式來實作,
-
- 方法一:所有的客戶端都去創建/lock節點,最終創建成功的持有這把鎖,(同一個節點只能創建一次,再次創建會回傳失敗資訊)
/** * 通過創建臨時節點,實作服務器之間的獨占鎖 */ @Test public void singleLock() { try { //引數:1,節點路徑; 2,要存盤的資料; 3,節點的權限; 4,節點的型別 String nodePath = zooKeeper.create("/lock", "This is Lock.".getBytes(), ZooDefs.Ids.OPEN_ACL_UNSAFE, CreateMode.EPHEMERAL); //創建成功,則相當于擁有獨占鎖,可以進行以下邏輯 //TODO 業務邏輯 System.out.println(nodePath); //業務邏輯結束后,洗掉節點,即釋放鎖資源 zooKeeper.delete("/lock", -1); } catch (Exception e) { //創建節點失敗,重新呼叫,直至創建成功 if (e instanceof KeeperException && "NODEEXISTS".equals(((KeeperException)e).code().name())) { System.out.println("Node exists."); singleLock(); }else { e.printStackTrace(); } } }View Code
-
-
-
- 方法二:判斷/lock節點是否存在,如果存在,說明其它服務器已經持有鎖資源;如果不存在,則鎖資源處于空閑狀態,創建節點占有鎖資源,
-
-
/** * 通過創建臨時節點,實作服務器之間的獨占鎖 */ @Test public void singleLock2() throws KeeperException, InterruptedException { Stat stat = zooKeeper.exists("/lock", false); //如果節點已經存在,等待其它服務器洗掉節點,即:等待其它服務器釋放鎖資源 while(stat != null) { } //引數:1,節點路徑; 2,要存盤的資料; 3,節點的權限; 4,節點的型別 String nodePath = zooKeeper.create("/lock", "This is Lock.".getBytes(), ZooDefs.Ids.OPEN_ACL_UNSAFE, CreateMode.EPHEMERAL); //創建成功,則相當于擁有獨占鎖,可以進行以下邏輯 //TODO 業務邏輯 System.out.println(nodePath); //業務邏輯結束后,洗掉節點,即釋放鎖資源 zooKeeper.delete("/lock", -1); }View Code
-
-
控制時序:/lock節點已存在,客戶端在它下面創建臨時有序節點/lock/{sessionId}-1 , /lock/{sessionId}-2 , /lock/{sessionId}-3 …..(通過節點屬性控制 CreateMode.EPHEMERAL_SEQUENTIAL控制),保證子節點創建的有序性,從而保證的客戶端的時序性,
- 方式一:獲取鎖資源時,程式處于block狀態,直到獲取鎖資源,
-
控制時序:/lock節點已存在,客戶端在它下面創建臨時有序節點/lock/{sessionId}-1 , /lock/{sessionId}-2 , /lock/{sessionId}-3 …..(通過節點屬性控制 CreateMode.EPHEMERAL_SEQUENTIAL控制),保證子節點創建的有序性,從而保證的客戶端的時序性,
/** * 通過創建臨時時序節點,實作服務器之間的時序鎖 */ @Test public void lock() throws KeeperException, InterruptedException { //創建臨時時序節點 String nodePath = zooKeeper.create("/lock/sublock", "This is sub lock.".getBytes(), ZooDefs.Ids.OPEN_ACL_UNSAFE, CreateMode.EPHEMERAL_SEQUENTIAL); while(true) { List<String> children = zooKeeper.getChildren("/lock", false); //排序,并獲取序號最小的節點,(序號越小,表明請求時間越早,優先獲取鎖資源) children.sort(String::compareTo); if (nodePath.equals("/lock/"+children.get(0))){ //TODO 業務邏輯 System.out.println("TODO Logic."); break; } } //業務邏輯結束后,洗掉節點,即釋放鎖資源 zooKeeper.delete(nodePath, -1); }View Code
-
-
- 方式二:通過watcher監聽服務器串列變化,判斷當前服務器是否獲取鎖資源,程式不會block
-
package zookeeper; import org.apache.zookeeper.*; import java.io.IOException; import java.util.Collections; import java.util.List; /** * * 通過Zookeeper實作服務器之間的時序鎖 * */ public class SeqLock { private static final String CONNECT_STRING = "172.17.23.79:2181,172.17.23.79:2182"; private static ZooKeeper zooKeeper; private String thispath; public SeqLock() throws IOException { //超時時間單位:毫秒 zooKeeper = new ZooKeeper(CONNECT_STRING, 15000, event -> { //監聽“/lock”的子節點變化,如果有服務器釋放鎖,判斷自己是否獲取鎖 if (event.getType() == Watcher.Event.EventType.NodeChildrenChanged && event.getPath().startsWith("/lock")) { try { List<String> children = zooKeeper.getChildren("/lock", false); if (children.size() > 0) { Collections.sort(children); String fistNode = "/lock/"+children.get(0); if (fistNode.equals(thispath)){ doSomethingAndDelNode(); } } } catch (KeeperException | InterruptedException e) { e.printStackTrace(); } } }); } /** * 通過創建臨時時序節點注冊時序鎖,并監聽服務器串列 */ public void lock() throws KeeperException, InterruptedException { thispath = zooKeeper.create("/lock/sublock", "This is sub lock.".getBytes(), ZooDefs.Ids.OPEN_ACL_UNSAFE, CreateMode.EPHEMERAL_SEQUENTIAL); System.out.println(thispath); List<String> children = zooKeeper.getChildren("/lock", false); //如果只有一個子節點,說明鎖資源被當前服務器持有,如果子節點不止一個,說明鎖資源已經被其它服務器持有 if(children.size() == 1) { doSomethingAndDelNode(); } } private void doSomethingAndDelNode() throws InterruptedException, KeeperException { //TODO 業務邏輯 System.out.println("TODO Logic."); //業務邏輯結束后,洗掉節點,即釋放鎖資源 zooKeeper.delete(thispath, -1); } public static void main(String[] args) { try { SeqLock lock = new SeqLock(); lock.lock(); } catch (Exception e) { e.printStackTrace(); } } }View Code
參考文獻:
- ZooKeeper典型應用場景
鏈接 :https://pan.baidu.com/s/1lohZ98K33z_Hf_8Y370Nuw 提取碼:kydw
更多內容,請訪問:http://www.cnblogs.com/BlueStarWei
轉載請註明出處,本文鏈接:https://www.uj5u.com/houduan/295992.html
標籤:Java
上一篇:Mybatis-Spring原理分析 -- @MapperScacn(Spring Boot中mapper層是如何初始化并注冊到Spring容器的)
