从入门到放弃ZooKeeperZooKeeper实战分布式队列

39次阅读

共计 3319 个字符,预计需要花费 9 分钟才能阅读完成。

前言

上文【从入门到放弃 -ZooKeeper】ZooKeeper 入门中,我们学习了 ZooKeeper 的简单安装和 cli 使用。
接下来我们开始基于 java API 的实战编程。本文先来写一个分布式队列的代码实现。

设计

我们来写一个先进先出的分布式无界公平队列。参考我们之前介绍的【从入门到放弃 -Java】并发编程 -JUC-ConcurrentLinkedQueue 和【从入门到放弃 -Java】并发编程 -JUC-LinkedBlockingQueue。我们直接继承 AbstractQueue 类,并实现 Queue 接口。
主要重写 offer、poll、peek、size 方法。
我们使用 ZooKeeper 的持久化顺序节点来实现分布式队列。
offer 是入队,入队时新创建一个持久化顺序节点,节点后缀会根据 ZooKeeper 的特性自动累加。
poll 的出队,获取根节点下的所有节点,根据后缀数字排序,数组最小的是最先入队的,因此要最先出队。
peek,获取到最下入队的数据,和 poll 的区别是,peek 只获取数据,不出队,不删除已经消费的节点。
size 获取队列长度,实现方式是,获取根节点下的节点数量即可。这个方法在并发时可能会有问题。慎用。

DistributedQueue

// 继承 AbstractQueue 类并实现 Queue 接口
public class DistributedQueue<E> extends AbstractQueue<E> implements Queue<E> {private static Logger logger = LoggerFactory.getLogger(DistributedQueue.class);

    //ZooKeeper 客户端,进行 ZooKeeper 操作
    private ZooKeeper zooKeeper;

    // 根节点名称
    private String dir;

    // 数据节点名称,顺序节点在插入口会变为 node{00000000xx} 格式
    private String node;

    //ZooKeeper 鉴权信息
    private List<ACL> acls;

    /**
     * Constructor.
     *
     * @param zooKeeper the zoo keeper
     * @param dir       the dir
     * @param node      the node
     * @param acls      the acls
     */
   public DistributedQueue (ZooKeeper zooKeeper, String dir, String node, List<ACL> acls) {
        this.zooKeeper = zooKeeper;
        this.dir = dir;
        this.node = node;
        this.acls = acls;
        init();}

    private void init() {
        // 需要先判断根节点是否存在,不存在的话,创建子节点时会出错。try {Stat stat = zooKeeper.exists(dir, false);
            if (stat == null) {zooKeeper.create(dir, null, acls, CreateMode.PERSISTENT);
            }
        } catch (Exception e) {logger.error("[DistributedQueue#init] error :" + e.toString(), e);
        }
    }
}

offer

/**
 * Offer boolean.
 *
 * @param o the o
 * @return the boolean
 */
@Override
public boolean offer(E o) {
    // 构建要插入的节点名称
    String fullPath = dir.concat("/").concat(node);
    try {
        // 创建子节点成功则返回入队成功
      zooKeeper.create(fullPath, objectToBytes(o), acls, CreateMode.PERSISTENT_SEQUENTIAL);
        return true;
    } catch (Exception e) {logger.error("[DistributedQueue#offer] error :" + e.toString(), e);
    }
    return false;
}

poll

/**
 * Poll e.
 *
 * @return the e
 */
@Override
public E poll() {
    try {
        // 获取根节点所有子节点信息。List<String> children = zooKeeper.getChildren(dir, null);
        // 如果队列是空的则返回 null
        if (children == null || children.isEmpty()) {return null;}

        // 将子节点名称排序
        Collections.sort(children);
        for (String child : children) {
            // 拼接子节点的具体名称
            String fullPath = dir.concat("/").concat(child);
            try {
                // 如果获取数据成功,则类型转换后,返回,并删除改队列中该节点
                byte[] bytes = zooKeeper.getData(fullPath, false, null);
                E data = (E) bytesToObject(bytes);
                zooKeeper.delete(fullPath, -1);
                return data;
            } catch (Exception e) {logger.warn("[DistributedQueue#poll] warn :" + e.toString(), e);
            }
        }

    } catch (Exception e) {logger.error("[DistributedQueue#peek] poll :" + e.toString(), e);
    }

    return null;
}

peek

/**
 * Peek e.
 *
 * @return the e
 */
@Override
public E peek() {

    try {
        // 获取根节点所有子节点信息。List<String> children = zooKeeper.getChildren(dir, null);
        // 如果队列是空的则返回 null
        if (children == null || children.isEmpty()) {return null;}

        // 将子节点名称排序
        Collections.sort(children);

        for (String child : children) {
            // 拼接子节点的具体名称
            String fullPath = dir.concat("/").concat(child);
            try {
                // 如果获取数据成功,则类型转换后,返回,不会删除改队列中该节点
                byte[] bytes = zooKeeper.getData(fullPath, false, null);
                E data = (E) bytesToObject(bytes);
                return data;
            } catch (Exception e) {logger.warn("[DistributedQueue#peek] warn :" + e.toString(), e);
            }
        }

    } catch (Exception e) {logger.error("[DistributedQueue#peek] warn :" + e.toString(), e);
    }

    return null;
}

size

/**
 * Size int.
 *
 * @return the int
 */
@Override
public int size() {
    try {
        // 获取根节点的子节点名称
        List<String> children = zooKeeper.getChildren(dir, null);
        // 返回子结点信息数量
        return children.size();} catch (Exception e) {logger.error("[DistributedQueue#offer] size :" + e.toString(), e);
    }

    return 0;
}

总结

上面我们一起学习了如何利用持久性顺序节点,创建一个分布式先进先出队列。源代码可见:aloofJr。如果有好的优化建议,欢迎一起讨论。


本文作者:aloof_

阅读原文

本文为云栖社区原创内容,未经允许不得转载。

正文完
 0