Zookeeper session

Zookeeper client通过创建一个session与Zookeeper server通信,如果与某一台server的连接断了,session会转移到与另一台server建立的连接。通过org.apache.zookeeper.Zookeeper类构建zookeeper session

Zookeeper(
    String connectString,   // servers address e.g.: "127.0.0.1:2181,127.0.0.1:2182,127.0.0.1:2183"
    int sessionTimeout,     // session timeout in millisecond
    Watcher watcher         // 用来接收session event, watcher是接口,需要传递一个实现接口的实例化object
)

为了能够接收zookeeper的通知,需要实现Watcher类,代码如下

public class Master implements Watcher {
    private ZooKeeper zk;
    private String hostPort;
    private volatile boolean connected = false;
    private volatile boolean expired = false;
    
    public void startZK() throws IOException {
        zk = new ZooKeeper(hostPort, 10000, this);
    }
    
    public void stopZK() throws InterruptedException {
        zk.close();
    }
    
    @Override
    public void process(WatchedEvent e) {
        logger.info("Master processing event: {}", e.toString());
        if (e.getType() == Event.EventType.None) {
            switch (e.getState()) {
                case SyncConnected:
                    connected = true;
                    break;
                case Disconnected:
                    connected = false;
                    break;
                case Expired:
                    expired = true;
                    connected = false;
                    logger.error("Session expiration");
                default:
                    break;
            }
        }
    }

    public boolean isConnected() {
        return connected;
    }

    public boolean isExpired() {
        return expired;
    }
}

WatchEvent包含3个属性,1) EventType, 2) KeeperStatus, 3) path

  • EventType取值包括NodeCreated, NodeDeleted, NodeDataChanged, NodeChildrenChanged, or None; None表示WatchEvent用来记录zk session的状态变化
  • KeeperStatus在EventType为None时,表示path节点状态,取值包括Disconnected, SyncConnec ted, AuthFailed, ConnectedReadOnly, SaslAuthenticated, or Expired
  • path表示在EventType非None时的znode path

Zookeeper的API

Zookeeper支持的API包括create, exists, getData, setData, getChildren, delete。 每个API都支持同步调用和异步调用。异步调用的接口增加了callback接口参数,调用方需要实现相应的callback接口,来处理函数调用产生的各种状态码。 下面是 /master znode创建的代码举例。

同步调用create

    private boolean checkMaster() throws InterruptedException {
        // Logic for checking /master created correctly
    }

    public void createMaster() throws InterruptedException {
        while (true) {
            try {
                zk.create("/master", "masterData".getBytes(), ZooDefs.Ids.OPEN_ACL_UNSAFE, CreateMode.EPHEMERAL);
                isLeader = true;
                break;
            } catch (KeeperException.NodeExistsException e) {
                isLeader = false;
                break;
            } catch (KeeperException e) {
                e.printStackTrace();
            }
            if (checkMaster()) {
                break;
            }
        }
    }

异步调用create

    public void createMasterAsync() {
        zk.create("/master", "masterData".getBytes(), ZooDefs.Ids.OPEN_ACL_UNSAFE, CreateMode.EPHEMERAL, masterCreateCallback, null);
    }

    private AsyncCallback.StringCallback masterCreateCallback = new AsyncCallback.StringCallback() {
        @Override
        public void processResult(int rc, String path, Object ctx, String name) {
            switch (KeeperException.Code.get(rc)) {
                case OK:
                    status = Status.ELECTED;
                    takeLeadership();
                    logger.info("I'm the leader");
                    break;
                case NODEEXISTS:
                    status = Status.NOT_ELECTED;
                    masterExistsAsync();
                    break;
                case CONNECTIONLOSS:
                    logger.warn("Connection Loss");
                    checkMasterAsync();
                    break;
                default:
                    status = Status.NOT_ELECTED;
                    logger.error("'create' failed", KeeperException.create(KeeperException.Code.get(rc), path));
            }
        }
    };

一般推荐使用异步API,因为它无需等待同步的create操作完成,线程不会阻塞,不用关心InterruptedException

处理znode状态改变

对于Zookeeper API的调用,调用方一般遵循固定的pattern

  1. 调用API异步方法
  2. 实现callback object,作为参数传给异步API方法
  3. 如果方法需要提供Watcher参数,实现一个Watcher传递给异步API方法

代码举例:

    public void masterExistsAsnyc() {
        zk.exists("/master", masterExistsWatcher, masterExistsCallback, null);
    }

    private Watcher masterExistsWatcher = new Watcher() {
        @Override
        public void process(WatchedEvent event) {
            if (event.getType() == Event.EventType.NodeDeleted) {
                assert "/master".equals(event.getPath());
                createMasterAsync();
            }
        }
    };

    private AsyncCallback.StatCallback masterExistsCallback = new AsyncCallback.StatCallback() {
        @Override
        public void processResult(int rc, String path, Object ctx, Stat stat) {
            switch (KeeperException.Code.get(rc)) {
                case OK:
                    looger.info("
                    break;
                case CONNECTIONLOSS:
                    logger.warn("Connection Loss");
                    masterExistsA();
                    break;
                case NONODE:
                    createMasterAsync();
                    logger.warn("Previous master is gone, run for new master");
                default:
                    checkMasterAsync();
                    break;
            }
        }
    };