Curator是一个Zookeeper的客户端库,是一个分布式协调服务,来帮助你管理zookeeper的连接,减少使用zookeeper的复杂性。 它包含了一套高级API和通用的utilities,提供了一些recipes(如lock,barriers和cache等)和操作(如create, delete, getData等), 这些都使得编写的应用可以更加鲁棒。

Curator最初是由Netfix实现和贡献的,后来被升级为Apache的顶级项目。

Curator Client

上一篇提到的使用Zookeeper Client类似,使用Curator Client,我们需要先创建一个CuratorFramework。

CuratorFramework zkClient = CuratorFrameworkFactory.newClient(
    connectorString,    // servers address e.g.: "127.0.0.1:2181,127.0.0.1:2182,127.0.0.1:2183"
    retryPolicy         // retry policy for retrying operations in the event of disconnections
);

Fluent API

之所以称之为Fluent API是因为,Curator提供的API可以以链式调用的方式来调用。 如果我们使用标准Zookeeper API来同步创建一个znode的调用应该是这样的

zk.create(
    "/mypath",
    new byte[0],
    ZooDefs.Ids.OPEN_ACL_UNSAFE,
    CreateMode.PERSISTENT
);

而如果使用Curator的fluent API完成上面同样的事情应该像下面这样调用

zkClient.create()
    .withMode(CreateMode.PERSISTENT)
    .forPath("/mypath", new byte[0]);

这种模式的调用不仅适用create,也适用其他操作如delete,setData, getData, checkExists和getChildren

如果用异步的方式执行同样的操作,需要添加inBackground()调用:

zkClient.create()
    .inBackground()
    .withMode(CreateMode.PERSISTENT)
    .forPath("/mypath", new byte[0]);

同时当zk创建好了znode,我们还需要创建一个或者多个listener来接收callback,来完成异步调用的过程。

同样我们也可以设置一个watcher,watcher触发的通知也是通过listener来完成的,通知将作为WATCHED event传递给listener

zkClient.getData().inBackground().watched().forPath("/mypath");

Listener

Curator使用listener来处理callbacks和watch notification。 实现一个处理callback和notification的listener,第一步我们需要按照下面的template

CuratorListener masterListener = new CuratorListener() {
    public void eventReceived(CuratorFramework client, CuratorEvent event) {
        try {
            switch (event.getType()) {
                case CHILDREN:
                    ... 
                    break;
                case CREATE:
                    ...
                    break;
                case DELETE:
                    ...
                    break;
                case WATCHED:
                    ...
                    break;
            }
        } catch (Exception e) {
            LOG.error("Exception while processing event.", e);
            try {
                close();
            } catch (IOException ioe) {
                LOG.error("IOException while closing.", ioe);
            }
        }
    }
};

下一步需要注册这个listener到Curator client

zkClient.getCuratorListenable().addListener(masterListener);

我们还需要定义一个特殊listener来处理background线程出现异常的时候报的错误

UnhandledErrorListener errorsListener = new UnhandledErrorListener() {
    public void unhandledError(String message, Throwable e) {
        LOG.error("Unrecoverable error: " + message, e);
        try {
            close();
        } catch (IOException ioe) {
            LOG.warn( "Exception when closing.", ioe );
        }
    }
};

同样也需要注册到Curator client

client.getUnhandledErrorListenable().addListener(errorsListener);