关于java:curator源码之分布式延迟队列

如果实现一个分布式提早队列须要思考哪些方面?至多有这些:数据存在何处、数据以何种格局存储、如何生产、怎么防止反复生产、生产是如何保障程序、数据在被生产过程中失败了是否会失落、重试策略……
前段时间想着本人实现个提早队列,发现curator曾经实现,这里就来剖析下。

public class DistributedDelayQueueDemo {
    private static CuratorFramework client = CuratorFrameworkFactory.newClient("localhost:2181", new ExponentialBackoffRetry(3000, 2));
    private static String path = "/queue/0001";//队列对应的zk门路
    private static DistributedDelayQueue<String> delayQueue;

    /**
     * 应用动态块启动
     */
    static {
        client.start();
    }

    public static void main(String[] args) throws Exception{
        QueueConsumer<String> consumer = new QueueConsumer<String>() {
            @Override
            public void consumeMessage(String s) throws Exception {
                System.out.println("consume data:"+s+",currentTime:"+System.currentTimeMillis());
            }

            @Override
            public void stateChanged(CuratorFramework curatorFramework, ConnectionState connectionState) {
                System.out.println("state changed");
            }
        };
        delayQueue = QueueBuilder.builder(client, consumer, new QueueSerializer<String>() {
            @Override
            public byte[] serialize(String t) {
                try {
                    return t.getBytes("utf-8");
                }catch (UnsupportedEncodingException e){
                    e.printStackTrace();
                }
                return null;
            }

            @Override
            public String deserialize(byte[] bytes) {
                return new String(bytes);
            }
        },path).buildDelayQueue();
        delayQueue.start();//start办法里新开了子线程去执行完之后会进行回调consumeMessage办法
        System.out.println("delay queue built");
        delayQueue.put("a",System.currentTimeMillis() + 4000);//提早4s
        delayQueue.put("b",System.currentTimeMillis() + 10000);
        delayQueue.put("c",System.currentTimeMillis() + 100000);
        System.out.println("put ended");
        TimeUnit.MINUTES.sleep(14);
        delayQueue.close();
        TimeUnit.SECONDS.sleep(5);
        client.close();
    }
}

这个是客户端应用的一个例子

private static CuratorFramework client = CuratorFrameworkFactory.newClient("localhost:2181", new ExponentialBackoffRetry(3000, 2));

这一行获取CuratorFramework对象,这里指定了重试策略,前期会专门写文章进行剖析。另外须要阐明的是这里会实例化ZooKeeper类里的ClientCnxn类,ClientCnxn是ZK客户端的外围类,负责客户端与服务端的通信,它有两个重要的成员(外部类)SendThread(IO读写、心跳检测、断连重试)与EventThread(事件处理:WatchEvent等)。
delayQueue.start()是在进行生产;上面的delayQueue.put()则是在进行数据的存储。
好了,重点看下delayQueue.start()办法,下面的例子应用的是curator-recipes 2.12.0版本,这里源码剖析应用的是5.0.0-SNAPSHOT.

public void start() throws Exception
    {
        //应用CAS更新状态,latent:潜在的,暗藏的。这里的state变量没应用static润饰
        if ( !state.compareAndSet(State.LATENT, State.STARTED) )
        {
            throw new IllegalStateException();
        }
        /**
         * 次要节点:
         * PERSISTENT长久节点,节点创立后就始终存在,直到有删除操作来被动革除这个节点,创立该节点的客户端生效也不会隐没
         * PERSISTENT_SEQUENTIAL长久程序节点,此节点个性与下面的统一,额定的是,每个父节点会记录每个子节点的程序
         * EPHEMERAL长期节点,长期节点的生命周期与客户端会话绑定,客户端生效那这个节点就会被主动革除
         *EPHEMERAL_SEQUENTIAL长期程序节点,带程序的长期节点,可用来实现分布式锁
         */
        try
        {
            //创立队列的zk节点,模式为PERSISTENT,应用父容器
            /**
             * 1,应用观察者模式(监听器)
             * 2,应用回调解决队列的子节点信息
             * 3,应用一个版本号对子节点列表时行本地缓存
             *    应用不可变list对子节点列表进行包装
             *    应用版本号跟踪,防止ABA问题
             *    子节点信息应用一个外部类进行封装
             *    最初的forPath指定要操作的ZNode
             *    https://www.jianshu.com/p/998cd2b471ef
             */
            //模式为PERSISTENT
            client.create().creatingParentContainersIfNeeded().forPath(queuePath);//创立一个空节点
        }
        catch ( KeeperException.NodeExistsException ignore )//节点创立失败
        {
            // this is OK
        }
        if ( lockPath != null )
        {
            //依据内部传入参数,决定是否创立对应分布式锁的zk节点
            try
            {
                client.create().creatingParentContainersIfNeeded().forPath(lockPath);
            }
            catch ( KeeperException.NodeExistsException ignore )
            {
                // this is OK
            }
        }

        //如果不是生产者角色,或者设定了有界队列,则启动子节点缓存,isProducerOnly:false
        if ( !isProducerOnly || (maxItems != QueueBuilder.NOT_SET) )
        {
            //底层callback里的path与下面的queuePath值一样
            childrenCache.start();// 这个中央很重要,runLoop里就是从childrenCache里了取的数据
        }

        //如果不是生产者模式,则异步执行runLoop办法,isProducerOnly:false
        if ( !isProducerOnly )
        {
            //这个线程池用来从队列里拉取音讯
            service.submit
                (
                    new Callable<Object>()
                    {
                        @Override
                        public Object call()
                        {
                            System.out.println("线程名:"+Thread.currentThread().getName());
                            runLoop();
                            return null;
                        }
                    }
                );
        }
    }

进入到start办法里,如果状态是LATENT且设置STARTED胜利,则创立一个长久有续的节点,紧接着就是childrenCache.start();这个办法特地重要,咱们能够往里面跟进一下,看代码:

/**
     * 1,应用观察者模式(监听器)
     * 2,应用回调解决队列的子节点信息
     * 3,应用一个版本号对子节点列表进行本地缓存
     * @param watched
     * @throws Exception
     */
    private synchronized void sync(boolean watched) throws Exception
    {
        if ( watched )//watched为true
        {
            //watch与callback别离为ChildCache对象里的两个属性
            //callback的回调里设置了child.setData()
            //forPath函数获取了所有的子节点
            client.getChildren().usingWatcher(watcher).inBackground(callback).forPath(path);
        }
        else
        {
            client.getChildren().inBackground(callback).forPath(path);
        }
    }

是的,应用了zk的监听器,当有节点或节点里的数据发生变化时,客户端会收到服务端的告诉,并进行callback回调,回调里的逻辑还是有必要看一下:

 /**
     * 应用一个版本号对子节点列表进行本地缓存
     * @param newChildren
     */
    private synchronized void setNewChildren(List<String> newChildren)
    {
        if ( newChildren != null )
        {
            Data currentData = children.get();
            //应用不可变list对子节点列表进行包装
            //应用版本号跟踪,防止ABA问题
            //应用AtomicReference进行原子化包装
            //子节点应用一个外部类进行封装
            //runLoop办法里会有children.get()取节点数据
            children.set(new Data(newChildren, currentData.version + 1));
            System.out.println(children.get().children+",:"+children.get().version);
            notifyFromCallback();//告诉期待的消费者
        }
    }

将新的数据与新的版本号关联后nofifyAll,这里告诉的应用前面马上说到。接下来是应用一个线程池获取元素,入口是runLoop:

private void runLoop()
{
    long currentVersion = -1;
 long maxWaitMs = -1;
 try {
        while ( state.get() == State.STARTED )
        {
            try
 {
                //依据版本号获取数据,ChildrenCache>data>child/version
 //在processChildren办法里有一个闭锁,如果currentVersion对应的数据没解决完,则会始终阻塞
 ChildrenCache.Data      data = (maxWaitMs > 0) ? childrenCache.blockingNextGetData(currentVersion, maxWaitMs, TimeUnit.MILLISECONDS) : childrenCache.blockingNextGetData(currentVersion);
 currentVersion = data.version;//1
 List<String>        children = Lists.newArrayList(data.children);
 //节点进行排序
 sortChildren(children); // makes sure items are processed in the correct order
 if ( children.size() > 0 )
                {
                    //get(0)只须要判断第一个元素的工夫就行,第一个元素未到执行工夫那其余的也不会到,只有有一个元素到期就进行解决
 //上面解决的时候还有一个判断工夫是否到期
 maxWaitMs = getDelay(children.get(0));//应用的是DistributedDelayQueue里的getDelay()办法,
 if ( maxWaitMs > 0 )//未到执行的工夫,那就期待maxWaitMs
 {
                        continue;
 }
                }
                else//还没有元素
 {
                    continue;
 }
                processChildren(children, currentVersion);//元素进行解决
 }
            catch ( InterruptedException e )
            {
                // swallow the interrupt as it's only possible from either a background
 // operation and, thus, doesn't apply to this loop or the instance // is being closed in which case the while test will get it //swallow the interrupt exception,因为只有两种情景才会呈现中断异样,一种是没在这里进行的后盾的操作;
 //另一种是测试的时候
 }
        }
    }
    catch ( Exception e )
    {
        log.error("Exception caught in background handler", e);
 }
}

在blockingNextGetData办法里从ChildrenCache里取元素,如果没有则会wait,如果有了元素会被唤醒,这也就是下面提到的notifyAll.
接着来看元素解决的办法:

private void processChildren(List<String> children, long currentVersion) throws Exception
    {
        //这里定义的信号量部分变更而非动态全局变量,那就能够判断这个信号量是用来管制以后办法体里的多个线程执行的先后顺序的
        final Semaphore processedLatch = new Semaphore(0);//定义一个信号量
        final boolean   isUsingLockSafety = (lockPath != null);//false
        int             min = minItemsBeforeRefresh;//0,管制队列调度音讯的最小数量
        System.out.println("+++++++++++++++++"+Thread.currentThread().getName()+",children: "+children.size()+",version:"+currentVersion);
        for ( final String itemNode : children ) {
            if ( Thread.currentThread().isInterrupted() )//判断以后线程中断状态,不扭转线程状态
            {
                processedLatch.release(children.size());//创立children.size个许可,release与acquire并不一定要成对呈现
                break;
            }

            if ( !itemNode.startsWith(QUEUE_ITEM_NAME) )
            {
                log.warn("Foreign node in queue path: " + itemNode);
                processedLatch.release();
                continue;
            }

            if ( min-- <= 0 )
            {
                //refreshOnWatch拉取音讯后是否异步调度生产,这里为true
                //如果版本号不匹配,则增加children.size()个许可,且中断循环,最初获取children.size个许可,返回到下层的while循环
                //所以这里的semaphore的作用是保障一个版本号对应的数据都能被解决完
                //
                //版本号不相等,阐明有新的数据被增加,就须要跳出循环去获取新的数据。那这里就会有这样一个问题:原本执行到这个办法体里阐明是有节点曾经到了可执行的工夫,但因为有
                //新的数据增加,会导致应该执行的节点会被推延
                if ( refreshOnWatch && (currentVersion != childrenCache.getData().version) )
                {
                    processedLatch.release(children.size());//这个是为了不阻塞办法的最初一行
                    break;
                }
            }
            //工夫上不合乎执行条件
            if ( getDelay(itemNode) > 0 )
            {
                processedLatch.release();//再减少一个许可
                continue;
            }
            //这个线程池用来解决取出来的音讯
            //示例中应用的是Executors.newCachedThreadPool();它的特点是当工作数减少时,线程池会对应增加新的线程来解决,线程池不会有大小限度,依赖于jvm可能创立的大小
            //for遍历出一个节点就会创立一个线程进行解决(所以应用这种线程池时对应须要执行的工作不能太耗时)
            executor.execute
            (
                new Runnable()
                {
                    @Override
                    public void run()
                    {
                        try
                        {
                            if ( isUsingLockSafety )//false
                            {
                                processWithLockSafety(itemNode, ProcessType.NORMAL);
                            }
                            else//执行这个分支
                            {
                                processNormally(itemNode, ProcessType.NORMAL);
                            }
                        }
                        catch ( Exception e )
                        {
                            ThreadUtils.checkInterrupted(e);
                            log.error("Error processing message at " + itemNode, e);
                        }
                        finally
                        {
                            //解决完一个itemNode,增加一个许可,直到currentVersion下的节点全被解决完(children.size个),主线程才会获取相应的许可
                            processedLatch.release();
                        }
                    }
                }
            );
        }
        //主线程将工作提交给了线程池后就会执行到acquire这里阻塞,但只有线程池里的工作都执行完release latch之后,主线程才不再阻塞,
        //也就是主线程肯定要等到currentVersion里的节都解决完才行
        processedLatch.acquire(children.size());
    }

1,这里应用了Semaphore保障所一个version对应的数据都被解决完。2,同样这里也应用了线程池
最初是真正解决元素的中央:

private boolean processNormally(String itemNode, ProcessType type) throws Exception
{
    System.out.println("thread name========================"+Thread.currentThread().getName());
 try {
        //queuePath:/queue/0001
 //itemPath:queue-|171EA3DEECC|0000000019 String  itemPath = ZKPaths.makePath(queuePath, itemNode);//获取parent+child全门路(queue-|171EA3DEECC|0000000019)
 Stat    stat = new Stat();//数据节点的节点状态信息
 byte[]  bytes = null;
 if ( type == ProcessType.NORMAL )
        {
            bytes = client.getData().storingStatIn(stat).forPath(itemPath);//获取对应节点上的数据("a")
 }
        if ( client.getState() == CuratorFrameworkState.STARTED )
        {
            //删除节点上的数据,音讯投递之后就被移除,不会期待生产端调用实现。想要期待生产端生产胜利返回后再删除,能够设置lockPath
 //如果消费者异常中断能够再次被生产
 //先删除再进行数据处理>优:解决比较简单,进入下次循环的时候不必再思考须要删除的数据。劣:数据删除后进行解决的时候如果失败,数据就丢掉了。
 //先进行数据处理再删除>优:数据比拟平安。劣:进入下次循环时须要判断是否是已解决的数据,须要对数据进行打标
 client.delete().withVersion(stat.getVersion()).forPath(itemPath);
 }
        if ( type == ProcessType.NORMAL )
        {
            processMessageBytes(itemNode, bytes);
 }
        return true;
 }
    catch ( KeeperException.NodeExistsException ignore )
    {
        // another process got it
 }
    catch ( KeeperException.NoNodeException ignore )
    {
        // another process got it
 }
    catch ( KeeperException.BadVersionException ignore )
    {
        // another process got it
 }
    return false;
}

先删除元素再解决元素,下面的processMessageBytes办法就是解决元素的入口。

评论

发表回复

您的邮箱地址不会被公开。 必填项已用 * 标注

这个站点使用 Akismet 来减少垃圾评论。了解你的评论数据如何被处理