【从入门到放弃-ZooKeeper】ZooKeeper实战-分布式竞选

前言

上文【从入门到放弃-ZooKeeper】ZooKeeper实战-分布式锁-升级版中,我们通过利用ZooKeeper的临时节点和Watcher特性,实现了一个分布式锁。
本文我们结合实际场景,完成一个分布式竞争选举。

设计

这里我们实现一个公平的选举方式,即先参加选举的优先被选为leader。
具体的实现思路 参考了ZooKeeper提供的官方示例:zookeeper-recipes-election

  • START:服务器开始竞选
  • OFFER:创建临时顺序结点
  • DETERMINE:开始决策,将临时节点按末尾序号从小到大排序,如果当前节点的序号最小,则竞选成功,否则,则Watch前一个节点,当前一个节点被删除时,再次进行决策
  • ELECTED:当前节点是序号最小的节点,竞选成功
  • READY:当前节点不是序号最小的节点,竞选不成功,Watch前一个节点,进入READY态
  • FAILED:当出现异常情况时,为失败状态
  • STOP:结束竞选

LeaderElectionSupport

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
public class LeaderElectionSupport implements LeaderElection{
private static Logger logger = LoggerFactory.getLogger(LeaderElectionSupport.class);

//ZooKeeper客户端,进行ZooKeeper操作
private ZooKeeper zooKeeper;

//根节点名称
private String dir;

//节点前缀
private String node;

//ZooKeeper鉴权信息
private List<ACL> acls;

//要加锁节点
private String fullPath;

//选举状态
private State state;

//监听器
private Set<LeaderElectionListener> listeners;

//存当前节点的信息
private volatile LeaderNode leaderNode;

//监察器
private Watcher watcher;


/**
* Constructor.
*
* @param zooKeeper the zoo keeper
* @param dir the dir
* @param node the node
* @param acls the acls
*/
public LeaderElectionSupport(ZooKeeper zooKeeper, String dir, String node, List<ACL> acls) {
this.zooKeeper = zooKeeper;
this.dir = dir;
this.node = node;
this.acls = acls;
this.fullPath = dir.concat("/").concat(this.node);
init();

state = State.STOP;
listeners = Collections.synchronizedSet(new HashSet<>());
}

/**
* 初始化根节点、检查器等
* */
private void init() {
try {
watcher = new LeaderWatcher();
Stat stat = zooKeeper.exists(dir, false);
if (stat == null) {
zooKeeper.create(dir, null, acls, CreateMode.PERSISTENT);
}
} catch (Exception e) {
logger.error("[LeaderElectionSupport#init] error : " + e.toString(), e);
}
}
}

start

1
2
3
4
5
6
7
8
9
10
11
12
13
14
/**
* Start.
* 开始竞选
*/
@Override
public void start() {
synchronized (this) {
state = State.START;
dispatchEvent(EventType.START);
offerElection();
determineElection();
}

}

offerElection

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
/**
* 创建临时节点,参加竞选,并将主机信息保存在node中
* */
private void offerElection() {
dispatchEvent(EventType.OFFER_START);
state = State.OFFER;
if (leaderNode == null) {
synchronized (this) {
try {
if (leaderNode == null) {
InetAddress ia = InetAddress.getLocalHost();
LeaderNode tmpNode = new LeaderNode();
tmpNode.setHostName(ia.getHostName());
String path = zooKeeper.create(fullPath, ConversionUtil.objectToBytes(ia.getHostName()), acls, CreateMode.EPHEMERAL_SEQUENTIAL);

tmpNode.setNodePath(path);
tmpNode.setId(NodeUtil.getNodeId(path));

leaderNode = tmpNode;
}
} catch (Exception e) {
becomeFailed(e);
}
}
}
dispatchEvent(EventType.OFFER_COMPLETE);
}

determineElection

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
/**
* 决定竞选结果
* 1、竞选节点序号最低的赢取选举
* 2、未赢得选举的节点,监听上一个节点,直到上一个节点被删除,则尝试重新竞选
* */
private void determineElection() {
dispatchEvent(EventType.DETERMINE_START);
state = State.DETERMINE;
synchronized (this) {
TreeSet<String> nodePathSet = getNodePathSet();
if (nodePathSet.isEmpty()) {
becomeFailed(new Exception("no node"));
return;
}
String leaderPath = nodePathSet.first();
if (leaderNode.getNodePath().equalsIgnoreCase(leaderPath)) {
becomeLeader();
} else {
becomeReady(nodePathSet.headSet(leaderNode.getNodePath()).last());
}
}
dispatchEvent(EventType.DETERMINE_COMPLETE);
}

becomeLeader

1
2
3
4
5
6
7
8
/**
* 竞选成功
* */
private void becomeLeader() {
dispatchEvent(EventType.ELECTED_START);
state = State.ELECTED;
dispatchEvent(EventType.ELECTED_COMPLETE);
}

becomeReady

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
/**
* 竞选失败进入就绪态
* */
private void becomeReady(String path) {
try {
Stat stat = zooKeeper.exists(path, watcher);

if (stat == null) {
determineElection();
} else {
dispatchEvent(EventType.READY_START);
state = State.READY;
dispatchEvent(EventType.READY_COMPLETE);
}
} catch (KeeperException e) {
becomeFailed(e);
} catch (InterruptedException e) {
becomeFailed(e);
}
}

becomeFailed

1
2
3
4
5
6
7
8
/**
* 当发生异常时,更新为FAILED状态
* */
private void becomeFailed(Exception e) {
state = State.FAILED;
dispatchEvent(EventType.FAILED);
logger.error("[LeaderElectionSupport#becomeFailed] error : " + e.toString(), e);
}

getNodePathSet

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
/**
* 获取参加竞选的节点信息
* */
private TreeSet<String> getNodePathSet() {
TreeSet<String> nodeSet = new TreeSet<>();
try {
List<String> nodes = zooKeeper.getChildren(dir, false);

for (String node : nodes) {
nodeSet.add(dir.concat("/").concat(node));
}

} catch (KeeperException e) {
becomeFailed(e);
} catch (InterruptedException e) {
becomeFailed(e);
}

return nodeSet;
}

stop

1
2
3
4
5
6
7
8
9
10
11
12
13
/**
* Stop.
* 停止竞选
*/
@Override
public void stop() {
synchronized (this) {
dispatchEvent(EventType.STOP_START);
deleteNode();
state = State.STOP;
dispatchEvent(EventType.STOP_COMPLETE);
}
}

deleteNode

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
/**
* 停止时,删除节点,退出竞选
* */
private void deleteNode() {
try {
if (leaderNode != null) {
synchronized (this) {
zooKeeper.delete(leaderNode.getNodePath(), -1);
leaderNode = null;
}
}
} catch (InterruptedException e) {
becomeFailed(e);
} catch (KeeperException e) {
becomeFailed(e);
}
}

getLeaderHostName

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
/**
* Gets get leader host name.
*
* @return the get leader host name
*/
@Override
public String getLeaderHostName() {
synchronized (this) {
TreeSet<String> nodePathSet = getNodePathSet();

if (!nodePathSet.isEmpty()) {
try {
String leaderPath = nodePathSet.first();
return (String) ConversionUtil.bytesToObject(zooKeeper.getData(leaderPath, false, null));
} catch (KeeperException e) {
logger.error("[LeaderWatcher#getLeaderHostName] error : " + e.toString(), e);
} catch (InterruptedException e) {
logger.error("[LeaderWatcher#getLeaderHostName] error : " + e.toString(), e);
} catch (IOException e) {
logger.error("[LeaderWatcher#getLeaderHostName] error : " + e.toString(), e);
} catch (ClassNotFoundException e) {
logger.error("[LeaderWatcher#getLeaderHostName] error : " + e.toString(), e);
}
}

return null;
}
}

getLeaderNodePath

1
2
3
4
5
6
7
8
9
10
11
12
13
14
/**
* Gets get leader node path.
*
* @return the get leader node path
*/
@Override
public String getLeaderNodePath() {
synchronized (this) {
TreeSet<String> nodePathSet = getNodePathSet();

return nodePathSet.isEmpty() ? null : nodePathSet.first();
}

}

LeaderWatcher

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
/**
* 内部watcher类,当竞选失败时,watch前一个节点,当前一个节点别移除时,再次发起决策
* */
private class LeaderWatcher implements Watcher {
/**
* Process.
*
* @param watchedEvent the watched event
*/
@Override
public void process(WatchedEvent watchedEvent) {
try {
if (Event.EventType.NodeDeleted.equals(watchedEvent.getType()) && !State.STOP.equals(state)) {
determineElection();
}
} catch (Exception e) {
logger.error("[LeaderWatcher#process] error : " + e.toString(), e);
}

}
}

总结

以上就是我们利用ZooKeeper的临时节点和Watcher特性实现的公平模式分布式竞选。

可以进行简单的选主操作,适用于如执行单机定时任务、心跳检测等场景。实际上是实现的Master-Slave模型。

源代码可见:aloofJr

而对高可用要求较多的复杂选举场景,如分布式存储、同步等,则需要考虑集群一致性、脑裂等各种情况,则需要实现如Paxos、raft、Zab等一致性算法协议。如ZooKeeper集群的选举模式就是使用的Zab算法。
我们后续会进行深入的探讨。