Curator Framework 深入了解

本文受到 colobu 前辈文章的指引,深入了解 Curator Framework 的工作流程,十分感谢 colobu 前辈的博文给予的启发和指导。

选举功能实现 (Leader Election)

Curator 提供了 Leader 选举的功能,用于在分布式计算中选举出一个节点作为一组节点的 Leader。Curator 提供了两种 Leader Election 的 Recipe:

LeaderLatch

构造方法:

// LeaderLatch.class

public LeaderLatch(CuratorFramework client, String latchPath)

public LeaderLatch(CuratorFramework client, String latchPath, String id/*zk的 path:value 中的 value*/)

同之前几章的使用风格,需要 start() 方法调用了才会开启选举。 start() 方法之后会调用真正的工作开始方法:

// LeaderLatch.class
private synchronized void internalStart() {
        if ( state.get() == State.STARTED ) { // 状态标记为开始 start()会完成
            // 很重要的一条实践,客户端需要注册一个 lisenter 用来监听和 zk 连接的状态,比如中断、重连等
            client.getConnectionStateListenable().addListener(listener);
            //...
            // 开始选举相关的工作
            reset();
            //...
        }
}

reset() 是一个会重复执行的方法,用来争抢当前的 leader:

// LeaderLatch.class
void reset() throws Exception {
        setLeadership(false); // 当前不是leader,先置为 false;如果是leader不会进行这个操作
        setNode(null); // 成为leader后会创建他的节点,存储起来方便下次删除旧节点
        // Curator 方法非常通用的一种设计,专门用来做回调
        BackgroundCallback callback = new BackgroundCallback() {
            @Override
            public void processResult(CuratorFramework client, CuratorEvent event) throws Exception {
                // 不知道这个 debugResetWaitLatch 这个什么用... 一开始就被赋值 null,没有修改过。看起来开发开发另一个新特性的 hook。
                // volatile CountDownLatch debugResetWaitLatch = null;
                if ( debugResetWaitLatch != null ) {
                    debugResetWaitLatch.await();
                    debugResetWaitLatch = null;
                }
                // 节点创建成功
                if ( event.getResultCode() == KeeperException.Code.OK.intValue() ) {
                    setNode(event.getName()); // 将当前 path 的名称记录下来,方便后续删除
                    if ( state.get() == State.CLOSED ) {
                        setNode(null); // 这应该是一个安全检测,如果这时候leaderLatch被 close() 了,这里的 node 也就不存了。下面创建的也是临时节点。
                    } else {
                        getChildren(); // 获取latchPath(构造方法中传入的)下所有的节点,用来关键的判断谁拿到了 leader 权限
                    }
                } else {
                    log.error("getChildren() failed. rc = " + event.getResultCode());
                }
            }
        };
        //这里可以看到创建的是一个临时节点,value的值就是 id
    client.create().creatingParentContainersIfNeeded().withProtection().withMode(CreateMode.EPHEMERAL_SEQUENTIAL).inBackground(callback).forPath(ZKPaths.makePath(latchPath, LOCK_NAME), LeaderSelector.getIdBytes(id));
    }

checkLeadership() 是关键的终结方法了,他用来判断是谁拿到了 leader 权限:

// LeaderLatch.class
private void getChildren() throws Exception {
        BackgroundCallback callback = new BackgroundCallback() {
            public void processResult(... )throws Exception {
                if ( event.getResultCode() == KeeperException.Code.OK.intValue() ) {
                    // 终结方法,找到对应的 leader
                    checkLeadership(event.getChildren());
                }
            }
        };
    // 获取 latchPath 所有的节点
client.getChildren().inBackground(callback).forPath(ZKPaths.makePath(latchPath, null));
    }

篇幅有限,checkLeadership() 只介绍获得 leader 身份的情况了:

// LeaderLatch.class
private void checkLeadership(List<String> children) throws Exception {
        final String localOurPath = ourPath.get(); // 当前 LeaderLacth 获取的节点
        List<String> sortedChildren = LockInternals.getSortedChildren(LOCK_NAME, sorter, children); // 排序 latchPath 下所有的节点
        int ourIndex = (localOurPath != null) ? sortedChildren.indexOf(ZKPaths.getNodeFromPath(localOurPath)) : -1; // 很明白的代码,查询当前 LeaderLacth 类的节点是否出现在排序数组中
        if ( ourIndex < 0 ) {// 没有出现,就 reset() 方法重新来
            log.error("Can't find our node. Resetting. Index: " + ourIndex);
            reset();
        }
        else if ( ourIndex == 0 ) {// 这里就是关键了, == 0,排在第一位,获得 leader 权限
            setLeadership(true);
        } else { /*...*/}
}

至此一个 Leader 选举的过程就完成了,Curator 利用了 ZooKeeper 的各种特性可谓是玩出了花儿…

这里还介绍一个阻塞的方法等待当前对象获取到 Leader 身份:

// LeaderLatch.class
public void await() throws InterruptedException, EOFException {
        synchronized(this) { // 锁住当前对象
            while ((state.get() == State.STARTED) && !hasLeadership.get()){
                wait(); // 等待成为 Leader,这里 setLeadership(true) 的方法里会 notifyAll()来唤醒的
            }
        }
        if ( state.get() != State.STARTED ) {
            throw new EOFException();
        }
}
// 超时版本
public boolean await(long timeout, TimeUnit unit) throws InterruptedException

LeaderSelector

Curator还提供了另外一种选举方法,注意涉及以下四个类:

  • LeaderSelector
  • LeaderSelectorListener
  • LeaderSelectorListenerAdapter
  • CancelLeadershipException
// LeaderSelector.class
// 构造函数
public LeaderSelector(CuratorFramework client, String leaderPath, LeaderSelectorListener listener)

public LeaderSelector(CuratorFramework client, String leaderPath, ExecutorService executorService, LeaderSelectorListener listener)

需要分析 LeaderSelector 依旧需要从 start() 方法开始,但在开始之前还有一个重要的方法 autoRequeue() 。如果需要该实例不停的去尝试获取 leader 身份,就需要调用此方法一次,在构造好该对象之后先调用 autoRequeue()start()

start() 的逻辑是 :

start() -> requeue() -> internalRequeue() ----
                               ↑             ↓ autoRequeue == true
                               --------------

internalRequeue() 中配置了一个 Future 任务执行 doWorkLoop() 方法,每次调用 internalRequeue() 是同步的,并且 Future 任务执行也是同步的,也就是必须一次一次同步的去尝试获取 leader 身份。

// LeaderSelector.class
void doWork() throws Exception {
        hasLeadership = false;
        try {
            // 这里就是关键了,这是一个分布式锁
            // InterProcessMutex mutex
            // 一旦这个拿到了就是持有锁了
            // 下面只需要 takeLeadership 方法阻塞住方法,不让这边执行到 finally 代码块就好了
            mutex.acquire();

            hasLeadership = true;
            try {/*...*/}
            catch(/**/){/**/}
            finally {
                clearIsQueued();
            }
        }
        catch ( InterruptedException e ) {
            Thread.currentThread().interrupt();
            throw e;
        }
        finally {
            if ( hasLeadership ) {
                hasLeadership = false;
                try {
                    mutex.release(); // 释放了锁,其他的可以去竞争 leader 了
                }
                catch ( Exception e )
                {
                    ThreadUtils.checkInterrupted(e);
                    log.error("The leader threw an exception", e);
                    // ignore errors - this is just a safety
                }
            }
        }
    }

异常处理
LeaderSelectorListener 类继承了 ConnectionStateListenerLeaderSelector 必须小心连接状态的改变。如果实例成为 leader, 当 SUSPENDED 状态出现时, 实例必须假定在重新连接成功之前它可能不再是 leader了。 如果 LOST 状态出现, 实例不再是 leader, takeLeadership() 方法返回。

重要:推荐处理方式是当收到 SUSPENDEDLOST 时抛出 CancelLeadershipException 异常。 这会导致 LeaderSelector 实例中断并取消执行 takeLeadership()方法的异常。Curator 提供了 LeaderSelectorListenerAdapter 以供继承,此 Adapter 提供了推荐的处理逻辑。

public abstract class LeaderSelectorListenerAdapter implements LeaderSelectorListener {
    @Override
    public void stateChanged(CuratorFramework client, ConnectionState newState ){
        if ( client.getConnectionStateErrorPolicy().isErrorState(newState) ){
            throw new CancelLeadershipException();
        }
    }
}

这里跑出异常以中断 takeLeadership()方法只能抛出CancelLeadershipException 异常:

// LeaderSelector.WrappedListener.class
public void stateChanged(CuratorFramework client, ConnectionState newState) {
       try{
                listener.stateChanged(client, newState);
       } catch ( CancelLeadershipException dummy ) {
                // 中断逻辑 
                leaderSelector.interruptLeadership();
       }
}

LeaderLatch 相比, 通过 LeaderSelectorListener 可以对领导权进行控制, 在适当的时候释放领导权,这样每个节点都有可能获得领导权。


转载请注明来源,欢迎对文章中的引用来源进行考证,欢迎指出任何有错误或不够清晰的表达。可以在下面评论区评论,也可以邮件至 [email protected]