Skip to content

Latest commit

 

History

History
182 lines (122 loc) · 5.2 KB

08.事务请求以及Watcher源码分析.md

File metadata and controls

182 lines (122 loc) · 5.2 KB

image-20220425230855984

Watcher 的基本流程

ZooKeeper 的 Watcher 机制,总的来说可以分为三个过程:客户端注册 Watcher、 服务器处理 Watcher 和客户端回调 Watcher

客户端注册 watcher 有 3 种方式,getData、exists、getChildren;以如下代码为例 来分析整个触发机制的原理

基于 zkclient 客户端发起一个数据操作

public class Test implements Watcher{

    static ZooKeeper zooKeeper;

    static {
        try {
            zooKeeper = new ZooKeeper("localhost:2181", 4000,new Test());
        } catch (IOException e) {
            e.printStackTrace();
        }
    }

    @Override
    public void process(WatchedEvent event) {
        System.out.println("eventType:"+event.getType());
        if(event.getType()==Event.EventType.NodeDataChanged){
            try {
                zooKeeper.exists(event.getPath(),true);
            } catch (KeeperException e) {
                e.printStackTrace();
            } catch (InterruptedException e) {
                e.printStackTrace();
            }
        }
    }
    //getData()/  exists  /getChildren
    public static void main(String[] args) throws IOException, KeeperException, InterruptedException {
        //Curator
        String path="/watcher";
        if(zooKeeper.exists(path,false)==null) {
            zooKeeper.create("/watcher", "0".getBytes(), ZooDefs.Ids.OPEN_ACL_UNSAFE, CreateMode.PERSISTENT);
        }
        Thread.sleep(1000);
        System.out.println("-----------");
        Stat stat=zooKeeper.exists(path,true); //true表示使用zookeeper实例中配置的watcher

        System.in.read();
    }
}

ZooKeeper API 的初始化过程

在创建一个 ZooKeeper 客户端对象实例时,我们通过 new Watcher()向构造方法中 传入一个默认的 Watcher, 这个 Watcher 将作为整个 ZooKeeper 会话期间的默认 Watcher,会一直被保存在客户端 ZKWatchManager 的 defaultWatcher 中;代码如 下

public ZooKeeper(String connectString, int sessionTimeout, Watcher watcher,
        boolean canBeReadOnly)
    throws IOException
{
    LOG.info("Initiating client connection, connectString=" + connectString
            + " sessionTimeout=" + sessionTimeout + " watcher=" + watcher);

    watchManager.defaultWatcher = watcher;

    ConnectStringParser connectStringParser = new ConnectStringParser(
            connectString);
    HostProvider hostProvider = new StaticHostProvider(
            connectStringParser.getServerAddresses());
    cnxn = new ClientCnxn(connectStringParser.getChrootPath(),
            hostProvider, sessionTimeout, this, watchManager,
            getClientCnxnSocket(), canBeReadOnly);
    cnxn.start();
}

ClientCnxn:是 Zookeeper 客户端和 Zookeeper 服务器端进行通信和事件通知处理 的主要类,它内部包含两个类,

1.SendThread :负责客户端和服务器端的数据通信, 也包括事件信息的传输

2.EventThread : 主要在客户端回调注册的 Watchers 进行通知处理

ClientCnxn 初始化

public ClientCnxn(String chrootPath, HostProvider hostProvider, int sessionTimeout, ZooKeeper zooKeeper,
        ClientWatchManager watcher, ClientCnxnSocket clientCnxnSocket,
        long sessionId, byte[] sessionPasswd, boolean canBeReadOnly) {
    this.zooKeeper = zooKeeper;
    this.watcher = watcher;
    this.sessionId = sessionId;
    this.sessionPasswd = sessionPasswd;
    this.sessionTimeout = sessionTimeout;
    this.hostProvider = hostProvider;
    this.chrootPath = chrootPath;

    connectTimeout = sessionTimeout / hostProvider.size();
    readTimeout = sessionTimeout * 2 / 3;
    readOnly = canBeReadOnly;

    sendThread = new SendThread(clientCnxnSocket);
    eventThread = new EventThread();

}

public void start() { --启动两个线程
 	sendThread.start();
 	eventThread.start();
}

服务端接收请求处理流程

NIOServerCnxn

ZookeeperServer-zks.processPacket(this, bb);

submitRequest

firstProcessor 的请求链组成

PredRequestProcessor.processRequest(si);

pRequest

SyncRequestProcessor. processRequest

FinalRequestProcessor. processRequest

客户端接收服务端处理完成的响应

ClientCnxnSocketNIO.doIO

SendThread. readResponse

finishPacket 方法

watchRegistration

EventThread.queuePacket()

事件触发

服务端的事件响应 DataTree.setData()

WatcherManager. triggerWatch

w.process(e);

客户端处理事件响应

SendThread.readResponse

eventThread.queueEvent

Meterialize 方法

waitingEvents.add

ProcessEvent

服务端接收数据请求

NIOServerCnxnFactory.run

NIOServerCnxn.doIO

NIOServerCnxn.readRequest

ZookeeperServer.processPacket

集群模式下的处理流程

image-20220425233658967

image-20220425234711678

image-20220426221335698