zookeeper使用close命令关闭连接与直接关闭运行窗口对临时节点来说有什么区别

  通过源码我们可以知道zookeeper的客户端与服务端连接后服务端会为其生成session并与客户端协商session的timeout,而之所以能自动删除是通过sessionTracker线程【SessionTrackerImpl实现类】来做到的,该线程会实时监测session过期队列,若有session过期了就会将其加入此队列中,并将相应的过期session的isClosing状态设置为true,再通过expire方法里的close方法来发送closeSession请求让服务端本身去关闭session。如下图及代码所示。

public void expire(Session session) {
          
   
    long sessionId = session.getSessionId();
    LOG.info(
        "Expiring session 0x{}, timeout of {}ms exceeded",
        Long.toHexString(sessionId),
        session.getTimeout());

    close(sessionId); //
}
private void close(long sessionId) {
          
   
// 清空关于session的map,删除临时节点 prepReqeustProcessor sync final
    Request si = new Request(null, sessionId, 0, OpCode.closeSession, null, null);
    submitRequest(si);
}

  当我们通过命令行窗口运行了一个客户端,相应的服务端会产生一个session与其绑定,若我们通过将窗口关闭的方式来关闭客户端连接,则服务端上的临时节点不会立即被删除,而是要等到session过期后才会将该session上的临时节点全部删除。即使你关闭了窗口后重新使用命令行窗口运行也阻止不了临时节点被删的命运,因为重新打开窗口运行的时候服务端也会重新为其绑定不同的session。【其本质也是session过期后才提交一个closeSession事务请求】   若当我们通过close命令去关闭客户端连接,则客户端会发送closeSession事务请求给服务端,服务端会将其记录日志并将内存datatree上与其相关的临时节点都删除,随后再关闭连接。

接下来我们来看看session临时节点删除的流程

  1. closeSession事务请求会先经过PrepRequestProcessor请看下图,之后交给SyncRequestProcessor 我们接着看pRequest2Txn方法,我将其相关的代码截取出来
protected void pRequest2Txn(int type, long zxid, Request request, Record record, boolean deserialize) throws KeeperException, IOException, RequestProcessorException {
          
   
        // 这是日志头
        if (request.getHdr() == null) {
          
   
                // sessionID、cxid、zxid、当前时间、请求类型
                request.setHdr(new TxnHeader(request.sessionId, request.cxid, zxid,
                     Time.currentWallTime(), type));
        }
switch (type) {
          
   
            case OpCode.closeSession:
    // We dont want to do this check since the session expiration thread
    // queues up this operation without being the session owner.
    // this request is the last of the session so it should be ok
    //zks.sessionTracker.checkSession(request.sessionId, request.getOwner());
    long startTime = Time.currentElapsedTime();
    synchronized (zks.outstandingChanges) {
          
   
            Set<String> es = zks.getZKDatabase().getEphemerals(request.sessionId);
            for (ChangeRecord c : zks.outstandingChanges) {
          
   
                if (c.stat == null) {
          
   
                    // Doing a delete
                    es.remove(c.path);
                } else if (c.stat.getEphemeralOwner() == request.sessionId) {
          
   
                    es.add(c.path);
                }
            }
            for (String path2Delete : es) {
          
   
                if (digestEnabled) {
          
   
                    parentPath = getParentPathAndValidate(path2Delete);
                    parentRecord = getRecordForPath(parentPath);
                    parentRecord = parentRecord.duplicate(request.getHdr().getZxid());
                    parentRecord.stat.setPzxid(request.getHdr().getZxid());
                    parentRecord.precalculatedDigest = precalculateDigest(
                        DigestOpCode.UPDATE, parentPath, parentRecord.data, parentRecord.stat);
                    addChangeRecord(parentRecord);
                }
           标红   nodeRecord = new ChangeRecord(
                    request.getHdr().getZxid(), path2Delete, null, 0, null);
                nodeRecord.precalculatedDigest = precalculateDigest(
                    DigestOpCode.REMOVE, path2Delete);
            标红    addChangeRecord(nodeRecord);
            }
            if (ZooKeeperServer.isCloseSessionTxnEnabled()) {
          
   
            // 持久化
                request.setTxn(new CloseSessionTxn(new ArrayList<String>(es)));
            }
            标红 zks.sessionTracker.setSessionClosing(request.sessionId);
        }
        ServerMetrics.getMetrics().CLOSE_SESSION_PREP_TIME.add(Time.currentElapsedTime() - startTime);
        break;
}

  看代码标红的位置,其会将与该session相关的临时节点全都添加一个stat属性为null的ChangeRecord并将其加入到outstandingChanges队列以及outstandingChangesForPath队列里,随后调用sessionTracker线程的setSessionClosing方法将session的状态isClosing设置为true。

这里需要普及一下为什么需要用ChangeRecord,才能更加看得懂上面的代码

ChangeRecord表示修改记录,表示某个节点的修改记录,在处理Request时,需要依赖现有节点上的已有信息,比如cversion(某个节点的孩子节点版本),比如,在处理一个create请求时,需要修改父节点上的cversion(加1),那么这个信息从哪来呢?一开始肯定是从DataTree上来,但是不能每次都从DataTree上来获取父节点的信息,这样性能很慢,比如ZooKeeperServer连续收到两个create请求,当某个create请求在被处理时,都需要先从DataTree获取信息,然后持久化,然后更新DataTree,最后才能处理下一个create请求,是一个串行的过程,那么如果第二个create不合法呢?依照上面的思路,则还需要等待第一个create请求处理完了之后才能对第二个请求进行验证,所以Zookeeper为了解决这个问题,在PrepRequestProcessor中,没验证完一个请求,就把这个请求异步的交给持久化线程来处理,PrepRequestProcessor自己就去处理下一个请求了,打断了串行的链路,但是这时又出现了问题,因为在处理第二个create请求时需要依赖父节点的信息,并且应该处理过第一个create请求后的结果,所以这时就引入了ChangeRecord,PrepRequestProcessor在处理第一个create请求时,先生成一条ChangeRecord记录,然后再异步的去持久化和更新DataTree,然后立即去处理第二个create请求,此时就可以不需要去取DataTree中的信息了(就算取了,可能取到的信息也不对),就直接取ChangeRecord中的信息就可以了。【简单理解就是因为有队列所以需要异步,因为异步所以才需要ChangeRecord】。
  1. closeSession事务请求接着会经过SyncRequestProcessor进行日志记录以及持久化,之后交给FinalRequestProcessor
  2. closeSession事务请求最后会由FinalRequestProcessor来处理   先从FinalRequestProcessor的processRequest方法的124行ProcessTxnResult rc = zks.processTxn(request);开始   再到ZooKeeperServer的1812行processTxn方法 的ProcessTxnResult rc = processTxnInDB(hdr, request.getTxn(), request.getTxnDigest());   再到ZooKeeperServer的1872行processTxnInDB方法的 return getZKDatabase().processTxn(hdr, txn, digest);   再到ZKDatabase的489行processTxn方法的dataTree.processTxn(hdr, txn, digest);   再到DataTree的895行processTxn方法的 ProcessTxnResult result = processTxn(header, txn);   再DataTree的904行processTxn方法的1001行,看下图 再点进去, DataTree的1184行,看下图

  这样内存DataTree的临时节点就都全部删除了。至此,closeSession记录会记录在日志记录里,但在内存DataTree上相关的临时节点却已经全部删除了,所以这就是为什么当客户端使用close命令来关闭连接后,即使很快重新连接也不能看到前一个session的临时节点的原因。

经验分享 程序员 微信小程序 职场和发展