Zookeeper(三)基本使用
- zk基本使用
- 資料模型
- ZooKeeper資料模型Znode
- 節點型別
- 事務id
- 節點狀態資訊
- Watcher(資料變更通知)
- ACL(權限管控)
- ZooKeeper命令?操作
- 創建節點
- 讀取節點
- 更新節點
- 洗掉節點
- zk API使用
- 建立會話
- 創建節點
- 洗掉節點
- 修改節點資料
- 獲取節點資料
- Zookeeper-開源客戶端(zkClient)
- 創建會話
- 創建節點
- 洗掉節點
- 修改節點資料
- 獲取節點資料
- 為客戶端添加監聽
- Curator客戶端
- 創建會話
- 創建節點
- 洗掉節點
- 修改節點
- 獲取節點資訊和資料
zk基本使用
資料模型
ZooKeeper資料模型Znode
在ZooKeeper中,資料資訊被保存在?個個資料節點上,這些節點被稱為znode,ZNode 是Zookeeper 中最?資料單位,在 ZNode 下??可以再掛 ZNode,這樣?層層下去就形成了?個層次化命名空間 ZNode 樹,我們稱為 ZNode Tree,它采?了類似?件系統的層級樹狀結構進?管理,

在 Zookeeper 中,每?個資料節點都是?個 ZNode,上圖根?錄下有兩個節點,分別是:app1 和app2,其中 app1 下??有三個?節點,所有ZNode按層次化進?組織,形成這么?顆樹,ZNode的節點路徑標識?式和Unix?件系統路徑?常相似,都是由?系列使?斜杠(/)進?分割的路徑表示,開發?員可以向這個節點寫?資料,也可以在這個節點下?創建?節點,
節點型別
Zookeeper 節點型別可以分為三?類:
- 持久性節點(Persistent)
- 臨時性節點(Ephemeral)
- 順序性節點(Sequential)
在開發中在創建節點的時候通過組合可以?成以下四種節點型別:持久節點、持久順序節點、臨時節 點、臨時順序節點,不同型別的節點則會有不同的?命周期
持久節點:節點被創建后會?直存在服務器,直到洗掉操作主動清除
持久順序節點:就是有順序的持久節點,節點特性和持久節點是?樣的,只是額外特性表現在順序上,順序特性實質是在創建節點的時候,會在節點名后?加上?個數字后綴,來表示其順序,
臨時節點:?命周期和客戶端會話綁在?起,客戶端會話結束,節點 會被洗掉掉,與持久性節點不同的是,臨時節點不能創建?節點,
臨時順序節點:有順序的臨時節點,和持久順序節點相同,在其創建的時候會在名字后?加上數字 后綴,
事務id
?先,先了解,事務是對物理和抽象的應?狀態上的操作集合,往往在現在的概念中,狹義上的事務通 常指的是資料庫事務,?般包含了?系列對資料庫有序的讀寫操作,這些資料庫事務具有所謂的ACID特 性,即原?性(Atomic)、?致性(Consistency)、隔離性(Isolation)和持久性(Durability),
?在ZooKeeper中,事務是指能夠改變ZooKeeper服務器狀態的操作,我們也稱之為事務操作或更新操 作,?般包括資料節點創建與洗掉、資料節點內容更新等操作,對于每?個事務請求,ZooKeeper都會為其分配?個全域唯?的事務ID,? ZXID 來表示,通常是?個 64 位的數字,每?個 ZXID 對應?次更新操作,從這些ZXID中可以間接地識別出ZooKeeper處理這些更新操作請求的全域順序,
節點狀態資訊
- cZxid 就是 Create ZXID,表示節點被創建時的事務ID,
- ctime 就是 Create Time,表示節點創建時間,
- mZxid 就是 Modified ZXID,表示節點最后?次被修改時的事務ID,
- mtime 就是 Modified Time,表示節點最后?次被修改的時間,
- pZxid 表示該節點的?節點串列最后?次被修改時的事務 ID,只有?節點串列變更才會更新 pZxid,
- ?節點內容變更不會更新,
- cversion 表示?節點的版本號,
- dataVersion 表示內容版本號,
- aclVersion 標識acl版本
- ephemeralOwner 表示創建該臨時節點時的會話 sessionID,如果是持久性節點那么值為 0
- dataLength 表示資料?度,
- numChildren 表示直系?節點數,
Watcher(資料變更通知)
Zookeeper使?Watcher機制實作分布式資料的發布/訂閱功能
?個典型的發布/訂閱模型系統定義了?種 ?對多的訂閱關系,能夠讓多個訂閱者同時監聽某?個主題物件,當這個主題物件?身狀態變化時,會通知所有訂閱者,使它們能夠做出相應的處理,
在 ZooKeeper 中,引?了 Watcher 機制來實作這種分布式的通知功能,ZooKeeper 允許客戶端向服務端注冊?個 Watcher 監聽,當服務端的?些指定事件觸發了這個 Watcher,那么就會向指定客戶端發送?個事件通知來實作分布式的通知功能,

Zookeeper的Watcher機制主要包括客戶端執行緒、客戶端WatcherManager、Zookeeper服務器三部 分,
具體?作流程為:客戶端在向Zookeeper服務器注冊的同時,會將Watcher物件存盤在客戶端的WatcherManager當中,當Zookeeper服務器觸發Watcher事件后,會向客戶端發送通知,客戶端執行緒 從WatcherManager中取出對應的Watcher物件來執?回呼邏輯,
ACL(權限管控)
Zookeeper作為?個分布式協調框架,其內部存盤了分布式系統運?時狀態的元資料,這些元資料會直接影響基于Zookeeper進?構造的分布式系統的運?狀態,因此,如何保障系統中資料的安全,從?避免因誤操作所帶來的資料隨意變更?導致的資料庫例外?分重要,在Zookeeper中,提供了?套完善的ACL(Access Control List)權限控制機制來保障資料的安全,
權限就是指那些通過權限檢查后可以被允許執?的操作,在ZooKeeper中,所有對資料的操作權限分為以下五?類:
·CREATE(C):資料節點的創建權限,允許授權物件在該資料節點下創建?節點, · DELETE(D):
?節點的洗掉權限,允許授權物件洗掉該資料節點的?節點, · READ(R):資料節點的讀取權限,允許授權物件訪問該資料節點并讀取其資料內容或?節點串列等, · WRITE(W):資料節點的更新權限,允許授權物件對該資料節點進?更新操作, · ADMIN(A):資料節點的管理權限,允許授權物件對該資料節點進? ACL 相關的設定操作,
ZooKeeper命令?操作
./zkcli.sh 連接本地的zookeeper服務器
./zkCli.sh -server ip:port 連接指定的服務器
連接成功后,可以通過help指令來查看幫助

創建節點
create [-s][-e] path data acl
其中,-s或-e分別指定節點特性,順序或臨時節點,若不指定,則創建持久節點;acl?來進?權限控制
讀取節點
ls path
其中,path表示的是指定資料節點的節點路徑
get path
可以獲取Zookeeper指定節點的資料內容和屬性資訊
更新節點
set path data [version]
洗掉節點
delete path [version]
zk API使用
匯入依賴
<dependency>
<groupId>org.apache.zookeeper</groupId>
<artifactId>zookeeper</artifactId>
<version>3.4.14</version>
</dependency>
建立會話
private static CountDownLatch countDownLatch=new CountDownLatch(1);
//建立會話
public static void main(String[] args) throws IOException, InterruptedException {
ZooKeeper zooKeeper = new ZooKeeper("localhost:2181", 5000, new CreateSession());
System.out.println(zooKeeper.getState());
//通過計數工具類CountDownLatch:不讓main方法結束,讓執行緒等待阻塞;
countDownLatch.await();
System.out.println("會話建立了");
}
//回呼方法,作用:處理來自服務器端的時間watcher通知
@Override
public void process(WatchedEvent watchedEvent) {
//SyncConnected 事件
if(watchedEvent.getState()==Event.KeeperState.SyncConnected){
//解除主程式在countDownLatch的等待阻塞
System.out.println("process執行了");
countDownLatch.countDown();
}
}
創建節點
在建立會話的基礎上在回呼方法中進行創建節點
//同步創建節點
private static void createNodeSync() throws KeeperException, InterruptedException {
/**
*path :節點創建的路徑
*data[] :節點創建要保存的資料,是個byte型別的
*acl :節點創建的權限資訊(4種型別)
*ANYONE_ID_UNSAFE : 表示任何?
*AUTH_IDS :此ID僅可?于設定ACL,它將被客戶機驗證的ID替
換,
*OPEN_ACL_UNSAFE :這是?個完全開放的ACL(常?)-->
world:anyone
*CREATOR_ALL_ACL :此ACL授予創建者身份驗證ID的所有權限
*createMode :創建節點的型別(4種型別)
*PERSISTENT:持久節點
*PERSISTENT_SEQUENTIAL:持久順序節點
*EPHEMERAL:臨時節點
*EPHEMERAL_SEQUENTIAL:臨時順序節點
String node = zookeeper.create(path,data,acl,createMode);
*/
//持久節點
String node_persistent = zooKeeper.create("/lg-server", "持久節點內容".getBytes(StandardCharsets.UTF_8), ZooDefs.Ids.OPEN_ACL_UNSAFE, CreateMode.PERSISTENT);
// //臨時節點
// String node_ephemeral = zooKeeper.create("/lg-ephemeral", "臨時節點內容".getBytes(StandardCharsets.UTF_8), ZooDefs.Ids.OPEN_ACL_UNSAFE, CreateMode.EPHEMERAL);
// //持久順序節點
// String node_persistent_sequential = zooKeeper.create("/lg-persistent_sequential", "持久順序節點內容".getBytes(StandardCharsets.UTF_8), ZooDefs.Ids.OPEN_ACL_UNSAFE, CreateMode.PERSISTENT_SEQUENTIAL);
System.out.println("持久節點"+node_persistent);
// System.out.println("臨時節點"+node_ephemeral);
// System.out.println("持久順序節點"+node_persistent_sequential);
}
//創建節點的方法
//回呼方法,作用:處理來自服務器端的時間watcher通知
@Override
public void process(WatchedEvent watchedEvent) {
//SyncConnected 事件
if(watchedEvent.getState()==Event.KeeperState.SyncConnected){
//解除主程式在countDownLatch的等待阻塞
try {
createNodeSync();
} catch (KeeperException e) {
e.printStackTrace();
} catch (InterruptedException e) {
e.printStackTrace();
}
}
}
洗掉節點
@Override
public void process(WatchedEvent watchedEvent) {
//SyncConnected 事件
if(watchedEvent.getState()==Event.KeeperState.SyncConnected){
//解除主程式在countDownLatch的等待阻塞
try {
deleteNodeSync();
} catch (KeeperException e) {
e.printStackTrace();
} catch (InterruptedException e) {
e.printStackTrace();
}
}
}
//洗掉節點
private void deleteNodeSync() throws KeeperException, InterruptedException {
/*
zooKeeper.exists(path,watch) :判斷節點是否存在
zookeeper.delete(path,version) : 洗掉節點
*/
Stat exists = zooKeeper.exists("/lg-persistent/c1", false);
System.out.println(exists==null?"該節點不存在":"該節點存在");
if(exists!=null){
zooKeeper.delete("/lg-persistent/c1",-1);
}
Stat exists2 = zooKeeper.exists("/lg-persistent/c1", false);
System.out.println(exists2==null?"該節點不存在":"該節點存在");
}
修改節點資料
//更新資料節點內容
private void updateNodeSync() throws KeeperException, InterruptedException {
/*
path:路徑
data:要修改的內容 byte[]
version:為-1,表示對最新版本的資料進?修改
zooKeeper.setData(path, data,version);
*/
System.out.println(new String(zooKeeper.getData("/lg-persistent",false,null)));
//修改資料,stat:狀態資訊物件
Stat stat = zooKeeper.setData("/lg-persistent", "修改資料".getBytes(StandardCharsets.UTF_8), -1);
System.out.println(new String(zooKeeper.getData("/lg-persistent",false,null)));
}
//創建節點的方法
//回呼方法,作用:處理來自服務器端的時間watcher通知
@Override
public void process(WatchedEvent watchedEvent) {
//SyncConnected 事件
if(watchedEvent.getState()==Event.KeeperState.SyncConnected){
//解除主程式在countDownLatch的等待阻塞
try {
updateNodeSync();
} catch (KeeperException e) {
e.printStackTrace();
} catch (InterruptedException e) {
e.printStackTrace();
}
}
}
獲取節點資料
public void process(WatchedEvent watchedEvent) {
//子節點串列發生變化時,服務器會發出noteChildrenChanged事件通知,重新獲取子節點串列,同時這個通知是一次性的,需要反復監聽
if(watchedEvent.getType()==Event.EventType.NodeChildrenChanged){
try {
getChildren();
} catch (KeeperException e) {
e.printStackTrace();
} catch (InterruptedException e) {
e.printStackTrace();
}
}
//SyncConnected 事件
if(watchedEvent.getState()==Event.KeeperState.SyncConnected){
//解除主程式在countDownLatch的等待阻塞
try {
//獲取節點資料
getNodeData();
//獲取節點子節點串列
getChildren();
} catch (KeeperException e) {
e.printStackTrace();
} catch (InterruptedException e) {
e.printStackTrace();
}
}
}
//獲取某個節點資料
private void getNodeData() throws KeeperException, InterruptedException {
/**
*path : 獲取資料的路徑
*watch : 是否開啟監聽
*stat : 節點狀態資訊
*null: 表示獲取最新版本的資料
*zk.getData(path, watch, stat);
*/
byte[] data = zooKeeper.getData("/lg-persistent", false, null);
System.out.println(new String(data));
}
//獲取某個節點的子節點串列方法
public static void getChildren() throws KeeperException, InterruptedException {
/*
path:路徑
watch:是否要啟動監聽,當?節點串列發?變化,會觸發監聽
zooKeeper.getChildren(path, watch);
*/
List<String> children = zooKeeper.getChildren("/lg-persistent", true);
System.out.println(children);
}
Zookeeper-開源客戶端(zkClient)
ZkClient是Github上?個開源的zookeeper客戶端,在Zookeeper原?API接?之上進?了包裝,是?個 更易?的Zookeeper客戶端,同時,zkClient在內部還實作了諸如Session超時重連、Watcher反復注冊 等功能,
添加依賴
<dependency>
<groupId>com.101tec</groupId>
<artifactId>zkclient</artifactId>
<version>0.2</version>
</dependency>
創建會話
ZkClient zkClient=new ZkClient("localhost:2181");
System.out.println("會話創建");
創建節點
ZkClient zkClient=new ZkClient("localhost:2181");
System.out.println("會話創建");
//創建節點 第一個引數為節點名稱,第二個引數如果為true,則創建父節點再創建子節點
zkClient.createPersistent("/lg-zkClient/c1",true);
System.out.println("節點創建完成");
洗掉節點
ZkClient zkClient=new ZkClient("localhost:2181");
System.out.println("會話創建");
String path="/lg-zkClient/c1";
zkClient.createPersistent(path+"/c11");
//洗掉節點
zkClient.deleteRecursive(path);
System.out.println("遞回洗掉成功");
修改節點資料
zkClient.writeData(path,"456");
獲取節點資料
List<String> children = zkClient.getChildren("/lg-zkClient");
為客戶端添加監聽
zkClient.subscribeChildChanges("/lg-zkClient-get", new IZkChildListener() {
/**
*
* @param s parentPath 父節點路徑
* @param list 變化后的子節點串列
* @throws Exception
*/
@Override
public void handleChildChange(String s, List<String> list) throws Exception {
System.out.println(s+"的子節點串列發生變化:"+list);
}
});
Curator客戶端
curator是Net?ix公司開源的?套Zookeeper客戶端框架,和ZKClient?樣,Curator解決了很多 Zookeeper 客 戶 端 ? 常 底 層 的 細 節 開 發 ? 作 , 包 括 連 接 重 連 , 反 復 注 冊 Watcher 和 NodeExistsException例外等,是最流?的Zookeeper客戶端之?,從編碼?格上來講,它提供了基于Fluent的編程?格?持,
Fluent編程風格:在set方法添加當前物件的回傳,那么再編碼程序中,就支持鏈式編程,簡化代碼,


添加依賴
<dependency>
<groupId>org.apache.curator</groupId>
<artifactId>curator-framework</artifactId>
<version>2.12.0</version>
</dependency>
創建會話
Curator的創建會話?式與原?的API和ZkClient的創建?式區別很?,Curator創建客戶端是通過CuratorFrameworkFactory??類來實作的,
RetryPolicy exponentialBackoffRetry = new ExponentialBackoffRetry(1000, 3);
//不使用fluent風格編程
CuratorFramework curatorFramework = CuratorFrameworkFactory.newClient("localhost:2181", exponentialBackoffRetry);
curatorFramework.start();
System.out.println("會話被建立");
//使用fluent風格編程 namespace設定了獨立的命名空間 /base
CuratorFramework base = CuratorFrameworkFactory.builder().connectString("localhost:2181").sessionTimeoutMs(50000)
.connectionTimeoutMs(30000)
.retryPolicy(exponentialBackoffRetry)
.namespace("base")
.build();
base.start();
System.out.println("會話創建了");
創建節點
RetryPolicy exponentialBackoffRetry = new ExponentialBackoffRetry(1000, 3);
//使用fluent風格編程 namespace設定了獨立的命名空間 /base
CuratorFramework base = CuratorFrameworkFactory.builder().connectString("localhost:2181").sessionTimeoutMs(50000)
.connectionTimeoutMs(30000)
.retryPolicy(exponentialBackoffRetry)
.namespace("lg-server")
.build();
base.start();
System.out.println("會話創建了");
base.create().creatingParentsIfNeeded().withMode(CreateMode.PERSISTENT).forPath("/test", "init".getBytes(StandardCharsets.UTF_8));
洗掉節點
RetryPolicy exponentialBackoffRetry = new ExponentialBackoffRetry(1000, 3);
//使用fluent風格編程 namespace設定了獨立的命名空間 /base
CuratorFramework base = CuratorFrameworkFactory.builder().connectString("localhost:2181").sessionTimeoutMs(50000)
.connectionTimeoutMs(30000)
.retryPolicy(exponentialBackoffRetry)
.namespace("base")
.build();
base.start();
System.out.println("會話創建了");
String path="/lg-curator";
base.delete().deletingChildrenIfNeeded().withVersion(-1).forPath(path);
System.out.println("洗掉成功,洗掉的節點"+path);
修改節點
RetryPolicy exponentialBackoffRetry = new ExponentialBackoffRetry(1000, 3);
//使用fluent風格編程 namespace設定了獨立的命名空間 /base
CuratorFramework base = CuratorFrameworkFactory.builder().connectString("localhost:2181").sessionTimeoutMs(50000)
.connectionTimeoutMs(30000)
.retryPolicy(exponentialBackoffRetry)
.namespace("base")
.build();
base.start();
System.out.println("會話創建了");
String path="/lg-curator";
//獲取節點資料內容及狀態資訊
byte[] bytes = base.getData().forPath(path);
System.out.println("獲取到的資料"+new String(bytes));
Stat stat = new Stat();
base.getData().storingStatIn(stat).forPath(path);
System.out.println("獲取到的節點狀態資訊:"+stat);
//更新節點資訊
int version = base.setData().withVersion(stat.getVersion()).forPath(path, "修改內容".getBytes(StandardCharsets.UTF_8)).getVersion();
System.out.println("當前最新版本為:"+version);
byte[] bytes1 = base.getData().forPath(path);
System.out.println("修改后的資料為:"+new String(bytes1));
//badVersionException
base.setData().withVersion(stat.getVersion()).forPath(path,"修改內容22222".getBytes(StandardCharsets.UTF_8));
獲取節點資訊和資料
RetryPolicy exponentialBackoffRetry = new ExponentialBackoffRetry(1000, 3);
//使用fluent風格編程 namespace設定了獨立的命名空間 /base
CuratorFramework base = CuratorFrameworkFactory.builder().connectString("localhost:2181").sessionTimeoutMs(50000)
.connectionTimeoutMs(30000)
.retryPolicy(exponentialBackoffRetry)
// .namespace("base")
.build();
base.start();
System.out.println("會話創建了");
String path="/lg-curator/c1";
// String s = base.create().creatingParentsIfNeeded().withMode(CreateMode.PERSISTENT).forPath(path, "init".getBytes(StandardCharsets.UTF_8));
// System.out.println("節點遞回創建成功,路徑為"+s);
List<String> strings = base.getChildren().forPath("/lg-server");
System.out.println(strings);
String path1="/lg-server/localhost:8088/status";
Stat stat = new Stat();
base.getData().storingStatIn(stat).forPath(path1);
System.out.println("獲取到的節點狀態資訊:"+stat);
轉載請註明出處,本文鏈接:https://www.uj5u.com/qita/294888.html
標籤:其他
下一篇:Flink CDC 實戰
