flink HA 有两种实现, ZooKeeper 和 Kubernetes, flink 对两者进行了封装, 定义如下
// 选举服务
interface LeaderElectionService {
    // 启动选举
    start(LeaderContender)
    // 停止选举
    stop()
    // 确认主权
    confirmLeadership(..)
}
// 参选者
interface LeaderContender {
    // 获取主权后回调
    grantLeadership(..)
    // 失去主权后回调
    revokeLeadership()
    // 选举异常回调
    handleError(Exception)
}1. 中间层抽象
2. ZooKeeper 实现
LeaderElectionDriver 的 ZooKeeper 实现是 ZooKeeperLeaderElectionDriver, 内部使用 Curator 框架提供的 LeaderLatch 和 NodeCache 机制实现选举,
 1
 2
 3
 4
 5
 6
 7
 8
 9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
public ZooKeeperLeaderElectionDriver(..) {
    // 选举成功, 回调 isLeader, 否则回调 notLeader
    leaderLatch = new LeaderLatch(client, checkNotNull(latchPath));
    // 监听变化, 回调 nodeChanged
    cache = new NodeCache(client, leaderPath);
    // 选举异常, 回调 unhandledError
    client.getUnhandledErrorListenable().addListener(this);
    leaderLatch.addListener(this);
    leaderLatch.start();
    cache.getListenable().addListener(this);
    cache.start();
}
public void isLeader() {
    leaderElectionEventHandler.onGrantLeadership();
}
public void notLeader() {
    leaderElectionEventHandler.onRevokeLeadership();
}
public void unhandledError(String message, Throwable e) {
    fatalErrorHandler.onFatalError(..);
}
References
[Flink之主从选举] https://www.modb.pro/db/107324
没有评论 :
发表评论