Zookeeper Curator
访问量:
Curator是一个Zookeeper的客户端库,是一个分布式协调服务,来帮助你管理zookeeper的连接,减少使用zookeeper的复杂性。 它包含了一套高级API和通用的utilities,提供了一些recipes(如lock,barriers和cache等)和操作(如create, delete, getData等), 这些都使得编写的应用可以更加鲁棒。
Curator最初是由Netfix实现和贡献的,后来被升级为Apache的顶级项目。
Curator Client
和上一篇提到的使用Zookeeper Client类似,使用Curator Client,我们需要先创建一个CuratorFramework。
CuratorFramework zkClient = CuratorFrameworkFactory.newClient(
connectorString, // servers address e.g.: "127.0.0.1:2181,127.0.0.1:2182,127.0.0.1:2183"
retryPolicy // retry policy for retrying operations in the event of disconnections
);
Fluent API
之所以称之为Fluent API是因为,Curator提供的API可以以链式调用的方式来调用。 如果我们使用标准Zookeeper API来同步创建一个znode的调用应该是这样的
zk.create(
"/mypath",
new byte[0],
ZooDefs.Ids.OPEN_ACL_UNSAFE,
CreateMode.PERSISTENT
);
而如果使用Curator的fluent API完成上面同样的事情应该像下面这样调用
zkClient.create()
.withMode(CreateMode.PERSISTENT)
.forPath("/mypath", new byte[0]);
这种模式的调用不仅适用create,也适用其他操作如delete,setData, getData, checkExists和getChildren
如果用异步的方式执行同样的操作,需要添加inBackground()调用:
zkClient.create()
.inBackground()
.withMode(CreateMode.PERSISTENT)
.forPath("/mypath", new byte[0]);
同时当zk创建好了znode,我们还需要创建一个或者多个listener来接收callback,来完成异步调用的过程。
同样我们也可以设置一个watcher,watcher触发的通知也是通过listener来完成的,通知将作为WATCHED event传递给listener
zkClient.getData().inBackground().watched().forPath("/mypath");
Listener
Curator使用listener来处理callbacks和watch notification。 实现一个处理callback和notification的listener,第一步我们需要按照下面的template
CuratorListener masterListener = new CuratorListener() {
public void eventReceived(CuratorFramework client, CuratorEvent event) {
try {
switch (event.getType()) {
case CHILDREN:
...
break;
case CREATE:
...
break;
case DELETE:
...
break;
case WATCHED:
...
break;
}
} catch (Exception e) {
LOG.error("Exception while processing event.", e);
try {
close();
} catch (IOException ioe) {
LOG.error("IOException while closing.", ioe);
}
}
}
};
下一步需要注册这个listener到Curator client
zkClient.getCuratorListenable().addListener(masterListener);
我们还需要定义一个特殊listener来处理background线程出现异常的时候报的错误
UnhandledErrorListener errorsListener = new UnhandledErrorListener() {
public void unhandledError(String message, Throwable e) {
LOG.error("Unrecoverable error: " + message, e);
try {
close();
} catch (IOException ioe) {
LOG.warn( "Exception when closing.", ioe );
}
}
};
同样也需要注册到Curator client
client.getUnhandledErrorListenable().addListener(errorsListener);