【从入门到放弃-ZooKeeper】ZooKeeper实战-分布式锁

前言

上文【从入门到放弃-ZooKeeper】ZooKeeper实战-分布式队列中,我们一起写了下如何通过ZooKeeper的持久性顺序节点实现一个分布式队列。
本文我们来一起写一个ZooKeeper的实现的分布式锁。

设计

参考之前学习的【从入门到放弃-Java】并发编程-JUC-locks-ReentrantLock,实现java.util.concurrent.locks.Lock接口。
我们通过重写接口中的方法实现一个可重入锁。

  • lock:请求锁,如果成功则直接返回,不成功则阻塞 直到获取锁。
  • lockInterruptibly:请求锁,如果失败则一直阻塞等待 直到获取锁或线程中断
  • tryLock:1、尝试获取锁,获取失败的话 直接返回false,不会再等待。2、尝试获取锁,获取成功返回true,否则一直请求,直到超时返回false
  • unlock:释放锁

我们使用ZooKeeper的EPHEMERAL临时节点机制,如果能创建成功的话,则获取锁成功,释放锁或客户端断开连接后,临时节点自动删除,这样可以避免误删除或漏删除的情况。

获取锁失败后,这里我们使用轮询的方式来不断尝试创建。其实应该使用Watcher机制来实现,这样能避免大量的无用请求。在下一节更优雅的分布式锁实现机制中我们会用到。

DistributedLock

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
public class DistributedLock implements Lock {
private static Logger logger = LoggerFactory.getLogger(DistributedQueue.class);

//ZooKeeper客户端,进行ZooKeeper操作
private ZooKeeper zooKeeper;

//根节点名称
private String dir;

//加锁节点
private String node;

//ZooKeeper鉴权信息
private List<ACL> acls;

//要加锁节点
private String fullPath;

//加锁标识,为0时表示未获取到锁,每获取一次锁则加一,释放锁时减一。减到0时断开连接,删除临时节点。
private volatile int state;

/**
* Constructor.
*
* @param zooKeeper the zoo keeper
* @param dir the dir
* @param node the node
* @param acls the acls
*/
public DistributedLock(ZooKeeper zooKeeper, String dir, String node, List<ACL> acls) {
this.zooKeeper = zooKeeper;
this.dir = dir;
this.node = node;
this.acls = acls;
this.fullPath = dir.concat("/").concat(node);
init();
}

private void init() {
try {
Stat stat = zooKeeper.exists(dir, false);
if (stat == null) {
zooKeeper.create(dir, null, acls, CreateMode.PERSISTENT);
}
} catch (Exception e) {
logger.error("[DistributedLock#init] error : " + e.toString(), e);
}
}
}

lock

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
public void lock() {
//通过state实现重入机制,如果已经获取锁,则将state++即可。
if (addLockCount()) {
return;
}
//一直尝试获取锁,知道获取成功
for (;;) {
try {
//创建临时节点
zooKeeper.create(fullPath, null, acls, CreateMode.EPHEMERAL);
//第一次获取锁,state++,这里不需要使用加锁机制保证原子性,因为同一时间,最多只有一个线程能create节点成功。
state++;
break;
} catch (InterruptedException ie) {
//如果捕获中断异常,则设置当前线程为中断状态
logger.error("[DistributedLock#lock] error : " + ie.toString(), ie);
Thread.currentThread().interrupt();
} catch (KeeperException ke) {
//如果捕获到的异常是 节点已存在 外的其他异常,则设置当前线程为中断状态
logger.error("[DistributedLock#lock] error : " + ke.toString(), ke);
if (!KeeperException.Code.NODEEXISTS.equals(ke.code())) {
Thread.currentThread().interrupt();
}
}
}
}

lockInterruptibly

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
public void lockInterruptibly() throws InterruptedException {
//通过state实现重入机制,如果已经获取锁,则将state++即可。
if (addLockCount()) {
return;
}
for (;;) {
//如果当前线程为中断状态,则抛出中断异常
if (Thread.interrupted()) {
throw new InterruptedException();
}
try {
zooKeeper.create(fullPath, null, acls, CreateMode.EPHEMERAL);
state++;
break;
} catch (InterruptedException ie) {
//如果捕获中断异常,则设置当前线程为中断状态
logger.error("[DistributedLock#lockInterruptibly] error : " + ie.toString(), ie);
Thread.currentThread().interrupt();
} catch (KeeperException ke) {
//如果捕获到的异常是 节点已存在 外的其他异常,则设置当前线程为中断状态
logger.error("[DistributedLock#lockInterruptibly] error : " + ke.toString(), ke);
if (!KeeperException.Code.NODEEXISTS.equals(ke.code())) {
Thread.currentThread().interrupt();
}
}
}
}

tryLock

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
public boolean tryLock() {
//通过state实现重入机制,如果已经获取锁,则将state++即可。
if (addLockCount()) {
return true;
}
//如果获取成功则返回true,失败则返回false
try {
zooKeeper.create(fullPath, null, acls, CreateMode.EPHEMERAL);
state++;
return true;
} catch (Exception e) {
logger.error("[DistributedLock#tryLock] error : " + e.toString(), e);
}

return false;
}


public boolean tryLock(long time, TimeUnit unit) throws InterruptedException {
//通过state实现重入机制,如果已经获取锁,则将state++即可。
if (addLockCount()) {
return true;
}

//如果尝试获取超时,则返回false
long nanosTimeout = unit.toNanos(time);
if (nanosTimeout <= 0L) {
return false;
}

final long deadline = System.nanoTime() + nanosTimeout;
for (;;) {
//如果当前线程为中断状态,则抛出中断异常
if (Thread.interrupted()) {
throw new InterruptedException();
}

//如果尝试获取超时,则返回false
nanosTimeout = deadline - System.nanoTime();
if (nanosTimeout <= 0L) {
return false;
}
try {
zooKeeper.create(fullPath, null, acls, CreateMode.EPHEMERAL);
state++;
return true;
} catch (InterruptedException ie) {
//如果捕获中断异常,则返回false
logger.error("[DistributedLock#tryLock] error : " + ie.toString(), ie);
return false;
} catch (KeeperException ke) {
//如果捕获到的异常是 节点已存在 外的其他异常,则返回false
logger.error("[DistributedLock#tryLock] error : " + ke.toString(), ke);
if (!KeeperException.Code.NODEEXISTS.equals(ke.code())) {
return false;
}
}
}
}

unlock

1
2
3
4
5
6
7
8
9
10
11
12
13
public void unlock() {
//通过state实现重入机制,如果已经获取锁,释放锁时,需要将state--。
delLockCount();

//如果state为0时,说明不再持有锁,需要将连接关闭,自动删除临时节点
if (state == 0 && zooKeeper != null) {
try {
zooKeeper.close();
} catch (InterruptedException e) {
logger.error("[DistributedLock#unlock] error : " + e.toString(), e);
}
}
}

addLockCount

1
2
3
4
5
6
7
8
9
10
11
12
private boolean addLockCount() {
//如果state大于0,即已持有锁,将state数量加一
if (state > 0) {
synchronized (this) {
if (state > 0) {
state++;
return true;
}
}
}
return false;
}

delLockCount

1
2
3
4
5
6
7
8
9
10
11
12
private boolean delLockCount() {
//如果state大于0,即还持有锁,将state数量减一
if (state > 0) {
synchronized (this) {
if (state > 0) {
state--;
return true;
}
}
}
return false;
}

总结

上面就是一个通过ZooKeeper实现的分布式可重入锁,利用了临时节点的特性。源代码可见:aloofJr
其中有几个可以优化的点。

  • 轮询的方式换成Watcher机制
  • 可重入锁实现方式的优化
  • 所有线程竞争一个节点的创建,容易出现羊群效应,且是一种不公平的锁竞争模式

下节我们使用新的方式实现分布式锁来解决上面的几个问题,如果大家好的优化建议,欢迎一起讨论。