深入浅出Zookeeper六客户端的请求在服务器中经历了什么

43次阅读

共计 19761 个字符,预计需要花费 50 分钟才能阅读完成。

本文首发于泊浮目的简书:https://www.jianshu.com/u/204…

版本 日期 备注
1.0 2020.5.23 文章首发

1. 前言

当我们向 zk 发出一个数据更新请求时,这个请求会经过怎样的处理流程呢?zk 又是使用了什么共识算法来保证一致性呢?带着这些问题,我们进入今天的正文。

2. 设计模式:责任链模式(Chain of Responsibility)

在分析源码之前,必须先和大家简单的科普一下责任链模式,因为这和本文的内容密切相关。简单的说:责任链模式将多个对象组成一条指责链,然后按照它们在职责链的顺序一个个地找出到底谁来负责处理。

那它的好处是什么呢?仔细想想,该设计模式像极面向对象版本的 if...else if...else(我们都知道if...else if...else 属于面向过程),但由于面向对象的特性,会比面向过程更容易复用。

3. 请求逻辑追踪

我们先从 ZooKeeperServer 这个类入手,查看其实现类。我们需要关心的有(常见的 zk 服务器角色如下):

  • LeaderZooKeeperServer
  • FollowerZooKeeperServer
  • ObserverZooKeeperServer

3.1 LeaderZooKeeperServer

   @Override
    protected void setupRequestProcessors() {RequestProcessor finalProcessor = new FinalRequestProcessor(this);
        RequestProcessor toBeAppliedProcessor = new Leader.ToBeAppliedRequestProcessor(finalProcessor, getLeader());
        commitProcessor = new CommitProcessor(toBeAppliedProcessor,
                Long.toString(getServerId()), false,
                getZooKeeperServerListener());
        commitProcessor.start();
        ProposalRequestProcessor proposalProcessor = new ProposalRequestProcessor(this,
                commitProcessor);
        proposalProcessor.initialize();
        prepRequestProcessor = new PrepRequestProcessor(this, proposalProcessor);
        prepRequestProcessor.start();
        firstProcessor = new LeaderRequestProcessor(this, prepRequestProcessor);

        setupContainerManager();}

整理一下顺序:

  1. LeaderRequestProcessor
  2. PrepRequestProcessor
  3. ProposalRequestProcessor

接下来分两个分支:

  • 事务型请求会额外走这条链:SyncRequestProcessor -> AckRequestProcessor
  • CommitProcessor -> ToBeAppliedRequestProcessor ->FinalRequestProcessor

3.1.1 LeaderRequestProcessor

    @Override
    public void processRequest(Request request)
            throws RequestProcessorException {
        // Check if this is a local session and we are trying to create
        // an ephemeral node, in which case we upgrade the session
        Request upgradeRequest = null;
        try {upgradeRequest = lzks.checkUpgradeSession(request);
        } catch (KeeperException ke) {if (request.getHdr() != null) {LOG.debug("Updating header");
                request.getHdr().setType(OpCode.error);
                request.setTxn(new ErrorTxn(ke.code().intValue()));
            }
            request.setException(ke);
            LOG.info("Error creating upgrade request" + ke.getMessage());
        } catch (IOException ie) {LOG.error("Unexpected error in upgrade", ie);
        }
        if (upgradeRequest != null) {nextProcessor.processRequest(upgradeRequest);
        }

        nextProcessor.processRequest(request);
    }

这段逻辑很清楚。因需要检查会话是否过期,去创建一个临时节点。如果失败那么就抛出异常。

3.1.2 PrepRequestProcessor

该类有 1000 多行代码,故此会挑出较为典型的代码进行剖析。在此之前,我们先看注释:

This request processor is generally at the start of a RequestProcessor
change. It sets up any transactions associated with requests that change the
state of the system. It counts on ZooKeeperServer to update
outstandingRequests, so that it can take into account transactions that are
in the queue to be applied when generating a transaction.

简单来说,它一般位于请求处理链的头部,它会设置事务型请求(改变系统状态的请求)。

OpCode.create2

对于创建型请求逻辑大致为:

          case OpCode.create2:
                CreateRequest create2Request = new CreateRequest();
                pRequest2Txn(request.type, zks.getNextZxid(), request, create2Request, true);
                break;

跳往pRequest2Txn

    protected void pRequest2Txn(int type, long zxid, Request request,
                                Record record, boolean deserialize)
        throws KeeperException, IOException, RequestProcessorException
    {
        request.setHdr(new TxnHeader(request.sessionId, request.cxid, zxid,
                Time.currentWallTime(), type));

        switch (type) {
            case OpCode.create:
            case OpCode.create2:
            case OpCode.createTTL:
            case OpCode.createContainer: {pRequest2TxnCreate(type, request, record, deserialize);
                break;
            }
//.... 多余代码不再展示

跳往pRequest2TxnCreate

    private void pRequest2TxnCreate(int type, Request request, Record record, boolean deserialize) throws IOException, KeeperException {if (deserialize) {ByteBufferInputStream.byteBuffer2Record(request.request, record);
        }

        int flags;
        String path;
        List<ACL> acl;
        byte[] data;
        long ttl;
        if (type == OpCode.createTTL) {CreateTTLRequest createTtlRequest = (CreateTTLRequest)record;
            flags = createTtlRequest.getFlags();
            path = createTtlRequest.getPath();
            acl = createTtlRequest.getAcl();
            data = createTtlRequest.getData();
            ttl = createTtlRequest.getTtl();} else {CreateRequest createRequest = (CreateRequest)record;
            flags = createRequest.getFlags();
            path = createRequest.getPath();
            acl = createRequest.getAcl();
            data = createRequest.getData();
            ttl = -1;
        }
        CreateMode createMode = CreateMode.fromFlag(flags);
        validateCreateRequest(path, createMode, request, ttl);
        String parentPath = validatePathForCreate(path, request.sessionId);

        List<ACL> listACL = fixupACL(path, request.authInfo, acl);
        ChangeRecord parentRecord = getRecordForPath(parentPath);

        checkACL(zks, parentRecord.acl, ZooDefs.Perms.CREATE, request.authInfo);
        int parentCVersion = parentRecord.stat.getCversion();
        if (createMode.isSequential()) {path = path + String.format(Locale.ENGLISH, "%010d", parentCVersion);
        }
        validatePath(path, request.sessionId);
        try {if (getRecordForPath(path) != null) {throw new KeeperException.NodeExistsException(path);
            }
        } catch (KeeperException.NoNodeException e) {// ignore this one}
        boolean ephemeralParent = EphemeralType.get(parentRecord.stat.getEphemeralOwner()) == EphemeralType.NORMAL;
        if (ephemeralParent) {throw new KeeperException.NoChildrenForEphemeralsException(path);
        }
        int newCversion = parentRecord.stat.getCversion()+1;
        if (type == OpCode.createContainer) {request.setTxn(new CreateContainerTxn(path, data, listACL, newCversion));
        } else if (type == OpCode.createTTL) {request.setTxn(new CreateTTLTxn(path, data, listACL, newCversion, ttl));
        } else {request.setTxn(new CreateTxn(path, data, listACL, createMode.isEphemeral(),
                    newCversion));
        }
        StatPersisted s = new StatPersisted();
        if (createMode.isEphemeral()) {s.setEphemeralOwner(request.sessionId);
        }
        parentRecord = parentRecord.duplicate(request.getHdr().getZxid());
        parentRecord.childCount++;
        parentRecord.stat.setCversion(newCversion);
        addChangeRecord(parentRecord);
        addChangeRecord(new ChangeRecord(request.getHdr().getZxid(), path, s, 0, listACL));
    }

大致可以总结下逻辑:

  1. 组装请求
  2. 校验请求是否合理:未定义的请求、参数不合理
  3. 检查上级路径是否存在
  4. 检查 ACL
  5. 检查路径是否合法
  6. 将请求装入 outstandingChanges 队列
  7. 发送至下一个 Processor

OpCode.multi

事务型请求:

          case OpCode.multi:
                MultiTransactionRecord multiRequest = new MultiTransactionRecord();
                try {ByteBufferInputStream.byteBuffer2Record(request.request, multiRequest);
                } catch(IOException e) {request.setHdr(new TxnHeader(request.sessionId, request.cxid, zks.getNextZxid(),
                            Time.currentWallTime(), OpCode.multi));
                    throw e;
                }
                List<Txn> txns = new ArrayList<Txn>();
                //Each op in a multi-op must have the same zxid!
                long zxid = zks.getNextZxid();
                KeeperException ke = null;

                //Store off current pending change records in case we need to rollback
                Map<String, ChangeRecord> pendingChanges = getPendingChanges(multiRequest);

                for(Op op: multiRequest) {Record subrequest = op.toRequestRecord();
                    int type;
                    Record txn;

                    /* If we've already failed one of the ops, don't bother
                     * trying the rest as we know it's going to fail and it
                     * would be confusing in the logfiles.
                     */
                    if (ke != null) {
                        type = OpCode.error;
                        txn = new ErrorTxn(Code.RUNTIMEINCONSISTENCY.intValue());
                    }

                    /* Prep the request and convert to a Txn */
                    else {
                        try {pRequest2Txn(op.getType(), zxid, request, subrequest, false);
                            type = request.getHdr().getType();
                            txn = request.getTxn();} catch (KeeperException e) {
                            ke = e;
                            type = OpCode.error;
                            txn = new ErrorTxn(e.code().intValue());

                            if (e.code().intValue() > Code.APIERROR.intValue()) {LOG.info("Got user-level KeeperException when processing {} aborting" +
                                        "remaining multi ops. Error Path:{} Error:{}",
                                        request.toString(), e.getPath(), e.getMessage());
                            }

                            request.setException(e);

                            /* Rollback change records from failed multi-op */
                            rollbackPendingChanges(zxid, pendingChanges);
                        }
                    }

                    //FIXME: I don't want to have to serialize it here and then
                    //       immediately deserialize in next processor. But I'm
                    //       not sure how else to get the txn stored into our list.
                    ByteArrayOutputStream baos = new ByteArrayOutputStream();
                    BinaryOutputArchive boa = BinaryOutputArchive.getArchive(baos);
                    txn.serialize(boa, "request") ;
                    ByteBuffer bb = ByteBuffer.wrap(baos.toByteArray());

                    txns.add(new Txn(type, bb.array()));
                }

                request.setHdr(new TxnHeader(request.sessionId, request.cxid, zxid,
                        Time.currentWallTime(), request.type));
                request.setTxn(new MultiTxn(txns));

                break;

代码虽然看起来很恶心,但是逻辑倒是挺简单的:

  • 遍历所有请求,一个个组装成起来(要通过一系列的校验:请求合理、上级路径存在、ACL、路径合法),如果中间一直没有异常,则组装成一个请求,里面封装了事务的记录。不然则变成一个标记为错误的请求,并回滚掉当前作用域里的记录(一个 Map)。无论如何,请求都会被发送至下一个 Processor。

OpCode.sync

//All the rest don't need to create a Txn - just verify session
            case OpCode.sync:
                zks.sessionTracker.checkSession(request.sessionId,
                        request.getOwner());
                break;

非事务型请求,校验一下 session 就可以发送至下一个 Processor 了。

3.1.3 PrepRequestProcessor 小结

其本质为事务性请求进行一些预处理,如:创建事务头、事务体、会话检查、ACL 检查和版本检查等等。

3.1.4 ProposalRequestProcessor

对于事务请求会发起 Proposal,并发送给 CommitProcessor。而且 ProposalRequestProcessor 还会将事务请求交付给 SyncRequestProcessor。

  public void processRequest(Request request) throws RequestProcessorException {
        // LOG.warn("Ack>>> cxid =" + request.cxid + "type =" +
        // request.type + "id =" + request.sessionId);
        // request.addRQRec(">prop");


        /* In the following IF-THEN-ELSE block, we process syncs on the leader.
         * If the sync is coming from a follower, then the follower
         * handler adds it to syncHandler. Otherwise, if it is a client of
         * the leader that issued the sync command, then syncHandler won't
         * contain the handler. In this case, we add it to syncHandler, and
         * call processRequest on the next processor.
         */

        if (request instanceof LearnerSyncRequest){zks.getLeader().processSync((LearnerSyncRequest)request);
        } else {nextProcessor.processRequest(request);
            if (request.getHdr() != null) {
                // We need to sync and get consensus on any transactions
                try {zks.getLeader().propose(request);
                } catch (XidRolloverException e) {throw new RequestProcessorException(e.getMessage(), e);
                }
                syncProcessor.processRequest(request);
            }
        }
    }

接着看 propose:

  /**
     * create a proposal and send it out to all the members
     *
     * @param request
     * @return the proposal that is queued to send to all the members
     */
    public Proposal propose(Request request) throws XidRolloverException {
        /**
         * Address the rollover issue. All lower 32bits set indicate a new leader
         * election. Force a re-election instead. See ZOOKEEPER-1277
         */
        if ((request.zxid & 0xffffffffL) == 0xffffffffL) {
            String msg =
                    "zxid lower 32 bits have rolled over, forcing re-election, and therefore new epoch start";
            shutdown(msg);
            throw new XidRolloverException(msg);
        }

        byte[] data = SerializeUtils.serializeRequest(request);
        proposalStats.setLastBufferSize(data.length);
        QuorumPacket pp = new QuorumPacket(Leader.PROPOSAL, request.zxid, data, null);

        Proposal p = new Proposal();
        p.packet = pp;
        p.request = request;                
        
        synchronized(this) {p.addQuorumVerifier(self.getQuorumVerifier());
                   
           if (request.getHdr().getType() == OpCode.reconfig){self.setLastSeenQuorumVerifier(request.qv, true);                       
           }
           
           if (self.getQuorumVerifier().getVersion()<self.getLastSeenQuorumVerifier().getVersion()) {p.addQuorumVerifier(self.getLastSeenQuorumVerifier());
           }
                   
            if (LOG.isDebugEnabled()) {LOG.debug("Proposing::" + request);
            }

            lastProposed = p.packet.getZxid();
            outstandingProposals.put(lastProposed, p);
            sendPacket(pp);
        }
        return p;
    }

这次提交的记录是一个 QuorumPacket,其实现了Record 接口。指定了 type 为 PROPOSAL。我们看一下注释:

    /**
     * This message type is sent by a leader to propose a mutation.
     */
    public final static int PROPOSAL = 2;

很显然,这个只有 Leader 才可以发起的一种变化型请求。再简单描述下逻辑:

  1. 放到 outstandingProposals 的 Map 里
  2. 组装成发送的 Packet
  3. 将 Proposal 传递给下一个 Processor

3.1.5 CommitProcessor

顾名思义,事务提交器。只关心事务请求——等待集群内 Proposal 投票直到可被提交。有了 CommitProcessor,每个服务器都可以很好的对事务进行顺序处理。

该部分的代码实在简陋,故不占篇幅来分析。读者朋友知道上述信息后,也可以理解整个请求链是怎样的。

3.1.6 SyncRequestProcessor

逻辑非常的简单,将请求记录到事务日志中,并尝试触发快照。

   public void processRequest(Request request) {// request.addRQRec(">sync");
        queuedRequests.add(request);
    }

   // 线程的核心方法,会对 queuedRequests 这个队列进行操作
    @Override
    public void run() {
        try {
            int logCount = 0;

            // we do this in an attempt to ensure that not all of the servers
            // in the ensemble take a snapshot at the same time
            int randRoll = r.nextInt(snapCount/2);
            while (true) {
                Request si = null;
                if (toFlush.isEmpty()) {si = queuedRequests.take();
                } else {si = queuedRequests.poll();
                    if (si == null) {flush(toFlush);
                        continue;
                    }
                }
                if (si == requestOfDeath) {break;}
                if (si != null) {
                    // track the number of records written to the log
                    if (zks.getZKDatabase().append(si)) {
                        logCount++;
                        if (logCount > (snapCount / 2 + randRoll)) {randRoll = r.nextInt(snapCount/2);
                            // roll the log
                            zks.getZKDatabase().rollLog();
                            // take a snapshot
                            if (snapInProcess != null && snapInProcess.isAlive()) {LOG.warn("Too busy to snap, skipping");
                            } else {snapInProcess = new ZooKeeperThread("Snapshot Thread") {public void run() {
                                            try {zks.takeSnapshot();
                                            } catch(Exception e) {LOG.warn("Unexpected exception", e);
                                            }
                                        }
                                    };
                                snapInProcess.start();}
                            logCount = 0;
                        }
                    } else if (toFlush.isEmpty()) {
                        // optimization for read heavy workloads
                        // iff this is a read, and there are no pending
                        // flushes (writes), then just pass this to the next
                        // processor
                        if (nextProcessor != null) {nextProcessor.processRequest(si);
                            if (nextProcessor instanceof Flushable) {((Flushable)nextProcessor).flush();}
                        }
                        continue;
                    }
                    toFlush.add(si);
                    if (toFlush.size() > 1000) {flush(toFlush);
                    }
                }
            }
        } catch (Throwable t) {handleException(this.getName(), t);
        } finally{running = false;}
        LOG.info("SyncRequestProcessor exited!");
    }

3.1.7 ToBeAppliedRequestProcessor

该处理器的核心为一个 toBeApplied 队列,专门用来存储那些已经被 CommitProcessor 处理过的可提交的 Proposal——直到 FinalRequestProcessor 处理完后,才会将其移除。

        /*
         * (non-Javadoc)
         *
         * @see org.apache.zookeeper.server.RequestProcessor#processRequest(org.apache.zookeeper.server.Request)
         */
        public void processRequest(Request request) throws RequestProcessorException {next.processRequest(request);

            // The only requests that should be on toBeApplied are write
            // requests, for which we will have a hdr. We can't simply use
            // request.zxid here because that is set on read requests to equal
            // the zxid of the last write op.
            if (request.getHdr() != null) {long zxid = request.getHdr().getZxid();
                Iterator<Proposal> iter = leader.toBeApplied.iterator();
                if (iter.hasNext()) {Proposal p = iter.next();
                    if (p.request != null && p.request.zxid == zxid) {iter.remove();
                        return;
                    }
                }
                LOG.error("Committed request not found on toBeApplied:"
                          + request);
            }
        }

3.1.8 FinalRequestProcessor

该类核心逻辑约有 500 多行,为节约篇幅,在这里仅描述下逻辑——既然是最后一个处理器,那么总得回复请求吧。并负责事务请求的生效——改变内存数据库的状态。

3.2 FollowerZooKeeperServer

先看一下其组装 Processors 的代码:

    @Override
    protected void setupRequestProcessors() {RequestProcessor finalProcessor = new FinalRequestProcessor(this);
        commitProcessor = new CommitProcessor(finalProcessor,
                Long.toString(getServerId()), true, getZooKeeperServerListener());
        commitProcessor.start();
        firstProcessor = new FollowerRequestProcessor(this, commitProcessor);
        ((FollowerRequestProcessor) firstProcessor).start();
        syncProcessor = new SyncRequestProcessor(this,
                new SendAckRequestProcessor((Learner)getFollower()));
        syncProcessor.start();}

可以看到,这里又两对儿请求链:

  1. FollowerRequestProcessor -> CommitProcessor -> FinalProcessor
  2. SyncRequestProcessor -> SendAckRequestProcessor

那么请求来的时候,是哪个 Processor 来 handle 呢?这边可以大致跟踪一下:

  • firstProcessor(即 FollowerRequestProcessor),是主要 handle 流程,由父类 ZooKeeperServer 来调度,handle 请求
  • syncProcessor(即 SyncRequestProcessor)从 logRequest 的入口进来。该类的由 Learner 调度进来,handle leader 的请求。

看到这里有人就要问了,这明明是个 Observer,怎么从 Learner 进来的呢?这就得看签名了:

/**
 * This class is the superclass of two of the three main actors in a ZK
 * ensemble: Followers and Observers. Both Followers and Observers share 
 * a good deal of code which is moved into Peer to avoid duplication. 
 */
public class Learner {

为了避免重复代码,就把一些共同的代码抽取上来了。

3.2.1 FollowerRequestProcessor

Follower 的正常处理器,会判断是不是事务,是事务就发送给 Leader,不然自己处理。

FollowerRequestProcessor.run

    @Override
    public void run() {
        try {while (!finished) {Request request = queuedRequests.take();
                if (LOG.isTraceEnabled()) {
                    ZooTrace.logRequest(LOG, ZooTrace.CLIENT_REQUEST_TRACE_MASK,
                            'F', request, "");
                }
                if (request == Request.requestOfDeath) {break;}
                // We want to queue the request to be processed before we submit
                // the request to the leader so that we are ready to receive
                // the response
                nextProcessor.processRequest(request);

                // We now ship the request to the leader. As with all
                // other quorum operations, sync also follows this code
                // path, but different from others, we need to keep track
                // of the sync operations this follower has pending, so we
                // add it to pendingSyncs.
                switch (request.type) {
                case OpCode.sync:
                    zks.pendingSyncs.add(request);
                    zks.getFollower().request(request);
                    break;
                case OpCode.create:
                case OpCode.create2:
                case OpCode.createTTL:
                case OpCode.createContainer:
                case OpCode.delete:
                case OpCode.deleteContainer:
                case OpCode.setData:
                case OpCode.reconfig:
                case OpCode.setACL:
                case OpCode.multi:
                case OpCode.check:
                    zks.getFollower().request(request);
                    break;
                case OpCode.createSession:
                case OpCode.closeSession:
                    // Don't forward local sessions to the leader.
                    if (!request.isLocalSession()) {zks.getFollower().request(request);
                    }
                    break;
                }
            }
        } catch (Exception e) {handleException(this.getName(), e);
        }
        LOG.info("FollowerRequestProcessor exited loop!");
    }

而提交到 CommitProcessor 说的很清楚,这样就可以收到回复。该 Processor 在前文说过,用于等待集群内 Proposal 投票直到可被提交。也是这个时候,该 Follower 的 CommitProcessor 已经在等待 Proposal 投票被提交了。

3.2.1 SendAckRequestProcessor

    public void processRequest(Request si) {if(si.type != OpCode.sync){QuorumPacket qp = new QuorumPacket(Leader.ACK, si.getHdr().getZxid(), null,
                null);
            try {learner.writePacket(qp, false);
            } catch (IOException e) {LOG.warn("Closing connection to leader, exception during packet send", e);
                try {if (!learner.sock.isClosed()) {learner.sock.close();
                    }
                } catch (IOException e1) {
                    // Nothing to do, we are shutting things down, so an exception here is irrelevant
                    LOG.debug("Ignoring error closing the connection", e1);
                }
            }
        }
    }

逻辑非常的简单,用于反馈 ACK 成功,表示自身完成了事务日志的记录。

3.3 ObserverZooKeeperServer

    /**
     * Set up the request processors for an Observer:
     * firstProcesor->commitProcessor->finalProcessor
     */
    @Override
    protected void setupRequestProcessors() {      
        // We might consider changing the processor behaviour of 
        // Observers to, for example, remove the disk sync requirements.
        // Currently, they behave almost exactly the same as followers.
        RequestProcessor finalProcessor = new FinalRequestProcessor(this);
        commitProcessor = new CommitProcessor(finalProcessor,
                Long.toString(getServerId()), true,
                getZooKeeperServerListener());
        commitProcessor.start();
        firstProcessor = new ObserverRequestProcessor(this, commitProcessor);
        ((ObserverRequestProcessor) firstProcessor).start();

        /*
         * Observer should write to disk, so that the it won't request
         * too old txn from the leader which may lead to getting an entire
         * snapshot.
         *
         * However, this may degrade performance as it has to write to disk
         * and do periodic snapshot which may double the memory requirements
         */
        if (syncRequestProcessorEnabled) {syncProcessor = new SyncRequestProcessor(this, null);
            syncProcessor.start();}
    }

逻辑很清晰(大概是因为 3.3.0 后加入的代码吧),正常的请求链为:

  1. ObserverRequestProcessor
  2. CommitProcessor
  3. FinalProcessor

如果 syncRequestProcessorEnabled 开启的情况下(缺省为开),这意味着 Observer 也会去记录事务日志以及做快照,这会给下降一定的性能,以及更多的内存要求。

然后再看下 ObserverRequestProcessor,简直和FollowerRequestProcessor 如出一辙,难道他们不知道复用一下代码吗???

    @Override
    public void run() {
        try {while (!finished) {Request request = queuedRequests.take();
                if (LOG.isTraceEnabled()) {
                    ZooTrace.logRequest(LOG, ZooTrace.CLIENT_REQUEST_TRACE_MASK,
                            'F', request, "");
                }
                if (request == Request.requestOfDeath) {break;}
                // We want to queue the request to be processed before we submit
                // the request to the leader so that we are ready to receive
                // the response
                nextProcessor.processRequest(request);

                // We now ship the request to the leader. As with all
                // other quorum operations, sync also follows this code
                // path, but different from others, we need to keep track
                // of the sync operations this Observer has pending, so we
                // add it to pendingSyncs.
                switch (request.type) {
                case OpCode.sync:
                    zks.pendingSyncs.add(request);
                    zks.getObserver().request(request);
                    break;
                case OpCode.create:
                case OpCode.create2:
                case OpCode.createTTL:
                case OpCode.createContainer:
                case OpCode.delete:
                case OpCode.deleteContainer:
                case OpCode.setData:
                case OpCode.reconfig:
                case OpCode.setACL:
                case OpCode.multi:
                case OpCode.check:
                    zks.getObserver().request(request);
                    break;
                case OpCode.createSession:
                case OpCode.closeSession:
                    // Don't forward local sessions to the leader.
                    if (!request.isLocalSession()) {zks.getObserver().request(request);
                    }
                    break;
                }
            }
        } catch (Exception e) {handleException(this.getName(), e);
        }
        LOG.info("ObserverRequestProcessor exited loop!");
    }

以上,就是源码分析部分,基于 3.5.7 版本。

4. 分布式事务:ZK 如何进行事务处理

上面和大家过了一下源码,相信各位对 ZK 请求处理流程有了一定的了解。接下来,让我们理一理事务请求的过程。从 Leader 的 ProposalRequestProcessor 开始,大致会分为三个阶段,即:

  1. Sync
  2. Proposal
  3. Commit

4.1 Sync

主要由 ProposalRequestProcessor 来做,参与 Proposql 的机器(Leader 和 Follower)都要记录事务日志。

4.2 Proposal

每个事务请求都要超过半数的投票认可(Leader + Follower)。

  1. Leader 检查服务端的 ZXID 可用,可用的话发起 Proposal。不可用则抛出 XidRolloverException。(见 org.apache.zookeeper.server.quorum.Leader.propose)
  2. 根据请求头、事务体以及 ZXID 生成 Proposal(见 org.apache.zookeeper.server.quorum.Leader.propose)
  3. 广播给所有 Follower 服务器(见 org.apache.zookeeper.server.quorum.Leader.sendPacket)
  4. Follower 记录日志,并 ACK 给 Leader 服务器。直到超过半数,进入 Commit。或者到超时。
  5. 将请求丢入 toBeApplied 队列中。(见 org.apache.zookeeper.server.quorum.Leader.tryToCommit)
  6. 广播 Commit,发给 Follower 的为COMMIT,而 Observer 的为Inform。这使它们提交该 Proposal。(见 org.apache.zookeeper.server.quorum.Leader.commit && inform)

直到这里,算是完成了 SyncRequestProcessor -> AckRequestProcessor

4.3 Commit

接下来讲 CommitProcessor -> ToBeAppliedRequestProcessor ->FinalRequestProcessor 的过程。

  1. 请求到 CommitPrcocessor 后是放入一个队列里,由线程一个个取出来。当取出来是事务请求时,那么就会设置一个 pending 对象到投票结束。这样保证了事务的顺序性,也可以让 CommitPrcocessor 方便的直到集群中是否有进行中的事务。
  2. 投票通过,唤醒 commit 流程。提交请求至 committedRequests 这个队列中,然后一个个发送至 ToBeAppliedRequestProcessor 里去。
  3. ToBeAppliedRequestProcessor 会等待 FinalRequestProcessor 处理完成后,从 toBeApplied 队列中移除这个 Proposal。
  4. FinalRequestProcessor 会先去校验最新的一个元素是否 zxid 小于等于当前的请求,是的话则移除该元素。等于是正常现象,因为这个对列是在 PrepRequestProcessor 时添加元素的。如果是小于的话说明当前事务比之前的事务早到达了这里,故移除掉(中途可能已经失败了)。接着就是应用到内存数据库了,该内存数据库实例会维护一个默认上限为 500 的 committedLog——存放最近成功的 Proposal,便于快速同步。

正文完
 0