关于java:curator源码之分布式延迟队列

34次阅读

共计 8887 个字符,预计需要花费 23 分钟才能阅读完成。

如果实现一个分布式提早队列须要思考哪些方面?至多有这些:数据存在何处、数据以何种格局存储、如何生产、怎么防止反复生产、生产是如何保障程序、数据在被生产过程中失败了是否会失落、重试策略 ……
前段时间想着本人实现个提早队列,发现 curator 曾经实现,这里就来剖析下。

public class DistributedDelayQueueDemo {private static CuratorFramework client = CuratorFrameworkFactory.newClient("localhost:2181", new ExponentialBackoffRetry(3000, 2));
    private static String path = "/queue/0001";// 队列对应的 zk 门路
    private static DistributedDelayQueue<String> delayQueue;

    /**
     * 应用动态块启动
     */
    static {client.start();
    }

    public static void main(String[] args) throws Exception{QueueConsumer<String> consumer = new QueueConsumer<String>() {
            @Override
            public void consumeMessage(String s) throws Exception {System.out.println("consume data:"+s+",currentTime:"+System.currentTimeMillis());
            }

            @Override
            public void stateChanged(CuratorFramework curatorFramework, ConnectionState connectionState) {System.out.println("state changed");
            }
        };
        delayQueue = QueueBuilder.builder(client, consumer, new QueueSerializer<String>() {
            @Override
            public byte[] serialize(String t) {
                try {return t.getBytes("utf-8");
                }catch (UnsupportedEncodingException e){e.printStackTrace();
                }
                return null;
            }

            @Override
            public String deserialize(byte[] bytes) {return new String(bytes);
            }
        },path).buildDelayQueue();
        delayQueue.start();//start 办法里新开了子线程去执行完之后会进行回调 consumeMessage 办法
        System.out.println("delay queue built");
        delayQueue.put("a",System.currentTimeMillis() + 4000);// 提早 4s
        delayQueue.put("b",System.currentTimeMillis() + 10000);
        delayQueue.put("c",System.currentTimeMillis() + 100000);
        System.out.println("put ended");
        TimeUnit.MINUTES.sleep(14);
        delayQueue.close();
        TimeUnit.SECONDS.sleep(5);
        client.close();}
}

这个是客户端应用的一个例子

private static CuratorFramework client = CuratorFrameworkFactory.newClient("localhost:2181", new ExponentialBackoffRetry(3000, 2));

这一行获取 CuratorFramework 对象,这里指定了 重试策略 ,前期会专门写文章进行剖析。另外须要阐明的是这里会实例化 ZooKeeper 类里的ClientCnxn 类,ClientCnxn 是 ZK 客户端的外围类,负责客户端与服务端的通信,它有两个重要的成员 (外部类)SendThread(IO 读写、心跳检测、断连重试) 与 EventThread(事件处理:WatchEvent 等)。
delayQueue.start()是在进行生产;上面的 delayQueue.put()则是在进行数据的存储。
好了,重点看下 delayQueue.start()办法,下面的例子应用的是 curator-recipes 2.12.0 版本,这里源码剖析应用的是 5.0.0-SNAPSHOT.

public void start() throws Exception
    {
        // 应用 CAS 更新状态,latent: 潜在的,暗藏的。这里的 state 变量没应用 static 润饰
        if (!state.compareAndSet(State.LATENT, State.STARTED) )
        {throw new IllegalStateException();
        }
        /**
         * 次要节点:* PERSISTENT 长久节点,节点创立后就始终存在,直到有删除操作来被动革除这个节点,创立该节点的客户端生效也不会隐没
         * PERSISTENT_SEQUENTIAL 长久程序节点,此节点个性与下面的统一,额定的是,每个父节点会记录每个子节点的程序
         * EPHEMERAL 长期节点,长期节点的生命周期与客户端会话绑定,客户端生效那这个节点就会被主动革除
         *EPHEMERAL_SEQUENTIAL 长期程序节点,带程序的长期节点,可用来实现分布式锁
         */
        try
        {
            // 创立队列的 zk 节点,模式为 PERSISTENT, 应用父容器
            /**
             * 1,应用观察者模式(监听器)
             * 2,应用回调解决队列的子节点信息
             * 3,应用一个版本号对子节点列表时行本地缓存
             *    应用不可变 list 对子节点列表进行包装
             *    应用版本号跟踪,防止 ABA 问题
             *    子节点信息应用一个外部类进行封装
             *    最初的 forPath 指定要操作的 ZNode
             *    https://www.jianshu.com/p/998cd2b471ef
             */
            // 模式为 PERSISTENT
            client.create().creatingParentContainersIfNeeded().forPath(queuePath);// 创立一个空节点
        }
        catch (KeeperException.NodeExistsException ignore)// 节点创立失败
        {// this is OK}
        if (lockPath != null)
        {
            // 依据内部传入参数,决定是否创立对应分布式锁的 zk 节点
            try
            {client.create().creatingParentContainersIfNeeded().forPath(lockPath);
            }
            catch (KeeperException.NodeExistsException ignore)
            {// this is OK}
        }

        // 如果不是生产者角色,或者设定了有界队列,则启动子节点缓存,isProducerOnly:false
        if (!isProducerOnly || (maxItems != QueueBuilder.NOT_SET) )
        {
            // 底层 callback 里的 path 与下面的 queuePath 值一样
            childrenCache.start();// 这个中央很重要,runLoop 里就是从 childrenCache 里了取的数据}

        // 如果不是生产者模式,则异步执行 runLoop 办法,isProducerOnly:false
        if (!isProducerOnly)
        {
            // 这个线程池用来从队列里拉取音讯
            service.submit
                (new Callable<Object>()
                    {
                        @Override
                        public Object call()
                        {System.out.println("线程名:"+Thread.currentThread().getName());
                            runLoop();
                            return null;
                        }
                    }
                );
        }
    }

进入到 start 办法里,如果状态是 LATENT 且设置 STARTED 胜利,则创立一个长久有续的节点,紧接着就是 childrenCache.start(); 这个办法特地重要,咱们能够往里面跟进一下,看代码:

/**
     * 1, 应用观察者模式(监听器)
     * 2, 应用回调解决队列的子节点信息
     * 3, 应用一个版本号对子节点列表进行本地缓存
     * @param watched
     * @throws Exception
     */
    private synchronized void sync(boolean watched) throws Exception
    {if ( watched)//watched 为 true
        {
            //watch 与 callback 别离为 ChildCache 对象里的两个属性
            //callback 的回调里设置了 child.setData()
            //forPath 函数获取了所有的子节点
            client.getChildren().usingWatcher(watcher).inBackground(callback).forPath(path);
        }
        else
        {client.getChildren().inBackground(callback).forPath(path);
        }
    }

是的,应用了 zk 的监听器,当有节点或节点里的数据发生变化时,客户端会收到服务端的告诉,并进行 callback 回调,回调里的逻辑还是有必要看一下:

 /**
     * 应用一个版本号对子节点列表进行本地缓存
     * @param newChildren
     */
    private synchronized void setNewChildren(List<String> newChildren)
    {if ( newChildren != null)
        {Data currentData = children.get();
            // 应用不可变 list 对子节点列表进行包装
            // 应用版本号跟踪,防止 ABA 问题
            // 应用 AtomicReference 进行原子化包装
            // 子节点应用一个外部类进行封装
            //runLoop 办法里会有 children.get()取节点数据
            children.set(new Data(newChildren, currentData.version + 1));
            System.out.println(children.get().children+",:"+children.get().version);
            notifyFromCallback();// 告诉期待的消费者}
    }

将新的数据与新的版本号关联后 nofifyAll,这里告诉的应用前面马上说到。接下来是应用一个线程池获取元素,入口是 runLoop:

private void runLoop()
{
    long currentVersion = -1;
 long maxWaitMs = -1;
 try {while ( state.get() == State.STARTED )
        {
            try
 {
                // 依据版本号获取数据,ChildrenCache>data>child/version
 // 在 processChildren 办法里有一个闭锁,如果 currentVersion 对应的数据没解决完,则会始终阻塞
 ChildrenCache.Data      data = (maxWaitMs > 0) ? childrenCache.blockingNextGetData(currentVersion, maxWaitMs, TimeUnit.MILLISECONDS) : childrenCache.blockingNextGetData(currentVersion);
 currentVersion = data.version;//1
 List<String>        children = Lists.newArrayList(data.children);
 // 节点进行排序
 sortChildren(children); // makes sure items are processed in the correct order
 if (children.size() > 0 )
                {//get(0)只须要判断第一个元素的工夫就行,第一个元素未到执行工夫那其余的也不会到, 只有有一个元素到期就进行解决
 // 上面解决的时候还有一个判断工夫是否到期
 maxWaitMs = getDelay(children.get(0));// 应用的是 DistributedDelayQueue 里的 getDelay()办法,
 if (maxWaitMs > 0)// 未到执行的工夫, 那就期待 maxWaitMs
 {continue;}
                }
                else// 还没有元素
 {continue;}
                processChildren(children, currentVersion);// 元素进行解决
 }
            catch (InterruptedException e)
            {
                // swallow the interrupt as it's only possible from either a background
 // operation and, thus, doesn't apply to this loop or the instance // is being closed in which case the while test will get it //swallow the interrupt exception, 因为只有两种情景才会呈现中断异样,一种是没在这里进行的后盾的操作;// 另一种是测试的时候
 }
        }
    }
    catch (Exception e)
    {log.error("Exception caught in background handler", e);
 }
}

在 blockingNextGetData 办法里从 ChildrenCache 里取元素,如果没有则会 wait,如果有了元素会被唤醒,这也就是下面提到的 notifyAll.
接着来看元素解决的办法:

private void processChildren(List<String> children, long currentVersion) throws Exception
    {
        // 这里定义的信号量部分变更而非动态全局变量,那就能够判断这个信号量是用来管制以后办法体里的多个线程执行的先后顺序的
        final Semaphore processedLatch = new Semaphore(0);// 定义一个信号量
        final boolean   isUsingLockSafety = (lockPath != null);//false
        int             min = minItemsBeforeRefresh;//0,管制队列调度音讯的最小数量
        System.out.println("+++++++++++++++++"+Thread.currentThread().getName()+",children:"+children.size()+",version:"+currentVersion);
        for (final String itemNode : children) {if ( Thread.currentThread().isInterrupted())// 判断以后线程中断状态,不扭转线程状态
            {processedLatch.release(children.size());// 创立 children.size 个许可,release 与 acquire 并不一定要成对呈现
                break;
            }

            if (!itemNode.startsWith(QUEUE_ITEM_NAME) )
            {log.warn("Foreign node in queue path:" + itemNode);
                processedLatch.release();
                continue;
            }

            if (min-- <= 0)
            {
                //refreshOnWatch 拉取音讯后是否异步调度生产, 这里为 true
                // 如果版本号不匹配,则增加 children.size()个许可,且中断循环,最初获取 children.size 个许可,返回到下层的 while 循环
                // 所以这里的 semaphore 的作用是保障一个版本号对应的数据都能被解决完
                //
                // 版本号不相等,阐明有新的数据被增加,就须要跳出循环去获取新的数据。那这里就会有这样一个问题:原本执行到这个办法体里阐明是有节点曾经到了可执行的工夫,但因为有
                // 新的数据增加,会导致应该执行的节点会被推延
                if (refreshOnWatch && (currentVersion != childrenCache.getData().version) )
                {processedLatch.release(children.size());// 这个是为了不阻塞办法的最初一行
                    break;
                }
            }
            // 工夫上不合乎执行条件
            if (getDelay(itemNode) > 0 )
            {processedLatch.release();// 再减少一个许可
                continue;
            }
            // 这个线程池用来解决取出来的音讯
            // 示例中应用的是 Executors.newCachedThreadPool(); 它的特点是当工作数减少时,线程池会对应增加新的线程来解决,线程池不会有大小限度,依赖于 jvm 可能创立的大小
            //for 遍历出一个节点就会创立一个线程进行解决(所以应用这种线程池时对应须要执行的工作不能太耗时)
            executor.execute
            (new Runnable()
                {
                    @Override
                    public void run()
                    {
                        try
                        {if ( isUsingLockSafety)//false
                            {processWithLockSafety(itemNode, ProcessType.NORMAL);
                            }
                            else// 执行这个分支
                            {processNormally(itemNode, ProcessType.NORMAL);
                            }
                        }
                        catch (Exception e)
                        {ThreadUtils.checkInterrupted(e);
                            log.error("Error processing message at" + itemNode, e);
                        }
                        finally
                        {// 解决完一个 itemNode,增加一个许可, 直到 currentVersion 下的节点全被解决完(children.size 个),主线程才会获取相应的许可
                            processedLatch.release();}
                    }
                }
            );
        }
        // 主线程将工作提交给了线程池后就会执行到 acquire 这里阻塞,但只有线程池里的工作都执行完 release latch 之后,主线程才不再阻塞,// 也就是主线程肯定要等到 currentVersion 里的节都解决完才行
        processedLatch.acquire(children.size());
    }

1,这里应用了 Semaphore 保障所一个 version 对应的数据都被解决完。2,同样这里也应用了线程池
最初是真正解决元素的中央:

private boolean processNormally(String itemNode, ProcessType type) throws Exception
{System.out.println("thread name========================"+Thread.currentThread().getName());
 try {
        //queuePath:/queue/0001
 //itemPath:queue-|171EA3DEECC|0000000019 String  itemPath = ZKPaths.makePath(queuePath, itemNode);// 获取 parent+child 全门路(queue-|171EA3DEECC|0000000019)
 Stat    stat = new Stat();// 数据节点的节点状态信息
 byte[]  bytes = null;
 if (type == ProcessType.NORMAL)
        {bytes = client.getData().storingStatIn(stat).forPath(itemPath);// 获取对应节点上的数据("a")
 }
        if (client.getState() == CuratorFrameworkState.STARTED )
        {
            // 删除节点上的数据, 音讯投递之后就被移除,不会期待生产端调用实现。想要期待生产端生产胜利返回后再删除,能够设置 lockPath
 // 如果消费者异常中断能够再次被生产
 // 先删除再进行数据处理 > 优: 解决比较简单,进入下次循环的时候不必再思考须要删除的数据。劣:数据删除后进行解决的时候如果失败,数据就丢掉了。// 先进行数据处理再删除 > 优: 数据比拟平安。劣: 进入下次循环时须要判断是否是已解决的数据,须要对数据进行打标
 client.delete().withVersion(stat.getVersion()).forPath(itemPath);
 }
        if (type == ProcessType.NORMAL)
        {processMessageBytes(itemNode, bytes);
 }
        return true;
 }
    catch (KeeperException.NodeExistsException ignore)
    {// another process got it}
    catch (KeeperException.NoNodeException ignore)
    {// another process got it}
    catch (KeeperException.BadVersionException ignore)
    {// another process got it}
    return false;
}

先删除元素再解决元素,下面的 processMessageBytes 办法就是解决元素的入口。

正文完
 0