目錄
介紹
使用
1.建立連接
2.節點創建
3.查詢節點
4.修改節點
5.洗掉節點
Watch事件監聽
1.概述
2.三種Watcher的實作
1. NodeCache
2.PathChildrenCache
3.TreeCache
介紹
Curator是Apache ZooKeeper的Java客戶端庫,常見的ZooKeeper Java API有原生的Java API、ZkClient、Curator,Curator框架在zookeeper原生API介面上進行了包裝,解決了很多ZooKeeper客戶端非常底層的細節開發,提供ZooKeeper各種應用場景如:分布式鎖服務、集群領導選舉、分布式佇列等的抽象封裝,是最好用、最流行的zookeeper的客戶端,
官網:Apache Curator –
Curator提供的常見操作有建立連接、節點創建、節點查詢、節點洗掉、節點修改、Watch事件監聽、分布式鎖的實作等,
使用
1.建立連接
首先在IDEA里新建一個空的maven專案,引入相關依賴和插件
<dependencies>
<!-- junit -->
<dependency>
<groupId>junit</groupId>
<artifactId>junit</artifactId>
<version>4.13.1</version>
<scope>test</scope>
</dependency>
<!-- curator recipes -->
<dependency>
<groupId>org.apache.curator</groupId>
<artifactId>curator-recipes</artifactId>
<version>4.2.0</version>
</dependency>
<!-- curator framework -->
<dependency>
<groupId>org.apache.curator</groupId>
<artifactId>curator-framework</artifactId>
<version>4.2.0</version>
</dependency>
<!-- 日志相關 -->
<dependency>
<groupId>org.slf4j</groupId>
<artifactId>slf4j-api</artifactId>
<version>1.7.21</version>
</dependency>
<dependency>
<groupId>org.slf4j</groupId>
<artifactId>slf4j-log4j12</artifactId>
<version>1.7.21</version>
</dependency>
</dependencies>
<build>
<plugins>
<plugin>
<groupId>org.apache.maven.plugins</groupId>
<artifactId>maven-compiler-plugin</artifactId>
<version>2.3.2</version>
<configuration>
<source>1.8</source>
<target>1.8</target>
<encoding>UTF-8</encoding>
</configuration>
</plugin>
</plugins>
</build>
連接有兩種建立方法如下,個人比較推薦使用第二種
//重試策略
RetryPolicy retryPolicy = new ExponentialBackoffRetry(3000, 10);
//方法一
/*
connectString 連接字串--> ip:埠號,ip:埠號...
sessionTimeoutMs 會話超時時間(ms) 默認值為60*1000
connectionTimeoutMs 連接超時時間(ms) 默認值為45*1000
retryPolicy 重試策略
*/
CuratorFramework client = CuratorFrameworkFactory.newClient("192.168.5.156:2181", 60 * 1000, 45 * 1000, retryPolicy);
client.start(); //開啟連接
//方法二
CuratorFramework client = CuratorFrameworkFactory.builder().connectString("192.168.5.156:2181")
.sessionTimeoutMs(60 * 1000)
.connectionTimeoutMs(45 * 1000)
.retryPolicy(retryPolicy)
.namespace("jason") //加上namespace可以使每次創建或修改的節點都是在/jason/下進行的
.build();
client.start();
2.節點創建
在已建立連接的基礎上就可以開始對節點進行一些操作了,此處的client是建立連接時的CuratorFramework物件,通過呼叫create().forPath()創建節點,創建節點默認存盤的值是當前客戶端的ip地址,若要自定義值需要再加一個值引數(byte陣列型別),與此同時,節點默認也是持久化的,若有需要則可以通過withMode來指定存盤型別,
ps:注意forPath例外的拋出
@Test
public void createTest1() throws Exception {
//默認節點的值是當前客戶端的ip地址,自定義時要注意值的型別為byte陣列
client.create().forPath("/app2","hello".getBytes());
}
@Test
public void createTest2() throws Exception {
//創建節點時默認為持久化節點
//若要設定其他型別需要使用到withMode方法
client.create().withMode(CreateMode.EPHEMERAL).forPath("/app3");
}
3.查詢節點
/*
測驗查詢
查詢資料:get
查詢子節點:ls
查詢子節點狀態資訊:ls -s
*/
@Test
public void getTest() throws Exception {
//查詢節點資料:get
byte[] data1 = client.getData().forPath("/app1");
//查詢子節點:ls
List<String> nodes = client.getChildren().forPath("/app1");
//查詢子節點狀態資訊:ls -s
Stat status = new Stat();
client.getData.storingStatIn(status).forPath("/app1");
}
4.修改節點
普通修改節點資料
@Test
public void setTest1() throws Exception {
client.setData().forPath("/app1","Jason".getBytes());
}
通過版本號修改節點資料
首先要查詢當前節點的狀態資訊(參考3.查詢節點),然后通過getVersion獲取版本號
@Test
public void setTest2() throws Exception {
Stat status = new Stat();
client.getData().storingStatIn(status).forPath("/app1");
int version = status.getVersion();
client.setData().withVersion(version).forPath("/app1","這是修改的資料".getBytes());
}
5.洗掉節點
前兩行的實作對應了delete和deleteall
由于連接可能出現超時或者網路的波動,通過guaranteed可以保證洗掉的成功性(盡量加上這個)
要想洗掉實作回呼,則需要呼叫inBackground,通過匿名內部類或Lambda運算式實作BackgroundCallback介面,重寫processResult,執行一些回呼任務,
@Test
public void deleteTest() throws Exception {
//洗掉空節點
client.delete().forPath("/app1");
//洗掉非空節點(包含子節點)
client.delete().deletingChildrenIfNeeded().forPath("/app1");
//若連接超時或出現例外,以上兩種方式可能出現洗掉失敗的情況,于是可以通過以下方式保證洗掉的成功
client.delete().guaranteed().forPath("/app1");
//回呼
client.delete().inBackground(new BackgroundCallback() {
@Override
public void processResult(CuratorFramework curatorFramework, CuratorEvent curatorEvent) throws Exception {
System.out.println(curatorEvent);
}
}).forPath("/app1");
}
Watch事件監聽
1.概述
ZooKeeper允許用戶在指定節點上注冊一些Watcher,并在一些特定事件觸發的時候,ZooKeeper服務端會將事件通知到感興趣的客戶端上去,該機制是實作分布式協調服務的重要特性,
ZooKeeper中引入了Watcher機制來實作了發布/訂閱功能,可以讓多個訂閱者同時監聽某一個物件,當一個物件自身狀態變化時,會通知所有訂閱者,
Curator引入了Cache來實作對ZooKeeper服務端事件的監聽,
ZooKeeper提供了三種Watcher:
1.NodeCache:只是監聽某一個特定節點
2.PathChildrenCache:監控一個ZNode的子節點
3.TreeCache:可以監控整個樹上的所有節點,類似于NodeCache和PathChildrenCache的組合
2.三種Watcher的實作
1. NodeCache
@Test
public void testNodeCache() throws Exception {
//1.創建NodeCache物件
NodeCache nodeCache = new NodeCache(client,"/app1");
//2.注冊監聽 在重寫的方法中撰寫觸發事件后要執行的任務
nodeCache.getListenable().addListener(new NodeCacheListener() {
@Override
public void nodeChanged() throws Exception {
System.out.println("節點變化了~");
//獲取當前修改后的值
byte[] data = nodeCache.getCurrentData().getData();
System.out.println(new String(data));
}
});
//3.開啟監聽
nodeCache.start(true);
while(true){
//保證當前執行緒一直在跑,因為要測驗相關操作
}
}
2.PathChildrenCache
@Test
public void testPathChildrenCache() throws Exception {
//1.創建PathChildrenCache物件 cacheData表示是否快取變化的事件
PathChildrenCache pathChildrenCache = new PathChildrenCache(client,"/app1",true);
//2.注冊監聽 在重寫的方法中撰寫觸發事件后要執行的任務
pathChildrenCache.getListenable().addListener(new PathChildrenCacheListener() {
@Override
public void childEvent(CuratorFramework curatorFramework, PathChildrenCacheEvent event) throws Exception {
System.out.println("子節點發生變化了~");
//這里列印一下event
System.out.println(event);
//監聽子節點的資料變更,并拿到變更后的資料
//獲取子節點的事件型別
PathChildrenCacheEvent.Type type = event.getType();
//若事件為update,那么列印當前變化的資料
if(type.equals(PathChildrenCacheEvent.Type.CHILD_UPDATED)){
byte[] data = event.getData().getData();
System.out.println(new String(data));
}
}
});
//3.開啟監聽
pathChildrenCache.start();
while (true){
//保證當前執行緒一直在跑,因為要測驗相關操作
}
}
3.TreeCache
@Test
public void testTreeCache() throws Exception {
//1.創建TreeCache物件
TreeCache treeCache = new TreeCache(client,"/app1");
//2.注冊監聽
treeCache.getListenable().addListener(new TreeCacheListener() {
@Override
public void childEvent(CuratorFramework curatorFramework, TreeCacheEvent event) throws Exception {
System.out.println("節點或子節點變化了~");
//看一下事件
System.out.println(event);
//具體可以參照前兩個
}
});
//3.開啟監聽
treeCache.start();
while (true){
//保證當前執行緒一直在跑,因為要測驗相關操作
}
}
轉載請註明出處,本文鏈接:https://www.uj5u.com/qita/333626.html
標籤:其他
