【从入门到放弃-ZooKeeper】ZooKeeper实战-分布式队列

前言

上文【从入门到放弃-ZooKeeper】ZooKeeper入门中,我们学习了ZooKeeper的简单安装和cli使用。
接下来我们开始基于java API的实战编程。本文先来写一个分布式队列的代码实现。

设计

我们来写一个先进先出的分布式无界公平队列。参考我们之前介绍的【从入门到放弃-Java】并发编程-JUC-ConcurrentLinkedQueue【从入门到放弃-Java】并发编程-JUC-LinkedBlockingQueue。我们直接继承AbstractQueue类,并实现Queue接口。
主要重写offer、poll、peek、size方法。
我们使用ZooKeeper的持久化顺序节点来实现分布式队列。

  • offer:入队,入队时新创建一个持久化顺序节点,节点后缀会根据ZooKeeper的特性自动累加。
  • poll:出队,获取根节点下的所有节点,根据后缀数字排序,数组最小的是最先入队的,因此要最先出队。
  • peek:获取到最下入队的数据,和poll的区别是,peek只获取数据,不出队,不删除已经消费的节点。
  • size:获取队列长度,实现方式是,获取根节点下的节点数量即可。这个方法在并发时可能会有问题。慎用。

DistributedQueue

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
//继承AbstractQueue类并实现Queue接口
public class DistributedQueue<E> extends AbstractQueue<E> implements Queue<E> {
private static Logger logger = LoggerFactory.getLogger(DistributedQueue.class);

//ZooKeeper客户端,进行ZooKeeper操作
private ZooKeeper zooKeeper;

//根节点名称
private String dir;

//数据节点名称,顺序节点在插入口会变为 node{00000000xx} 格式
private String node;

//ZooKeeper鉴权信息
private List<ACL> acls;

/**
* Constructor.
*
* @param zooKeeper the zoo keeper
* @param dir the dir
* @param node the node
* @param acls the acls
*/
public DistributedQueue (ZooKeeper zooKeeper, String dir, String node, List<ACL> acls) {
this.zooKeeper = zooKeeper;
this.dir = dir;
this.node = node;
this.acls = acls;
init();
}


private void init() {
//需要先判断根节点是否存在,不存在的话,创建子节点时会出错。
try {
Stat stat = zooKeeper.exists(dir, false);
if (stat == null) {
zooKeeper.create(dir, null, acls, CreateMode.PERSISTENT);
}
} catch (Exception e) {
logger.error("[DistributedQueue#init] error : " + e.toString(), e);
}
}
}

offer

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
/**
* Offer boolean.
*
* @param o the o
* @return the boolean
*/
@Override
public boolean offer(E o) {
//构建要插入的节点名称
String fullPath = dir.concat("/").concat(node);
try {
//创建子节点成功则返回入队成功
zooKeeper.create(fullPath, objectToBytes(o), acls, CreateMode.PERSISTENT_SEQUENTIAL);
return true;
} catch (Exception e) {
logger.error("[DistributedQueue#offer] error : " + e.toString(), e);
}
return false;
}

poll

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
/**
* Poll e.
*
* @return the e
*/
@Override
public E poll() {
try {
//获取根节点所有子节点信息。
List<String> children = zooKeeper.getChildren(dir, null);
//如果队列是空的则返回null
if (children == null || children.isEmpty()) {
return null;
}

//将子节点名称排序
Collections.sort(children);
for (String child : children) {
//拼接子节点的具体名称
String fullPath = dir.concat("/").concat(child);
try {
//如果获取数据成功,则类型转换后,返回,并删除改队列中该节点
byte[] bytes = zooKeeper.getData(fullPath, false, null);
E data = (E) bytesToObject(bytes);
zooKeeper.delete(fullPath, -1);
return data;
} catch (Exception e) {
logger.warn("[DistributedQueue#poll] warn : " + e.toString(), e);
}
}

} catch (Exception e) {
logger.error("[DistributedQueue#peek] poll : " + e.toString(), e);
}

return null;
}

peek

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
/**
* Peek e.
*
* @return the e
*/
@Override
public E peek() {

try {
//获取根节点所有子节点信息。
List<String> children = zooKeeper.getChildren(dir, null);
//如果队列是空的则返回null
if (children == null || children.isEmpty()) {
return null;
}

//将子节点名称排序
Collections.sort(children);

for (String child : children) {
//拼接子节点的具体名称
String fullPath = dir.concat("/").concat(child);
try {
//如果获取数据成功,则类型转换后,返回,不会删除改队列中该节点
byte[] bytes = zooKeeper.getData(fullPath, false, null);
E data = (E) bytesToObject(bytes);
return data;
} catch (Exception e) {
logger.warn("[DistributedQueue#peek] warn : " + e.toString(), e);
}
}

} catch (Exception e) {
logger.error("[DistributedQueue#peek] warn : " + e.toString(), e);
}

return null;
}

size

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
/**
* Size int.
*
* @return the int
*/
@Override
public int size() {
try {
//获取根节点的子节点名称
List<String> children = zooKeeper.getChildren(dir, null);
//返回子结点信息数量
return children.size();
} catch (Exception e) {
logger.error("[DistributedQueue#offer] size : " + e.toString(), e);
}

return 0;
}

总结

上面我们一起学习了如何利用持久性顺序节点,创建一个分布式先进先出队列。源代码可见:aloofJr
如果有好的优化建议,欢迎一起讨论。