乐趣区

Java并发指南14Java并发容器ConcurrentSkipListMap与CopyOnWriteArrayList

原文出处 http://cmsblogs.com/『chenssy

到目前为止,我们在 Java 世界里看到了两种实现 key-value 的数据结构:Hash、TreeMap,这两种数据结构各自都有着优缺点。

  1. Hash 表:插入、查找最快,为 O(1);如使用链表实现则可实现无锁;数据有序化需要显式的排序操作。
  2. 红黑树:插入、查找为 O(logn),但常数项较小;无锁实现的复杂性很高,一般需要加锁;数据天然有序。

然而,这次介绍第三种实现 key-value 的数据结构:SkipList。SkipList 有着不低于红黑树的效率,但是其原理和实现的复杂度要比红黑树简单多了。

SkipList

什么是 SkipList?Skip List,称之为跳表,它是一种可以替代平衡树的数据结构,其数据元素默认按照 key 值升序,天然有序。Skip list 让已排序的数据分布在多层链表中,以 0 - 1 随机数决定一个数据的向上攀升与否,通过“空间来换取时间”的一个算法,在每个节点中增加了向前的指针,在插入、删除、查找时可以忽略一些不可能涉及到的结点,从而提高了效率。

我们先看一个简单的链表,如下:

如果我们需要查询 9、21、30,则需要比较次数为 3 + 6 + 8 = 17 次,那么有没有优化方案呢?有!我们将该链表中的某些元素提炼出来作为一个比较“索引”,如下:

我们先与这些索引进行比较来决定下一个元素是往右还是下走,由于存在“索引”的缘故,导致在检索的时候会大大减少比较的次数。当然元素不是很多,很难体现出优势,当元素足够多的时候,这种索引结构就会大显身手。

SkipList 的特性

SkipList 具备如下特性:

  1. 由很多层结构组成,level 是通过一定的概率随机产生的
  2. 每一层都是一个有序的链表,默认是升序,也可以根据创建映射时所提供的 Comparator 进行排序,具体取决于使用的构造方法
  3. 最底层 (Level 1) 的链表包含所有元素
  4. 如果一个元素出现在 Level i 的链表中,则它在 Level i 之下的链表也都会出现
  5. 每个节点包含两个指针,一个指向同一链表中的下一个元素,一个指向下面一层的元素

我们将上图再做一些扩展就可以变成一个典型的 SkipList 结构了

SkipList 的查找

SkipListd 的查找算法较为简单,对于上面我们我们要查找元素 21,其过程如下:

  1. 比较 3,大于,往后找(9),
  2. 比 9 大,继续往后找(25),但是比 25 小,则从 9 的下一层开始找(16)
  3. 16 的后面节点依然为 25,则继续从 16 的下一层找
  4. 找到 21

如图

红色虚线代表路径。

SkipList 的插入

SkipList 的插入操作主要包括:

  1. 查找合适的位置。这里需要明确一点就是在确认新节点要占据的层次 K 时,采用丢硬币的方式,完全随机。如果占据的层次 K 大于链表的层次,则重新申请新的层,否则插入指定层次
  2. 申请新的节点
  3. 调整指针

假定我们要插入的元素为 23,经过查找可以确认她是位于 25 后,9、16、21 前。当然需要考虑申请的层次 K。

如果层次 K > 3

需要申请新层次(Level 4)

如果层次 K = 2

直接在 Level 2 层插入即可

这里会涉及到以个算法:通过丢硬币决定层次 K,该算法我们通过后面 ConcurrentSkipListMap 源码来分析。还有一个需要注意的地方就是,在 K 层插入元素后,需要确保所有小于 K 层的层次都应该出现新节点。

SkipList 的删除

删除节点和插入节点思路基本一致:找到节点,删除节点,调整指针。

比如删除节点 9,如下:

ConcurrentSkipListMap

通过上面我们知道 SkipList 采用空间换时间的算法,其插入和查找的效率 O(logn),其效率不低于红黑树,但是其原理和实现的复杂度要比红黑树简单多了。一般来说会操作链表 List,就会对 SkipList 毫无压力。

ConcurrentSkipListMap 其内部采用 SkipLis 数据结构实现。为了实现 SkipList,ConcurrentSkipListMap 提供了三个内部类来构建这样的链表结构:Node、Index、HeadIndex。其中 Node 表示最底层的单链表有序节点、Index 表示为基于 Node 的索引层,HeadIndex 用来维护索引层次。到这里我们可以这样说 ConcurrentSkipListMap 是通过 HeadIndex 维护索引层次,通过 Index 从最上层开始往下层查找,一步一步缩小查询范围,最后到达最底层 Node 时,就只需要比较很小一部分数据了。在 JDK 中的关系如下图:

Node

    static final class Node<K,V> {
        final K key;
        volatile Object value;
        volatile ConcurrentSkipListMap.Node<K, V> next;

        /** 省略些许代码 */
    }

Node 的结构和一般的单链表毫无区别,key-value 和一个指向下一个节点的 next。

Index

    static class Index<K,V> {
        final ConcurrentSkipListMap.Node<K,V> node;
        final ConcurrentSkipListMap.Index<K,V> down;
        volatile ConcurrentSkipListMap.Index<K,V> right;

        /** 省略些许代码 */
    }

Index 提供了一个基于 Node 节点的索引 Node,一个指向下一个 Index 的 right,一个指向下层的 down 节点。

HeadIndex

    static final class HeadIndex<K,V> extends Index<K,V> {
        final int level;  // 索引层,从 1 开始,Node 单链表层为 0
        HeadIndex(Node<K,V> node, Index<K,V> down, Index<K,V> right, int level) {super(node, down, right);
            this.level = level;
        }
    }

HeadIndex 内部就一个 level 来定义层级。

ConcurrentSkipListMap 提供了四个构造函数,每个构造函数都会调用 initialize()方法进行初始化工作。

    final void initialize() {
        keySet = null;
        entrySet = null;
        values = null;
        descendingMap = null;
        randomSeed = seedGenerator.nextInt() | 0x0100; // ensure nonzero
        head = new ConcurrentSkipListMap.HeadIndex<K,V>(new ConcurrentSkipListMap.Node<K,V>(null, BASE_HEADER, null),
                null, null, 1);
    }

注意,initialize()方法不仅仅只在构造函数中被调用,如 clone,clear、readObject 时都会调用该方法进行初始化步骤。这里需要注意 randomSeed 的初始化。

 private transient int randomSeed;
 randomSeed = seedGenerator.nextInt() | 0x0100; // ensure nonzero

randomSeed 一个简单的随机数生成器(在后面介绍)。

put 操作

CoucurrentSkipListMap 提供了 put()方法用于将指定值与此映射中的指定键关联。源码如下:

    public V put(K key, V value) {if (value == null)
            throw new NullPointerException();
        return doPut(key, value, false);
    }

首先判断 value 如果为 null,则抛出 NullPointerException,否则调用 doPut 方法,其实如果各位看过 JDK 的源码的话,应该对这样的操作很熟悉了,JDK 源码里面很多方法都是先做一些必要性的验证后,然后通过调用 do**()方法进行真正的操作。

doPut()方法内容较多,我们分步分析。

    private V doPut(K key, V value, boolean onlyIfAbsent) {
        Node<K,V> z;             // added node
        if (key == null)
            throw new NullPointerException();
        // 比较器
        Comparator<? super K> cmp = comparator;
        outer: for (;;) {for (Node<K, V> b = findPredecessor(key, cmp), n = b.next; ; ) {/** 省略代码 */

doPut()方法有三个参数,除了 key,value 外还有一个 boolean 类型的 onlyIfAbsent,该参数作用与如果存在当前 key 时,该做何动作。当 onlyIfAbsent 为 false 时,替换 value,为 true 时,则返回该 value。用代码解释为:

   if (!map.containsKey(key))
      return map.put(key, value);
  else
       return map.get(key);

首先判断 key 是否为 null,如果为 null,则抛出 NullPointerException,从这里我们可以确认 ConcurrentSkipList 是不支持 key 或者 value 为 null 的。然后调用 findPredecessor()方法,传入 key 来确认位置。findPredecessor()方法其实就是确认 key 要插入的位置。

   private Node<K,V> findPredecessor(Object key, Comparator<? super K> cmp) {if (key == null)
            throw new NullPointerException(); // don't postpone errors
        for (;;) {
            // 从 head 节点开始,head 是 level 最高级别的 headIndex
            for (Index<K,V> q = head, r = q.right, d;;) {

                // r != null,表示该节点右边还有节点,需要比较
                if (r != null) {
                    Node<K,V> n = r.node;
                    K k = n.key;
                    // value == null,表示该节点已经被删除了
                    // 通过 unlink()方法过滤掉该节点
                    if (n.value == null) {
                        // 删掉 r 节点
                        if (!q.unlink(r))
                            break;           // restart
                        r = q.right;         // reread r
                        continue;
                    }

                    // value != null,节点存在
                    // 如果 key 大于 r 节点的 key 则往前进一步
                    if (cpr(cmp, key, k) > 0) {
                        q = r;
                        r = r.right;
                        continue;
                    }
                }

                // 到达最右边,如果 dowm == null,表示指针已经达到最下层了,直接返回该节点
                if ((d = q.down) == null)
                    return q.node;
                q = d;
                r = d.right;
            }
        }
    }

findPredecessor()方法意思非常明确:寻找前辈。从最高层的 headIndex 开始向右一步一步比较,直到 right 为 null 或者右边节点的 Node 的 key 大于当前 key 为止,然后再向下寻找,依次重复该过程,直到 down 为 null 为止,即找到了前辈,看返回的结果注意是 Node,不是 Item,所以插入的位置应该是最底层的 Node 链表。

在这个过程中 ConcurrentSkipListMap 赋予了该方法一个其他的功能,就是通过判断节点的 value 是否为 null,如果为 null,表示该节点已经被删除了,通过调用 unlink()方法删除该节点。

        final boolean unlink(Index<K,V> succ) {return node.value != null && casRight(succ, succ.right);
        }

删除节点过程非常简单,更改下 right 指针即可。

通过 findPredecessor()找到前辈节点后,做什么呢?看下面:

 for (Node<K,V> b = findPredecessor(key, cmp), n = b.next;;) {
        // 前辈节点的 next != null
        if (n != null) {
            Object v; int c;
            Node<K,V> f = n.next;

            // 不一致读,主要原因是并发,有节点捷足先登
            if (n != b.next)               // inconsistent read
                break;

            // n.value == null,该节点已经被删除了
            if ((v = n.value) == null) {   // n is deleted
                n.helpDelete(b, f);
                break;
            }

            // 前辈节点 b 已经被删除
            if (b.value == null || v == n) // b is deleted
                break;

            // 节点大于,往前移
            if ((c = cpr(cmp, key, n.key)) > 0) {
                b = n;
                n = f;
                continue;
            }

            // c == 0 表示,找到一个 key 相等的节点,根据 onlyIfAbsent 参数来做判断
            // onlyIfAbsent ==false,则通过 casValue,替换 value
            // onlyIfAbsent == true,返回该 value
            if (c == 0) {if (onlyIfAbsent || n.casValue(v, value)) {@SuppressWarnings("unchecked") V vv = (V)v;
                    return vv;
                }
                break; // restart if lost race to replace value
            }
            // else c < 0; fall through
        }

        // 将 key-value 包装成一个 node,插入
        z = new Node<K,V>(key, value, n);
        if (!b.casNext(n, z))
            break;         // restart if lost race to append to b
        break outer;
    }

找到合适的位置后,就是在该位置插入节点咯。插入节点的过程比较简单,就是将 key-value 包装成一个 Node,然后通过 casNext()方法加入到链表当中。当然是插入之前需要进行一系列的校验工作。

在最下层插入节点后,下一步工作是什么?新建索引。前面博主提过,在插入节点的时候,会根据采用抛硬币的方式来决定新节点所插入的层次,由于存在并发的可能,ConcurrentSkipListMap 采用 ThreadLocalRandom 来生成随机数。如下:

int rnd = ThreadLocalRandom.nextSecondarySeed();

抛硬币决定层次的思想很简单,就是通过抛硬币如果硬币为正面则层次 level + 1,否则停止,如下:

            // 抛硬币决定层次
            while (((rnd >>>= 1) & 1) != 0)
                ++level;

在阐述 SkipList 插入节点的时候说明了,决定的层次 level 会分为两种情况进行处理,一是如果层次 level 大于最大的层次话则需要新增一层,否则就在相应层次以及小于该 level 的层次进行节点新增处理。

level <= headIndex.level

            // 如果决定的层次 level 比最高层次 head.level 小,直接生成最高层次的 index
            // 由于需要确认每一层次的 down,所以需要从最下层依次往上生成
            if (level <= (max = h.level)) {for (int i = 1; i <= level; ++i)
                    idx = new ConcurrentSkipListMap.Index<K,V>(z, idx, null);
            }

从底层开始,小于 level 的每一层都初始化一个 index,每次的 node 都指向新加入的 node,down 指向下一层的 item,右侧 next 全部为 null。整个处理过程非常简单:为小于 level 的每一层初始化一个 index,然后加入到原来的 index 链条中去。

level > headIndex.level

           // leve > head.level 则新增一层
            else { // try to grow by one level
                // 新增一层
                level = max + 1;

                // 初始化 level 个 item 节点
                @SuppressWarnings("unchecked")
                ConcurrentSkipListMap.Index<K,V>[] idxs =
                        (ConcurrentSkipListMap.Index<K,V>[])new ConcurrentSkipListMap.Index<?,?>[level+1];
                for (int i = 1; i <= level; ++i)
                    idxs[i] = idx = new ConcurrentSkipListMap.Index<K,V>(z, idx, null);

                //
                for (;;) {
                    h = head;
                    int oldLevel = h.level;
                    // 层次扩大了,需要重新开始(有新线程节点加入)if (level <= oldLevel) // lost race to add level
                        break;
                    // 新的头结点 HeadIndex
                    ConcurrentSkipListMap.HeadIndex<K,V> newh = h;
                    ConcurrentSkipListMap.Node<K,V> oldbase = h.node;
                    // 生成新的 HeadIndex 节点,该 HeadIndex 指向新增层次
                    for (int j = oldLevel+1; j <= level; ++j)
                        newh = new ConcurrentSkipListMap.HeadIndex<K,V>(oldbase, newh, idxs[j], j);

                    // HeadIndex CAS 替换
                    if (casHead(h, newh)) {
                        h = newh;
                        idx = idxs[level = oldLevel];
                        break;
                    }
                }

当抛硬币决定的 level 大于最大层次 level 时,需要新增一层进行处理。处理逻辑如下:

  1. 初始化一个对应的 index 数组,大小为 level + 1,然后为每个单位都创建一个 index,个中参数为:Node 为新增的 Z,down 为下一层 index,right 为 null
  2. 通过 for 循环来进行扩容操作。从最高层进行处理,新增一个 HeadIndex,个中参数:节点 Node,down 都为最高层的 Node 和 HeadIndex,right 为刚刚创建的对应层次的 index,level 为相对应的层次 level。最后通过 CAS 把当前的 head 与新加入层的 head 进行替换。

通过上面步骤我们发现,尽管已经找到了前辈节点,也将 node 插入了,也确定确定了层次并生成了相应的 Index,但是并没有将这些 Index 插入到相应的层次当中,所以下面的代码就是将 index 插入到相对应的层当中。

  // 从插入的层次 level 开始
    splice: for (int insertionLevel = level;;) {
        int j = h.level;
        //  从 headIndex 开始
        for (ConcurrentSkipListMap.Index<K,V> q = h, r = q.right, t = idx;;) {if (q == null || t == null)
                break splice;

            // r != null;这里是找到相应层次的插入节点位置,注意这里只横向找
            if (r != null) {
                ConcurrentSkipListMap.Node<K,V> n = r.node;

                int c = cpr(cmp, key, n.key);

                // n.value == null,解除关系,r 右移
                if (n.value == null) {if (!q.unlink(r))
                        break;
                    r = q.right;
                    continue;
                }

                // key > n.key 右移
                if (c > 0) {
                    q = r;
                    r = r.right;
                    continue;
                }
            }

            // 上面找到节点要插入的位置,这里就插入
            // 当前层是最顶层
            if (j == insertionLevel) {
                // 建立联系
                if (!q.link(r, t))
                    break; // restart
                if (t.node.value == null) {findNode(key);
                    break splice;
                }
                // 标志的插入层 --,如果 == 0,表示已经到底了,插入完毕,退出循环
                if (--insertionLevel == 0)
                    break splice;
            }

            // 上面节点已经插入完毕了,插入下一个节点
            if (--j >= insertionLevel && j < level)
                t = t.down;
            q = q.down;
            r = q.right;
        }
    }

这段代码分为两部分看,一部分是找到相应层次的该节点插入的位置,第二部分在该位置插入,然后下移。

至此,ConcurrentSkipListMap 的 put 操作到此就结束了。代码量有点儿多,这里总结下:

  1. 首先通过 findPredecessor()方法找到前辈节点 Node
  2. 根据返回的前辈节点以及 key-value,新建 Node 节点,同时通过 CAS 设置 next
  3. 设置节点 Node,再设置索引节点。采取抛硬币方式决定层次,如果所决定的层次大于现存的最大层次,则新增一层,然后新建一个 Item 链表。
  4. 最后,将新建的 Item 链表插入到 SkipList 结构中。

get 操作

相比于 put 操作,get 操作会简单很多,其过程其实就只相当于 put 操作的第一步:

  private V doGet(Object key) {if (key == null)
            throw new NullPointerException();
        Comparator<? super K> cmp = comparator;
        outer: for (;;) {for (ConcurrentSkipListMap.Node<K,V> b = findPredecessor(key, cmp), n = b.next;;) {
                Object v; int c;
                if (n == null)
                    break outer;
                ConcurrentSkipListMap.Node<K,V> f = n.next;
                if (n != b.next)                // inconsistent read
                    break;
                if ((v = n.value) == null) {    // n is deleted
                    n.helpDelete(b, f);
                    break;
                }
                if (b.value == null || v == n)  // b is deleted
                    break;
                if ((c = cpr(cmp, key, n.key)) == 0) {@SuppressWarnings("unchecked") V vv = (V)v;
                    return vv;
                }
                if (c < 0)
                    break outer;
                b = n;
                n = f;
            }
        }
        return null;
    }

与 put 操作第一步相似,首先调用 findPredecessor()方法找到前辈节点,然后顺着 right 一直往右找即可,同时在这个过程中同样承担了一个删除 value 为 null 的节点的职责。

remove 操作

remove 操作为删除指定 key 节点,如下:

    public V remove(Object key) {return doRemove(key, null);
    }

直接调用 doRemove()方法,这里 remove 有两个参数,一个是 key,另外一个是 value,所以 doRemove 方法即提供 remove key,也提供同时满足 key-value。

 final V doRemove(Object key, Object value) {if (key == null)
            throw new NullPointerException();
        Comparator<? super K> cmp = comparator;
        outer: for (;;) {for (ConcurrentSkipListMap.Node<K,V> b = findPredecessor(key, cmp), n = b.next;;) {
                Object v; int c;
                if (n == null)
                    break outer;
                ConcurrentSkipListMap.Node<K,V> f = n.next;

                // 不一致读,重新开始
                if (n != b.next)                    // inconsistent read
                    break;

                // n 节点已删除
                if ((v = n.value) == null) {        // n is deleted
                    n.helpDelete(b, f);
                    break;
                }

                // b 节点已删除
                if (b.value == null || v == n)      // b is deleted
                    break;

                if ((c = cpr(cmp, key, n.key)) < 0)
                    break outer;

                // 右移
                if (c > 0) {
                    b = n;
                    n = f;
                    continue;
                }

                /*
                 * 找到节点
                 */

                // value != null 表示需要同时校验 key-value 值
                if (value != null && !value.equals(v))
                    break outer;

                // CAS 替换 value
                if (!n.casValue(v, null))
                    break;
                if (!n.appendMarker(f) || !b.casNext(n, f))
                    findNode(key);                  // retry via findNode
                else {
                    // 清理节点
                    findPredecessor(key, cmp);      // clean index

                    // head.right == null 表示该层已经没有节点,删掉该层
                    if (head.right == null)
                        tryReduceLevel();}
                @SuppressWarnings("unchecked") V vv = (V)v;
                return vv;
            }
        }
        return null;
    }

调用 findPredecessor()方法找到前辈节点,然后通过右移,然后比较,找到后利用 CAS 把 value 替换为 null,然后判断该节点是不是这层唯一的 index,如果是的话,调用 tryReduceLevel()方法把这层干掉,完成删除。

其实从这里可以看出,remove 方法仅仅是把 Node 的 value 设置 null,并没有真正删除该节点 Node,其实从上面的 put 操作、get 操作我们可以看出,他们在寻找节点的时候都会判断节点的 value 是否为 null,如果为 null,则调用 unLink()方法取消关联关系,如下:

                    if (n.value == null) {if (!q.unlink(r))
                            break;           // restart
                        r = q.right;         // reread r
                        continue;
                    }

size 操作

ConcurrentSkipListMap 的 size()操作和 ConcurrentHashMap 不同,它并没有维护一个全局变量来统计元素的个数,所以每次调用该方法的时候都需要去遍历。

    public int size() {
        long count = 0;
        for (Node<K,V> n = findFirst(); n != null; n = n.next) {if (n.getValidValue() != null)
                ++count;
        }
        return (count >= Integer.MAX_VALUE) ? Integer.MAX_VALUE : (int) count;
    }

调用 findFirst()方法找到第一个 Node,然后利用 node 的 next 去统计。最后返回统计数据,最多能返回 Integer.MAX_VALUE。注意这里在线程并发下是安全的。

ConcurrentSkipListMap 过程其实不复杂,相比于 ConcurrentHashMap 而言,是简单的不能再简单了。对跳表 SkipList 熟悉的话,ConcurrentSkipListMap 应该是盘中餐了。

Java 并发编程:并发容器之 CopyOnWriteArrayList(转载)

原文链接:

http://ifeve.com/java-copy-on-write/

Copy-On-Write 简称 COW,是一种用于程序设计中的优化策略。其基本思路是,从一开始大家都在共享同一个内容,当某个人想要修改这个内容的时候,才会真正把内容 Copy 出去形成一个新的内容然后再改,这是一种延时懒惰策略。从 JDK1.5 开始 Java 并发包里提供了两个使用 CopyOnWrite 机制实现的并发容器, 它们是 CopyOnWriteArrayList 和 CopyOnWriteArraySet。CopyOnWrite 容器非常有用,可以在非常多的并发场景中使用到。

什么是 CopyOnWrite 容器

CopyOnWrite 容器即写时复制的容器。通俗的理解是当我们往一个容器添加元素的时候,不直接往当前容器添加,而是先将当前容器进行 Copy,复制出一个新的容器,然后新的容器里添加元素,添加完元素之后,再将原容器的引用指向新的容器。这样做的好处是我们可以对 CopyOnWrite 容器进行并发的读,而不需要加锁,因为当前容器不会添加任何元素。所以 CopyOnWrite 容器也是一种读写分离的思想,读和写不同的容器。

CopyOnWriteArrayList 的实现原理

在使用 CopyOnWriteArrayList 之前,我们先阅读其源码了解下它是如何实现的。以下代码是向 CopyOnWriteArrayList 中 add 方法的实现(向 CopyOnWriteArrayList 里添加元素),可以发现在添加的时候是需要加锁的,否则多线程写的时候会 Copy 出 N 个副本出来。

1234567891011121314151617181920 /**` Appends the specified element to the end of this list.* @param e element to be appended to this list* @return <tt>true</tt> (as specified by {@link Collection#add})*/public` `boolean` `add(E e) {final ReentrantLock lock = this.lock;lock.lock();try {Object[] elements = getArray();int len = elements.length;Object[] newElements = Arrays.copyOf(elements, len + 1);newElements[len] = e;setArray(newElements);return true;finally` `{lock.unlock();}}`

  读的时候不需要加锁,如果读的时候有多个线程正在向 CopyOnWriteArrayList 添加数据,读还是会读到旧的数据,因为写的时候不会锁住旧的 CopyOnWriteArrayList。

123 public E get(`int index) {return` `get(getArray(), index);}`

  JDK 中并没有提供 CopyOnWriteMap,我们可以参考 CopyOnWriteArrayList 来实现一个,基本代码如下:

123456789101112131415161718192021222324252627282930313233 import java.util.Collection;`import java.util.Map;import` `java.util.Set;public class CopyOnWriteMap<K, V> implements` `Map<K, V>, Cloneable {private volatile Map<K, V> internalMap;public` `CopyOnWriteMap() {internalMap = new` `HashMap<K, V>();}public` `V put(K key, V value) {synchronized (this) {Map<K, V> newMap = new HashMap<K, V>(internalMap);V val = newMap.put(key, value);internalMap = newMap;return` `val;}}public V get(Object key) {return` `internalMap.get(key);}public` `void` `putAll(Map<? extends K, ? extends` `V> newData) {synchronized (this) {Map<K, V> newMap = new HashMap<K, V>(internalMap);newMap.putAll(newData);internalMap = newMap;}}`}

  实现很简单,只要了解了 CopyOnWrite 机制,我们可以实现各种 CopyOnWrite 容器,并且在不同的应用场景中使用。

CopyOnWrite 的应用场景

CopyOnWrite 并发容器用于读多写少的并发场景。比如白名单,黑名单,商品类目的访问和更新场景,假如我们有一个搜索网站,用户在这个网站的搜索框中,输入关键字搜索内容,但是某些关键字不允许被搜索。这些不能被搜索的关键字会被放在一个黑名单当中,黑名单每天晚上更新一次。当用户搜索时,会检查当前关键字在不在黑名单当中,如果在,则提示不能搜索。实现代码如下:

1234567891011121314151617181920212223242526272829303132333435 package com.ifeve.book;`import java.util.Map;import` `com.ifeve.book.forkjoin.CopyOnWriteMap;/** 黑名单服务* @author fangtengfei*/public class BlackListServiceImpl {private` `static` `CopyOnWriteMap<String, Boolean> blackListMap = new CopyOnWriteMap<String, Boolean>(1000);public` `static` `boolean` `isBlackList(String id) {return blackListMap.get(id) == null` `? false true;}public static void addBlackList(String id) {blackListMap.put(id, Boolean.TRUE);}/** 批量添加黑名单** @param ids*/public static void addBlackList(Map<String,Boolean> ids) {blackListMap.putAll(ids);}`}

  代码很简单,但是使用 CopyOnWriteMap 需要注意两件事情:

1. 减少扩容开销。根据实际需要,初始化 CopyOnWriteMap 的大小,避免写时 CopyOnWriteMap 扩容的开销。

2. 使用批量添加。因为每次添加,容器每次都会进行复制,所以减少添加次数,可以减少容器的复制次数。如使用上面代码里的 addBlackList 方法。

CopyOnWrite 的缺点

CopyOnWrite 容器有很多优点,但是同时也存在两个问题,即内存占用问题和数据一致性问题。所以在开发的时候需要注意一下。

内存占用问题。因为 CopyOnWrite 的写时复制机制,所以在进行写操作的时候,内存里会同时驻扎两个对象的内存,旧的对象和新写入的对象(注意: 在复制的时候只是复制容器里的引用,只是在写的时候会创建新对象添加到新容器里,而旧容器的对象还在使用,所以有两份对象内存)。如果这些对象占用的内存比较大,比如说 200M 左右,那么再写入 100M 数据进去,内存就会占用 300M,那么这个时候很有可能造成频繁的 Yong GC 和 Full GC。之前我们系统中使用了一个服务由于每晚使用 CopyOnWrite 机制更新大对象,造成了每晚 15 秒的 Full GC,应用响应时间也随之变长。

针对内存占用问题,可以通过压缩容器中的元素的方法来减少大对象的内存消耗,比如,如果元素全是 10 进制的数字,可以考虑把它压缩成 36 进制或 64 进制。或者不使用 CopyOnWrite 容器,而使用其他的并发容器,如 ConcurrentHashMap。

数据一致性问题。CopyOnWrite 容器只能保证数据的最终一致性,不能保证数据的实时一致性。所以如果你希望写入的的数据,马上能读到,请不要使用 CopyOnWrite 容器。

下面这篇文章验证了 CopyOnWriteArrayList 和同步容器的性能:

http://blog.csdn.net/wind5shy/article/details/5396887

下面这篇文章简单描述了 CopyOnWriteArrayList 的使用:

http://blog.csdn.net/imzoer/article/details/9751591

 

退出移动版