開篇碎碎念:不要試圖用斷點,或者你斷點位置要放好,不然你就會識訓許多連接例外,這絕對是我目前翻譯過的最流暢的,咳,不是官網流暢,是我筆記流暢,也許是我成長了,(屏障就是人齊開飯,都吃完散場,然后佇列是作業系統的生產者-消費者模型)
ZooKeeper的屏障(Barrier)和佇列(Queue)教程
- 引言
- SyncPrimitive
- 屏障(Barrier)
- 建構式
- enter()
- leave()
- 生產者-消費者佇列(Queue)
- 建構式
- produce()
- consume()
- 測驗
- 代碼
- 呼叫方法
- 除錯
- 佇列除錯
- 屏障除錯
- 完整原始碼
引言
這個教程展示屏障和生產者-消費者佇列的實作,下文主要涉及Barrier和Queue類,你需要啟動至少一個zookeeper服務器,
這兩個類都繼承了SyncPrimitive,
注釋前的序號是連起來的,若前文沒有,可以看后面
SyncPrimitive
package barrierexample;
import org.apache.zookeeper.WatchedEvent;
import org.apache.zookeeper.Watcher;
import org.apache.zookeeper.ZooKeeper;
import java.io.IOException;
public class SyncPrimitive implements Watcher {
static ZooKeeper zk = null;
static Integer mutex;
String root;
SyncPrimitive(String address) {
//3 zookeeper實體不存在,則父類自己創造
if (zk == null) {
try {
System.out.println("Starting ZK:");
zk = new ZooKeeper(address,3000,this);
mutex = new Integer(-1);
System.out.println("Finished starting ZK: "+zk);
} catch (IOException e) {
System.out.println(e.toString());
zk = null;
}
}
}
@Override
synchronized public void process(WatchedEvent event) {
synchronized (mutex) {
mutex.notify();
}
}
}
我們在第一次實體化barrier物件或queue物件時創建了zookeeper物件,并宣告一個靜態變數作為該物件的參考,Barrier 和 Queue 的后續實體檢查 ZooKeeper 物件是否存在,或者,我們可以讓應用程式創建一個 ZooKeeper 物件,并將其傳遞給 Barrier 和 Queue 的建構式,
我們使用process()方法去處理watch觸發的通知,watch作為內部結構,可以讓zookeeper通知客戶端節點的變更,若一個客戶端在等待其他客戶端離開barrier,那么他可以設定一個watch監視節點修改,
屏障(Barrier)
Barrier是一個原語,他的作用是同步 計算的開始和結束,
有多個行程呼叫了,等他們都投進去的時候,開始,等他們都結束的時候結束,宏觀上看是同步計算的?
實作方法是使用一個屏障節點作為單個流程節點的父節點“/b1”,每個行程"p"創建節點"/b1/p",只要有足夠的行程創建了相應的節點,就可以開始計算了,
想象一個二叉樹,b1是父節點,p是子節點
建構式
public class SyncPrimitive implements Watcher {
......
/**
* Barrier
*/
static public class Barrier extends SyncPrimitive {
int size;
String name;
/**
* 實體化Barrier物件的行程的引數如下:
* @param address zookeeper服務器地址(如"zoo1.foo.com:2181")
* @param root zookeeper上屏障節點的路徑(如"/b1")
* @param size 一組行程的大小
*/
Barrier(String address, String root, int size) {
//1 Barrier的建構式將zookeeper服務器的地址傳遞給父類的建構式
super(address);
this.root = root;
this.size = size;
//1.5 如果zk物件存在
if (zk != null) {
try {
//2 驗證根節點是否存在
//(此root非zookeeper的root哦)
Stat s = zk.exists(root, false);
//2.5 不存在則,Barrier的建構式在zookeeper上創建一個Barrier節點,作為上述行程節點的父節點,被稱作root
if (s == null) {
zk.create(root,new byte[0], ZooDefs.Ids.OPEN_ACL_UNSAFE,
CreateMode.PERSISTENT);
}
} catch (KeeperException e) {
System.out.println("Keeper exception when instantiating queue: "
+ e.toString());
} catch (InterruptedException e) {
System.out.println("Interrupted exception");
}
}
//獲取本地主機地址的完全限定域名
try {
name = new String(InetAddress.getLocalHost().getCanonicalHostName().toString());
} catch (UnknownHostException e) {
System.out.println(e.toString());
}
}
......
}
}
進入屏障的方法是enter()
enter()
public class SyncPrimitive implements Watcher {
......
static public class Barrier extends SyncPrimitive {
.....
/**
* 進入屏障
* @return
* @throws KeeperException
* @throws InterruptedException
*/
boolean enter() throws KeeperException,InterruptedException {
//4 行程用root下面創建的節點代表自己,用主機名來作為節點名,
//這里如過寫臨時順序節點(EPHEMERAL_SEQUENTIAL)的話,創建節點后就無法根據名字(名字會變為一串0的)洗掉它,所以,我們把這里設為臨時節點即可
zk.create(root + "/"+name,new byte[0], ZooDefs.Ids.OPEN_ACL_UNSAFE,
CreateMode.EPHEMERAL);
while (true) {
//5 它會一直等待直到足夠多的行程進入屏障
synchronized (mutex) {
//6行程通過getChidren()檢查root子節點的數量是否滿足要求,
//getChildren兩個引數,第一個表示要讀取的節點,第二個表示是否設定watch(根節點發生變化時通知),true為設定
List<String> list = zk.getChildren(root,true);
if (list.size() < size) {
mutex.wait();
} else {
//7 滿足要求就結束等待
return true;
}
}
}
}
}
}
leave()
計算完成后,行程呼叫leave()離開屏障,
public class SyncPrimitive implements Watcher {
......
static public class Barrier extends SyncPrimitive {
.....
/**
* 在所有子節點被洗掉后來開屏障
* @return
* @throws KeeperException
* @throws InterruptedException
*/
boolean leave() throws KeeperException,InterruptedException {
zk.delete(root + "/"+name,0);
while (true) {
synchronized (mutex) {
//8 洗掉對應子節點并設定watch
List<String> list = zk.getChildren(root,true);
if (list.size() > 0) {
mutex.wait();
} else {
return true;
}
}
}
}
}
}
生產者-消費者佇列(Queue)
這是我作業系統中的那個生產者-消費者嗎?往下看

生產者-消費者佇列是一種分布式的資料結構,一組行程用它來生產和消費專案,
生產者行程創建新元素并將他們添加到佇列,消費者行程從佇列中移除元素并使用他們,在這個實作中,元素是相當于原子的存在,佇列由根節點來代表,生產者行程是創建根節點的子節點
呀,和前面的enter()方法有點類似呦
建構式
下面是Queue的建構式
public class SyncPrimitive implements Watcher {
......
/**
* 生產者-消費者佇列
*/
static public class Queue extends SyncPrimitive {
Queue(String address,String name) {
//q1 呼叫父類建構式,如果zookeeper物件不存在就自己創建一個
super(address);
this.root = name;
//驗證根節點是否存在,不存在自己創建
if (zk != null) {
try {
Stat s = zk.exists(root,false);
if (s == null) {
//幾個引數:節點路徑、節點資料、acl權限、節點型別
// OPEN_ACL_UNSAFE:完全開放、PERSISTENT:持久化節點
zk.create(root,new byte[0], ZooDefs.Ids.OPEN_ACL_UNSAFE,
CreateMode.PERSISTENT);
}
} catch (KeeperException e) {
System.out
.println("Keeper exception when instantiating queue: "
+ e.toString());
} catch (InterruptedException e) {
System.out.println("Interrupted exception");
}
}
}
}
}
produce()
生產者行程呼叫produce()向佇列中添加元素,
public class SyncPrimitive implements Watcher {
.....
/**
* 向佇列添加元素
* @param i
* @return
* @throws KeeperException
* @throws InterruptedException
*/
boolean produce(int i) throws KeeperException,InterruptedException {
//分配堆位元組緩沖區,容量為4
ByteBuffer b = ByteBuffer.allocate(4);
byte[] value;
b.putInt(i);
value = b.array();
//PERSISTENT_SEQUENTIAL:持久化有序節點,可以讓佇列按照先進先出(作業系統+1,就是先排隊的先吃飯啦)的方法使用元素
zk.create(root + "/element",value, ZooDefs.Ids.OPEN_ACL_UNSAFE,
CreateMode.PERSISTENT_SEQUENTIAL);
return true;
}
}
}
consume()
消費者使用元素的方法:消費者行程consume()獲取根節點的子節點,然后依據value去獲取排列在最前面的元素,如果同時有兩個行程去要獲取這個元素,就兩個都無法獲取,并移除節點
死鎖啦?
public class SyncPrimitive implements Watcher {
......
/**
* 生產者-消費者佇列
*/
static public class Queue extends SyncPrimitive {
......
/**
* 移除佇列的第一個元素
* @return
* @throws KeeperException
* @throws InterruptedException
*/
int consume() throws KeeperException,InterruptedException {
int retValue = -1;
Stat stat = null;
while(true) {
synchronized (mutex) {
List<String> list = zk.getChildren(root,true);
//如果list為空,也就是沒有子節點,消費者無法消費,所以行程陷入等待
if (list.size() == 0) {
System.out.println("Going to wait");
mutex.wait();
} else {
//移除前綴,前綴7位:element 節點長啥樣,為啥后面是int?
Integer min = new Integer(list.get(0).substring(7));
String minNode = list.get(0);
//尋找最小值(因為children的順序可能和佇列不一致)
for (String s: list) {
//移除前綴,
Integer tempValue = new Integer(s.substring(7));
//System.out.println("Temporary value: " + tempValue);
if (tempValue < min) {
min = tempValue;
minNode = s;
}
}
System.out.println("Temporary value: " + root + "/" +minNode);
//獲取節點值
byte[] b = zk.getData(root + "/"+minNode,
false,stat);
//移除節點
zk.delete(root +"/"+minNode,0);
//緩沖區的資料會存放在byte陣列中
ByteBuffer buffer = ByteBuffer.wrap(b);
retValue = buffer.getInt();
//回傳值
return retValue;
}
}
}
}
}
}
測驗
代碼
public class SyncPrimitive implements Watcher {
.....
//args接收引數
public static void main(String args[]) {
//如果是qTest那么代表是佇列的測驗
if (args[0].equals("qTest")) {
queueTest(args);
} else {
//屏障測驗
barrierTest(args);
}
}
public static void queueTest(String args[]) {
//在地址為args[1]的zookeeper服務器上,建立/app1節點
Queue q = new Queue(args[1],"/app1");
System.out.println("Input:"+args[1]);
//創建元素的數量
int i;
Integer max = new Integer(args[2]);
//創建元素
if (args[3].equals("p")) {
System.out.println("Producer");
for (i = 0; i < max; i++) {
try {
q.produce(10 + i);
} catch (KeeperException e) {
} catch (InterruptedException e) {
}
}
} else {
//消費元素
System.out.println("Consumer");
for (i = 0; i < max; i++) {
try {
int r = q.consume();
System.out.println("Item:"+r);
} catch (KeeperException e) {
//剩余元素的數量
i--;
} catch (InterruptedException e) {
}
}
}
}
public static void barrierTest(String args[]) {
//創建一個屏障,可以容納兩個參與者
Barrier b = new Barrier(args[1],"/b1",new Integer(args[2]));
try{
boolean flag = b.enter();
System.out.println("Entered barrier: "+args[2]);
if (!flag) {
System.out.println("Error when entering the barrier");
}
} catch (KeeperException e) {
System.out.println("barrierTest enter KeeperException");
} catch (InterruptedException e) {
System.out.println("barrierTest enter InterruptedException");
}
Random rand = new Random();
//int r = rand.nextInt(100);
int r = 200;
for (int i = 0; i < r; i++) {
try {
Thread.sleep(100);
}catch (InterruptedException e) {
System.out.println("barrierTest sleep InterruptedException");
}
}
try {
b.leave();
} catch (KeeperException e) {
System.out.println("barrierTest leave KeeperException"+e.toString());
} catch (InterruptedException e) {
System.out.println("barrierTest leave InterruptedException");
}
System.out.println("Left barrier");
}
}
呼叫方法
- 1 啟動一個zookeeper的服務器
(1)如果你是直接在記事本里寫的代碼
可以點擊zookeeper的zkServer.cmd
(2)如果你和我一樣用的idea
可以這樣(詳細配置參考):

然后啟動ZooKeeperServerMain - 2 編譯
SyncPrimitive
(1) 如果你是直接在記事本里寫的代碼
然后javac的話,他可能會報一些換行符什么的問題,或者無法找到主類(也許和最上方有package有關),自己修正一下啦~,因為idea沒有呢
(2)如果你是用的idea
先編譯一下SyncPrimitive

確保這個目錄下有(代表編譯成功)

- 3 傳參
(1)第一種 我沒有嘗試,你自己試試啦~
(2)idea
qTest 127.0.1:2181 100 p

除錯
佇列除錯
掌握了呼叫方法,啟動了zookeeper服務器就正式開始除錯嘍

- 1 先生產100個元素
qTest 127.0.1:2181 100 p

生產成功與否,可以用zkCli.cmd看一下

- 2 再消費100個元素
qTest localhost 100 c
127.0.1:2181和localhost都可以呀


好的,成功,真棒吶

下一個~
屏障除錯
設定一個障礙與2個參與者
bTest localhost 2
啟動之后~~~~
(1) 這里是idea的console

(2)這個是zServer.cmd的
因為他需要兩個參與者,然后我們程式只創建了一個參與者,b.enter()生效,但是這樣他只建立了一個參與者,這個時候,我們的程式就陷入了無限的等待中

(3)創建節點
create -e /b1/a 1
在右圖創建節點后,作圖觸發watch,輸出第一個Process:NodeChildrenChanged
然后輸出Entered…
等進入后,呼叫sleep,睡眠之后,洗掉一個節點,也就是第二個Process,但是洗掉節點后,因為還有個節點在里面,所以會陷入等待

(4)然后 我們用
rmr /b1/a
洗掉另一個節點,這是觸發下圖的第二個Process
然后離開屏障

到此為止,恭喜你完成啦~

完整原始碼
package barrierexample;
import org.apache.zookeeper.*;
import org.apache.zookeeper.data.Stat;
import java.io.IOException;
import java.net.InetAddress;
import java.net.UnknownHostException;
import java.nio.ByteBuffer;
import java.security.KeyPair;
import java.util.List;
import java.util.Random;
public class SyncPrimitive implements Watcher {
static ZooKeeper zk = null;
static Integer mutex;
String root;
SyncPrimitive(String address) {
//3 zookeeper實體不存在,則父類自己創造
if (zk == null) {
try {
System.out.println("Starting ZK:");
zk = new ZooKeeper(address,3000,this);
mutex = new Integer(-1);
System.out.println("Finished starting ZK: "+zk);
} catch (IOException e) {
System.out.println(e.toString());
zk = null;
}
}
}
@Override
synchronized public void process(WatchedEvent event) {
synchronized (mutex) {
System.out.println("Process: " + event.getType());
mutex.notify();
}
}
/**
* Barrier
*/
static public class Barrier extends SyncPrimitive {
int size;
String name;
/**
* 實體化Barrier物件的行程的引數如下:
* @param address zookeeper服務器地址(如"zoo1.foo.com:2181")
* @param root zookeeper上屏障節點的路徑(如"/b1")
* @param size 一組行程的大小
*/
Barrier(String address, String root, int size) {
//1 Barrier的建構式將zookeeper服務器的地址傳遞給父類的建構式
super(address);
this.root = root;
this.size = size;
//1.5 如果zk物件存在
if (zk != null) {
try {
//2 驗證根節點是否存在
//(此root非zookeeper的root哦)
Stat s = zk.exists(root, false);
//2.5 不存在則,Barrier的建構式在zookeeper上創建一個Barrier節點,作為上述行程節點的父節點,被稱作root
if (s == null) {
zk.create(root,new byte[0], ZooDefs.Ids.OPEN_ACL_UNSAFE,
CreateMode.PERSISTENT);
}
} catch (KeeperException e) {
System.out.println("Keeper exception when instantiating queue: "
+ e.toString());
} catch (InterruptedException e) {
System.out.println("Interrupted exception");
}
}
//獲取本地主機地址的完全限定域名
try {
name = new String(InetAddress.getLocalHost().getCanonicalHostName().toString());
} catch (UnknownHostException e) {
System.out.println(e.toString());
}
}
/**
* 進入屏障
* @return
* @throws KeeperException
* @throws InterruptedException
*/
boolean enter() throws KeeperException,InterruptedException {
//4 行程用root下面創建的節點代表自己,用主機名來作為節點名,
//這里如過寫臨時順序節點(EPHEMERAL_SEQUENTIAL)的話,創建節點后就無法根據名字(名字會變為一串0的)洗掉它,所以,我們把這里設為臨時節點即可
zk.create(root + "/"+name,new byte[0], ZooDefs.Ids.OPEN_ACL_UNSAFE,
CreateMode.EPHEMERAL);
while (true) {
//5 它會一直等待直到足夠多的行程進入屏障
synchronized (mutex) {
//6行程通過getChidren()檢查root子節點的數量是否滿足要求,
//getChildren兩個引數,第一個表示要讀取的節點,第二個表示是否設定watch(根節點發生變化時通知),true為設定
List<String> list = zk.getChildren(root,true);
if (list.size() < size) {
mutex.wait();
} else {
//7 滿足要求就結束等待
return true;
}
}
}
}
/**
* 在所有子節點被洗掉后來開屏障
* @return
* @throws KeeperException
* @throws InterruptedException
*/
boolean leave() throws KeeperException,InterruptedException {
zk.delete(root + "/"+name,0);
while (true) {
synchronized (mutex) {
//8 洗掉對應子節點并設定watch
List<String> list = zk.getChildren(root,true);
if (list.size() > 0) {
mutex.wait();
} else {
return true;
}
}
}
}
}
/**
* 生產者-消費者佇列
*/
static public class Queue extends SyncPrimitive {
Queue(String address,String name) {
//q1 呼叫父類建構式,如果zookeeper物件不存在就自己創建一個
super(address);
this.root = name;
//驗證根節點是否存在,不存在自己創建
if (zk != null) {
try {
Stat s = zk.exists(root,false);
if (s == null) {
//幾個引數:節點路徑、節點資料、acl權限、節點型別
// OPEN_ACL_UNSAFE:完全開放、PERSISTENT:持久化節點
zk.create(root,new byte[0], ZooDefs.Ids.OPEN_ACL_UNSAFE,
CreateMode.PERSISTENT);
}
} catch (KeeperException e) {
System.out
.println("Keeper exception when instantiating queue: "
+ e.toString());
} catch (InterruptedException e) {
System.out.println("Interrupted exception");
}
}
}
/**
* 向佇列添加元素
* @param i
* @return
* @throws KeeperException
* @throws InterruptedException
*/
boolean produce(int i) throws KeeperException,InterruptedException {
//分配堆位元組緩沖區,容量為4
ByteBuffer b = ByteBuffer.allocate(4);
byte[] value;
b.putInt(i);
value = b.array();
//PERSISTENT_SEQUENTIAL:持久化有序節點,可以讓佇列按照先進先出(作業系統+1)的方法使用元素
zk.create(root + "/element",value, ZooDefs.Ids.OPEN_ACL_UNSAFE,
CreateMode.PERSISTENT_SEQUENTIAL);
return true;
}
/**
* 移除佇列的第一個元素
* @return
* @throws KeeperException
* @throws InterruptedException
*/
int consume() throws KeeperException,InterruptedException {
int retValue = -1;
Stat stat = null;
while(true) {
synchronized (mutex) {
List<String> list = zk.getChildren(root,true);
//如果list為空,也就是沒有子節點,消費者無法消費,所以行程陷入等待
if (list.size() == 0) {
System.out.println("Going to wait");
mutex.wait();
} else {
//移除前綴,前綴7位:element 節點長啥樣,為啥后面是int?
Integer min = new Integer(list.get(0).substring(7));
String minNode = list.get(0);
//尋找最小值(因為children的順序可能和佇列不一致)
for (String s: list) {
//移除前綴,
Integer tempValue = new Integer(s.substring(7));
//System.out.println("Temporary value: " + tempValue);
if (tempValue < min) {
min = tempValue;
minNode = s;
}
}
System.out.println("Temporary value: " + root + "/" +minNode);
//獲取節點值
byte[] b = zk.getData(root + "/"+minNode,
false,stat);
//移除節點
zk.delete(root +"/"+minNode,0);
//緩沖區的資料會存放在byte陣列中
ByteBuffer buffer = ByteBuffer.wrap(b);
retValue = buffer.getInt();
//回傳值
return retValue;
}
}
}
}
}
//args接收引數
public static void main(String args[]) {
//如果是qTest那么代表是佇列的測驗
if (args[0].equals("qTest")) {
queueTest(args);
} else {
//屏障測驗
barrierTest(args);
}
}
public static void queueTest(String args[]) {
//在地址為args[1]的zookeeper服務器上,建立/app1節點
Queue q = new Queue(args[1],"/app1");
System.out.println("Input:"+args[1]);
//創建元素的數量
int i;
Integer max = new Integer(args[2]);
//創建元素
if (args[3].equals("p")) {
System.out.println("Producer");
for (i = 0; i < max; i++) {
try {
q.produce(10 + i);
} catch (KeeperException e) {
} catch (InterruptedException e) {
}
}
} else {
//消費元素
System.out.println("Consumer");
for (i = 0; i < max; i++) {
try {
int r = q.consume();
System.out.println("Item:"+r);
} catch (KeeperException e) {
//剩余元素的數量
i--;
} catch (InterruptedException e) {
}
}
}
}
public static void barrierTest(String args[]) {
//創建一個屏障,可以容納兩個參與者
Barrier b = new Barrier(args[1],"/b1",new Integer(args[2]));
try{
boolean flag = b.enter();
System.out.println("Entered barrier: "+args[2]);
if (!flag) {
System.out.println("Error when entering the barrier");
}
} catch (KeeperException e) {
System.out.println("barrierTest enter KeeperException");
} catch (InterruptedException e) {
System.out.println("barrierTest enter InterruptedException");
}
Random rand = new Random();
//int r = rand.nextInt(100);
int r = 200;
for (int i = 0; i < r; i++) {
try {
Thread.sleep(100);
}catch (InterruptedException e) {
System.out.println("barrierTest sleep InterruptedException");
}
}
try {
b.leave();
} catch (KeeperException e) {
System.out.println("barrierTest leave KeeperException"+e.toString());
} catch (InterruptedException e) {
System.out.println("barrierTest leave InterruptedException");
}
System.out.println("Left barrier");
}
}
都看到這里了,點個贊再走唄

轉載請註明出處,本文鏈接:https://www.uj5u.com/qita/295124.html
標籤:其他
下一篇:elk + logback搭建

