flink HA 有两种实现, ZooKeeper 和 Kubernetes, flink 对两者进行了封装, 定义如下
// 选举服务
interface LeaderElectionService {
// 启动选举
start(LeaderContender)
// 停止选举
stop()
// 确认主权
confirmLeadership(..)
}
// 参选者
interface LeaderContender {
// 获取主权后回调
grantLeadership(..)
// 失去主权后回调
revokeLeadership()
// 选举异常回调
handleError(Exception)
}1. 中间层抽象
2. ZooKeeper 实现
LeaderElectionDriver 的 ZooKeeper 实现是 ZooKeeperLeaderElectionDriver, 内部使用 Curator 框架提供的 LeaderLatch 和 NodeCache 机制实现选举,
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
public ZooKeeperLeaderElectionDriver(..) {
// 选举成功, 回调 isLeader, 否则回调 notLeader
leaderLatch = new LeaderLatch(client, checkNotNull(latchPath));
// 监听变化, 回调 nodeChanged
cache = new NodeCache(client, leaderPath);
// 选举异常, 回调 unhandledError
client.getUnhandledErrorListenable().addListener(this);
leaderLatch.addListener(this);
leaderLatch.start();
cache.getListenable().addListener(this);
cache.start();
}
public void isLeader() {
leaderElectionEventHandler.onGrantLeadership();
}
public void notLeader() {
leaderElectionEventHandler.onRevokeLeadership();
}
public void unhandledError(String message, Throwable e) {
fatalErrorHandler.onFatalError(..);
}
References
[Flink之主从选举] https://www.modb.pro/db/107324
没有评论 :
发表评论