Zookeeper API使用
访问量:
Zookeeper session
Zookeeper client通过创建一个session与Zookeeper server通信,如果与某一台server的连接断了,session会转移到与另一台server建立的连接。通过org.apache.zookeeper.Zookeeper类构建zookeeper session
Zookeeper(
String connectString, // servers address e.g.: "127.0.0.1:2181,127.0.0.1:2182,127.0.0.1:2183"
int sessionTimeout, // session timeout in millisecond
Watcher watcher // 用来接收session event, watcher是接口,需要传递一个实现接口的实例化object
)
为了能够接收zookeeper的通知,需要实现Watcher类,代码如下
public class Master implements Watcher {
private ZooKeeper zk;
private String hostPort;
private volatile boolean connected = false;
private volatile boolean expired = false;
public void startZK() throws IOException {
zk = new ZooKeeper(hostPort, 10000, this);
}
public void stopZK() throws InterruptedException {
zk.close();
}
@Override
public void process(WatchedEvent e) {
logger.info("Master processing event: {}", e.toString());
if (e.getType() == Event.EventType.None) {
switch (e.getState()) {
case SyncConnected:
connected = true;
break;
case Disconnected:
connected = false;
break;
case Expired:
expired = true;
connected = false;
logger.error("Session expiration");
default:
break;
}
}
}
public boolean isConnected() {
return connected;
}
public boolean isExpired() {
return expired;
}
}
WatchEvent包含3个属性,1) EventType, 2) KeeperStatus, 3) path
- EventType取值包括NodeCreated, NodeDeleted, NodeDataChanged, NodeChildrenChanged, or None; None表示WatchEvent用来记录zk session的状态变化
- KeeperStatus在EventType为None时,表示path节点状态,取值包括Disconnected, SyncConnec ted, AuthFailed, ConnectedReadOnly, SaslAuthenticated, or Expired
- path表示在EventType非None时的znode path
Zookeeper的API
Zookeeper支持的API包括create, exists, getData, setData, getChildren, delete。 每个API都支持同步调用和异步调用。异步调用的接口增加了callback接口参数,调用方需要实现相应的callback接口,来处理函数调用产生的各种状态码。 下面是 /master znode创建的代码举例。
同步调用create
private boolean checkMaster() throws InterruptedException {
// Logic for checking /master created correctly
}
public void createMaster() throws InterruptedException {
while (true) {
try {
zk.create("/master", "masterData".getBytes(), ZooDefs.Ids.OPEN_ACL_UNSAFE, CreateMode.EPHEMERAL);
isLeader = true;
break;
} catch (KeeperException.NodeExistsException e) {
isLeader = false;
break;
} catch (KeeperException e) {
e.printStackTrace();
}
if (checkMaster()) {
break;
}
}
}
异步调用create
public void createMasterAsync() {
zk.create("/master", "masterData".getBytes(), ZooDefs.Ids.OPEN_ACL_UNSAFE, CreateMode.EPHEMERAL, masterCreateCallback, null);
}
private AsyncCallback.StringCallback masterCreateCallback = new AsyncCallback.StringCallback() {
@Override
public void processResult(int rc, String path, Object ctx, String name) {
switch (KeeperException.Code.get(rc)) {
case OK:
status = Status.ELECTED;
takeLeadership();
logger.info("I'm the leader");
break;
case NODEEXISTS:
status = Status.NOT_ELECTED;
masterExistsAsync();
break;
case CONNECTIONLOSS:
logger.warn("Connection Loss");
checkMasterAsync();
break;
default:
status = Status.NOT_ELECTED;
logger.error("'create' failed", KeeperException.create(KeeperException.Code.get(rc), path));
}
}
};
一般推荐使用异步API,因为它无需等待同步的create操作完成,线程不会阻塞,不用关心InterruptedException
处理znode状态改变
对于Zookeeper API的调用,调用方一般遵循固定的pattern
- 调用API异步方法
- 实现callback object,作为参数传给异步API方法
- 如果方法需要提供Watcher参数,实现一个Watcher传递给异步API方法
代码举例:
public void masterExistsAsnyc() {
zk.exists("/master", masterExistsWatcher, masterExistsCallback, null);
}
private Watcher masterExistsWatcher = new Watcher() {
@Override
public void process(WatchedEvent event) {
if (event.getType() == Event.EventType.NodeDeleted) {
assert "/master".equals(event.getPath());
createMasterAsync();
}
}
};
private AsyncCallback.StatCallback masterExistsCallback = new AsyncCallback.StatCallback() {
@Override
public void processResult(int rc, String path, Object ctx, Stat stat) {
switch (KeeperException.Code.get(rc)) {
case OK:
looger.info("
break;
case CONNECTIONLOSS:
logger.warn("Connection Loss");
masterExistsA();
break;
case NONODE:
createMasterAsync();
logger.warn("Previous master is gone, run for new master");
default:
checkMasterAsync();
break;
}
}
};