java-并发-ConcurrentHashMap高并发机制-jdk1.8

2020年05月20日 阅读数:44
这篇文章主要向大家介绍java-并发-ConcurrentHashMap高并发机制-jdk1.8,主要内容包括基础应用、实用技巧、原理机制等方面,希望对大家有所帮助。

JDK8的版本,与JDK6的版本有很大的差别。实现线程安全的思想也已经彻底变了,它摒弃了Segment(锁段)的概念,而是启用了一种全新的方式实现,利用CAS算法。它沿用了与它同时期的HashMap版本的思想,底层依然由“数组”+链表+红黑树的方式思想,可是为了作到并发,又增长了不少辅助的类,例如TreeBin,Traverser等对象内部类。CAS算法实现无锁化的修改值的操做,他能够大大下降锁代理的性能消耗。这个算法的基本思想就是不断地去比较当前内存中的变量值与你指定的一个变量值是否相等,若是相等,则接受你指定的修改的值,不然拒绝你的操做。由于当前线程中的值已经不是最新的值,你的修改极可能会覆盖掉其余线程修改的结果。这一点与乐观锁,SVN的思想是比较相似的。html

ConcurrentHashMap是conccurrent家族中的一个类,因为它能够高效地支持并发操做,以及被普遍使用,经典的开源框架Spring的底层数据结构就是使用ConcurrentHashMap实现的。与同是线程安全的老大哥HashTable相比,它已经更胜一筹,所以它的锁更加细化,而不是像HashTable同样为几乎每一个方法都添加了synchronized锁,这样的锁无疑会影响到性能。java

本文的分析的源码是JDK8的版本,与JDK6的版本有很大的差别。实现线程安全的思想也已经彻底变了,它摒弃了Segment(锁段)的概念,而是启用了一种全新的方式实现,利用CAS算法。它沿用了与它同时期的HashMap版本的思想,底层依然由“数组”+链表+红黑树的方式思想,可是为了作到并发,又增长了不少辅助的类,例如TreeBin,Traverser等对象内部类。


1 重要的属性

首先来看几个重要的属性,与HashMap相同的就再也不介绍了,这里重点解释一下sizeCtl这个属性。能够说它是ConcurrentHashMap中出镜率很高的一个属性,由于它是一个控制标识符,在不一样的地方有不一样用途,并且它的取值不一样,也表明不一样的含义。node

  • 负数表明正在进行初始化或扩容操做
  • -1表明正在初始化
  • -N 表示有N-1个线程正在进行扩容操做
  • 正数或0表明hash表尚未被初始化,这个数值表示初始化或下一次进行扩容的大小,这一点相似于扩容阈值的概念。还后面能够看到,它的值始终是当前ConcurrentHashMap容量的0.75倍,这与loadfactor是对应的
[java]  view plain  copy
  1. /** 
  2.    * 盛装Node元素的数组 它的大小是2的整数次幂 
  3.    * Size is always a power of two. Accessed directly by iterators. 
  4.    */  
  5.   transient volatile Node<K,V>[] table;  
  6.   
  7. /** 
  8.    * Table initialization and resizing control.  When negative, the 
  9.    * table is being initialized or resized: -1 for initialization, 
  10.    * else -(1 + the number of active resizing threads).  Otherwise, 
  11.    * when table is null, holds the initial table size to use upon 
  12.    * creation, or 0 for default. After initialization, holds the 
  13.    * next element count value upon which to resize the table. 
  14.    hash表初始化或扩容时的一个控制位标识量。 
  15.    负数表明正在进行初始化或扩容操做 
  16.    -1表明正在初始化 
  17.    -N 表示有N-1个线程正在进行扩容操做 
  18.    正数或0表明hash表尚未被初始化,这个数值表示初始化或下一次进行扩容的大小 
  19.     
  20.    */  
  21.   private transient volatile int sizeCtl;   
  22.   // 如下两个是用来控制扩容的时候 单线程进入的变量  
  23.    /** 
  24.    * The number of bits used for generation stamp in sizeCtl. 
  25.    * Must be at least 6 for 32bit arrays. 
  26.    */  
  27.   private static int RESIZE_STAMP_BITS = 16;  
  28. /** 
  29.    * The bit shift for recording size stamp in sizeCtl. 
  30.    */  
  31.   private static final int RESIZE_STAMP_SHIFT = 32 - RESIZE_STAMP_BITS;  
  32.     
  33.     
  34.   /* 
  35.    * Encodings for Node hash fields. See above for explanation. 
  36.    */  
  37.   static final int MOVED     = -1// hash值是-1,表示这是一个forwardNode节点  
  38.   static final int TREEBIN   = -2// hash值是-2  表示这时一个TreeBin节点  

2 重要的内部类

2.1 Node

Node是最核心的内部类,它包装了key-value键值对,全部插入ConcurrentHashMap的数据都包装在这里面。它与HashMap中的定义很类似,可是可是有一些差异它对value和next属性设置了volatile同步锁,它不容许调用setValue方法直接改变Node的value域,它增长了find方法辅助map.get()方法。nginx

[java]  view plain  copy
  1. static class Node<K,V> implements Map.Entry<K,V> {  
  2.        final int hash;  
  3.        final K key;  
  4.        volatile V val;//带有同步锁的value  
  5.        volatile Node<K,V> next;//带有同步锁的next指针  
  6.   
  7.        Node(int hash, K key, V val, Node<K,V> next) {  
  8.            this.hash = hash;  
  9.            this.key = key;  
  10.            this.val = val;  
  11.            this.next = next;  
  12.        }  
  13.   
  14.        public final K getKey()       { return key; }  
  15.        public final V getValue()     { return val; }  
  16.        public final int hashCode()   { return key.hashCode() ^ val.hashCode(); }  
  17.        public final String toString(){ return key + "=" + val; }  
  18.        //不容许直接改变value的值  
  19.        public final V setValue(V value) {  
  20.            throw new UnsupportedOperationException();  
  21.        }  
  22.   
  23.        public final boolean equals(Object o) {  
  24.            Object k, v, u; Map.Entry<?,?> e;  
  25.            return ((o instanceof Map.Entry) &&  
  26.                    (k = (e = (Map.Entry<?,?>)o).getKey()) != null &&  
  27.                    (v = e.getValue()) != null &&  
  28.                    (k == key || k.equals(key)) &&  
  29.                    (v == (u = val) || v.equals(u)));  
  30.        }  
  31.   
  32.        /** 
  33.         * Virtualized support for map.get(); overridden in subclasses. 
  34.         */  
  35.        Node<K,V> find(int h, Object k) {  
  36.            Node<K,V> e = this;  
  37.            if (k != null) {  
  38.                do {  
  39.                    K ek;  
  40.                    if (e.hash == h &&  
  41.                        ((ek = e.key) == k || (ek != null && k.equals(ek))))  
  42.                        return e;  
  43.                } while ((e = e.next) != null);  
  44.            }  
  45.            return null;  
  46.        }  
  47.    }  
  48.      
  49.    这个Node内部类与HashMap中定义的Node类很类似,可是有一些差异  
  50.    它对value和next属性设置了volatile同步锁  
  51.    它不容许调用setValue方法直接改变Node的value域  
  52.    它增长了find方法辅助map.get()方法  

2.2 TreeNode

树节点类,另一个核心的数据结构。当链表长度过长的时候,会转换为TreeNode。可是与HashMap不相同的是,它并非直接转换为红黑树,而是把这些结点包装成TreeNode放在TreeBin对象中,由TreeBin完成对红黑树的包装。并且TreeNode在ConcurrentHashMap集成自Node类,而并不是HashMap中的集成自LinkedHashMap.Entry<K,V>类,也就是说TreeNode带有next指针,这样作的目的是方便基于TreeBin的访问。c++

2.3 TreeBin

这个类并不负责包装用户的key、value信息,而是包装的不少TreeNode节点。它代替了TreeNode的根节点,也就是说在实际的ConcurrentHashMap“数组”中,存放的是TreeBin对象,而不是TreeNode对象,这是与HashMap的区别。另外这个类还带有了读写锁。git

这里仅贴出它的构造方法。能够看到在构造TreeBin节点时,仅仅指定了它的hash值为TREEBIN常量,这也就是个标识为。同时也看到咱们熟悉的红黑树构造方法

[java]  view plain  copy
  1. /** 
  2.          * Creates bin with initial set of nodes headed by b. 
  3.          */  
  4.         TreeBin(TreeNode<K,V> b) {  
  5.             super(TREEBIN, nullnullnull);  
  6.             this.first = b;  
  7.             TreeNode<K,V> r = null;  
  8.             for (TreeNode<K,V> x = b, next; x != null; x = next) {  
  9.                 next = (TreeNode<K,V>)x.next;  
  10.                 x.left = x.right = null;  
  11.                 if (r == null) {  
  12.                     x.parent = null;  
  13.                     x.red = false;  
  14.                     r = x;  
  15.                 }  
  16.                 else {  
  17.                     K k = x.key;  
  18.                     int h = x.hash;  
  19.                     Class<?> kc = null;  
  20.                     for (TreeNode<K,V> p = r;;) {  
  21.                         int dir, ph;  
  22.                         K pk = p.key;  
  23.                         if ((ph = p.hash) > h)  
  24.                             dir = -1;  
  25.                         else if (ph < h)  
  26.                             dir = 1;  
  27.                         else if ((kc == null &&  
  28.                                   (kc = comparableClassFor(k)) == null) ||  
  29.                                  (dir = compareComparables(kc, k, pk)) == 0)  
  30.                             dir = tieBreakOrder(k, pk);  
  31.                             TreeNode<K,V> xp = p;  
  32.                         if ((p = (dir <= 0) ? p.left : p.right) == null) {  
  33.                             x.parent = xp;  
  34.                             if (dir <= 0)  
  35.                                 xp.left = x;  
  36.                             else  
  37.                                 xp.right = x;  
  38.                             r = balanceInsertion(r, x);  
  39.                             break;  
  40.                         }  
  41.                     }  
  42.                 }  
  43.             }  
  44.             this.root = r;  
  45.             assert checkInvariants(root);  
  46.         }  

2.5 ForwardingNode

一个用于链接两个table的节点类。它包含一个nextTable指针,用于指向下一张表。并且这个节点的key value next指针所有为null,它的hash值为-1. 这里面定义的find的方法是从nextTable里进行查询节点,而不是以自身为头节点进行查找github

[java]  view plain  copy
  1. /** 
  2.      * A node inserted at head of bins during transfer operations. 
  3.      */  
  4.     static final class ForwardingNode<K,V> extends Node<K,V> {  
  5.         final Node<K,V>[] nextTable;  
  6.         ForwardingNode(Node<K,V>[] tab) {  
  7.             super(MOVED, nullnullnull);  
  8.             this.nextTable = tab;  
  9.         }  
  10.   
  11.         Node<K,V> find(int h, Object k) {  
  12.             // loop to avoid arbitrarily deep recursion on forwarding nodes  
  13.             outer: for (Node<K,V>[] tab = nextTable;;) {  
  14.                 Node<K,V> e; int n;  
  15.                 if (k == null || tab == null || (n = tab.length) == 0 ||  
  16.                     (e = tabAt(tab, (n - 1) & h)) == null)  
  17.                     return null;  
  18.                 for (;;) {  
  19.                     int eh; K ek;  
  20.                     if ((eh = e.hash) == h &&  
  21.                         ((ek = e.key) == k || (ek != null && k.equals(ek))))  
  22.                         return e;  
  23.                     if (eh < 0) {  
  24.                         if (e instanceof ForwardingNode) {  
  25.                             tab = ((ForwardingNode<K,V>)e).nextTable;  
  26.                             continue outer;  
  27.                         }  
  28.                         else  
  29.                             return e.find(h, k);  
  30.                     }  
  31.                     if ((e = e.next) == null)  
  32.                         return null;  
  33.                 }  
  34.             }  
  35.         }  
  36.     }  


3 Unsafe与CAS

在ConcurrentHashMap中,随处能够看到U, 大量使用了U.compareAndSwapXXX的方法,这个方法是利用一个CAS算法实现无锁化的修改值的操做,他能够大大下降锁代理的性能消耗。这个算法的基本思想就是不断地去比较当前内存中的变量值与你指定的一个变量值是否相等,若是相等,则接受你指定的修改的值,不然拒绝你的操做。由于当前线程中的值已经不是最新的值,你的修改极可能会覆盖掉其余线程修改的结果。这一点与乐观锁,SVN的思想是比较相似的。算法

3.1 unsafe静态块

unsafe代码块控制了一些属性的修改工做,好比最经常使用的SIZECTL 。  在这一版本的concurrentHashMap中,大量应用来的CAS方法进行变量、属性的修改工做。  利用CAS进行无锁操做,能够大大提升性能。数据库

[java]  view plain  copy
  1. private static final sun.misc.Unsafe U;  
  2.    private static final long SIZECTL;  
  3.    private static final long TRANSFERINDEX;  
  4.    private static final long BASECOUNT;  
  5.    private static final long CELLSBUSY;  
  6.    private static final long CELLVALUE;  
  7.    private static final long ABASE;  
  8.    private static final int ASHIFT;  
  9.   
  10.    static {  
  11.        try {  
  12.            U = sun.misc.Unsafe.getUnsafe();  
  13.            Class<?> k = ConcurrentHashMap.class;  
  14.            SIZECTL = U.objectFieldOffset  
  15.                (k.getDeclaredField("sizeCtl"));  
  16.            TRANSFERINDEX = U.objectFieldOffset  
  17.                (k.getDeclaredField("transferIndex"));  
  18.            BASECOUNT = U.objectFieldOffset  
  19.                (k.getDeclaredField("baseCount"));  
  20.            CELLSBUSY = U.objectFieldOffset  
  21.                (k.getDeclaredField("cellsBusy"));  
  22.            Class<?> ck = CounterCell.class;  
  23.            CELLVALUE = U.objectFieldOffset  
  24.                (ck.getDeclaredField("value"));  
  25.            Class<?> ak = Node[].class;  
  26.            ABASE = U.arrayBaseOffset(ak);  
  27.            int scale = U.arrayIndexScale(ak);  
  28.            if ((scale & (scale - 1)) != 0)  
  29.                throw new Error("data type scale not a power of two");  
  30.            ASHIFT = 31 - Integer.numberOfLeadingZeros(scale);  
  31.        } catch (Exception e) {  
  32.            throw new Error(e);  
  33.        }  
  34.    }  
  35.      

3.2 三个核心方法

ConcurrentHashMap定义了三个原子操做,用于对指定位置的节点进行操做。正是这些原子操做保证了ConcurrentHashMap的线程安全。编程

[java]  view plain  copy
  1. @SuppressWarnings("unchecked")  
  2.    //得到在i位置上的Node节点  
  3.    static final <K,V> Node<K,V> tabAt(Node<K,V>[] tab, int i) {  
  4.        return (Node<K,V>)U.getObjectVolatile(tab, ((long)i << ASHIFT) + ABASE);  
  5.    }  
  6.     //利用CAS算法设置i位置上的Node节点。之因此能实现并发是由于他指定了原来这个节点的值是多少  
  7.     //在CAS算法中,会比较内存中的值与你指定的这个值是否相等,若是相等才接受你的修改,不然拒绝你的修改  
  8.     //所以当前线程中的值并非最新的值,这种修改可能会覆盖掉其余线程的修改结果  有点相似于SVN  
  9.    static final <K,V> boolean casTabAt(Node<K,V>[] tab, int i,  
  10.                                        Node<K,V> c, Node<K,V> v) {  
  11.        return U.compareAndSwapObject(tab, ((long)i << ASHIFT) + ABASE, c, v);  
  12.    }  
  13.     //利用volatile方法设置节点位置的值  
  14.    static final <K,V> void setTabAt(Node<K,V>[] tab, int i, Node<K,V> v) {  
  15.        U.putObjectVolatile(tab, ((long)i << ASHIFT) + ABASE, v);  
  16.    }  

4 初始化方法initTable

对于ConcurrentHashMap来讲,调用它的构造方法仅仅是设置了一些参数而已。而整个table的初始化是在向ConcurrentHashMap中插入元素的时候发生的。如调用put、computeIfAbsent、compute、merge等方法的时候,调用时机是检查table==null

初始化方法主要应用了关键属性sizeCtl 若是这个值〈0,表示其余线程正在进行初始化,就放弃这个操做。在这也能够看出ConcurrentHashMap的初始化只能由一个线程完成。若是得到了初始化权限,就用CAS方法将sizeCtl置为-1,防止其余线程进入。初始化数组后,将sizeCtl的值改成0.75*n

[java]  view plain  copy
  1. /** 
  2.      * Initializes table, using the size recorded in sizeCtl. 
  3.      */  
  4.     private final Node<K,V>[] initTable() {  
  5.         Node<K,V>[] tab; int sc;  
  6.         while ((tab = table) == null || tab.length == 0) {  
  7.                 //sizeCtl表示有其余线程正在进行初始化操做,把线程挂起。对于table的初始化工做,只能有一个线程在进行。  
  8.             if ((sc = sizeCtl) < 0)  
  9.                 Thread.yield(); // lost initialization race; just spin  
  10.             else if (U.compareAndSwapInt(this, SIZECTL, sc, -1)) {//利用CAS方法把sizectl的值置为-1 表示本线程正在进行初始化  
  11.                 try {  
  12.                     if ((tab = table) == null || tab.length == 0) {  
  13.                         int n = (sc > 0) ? sc : DEFAULT_CAPACITY;  
  14.                         @SuppressWarnings("unchecked")  
  15.                         Node<K,V>[] nt = (Node<K,V>[])new Node<?,?>[n];  
  16.                         table = tab = nt;  
  17.                         sc = n - (n >>> 2);//至关于0.75*n 设置一个扩容的阈值  
  18.                     }  
  19.                 } finally {  
  20.                     sizeCtl = sc;  
  21.                 }  
  22.                 break;  
  23.             }  
  24.         }  
  25.         return tab;  
  26.     }  


5 扩容方法 transfer

当ConcurrentHashMap容量不足的时候,须要对table进行扩容。这个方法的基本思想跟HashMap是很像的,可是因为它是支持并发扩容的,因此要复杂的多。缘由是它支持多线程进行扩容操做,而并无加锁。我想这样作的目的不只仅是为了知足concurrent的要求,而是但愿利用并发处理去减小扩容带来的时间影响。由于在扩容的时候,老是会涉及到从一个“数组”到另外一个“数组”拷贝的操做,若是这个操做可以并发进行,那真真是极好的了。

整个扩容操做分为两个部分
  •  第一部分是构建一个nextTable,它的容量是原来的两倍,这个操做是单线程完成的。这个单线程的保证是经过RESIZE_STAMP_SHIFT这个常量通过一次运算来保证的,这个地方在后面会有提到;

  • 第二个部分就是将原来table中的元素复制到nextTable中,这里容许多线程进行操做。

先来看一下单线程是如何完成的:

它的大致思想就是遍历、复制的过程。首先根据运算获得须要遍历的次数i,而后利用tabAt方法得到i位置的元素:

  • 若是这个位置为空,就在原table中的i位置放入forwardNode节点,这个也是触发并发扩容的关键点;

  • 若是这个位置是Node节点(fh>=0),若是它是一个链表的头节点,就构造一个反序链表,把他们分别放在nextTable的i和i+n的位置上

  • 若是这个位置是TreeBin节点(fh<0),也作一个反序处理,而且判断是否须要untreefi,把处理的结果分别放在nextTable的i和i+n的位置上

  • 遍历过全部的节点之后就完成了复制工做,这时让nextTable做为新的table,而且更新sizeCtl为新容量的0.75倍 ,完成扩容。
再看一下多线程是如何完成的:

在代码的69行有一个判断,若是遍历到的节点是forward节点,就向后继续遍历,再加上给节点上锁的机制,就完成了多线程的控制。多线程遍历节点,处理了一个节点,就把对应点的值set为forward,另外一个线程看到forward,就向后遍历。这样交叉就完成了复制工做。并且还很好的解决了线程安全的问题。 这个方法的设计实在是让我膜拜。

[java]  view plain  copy
  1.  /** 
  2.     * 一个过渡的table表  只有在扩容的时候才会使用 
  3.     */  
  4.    private transient volatile Node<K,V>[] nextTable;  
  5.   
  6. /** 
  7.     * Moves and/or copies the nodes in each bin to new table. See 
  8.     * above for explanation. 
  9.     */  
  10.    private final void transfer(Node<K,V>[] tab, Node<K,V>[] nextTab) {  
  11.        int n = tab.length, stride;  
  12.        if ((stride = (NCPU > 1) ? (n >>> 3) / NCPU : n) < MIN_TRANSFER_STRIDE)  
  13.            stride = MIN_TRANSFER_STRIDE; // subdivide range  
  14.        if (nextTab == null) {            // initiating  
  15.            try {  
  16.                @SuppressWarnings("unchecked")  
  17.                Node<K,V>[] nt = (Node<K,V>[])new Node<?,?>[n << 1];//构造一个nextTable对象 它的容量是原来的两倍  
  18.                nextTab = nt;  
  19.            } catch (Throwable ex) {      // try to cope with OOME  
  20.                sizeCtl = Integer.MAX_VALUE;  
  21.                return;  
  22.            }  
  23.            nextTable = nextTab;  
  24.            transferIndex = n;  
  25.        }  
  26.        int nextn = nextTab.length;  
  27.        ForwardingNode<K,V> fwd = new ForwardingNode<K,V>(nextTab);//构造一个连节点指针 用于标志位  
  28.        boolean advance = true;//并发扩容的关键属性 若是等于true 说明这个节点已经处理过  
  29.        boolean finishing = false// to ensure sweep before committing nextTab  
  30.        for (int i = 0, bound = 0;;) {  
  31.            Node<K,V> f; int fh;  
  32.            //这个while循环体的做用就是在控制i--  经过i--能够依次遍历原hash表中的节点  
  33.            while (advance) {  
  34.                int nextIndex, nextBound;  
  35.                if (--i >= bound || finishing)  
  36.                    advance = false;  
  37.                else if ((nextIndex = transferIndex) <= 0) {  
  38.                    i = -1;  
  39.                    advance = false;  
  40.                }  
  41.                else if (U.compareAndSwapInt  
  42.                         (this, TRANSFERINDEX, nextIndex,  
  43.                          nextBound = (nextIndex > stride ?  
  44.                                       nextIndex - stride : 0))) {  
  45.                    bound = nextBound;  
  46.                    i = nextIndex - 1;  
  47.                    advance = false;  
  48.                }  
  49.            }  
  50.            if (i < 0 || i >= n || i + n >= nextn) {  
  51.                int sc;  
  52.                if (finishing) {  
  53.                 //若是全部的节点都已经完成复制工做  就把nextTable赋值给table 清空临时对象nextTable  
  54.                    nextTable = null;  
  55.                    table = nextTab;  
  56.                    sizeCtl = (n << 1) - (n >>> 1);//扩容阈值设置为原来容量的1.5倍  依然至关于如今容量的0.75倍  
  57.                    return;  
  58.                }  
  59.                //利用CAS方法更新这个扩容阈值,在这里面sizectl值减一,说明新加入一个线程参与到扩容操做  
  60.                if (U.compareAndSwapInt(this, SIZECTL, sc = sizeCtl, sc - 1)) {  
  61.                    if ((sc - 2) != resizeStamp(n) << RESIZE_STAMP_SHIFT)  
  62.                        return;  
  63.                    finishing = advance = true;  
  64.                    i = n; // recheck before commit  
  65.                }  
  66.            }  
  67.            //若是遍历到的节点为空 则放入ForwardingNode指针  
  68.            else if ((f = tabAt(tab, i)) == null)  
  69.                advance = casTabAt(tab, i, null, fwd);  
  70.            //若是遍历到ForwardingNode节点  说明这个点已经被处理过了 直接跳过  这里是控制并发扩容的核心  
  71.            else if ((fh = f.hash) == MOVED)  
  72.                advance = true// already processed  
  73.            else {  
  74.                 //节点上锁  
  75.                synchronized (f) {  
  76.                    if (tabAt(tab, i) == f) {  
  77.                        Node<K,V> ln, hn;  
  78.                        //若是fh>=0 证实这是一个Node节点  
  79.                        if (fh >= 0) {  
  80.                            int runBit = fh & n;  
  81.                            //如下的部分在完成的工做是构造两个链表  一个是原链表  另外一个是原链表的反序排列  
  82.                            Node<K,V> lastRun = f;  
  83.                            for (Node<K,V> p = f.next; p != null; p = p.next) {  
  84.                                int b = p.hash & n;  
  85.                                if (b != runBit) {  
  86.                                    runBit = b;  
  87.                                    lastRun = p;  
  88.                                }  
  89.                            }  
  90.                            if (runBit == 0) {  
  91.                                ln = lastRun;  
  92.                                hn = null;  
  93.                            }  
  94.                            else {  
  95.                                hn = lastRun;  
  96.                                ln = null;  
  97.                            }  
  98.                            for (Node<K,V> p = f; p != lastRun; p = p.next) {  
  99.                                int ph = p.hash; K pk = p.key; V pv = p.val;  
  100.                                if ((ph & n) == 0)  
  101.                                    ln = new Node<K,V>(ph, pk, pv, ln);  
  102.                                else  
  103.                                    hn = new Node<K,V>(ph, pk, pv, hn);  
  104.                            }  
  105.                            //在nextTable的i位置上插入一个链表  
  106.                            setTabAt(nextTab, i, ln);  
  107.                            //在nextTable的i+n的位置上插入另外一个链表  
  108.                            setTabAt(nextTab, i + n, hn);  
  109.                            //在table的i位置上插入forwardNode节点  表示已经处理过该节点  
  110.                            setTabAt(tab, i, fwd);  
  111.                            //设置advance为true 返回到上面的while循环中 就能够执行i--操做  
  112.                            advance = true;  
  113.                        }  
  114.                        //对TreeBin对象进行处理  与上面的过程相似  
  115.                        else if (f instanceof TreeBin) {  
  116.                            TreeBin<K,V> t = (TreeBin<K,V>)f;  
  117.                            TreeNode<K,V> lo = null, loTail = null;  
  118.                            TreeNode<K,V> hi = null, hiTail = null;  
  119.                            int lc = 0, hc = 0;  
  120.                            //构造正序和反序两个链表  
  121.                            for (Node<K,V> e = t.first; e != null; e = e.next) {  
  122.                                int h = e.hash;  
  123.                                TreeNode<K,V> p = new TreeNode<K,V>  
  124.                                    (h, e.key, e.val, nullnull);  
  125.                                if ((h & n) == 0) {  
  126.                                    if ((p.prev = loTail) == null)  
  127.                                        lo = p;  
  128.                                    else  
  129.                                        loTail.next = p;  
  130.                                    loTail = p;  
  131.                                    ++lc;  
  132.                                }  
  133.                                else {  
  134.                                    if ((p.prev = hiTail) == null)  
  135.                                        hi = p;  
  136.                                    else  
  137.                                        hiTail.next = p;  
  138.                                    hiTail = p;  
  139.                                    ++hc;  
  140.                                }  
  141.                            }  
  142.                            //若是扩容后已经再也不须要tree的结构 反向转换为链表结构  
  143.                            ln = (lc <= UNTREEIFY_THRESHOLD) ? untreeify(lo) :  
  144.                                (hc != 0) ? new TreeBin<K,V>(lo) : t;  
  145.                            hn = (hc <= UNTREEIFY_THRESHOLD) ? untreeify(hi) :  
  146.                                (lc != 0) ? new TreeBin<K,V>(hi) : t;  
  147.                             //在nextTable的i位置上插入一个链表      
  148.                            setTabAt(nextTab, i, ln);  
  149.                            //在nextTable的i+n的位置上插入另外一个链表  
  150.                            setTabAt(nextTab, i + n, hn);  
  151.                             //在table的i位置上插入forwardNode节点  表示已经处理过该节点  
  152.                            setTabAt(tab, i, fwd);  
  153.                            //设置advance为true 返回到上面的while循环中 就能够执行i--操做  
  154.                            advance = true;  
  155.                        }  
  156.                    }  
  157.                }  
  158.            }  
  159.        }  
  160.    }  

6 Put方法

前面的全部的介绍其实都为这个方法作铺垫。ConcurrentHashMap最经常使用的就是put和get两个方法。如今来介绍put方法,这个put方法依然沿用HashMap的put方法的思想,根据hash值计算这个新插入的点在table中的位置i,若是i位置是空的,直接放进去,不然进行判断,若是i位置是树节点,按照树的方式插入新的节点,不然把i插入到链表的末尾。ConcurrentHashMap中依然沿用这个思想,有一个最重要的不一样点就是ConcurrentHashMap不容许keyvaluenull。另外因为涉及到多线程,put方法就要复杂一点。在多线程中可能有如下两个状况

  1. 若是一个或多个线程正在对ConcurrentHashMap进行扩容操做,当前线程也要进入扩容的操做中。这个扩容的操做之因此能被检测到,是由于transfer方法中在空结点上插入forward节点,若是检测到须要插入的位置被forward节点占有,就帮助进行扩容;

  2. 若是检测到要插入的节点是非空且不是forward节点,就对这个节点加锁,这样就保证了线程安全。尽管这个有一些影响效率,可是仍是会比hashTable的synchronized要好得多。

总体流程就是首先定义不容许key或value为null的状况放入  对于每个放入的值,首先利用spread方法对key的hashcode进行一次hash计算,由此来肯定这个值在table中的位置。

若是这个位置是空的,那么直接放入,并且不须要加锁操做。

    若是这个位置存在结点,说明发生了hash碰撞,首先判断这个节点的类型。若是是链表节点(fh>0),则获得的结点就是hash值相同的节点组成的链表的头节点。须要依次向后遍历肯定这个新加入的值所在位置。若是遇到hash值与key值都与新加入节点是一致的状况,则只须要更新value值便可。不然依次向后遍历,直到链表尾插入这个结点。  若是加入这个节点之后链表长度大于8,就把这个链表转换成红黑树。若是这个节点的类型已是树节点的话,直接调用树节点的插入方法进行插入新的值。

[java]  view plain  copy
  1. public V put(K key, V value) {  
  2.         return putVal(key, value, false);  
  3.     }  
  4.   
  5.     /** Implementation for put and putIfAbsent */  
  6.     final V putVal(K key, V value, boolean onlyIfAbsent) {  
  7.             //不容许 key或value为null  
  8.         if (key == null || value == nullthrow new NullPointerException();  
  9.         //计算hash值  
  10.         int hash = spread(key.hashCode());  
  11.         int binCount = 0;  
  12.         //死循环 什么时候插入成功 什么时候跳出  
  13.         for (Node<K,V>[] tab = table;;) {  
  14.             Node<K,V> f; int n, i, fh;  
  15.             //若是table为空的话,初始化table  
  16.             if (tab == null || (n = tab.length) == 0)  
  17.                 tab = initTable();  
  18.             //根据hash值计算出在table里面的位置   
  19.             else if ((f = tabAt(tab, i = (n - 1) & hash)) == null) {  
  20.                 //若是这个位置没有值 ,直接放进去,不须要加锁  
  21.                 if (casTabAt(tab, i, null,  
  22.                              new Node<K,V>(hash, key, value, null)))  
  23.                     break;                   // no lock when adding to empty bin  
  24.             }  
  25.             //当遇到表链接点时,须要进行整合表的操做  
  26.             else if ((fh = f.hash) == MOVED)  
  27.                 tab = helpTransfer(tab, f);  
  28.             else {  
  29.                 V oldVal = null;  
  30.                 //结点上锁  这里的结点能够理解为hash值相同组成的链表的头结点  
  31.                 synchronized (f) {  
  32.                     if (tabAt(tab, i) == f) {  
  33.                         //fh〉0 说明这个节点是一个链表的节点 不是树的节点  
  34.                         if (fh >= 0) {  
  35.                             binCount = 1;  
  36.                             //在这里遍历链表全部的结点  
  37.                             for (Node<K,V> e = f;; ++binCount) {  
  38.                                 K ek;  
  39.                                 //若是hash值和key值相同  则修改对应结点的value值  
  40.                                 if (e.hash == hash &&  
  41.                                     ((ek = e.key) == key ||  
  42.                                      (ek != null && key.equals(ek)))) {  
  43.                                     oldVal = e.val;  
  44.                                     if (!onlyIfAbsent)  
  45.                                         e.val = value;  
  46.                                     break;  
  47.                                 }  
  48.                                 Node<K,V> pred = e;  
  49.                                 //若是遍历到了最后一个结点,那么就证实新的节点须要插入 就把它插入在链表尾部  
  50.                                 if ((e = e.next) == null) {  
  51.                                     pred.next = new Node<K,V>(hash, key,  
  52.                                                               value, null);  
  53.                                     break;  
  54.                                 }  
  55.                             }  
  56.                         }  
  57.                         //若是这个节点是树节点,就按照树的方式插入值  
  58.                         else if (f instanceof TreeBin) {  
  59.                             Node<K,V> p;  
  60.                             binCount = 2;  
  61.                             if ((p = ((TreeBin<K,V>)f).putTreeVal(hash, key,  
  62.                                                            value)) != null) {  
  63.                                 oldVal = p.val;  
  64.                                 if (!onlyIfAbsent)  
  65.                                     p.val = value;  
  66.                             }  
  67.                         }  
  68.                     }  
  69.                 }  
  70.                 if (binCount != 0) {  
  71.                     //若是链表长度已经达到临界值8 就须要把链表转换为树结构  
  72.                     if (binCount >= TREEIFY_THRESHOLD)  
  73.                         treeifyBin(tab, i);  
  74.                     if (oldVal != null)  
  75.                         return oldVal;  
  76.                     break;  
  77.                 }  
  78.             }  
  79.         }  
  80.         //将当前ConcurrentHashMap的元素数量+1  
  81.         addCount(1L, binCount);  
  82.         return null;  
  83.     }  
  84.       
  85.       


6.1 helpTransfer方法

这是一个协助扩容的方法。这个方法被调用的时候,当前ConcurrentHashMap必定已经有了nextTable对象,首先拿到这个nextTable对象,调用transfer方法。回看上面的transfer方法能够看到,当本线程进入扩容方法的时候会直接进入复制阶段。

[java]  view plain  copy
  1. /** 
  2.     * Helps transfer if a resize is in progress. 
  3.     */  
  4.    final Node<K,V>[] helpTransfer(Node<K,V>[] tab, Node<K,V> f) {  
  5.        Node<K,V>[] nextTab; int sc;  
  6.        if (tab != null && (f instanceof ForwardingNode) &&  
  7.            (nextTab = ((ForwardingNode<K,V>)f).nextTable) != null) {  
  8.            int rs = resizeStamp(tab.length);//计算一个操做校验码  
  9.            while (nextTab == nextTable && table == tab &&  
  10.                   (sc = sizeCtl) < 0) {  
  11.                if ((sc >>> RESIZE_STAMP_SHIFT) != rs || sc == rs + 1 ||  
  12.                    sc == rs + MAX_RESIZERS || transferIndex <= 0)  
  13.                    break;  
  14.                if (U.compareAndSwapInt(this, SIZECTL, sc, sc + 1)) {  
  15.                    transfer(tab, nextTab);  
  16.                    break;  
  17.                }  
  18.            }  
  19.            return nextTab;  
  20.        }  
  21.        return table;  
  22.    }  

6.2 treeifyBin方法

这个方法用于将过长的链表转换为TreeBin对象。可是他并非直接转换,而是进行一次容量判断,若是容量没有达到转换的要求,直接进行扩容操做并返回;若是知足条件才链表的结构抓换为TreeBin ,这与HashMap不一样的是,它并无把TreeNode直接放入红黑树,而是利用了TreeBin这个小容器来封装全部的TreeNode.

[java]  view plain  copy
  1. private final void treeifyBin(Node<K,V>[] tab, int index) {  
  2.         Node<K,V> b; int n, sc;  
  3.         if (tab != null) {  
  4.             if ((n = tab.length) < MIN_TREEIFY_CAPACITY)//若是table.length<64 就扩大一倍 返回  
  5.                 tryPresize(n << 1);  
  6.             else if ((b = tabAt(tab, index)) != null && b.hash >= 0) {  
  7.                 synchronized (b) {  
  8.                     if (tabAt(tab, index) == b) {  
  9.                         TreeNode<K,V> hd = null, tl = null;  
  10.                         //构造了一个TreeBin对象 把全部Node节点包装成TreeNode放进去  
  11.                         for (Node<K,V> e = b; e != null; e = e.next) {  
  12.                             TreeNode<K,V> p =  
  13.                                 new TreeNode<K,V>(e.hash, e.key, e.val,  
  14.                                                   nullnull);//这里只是利用了TreeNode封装 而没有利用TreeNode的next域和parent域  
  15.                             if ((p.prev = tl) == null)  
  16.                                 hd = p;  
  17.                             else  
  18.                                 tl.next = p;  
  19.                             tl = p;  
  20.                         }  
  21.                         //在原来index的位置 用TreeBin替换掉原来的Node对象  
  22.                         setTabAt(tab, index, new TreeBin<K,V>(hd));  
  23.                     }  
  24.                 }  
  25.             }  
  26.         }  
  27.     }  

7 get方法

get方法比较简单,给定一个key来肯定value的时候,必须知足两个条件  key相同  hash值相同,对于节点可能在链表或树上的状况,须要分别去查找.

[java]  view plain  copy
  1. public V get(Object key) {  
  2.         Node<K,V>[] tab; Node<K,V> e, p; int n, eh; K ek;  
  3.         //计算hash值  
  4.         int h = spread(key.hashCode());  
  5.         //根据hash值肯定节点位置  
  6.         if ((tab = table) != null && (n = tab.length) > 0 &&  
  7.             (e = tabAt(tab, (n - 1) & h)) != null) {  
  8.             //若是搜索到的节点key与传入的key相同且不为null,直接返回这个节点    
  9.             if ((eh = e.hash) == h) {  
  10.                 if ((ek = e.key) == key || (ek != null && key.equals(ek)))  
  11.                     return e.val;  
  12.             }  
  13.             //若是eh<0 说明这个节点在树上 直接寻找  
  14.             else if (eh < 0)  
  15.                 return (p = e.find(h, key)) != null ? p.val : null;  
  16.              //不然遍历链表 找到对应的值并返回  
  17.             while ((e = e.next) != null) {  
  18.                 if (e.hash == h &&  
  19.                     ((ek = e.key) == key || (ek != null && key.equals(ek))))  
  20.                     return e.val;  
  21.             }  
  22.         }  
  23.         return null;  
  24.     }  

8 Size相关的方法

对于ConcurrentHashMap来讲,这个table里到底装了多少东西实际上是个不肯定的数量,由于不可能在调用size()方法的时候像GC的“stop the world”同样让其余线程都停下来让你去统计,所以只能说这个数量是个估计值。对于这个估计值,ConcurrentHashMap也是大费周章才计算出来的。

8.1 辅助定义

为了统计元素个数,ConcurrentHashMap定义了一些变量和一个内部类

[java]  view plain  copy
  1. /** 
  2.      * A padded cell for distributing counts.  Adapted from LongAdder 
  3.      * and Striped64.  See their internal docs for explanation. 
  4.      */  
  5.     @sun.misc.Contended static final class CounterCell {  
  6.         volatile long value;  
  7.         CounterCell(long x) { value = x; }  
  8.     }  
  9.       
  10.   /******************************************/    
  11.       
  12.     /** 
  13.      * 实际上保存的是hashmap中的元素个数  利用CAS锁进行更新 
  14.      但它并不用返回当前hashmap的元素个数  
  15.       
  16.      */  
  17.     private transient volatile long baseCount;  
  18.     /** 
  19.      * Spinlock (locked via CAS) used when resizing and/or creating CounterCells. 
  20.      */  
  21.     private transient volatile int cellsBusy;  
  22.   
  23.     /** 
  24.      * Table of counter cells. When non-null, size is a power of 2. 
  25.      */  
  26.     private transient volatile CounterCell[] counterCells;  

8.2 mappingCount与Size方法

mappingCount与size方法的相似  从Java工程师给出的注释来看,应该使用mappingCount代替size方法 两个方法都没有直接返回basecount 而是统计一次这个值,而这个值其实也是一个大概的数值,所以可能在统计的时候有其余线程正在执行插入或删除操做。

[java]  view plain  copy
  1. public int size() {  
  2.         long n = sumCount();  
  3.         return ((n < 0L) ? 0 :  
  4.                 (n > (long)Integer.MAX_VALUE) ? Integer.MAX_VALUE :  
  5.                 (int)n);  
  6.     }  
  7.      /** 
  8.      * Returns the number of mappings. This method should be used 
  9.      * instead of {@link #size} because a ConcurrentHashMap may 
  10.      * contain more mappings than can be represented as an int. The 
  11.      * value returned is an estimate; the actual count may differ if 
  12.      * there are concurrent insertions or removals. 
  13.      * 
  14.      * @return the number of mappings 
  15.      * @since 1.8 
  16.      */  
  17.     public long mappingCount() {  
  18.         long n = sumCount();  
  19.         return (n < 0L) ? 0L : n; // ignore transient negative values  
  20.     }  
  21.       
  22.      final long sumCount() {  
  23.         CounterCell[] as = counterCells; CounterCell a;  
  24.         long sum = baseCount;  
  25.         if (as != null) {  
  26.             for (int i = 0; i < as.length; ++i) {  
  27.                 if ((a = as[i]) != null)  
  28.                     sum += a.value;//全部counter的值求和  
  29.             }  
  30.         }  
  31.         return sum;  
  32.     }  

8.3 addCount方法

在put方法结尾处调用了addCount方法,把当前ConcurrentHashMap的元素个数+1这个方法一共作了两件事,更新baseCount的值,检测是否进行扩容。

[java]  view plain  copy
  1. private final void addCount(long x, int check) {  
  2.         CounterCell[] as; long b, s;  
  3.         //利用CAS方法更新baseCount的值   
  4.         if ((as = counterCells) != null ||  
  5.             !U.compareAndSwapLong(this, BASECOUNT, b = baseCount, s = b + x)) {  
  6.             CounterCell a; long v; int m;  
  7.             boolean uncontended = true;  
  8.             if (as == null || (m = as.length - 1) < 0 ||  
  9.                 (a = as[ThreadLocalRandom.getProbe() & m]) == null ||  
  10.                 !(uncontended =  
  11.                   U.compareAndSwapLong(a, CELLVALUE, v = a.value, v + x))) {  
  12.                 fullAddCount(x, uncontended);  
  13.                 return;  
  14.             }  
  15.             if (check <= 1)  
  16.                 return;  
  17.             s = sumCount();  
  18.         }  
  19.         //若是check值大于等于0 则须要检验是否须要进行扩容操做  
  20.         if (check >= 0) {  
  21.             Node<K,V>[] tab, nt; int n, sc;  
  22.             while (s >= (long)(sc = sizeCtl) && (tab = table) != null &&  
  23.                    (n = tab.length) < MAXIMUM_CAPACITY) {  
  24.                 int rs = resizeStamp(n);  
  25.                 //  
  26.                 if (sc < 0) {  
  27.                     if ((sc >>> RESIZE_STAMP_SHIFT) != rs || sc == rs + 1 ||  
  28.                         sc == rs + MAX_RESIZERS || (nt = nextTable) == null ||  
  29.                         transferIndex <= 0)  
  30.                         break;  
  31.                      //若是已经有其余线程在执行扩容操做  
  32.                     if (U.compareAndSwapInt(this, SIZECTL, sc, sc + 1))  
  33.                         transfer(tab, nt);  
  34.                 }  
  35.                 //当前线程是惟一的或是第一个发起扩容的线程  此时nextTable=null  
  36.                 else if (U.compareAndSwapInt(this, SIZECTL, sc,  
  37.                                              (rs << RESIZE_STAMP_SHIFT) + 2))  
  38.                     transfer(tab, null);  
  39.                 s = sumCount();  
  40.             }  
  41.         }  
  42.     }  

CAS

CAS:Compare and Swap, 翻译成比较并交换。 

java.util.concurrent包中借助CAS实现了区别于synchronouse同步锁的一种乐观锁。

 

本文先从CAS的应用提及,再深刻原理解析。

 

CAS应用

CAS有3个操做数,内存值V,旧的预期值A,要修改的新值B。当且仅当预期值A和内存值V相同时,将内存值V修改成B,不然什么都不作。

 

非阻塞算法 (nonblocking algorithms)

一个线程的失败或者挂起不该该影响其余线程的失败或挂起的算法。

现代的CPU提供了特殊的指令,能够自动更新共享数据,并且可以检测到其余线程的干扰,而 compareAndSet() 就用这些代替了锁定。

拿出AtomicInteger来研究在没有锁的状况下是如何作到数据正确性的。

private volatile int value;

首先毫无觉得,在没有锁的机制下可能须要借助volatile原语,保证线程间的数据是可见的(共享的)。

这样才获取变量的值的时候才能直接读取。

public final int get() {
        return value;
    }

而后来看看++i是怎么作到的。

public final int incrementAndGet() {
    for (;;) {
        int current = get();
        int next = current + 1;
        if (compareAndSet(current, next))
            return next;
    }
}

在这里采用了CAS操做,每次从内存中读取数据而后将此数据和+1后的结果进行CAS操做,若是成功就返回结果,不然重试直到成功为止。

而compareAndSet利用JNI来完成CPU指令的操做。

public final boolean compareAndSet(int expect, int update) {   
    return unsafe.compareAndSwapInt(this, valueOffset, expect, update);
    }

总体的过程就是这样子的,利用CPU的CAS指令,同时借助JNI来完成Java的非阻塞算法。其它原子操做都是利用相似的特性完成的。

 

其中

unsafe.compareAndSwapInt(this, valueOffset, expect, update);

相似:

if (this == expect) {

  this = update

 return true;

} else {

return false;

}

 

那么问题就来了,成功过程当中须要2个步骤:比较this == expect,替换this = update,compareAndSwapInt如何这两个步骤的原子性呢? 参考CAS的原理。

 

CAS原理

 CAS经过调用JNI的代码实现的。JNI:Java Native Interface为JAVA本地调用,容许java调用其余语言。

而compareAndSwapInt就是借助C来调用CPU底层指令实现的。

下面从分析比较经常使用的CPU(intel x86)来解释CAS的实现原理。

 下面是sun.misc.Unsafe类的compareAndSwapInt()方法的源代码:

public final native boolean compareAndSwapInt(Object o, long offset,
                                              int expected,
                                              int x);

 

能够看到这是个本地方法调用。这个本地方法在openjdk中依次调用的c++代码为:unsafe.cpp,atomic.cpp和atomicwindowsx86.inline.hpp。这个本地方法的最终实如今openjdk的以下位置:openjdk-7-fcs-src-b147-27jun2011\openjdk\hotspot\src\oscpu\windowsx86\vm\ atomicwindowsx86.inline.hpp(对应于windows操做系统,X86处理器)。下面是对应于intel x86处理器的源代码的片断:

 

// Adding a lock prefix to an instruction on MP machine
// VC++ doesn't like the lock prefix to be on a single line
// so we can't insert a label after the lock prefix.
// By emitting a lock prefix, we can define a label after it.
#define LOCK_IF_MP(mp) __asm cmp mp, 0  \
                       __asm je L0      \
                       __asm _emit 0xF0 \
                       __asm L0:

inline jint     Atomic::cmpxchg    (jint     exchange_value, volatile jint*     dest, jint     compare_value) {
  // alternative for InterlockedCompareExchange
  int mp = os::is_MP();
  __asm {
    mov edx, dest
    mov ecx, exchange_value
    mov eax, compare_value
    LOCK_IF_MP(mp)
    cmpxchg dword ptr [edx], ecx
  }
}

如上面源代码所示,程序会根据当前处理器的类型来决定是否为cmpxchg指令添加lock前缀。若是程序是在多处理器上运行,就为cmpxchg指令加上lock前缀(lock cmpxchg)。反之,若是程序是在单处理器上运行,就省略lock前缀(单处理器自身会维护单处理器内的顺序一致性,不须要lock前缀提供的内存屏障效果)。

 

 intel的手册对lock前缀的说明以下:

  1. 确保对内存的读-改-写操做原子执行。在Pentium及Pentium以前的处理器中,带有lock前缀的指令在执行期间会锁住总线,使得其余处理器暂时没法经过总线访问内存。很显然,这会带来昂贵的开销。从Pentium 4,Intel Xeon及P6处理器开始,intel在原有总线锁的基础上作了一个颇有意义的优化:若是要访问的内存区域(area of memory)在lock前缀指令执行期间已经在处理器内部的缓存中被锁定(即包含该内存区域的缓存行当前处于独占或以修改状态),而且该内存区域被彻底包含在单个缓存行(cache line)中,那么处理器将直接执行该指令。因为在指令执行期间该缓存行会一直被锁定,其它处理器没法读/写该指令要访问的内存区域,所以能保证指令执行的原子性。这个操做过程叫作缓存锁定(cache locking),缓存锁定将大大下降lock前缀指令的执行开销,可是当多处理器之间的竞争程度很高或者指令访问的内存地址未对齐时,仍然会锁住总线。
  2. 禁止该指令与以前和以后的读和写指令重排序。
  3. 把写缓冲区中的全部数据刷新到内存中。

备注知识:

关于CPU的锁有以下3种:

  3.1 处理器自动保证基本内存操做的原子性

  首先处理器会自动保证基本的内存操做的原子性。处理器保证从系统内存当中读取或者写入一个字节是原子的,意思是当一个处理器读取一个字节时,其余处理器不能访问这个字节的内存地址。奔腾6和最新的处理器能自动保证单处理器对同一个缓存行里进行16/32/64位的操做是原子的,可是复杂的内存操做处理器不能自动保证其原子性,好比跨总线宽度,跨多个缓存行,跨页表的访问。可是处理器提供总线锁定和缓存锁定两个机制来保证复杂内存操做的原子性。 

  3.2 使用总线锁保证原子性

  第一个机制是经过总线锁保证原子性。若是多个处理器同时对共享变量进行读改写(i++就是经典的读改写操做)操做,那么共享变量就会被多个处理器同时进行操做,这样读改写操做就不是原子的,操做完以后共享变量的值会和指望的不一致,举个例子:若是i=1,咱们进行两次i++操做,咱们指望的结果是3,可是有可能结果是2。以下图

 

 

  缘由是有可能多个处理器同时从各自的缓存中读取变量i,分别进行加一操做,而后分别写入系统内存当中。那么想要保证读改写共享变量的操做是原子的,就必须保证CPU1读改写共享变量的时候,CPU2不能操做缓存了该共享变量内存地址的缓存。

  处理器使用总线锁就是来解决这个问题的。所谓总线锁就是使用处理器提供的一个LOCK#信号,当一个处理器在总线上输出此信号时,其余处理器的请求将被阻塞住,那么该处理器能够独占使用共享内存。

  3.3 使用缓存锁保证原子性

  第二个机制是经过缓存锁定保证原子性。在同一时刻咱们只需保证对某个内存地址的操做是原子性便可,但总线锁定把CPU和内存之间通讯锁住了,这使得锁按期间,其余处理器不能操做其余内存地址的数据,因此总线锁定的开销比较大,最近的处理器在某些场合下使用缓存锁定代替总线锁定来进行优化。

  频繁使用的内存会缓存在处理器的L1,L2和L3高速缓存里,那么原子操做就能够直接在处理器内部缓存中进行,并不须要声明总线锁,在奔腾6和最近的处理器中可使用“缓存锁定”的方式来实现复杂的原子性。所谓“缓存锁定”就是若是缓存在处理器缓存行中内存区域在LOCK操做期间被锁定,当它执行锁操做回写内存时,处理器不在总线上声言LOCK#信号,而是修改内部的内存地址,并容许它的缓存一致性机制来保证操做的原子性,由于缓存一致性机制会阻止同时修改被两个以上处理器缓存的内存区域数据,当其余处理器回写已被锁定的缓存行的数据时会起缓存行无效,在例1中,当CPU1修改缓存行中的i时使用缓存锁定,那么CPU2就不能同时缓存了i的缓存行。

  可是有两种状况下处理器不会使用缓存锁定。第一种状况是:当操做的数据不能被缓存在处理器内部,或操做的数据跨多个缓存行(cache line),则处理器会调用总线锁定。第二种状况是:有些处理器不支持缓存锁定。对于Inter486和奔腾处理器,就算锁定的内存区域在处理器的缓存行中也会调用总线锁定。

  以上两个机制咱们能够经过Inter处理器提供了不少LOCK前缀的指令来实现。好比位测试和修改指令BTS,BTR,BTC,交换指令XADD,CMPXCHG和其余一些操做数和逻辑指令,好比ADD(加),OR(或)等,被这些指令操做的内存区域就会加锁,致使其余处理器不能同时访问它。

 

CAS缺点

 CAS虽然很高效的解决原子操做,可是CAS仍然存在三大问题。ABA问题,循环时间长开销大和只能保证一个共享变量的原子操做

1.  ABA问题。由于CAS须要在操做值的时候检查下值有没有发生变化,若是没有发生变化则更新,可是若是一个值原来是A,变成了B,又变成了A,那么使用CAS进行检查时会发现它的值没有发生变化,可是实际上却变化了。ABA问题的解决思路就是使用版本号。在变量前面追加上版本号,每次变量更新的时候把版本号加一,那么A-B-A 就会变成1A-2B-3A。

从Java1.5开始JDK的atomic包里提供了一个类AtomicStampedReference来解决ABA问题。这个类的compareAndSet方法做用是首先检查当前引用是否等于预期引用,而且当前标志是否等于预期标志,若是所有相等,则以原子方式将该引用和该标志的值设置为给定的更新值。

关于ABA问题参考文档: http://blog.hesey.net/2011/09/resolve-aba-by-atomicstampedreference.html

2. 循环时间长开销大。自旋CAS若是长时间不成功,会给CPU带来很是大的执行开销。若是JVM能支持处理器提供的pause指令那么效率会有必定的提高,pause指令有两个做用,第一它能够延迟流水线执行指令(de-pipeline),使CPU不会消耗过多的执行资源,延迟的时间取决于具体实现的版本,在一些处理器上延迟时间是零。第二它能够避免在退出循环的时候因内存顺序冲突(memory order violation)而引发CPU流水线被清空(CPU pipeline flush),从而提升CPU的执行效率。

 

3. 只能保证一个共享变量的原子操做。当对一个共享变量执行操做时,咱们可使用循环CAS的方式来保证原子操做,可是对多个共享变量操做时,循环CAS就没法保证操做的原子性,这个时候就能够用锁,或者有一个取巧的办法,就是把多个共享变量合并成一个共享变量来操做。好比有两个共享变量i=2,j=a,合并一下ij=2a,而后用CAS来操做ij。从Java1.5开始JDK提供了AtomicReference类来保证引用对象之间的原子性,你能够把多个变量放在一个对象里来进行CAS操做。

 

 

concurrent包的实现

因为java的CAS同时具备 volatile 读和volatile写的内存语义,所以Java线程之间的通讯如今有了下面四种方式:

  1. A线程写volatile变量,随后B线程读这个volatile变量。
  2. A线程写volatile变量,随后B线程用CAS更新这个volatile变量。
  3. A线程用CAS更新一个volatile变量,随后B线程用CAS更新这个volatile变量。
  4. A线程用CAS更新一个volatile变量,随后B线程读这个volatile变量。

Java的CAS会使用现代处理器上提供的高效机器级别原子指令,这些原子指令以原子方式对内存执行读-改-写操做,这是在多处理器中实现同步的关键(从本质上来讲,可以支持原子性读-改-写指令的计算机器,是顺序计算图灵机的异步等价机器,所以任何现代的多处理器都会去支持某种能对内存执行原子性读-改-写操做的原子指令)。同时,volatile变量的读/写和CAS能够实现线程之间的通讯。把这些特性整合在一块儿,就造成了整个concurrent包得以实现的基石。若是咱们仔细分析concurrent包的源代码实现,会发现一个通用化的实现模式:

  1. 首先,声明共享变量为volatile;
  2. 而后,使用CAS的原子条件更新来实现线程之间的同步;
  3. 同时,配合以volatile的读/写和CAS所具备的volatile读和写的内存语义来实现线程之间的通讯。

AQS,非阻塞数据结构和原子变量类(java.util.concurrent.atomic包中的类),这些concurrent包中的基础类都是使用这种模式来实现的,而concurrent包中的高层类又是依赖于这些基础类来实现的。从总体来看,concurrent包的实现示意图以下:


非阻塞同步算法与CAS(Compare and Swap)无锁算法

锁(lock)的代价

锁是用来作并发最简单的方式,固然其代价也是最高的。内核态的锁的时候须要操做系统进行一次上下文切换,加锁、释放锁会致使比较多的上下文切换和调度延时,等待锁的线程会被挂起直至锁释放。在上下文切换的时候,cpu以前缓存的指令和数据都将失效,对性能有很大的损失。操做系统对多线程的锁进行判断就像两姐妹在为一个玩具在争吵,而后操做系统就是能决定他们谁能拿到玩具的父母,这是很慢的。用户态的锁虽然避免了这些问题,可是其实它们只是在没有真实的竞争时才有效。

Java在JDK1.5以前都是靠synchronized关键字保证同步的,这种经过使用一致的锁定协议来协调对共享状态的访问,能够确保不管哪一个线程持有守护变量的锁,都采用独占的方式来访问这些变量,若是出现多个线程同时访问锁,那第一些线线程将被挂起,当线程恢复执行时,必须等待其它线程执行完他们的时间片之后才能被调度执行,在挂起和恢复执行过程当中存在着很大的开销。锁还存在着其它一些缺点,当一个线程正在等待锁时,它不能作任何事。若是一个线程在持有锁的状况下被延迟执行,那么全部须要这个锁的线程都没法执行下去。若是被阻塞的线程优先级高,而持有锁的线程优先级低,将会致使优先级反转(Priority Inversion)。

乐观锁与悲观锁

独占锁是一种悲观锁,synchronized就是一种独占锁,它假设最坏的状况,而且只有在确保其它线程不会形成干扰的状况下执行,会致使其它全部须要锁的线程挂起,等待持有锁的线程释放锁。而另外一个更加有效的锁就是乐观锁。所谓乐观锁就是,每次不加锁而是假设没有冲突而去完成某项操做,若是由于冲突失败就重试,直到成功为止。

volatile的问题

与锁相比,volatile变量是一和更轻量级的同步机制,由于在使用这些变量时不会发生上下文切换和线程调度等操做,可是volatile变量也存在一些局限:不能用于构建原子的复合操做,所以当一个变量依赖旧值时就不能使用volatile变量。(参考:谈谈volatiile

volatile只能保证变量对各个线程的可见性,但不能保证原子性。为何?见个人另一篇文章:《为何volatile不能保证原子性而Atomic能够?

Java中的原子操做( atomic operations)

原子操做指的是在一步以内就完成并且不能被中断。原子操做在多线程环境中是线程安全的,无需考虑同步的问题。在java中,下列操做是原子操做:

  • all assignments of primitive types except for long and double
  • all assignments of references
  • all operations of java.concurrent.Atomic* classes
  • all assignments to volatile longs and doubles

问题来了,为何long型赋值不是原子操做呢?例如:

1
long  foo = 65465498L;

实时上java会分两步写入这个long变量,先写32位,再写后32位。这样就线程不安全了。若是改为下面的就线程安全了:

1
private  volatile  long  foo;

由于volatile内部已经作了synchronized.

CAS无锁算法

要实现无锁(lock-free)的非阻塞算法有多种实现方法,其中CAS(比较与交换,Compare and swap)是一种有名的无锁算法。CAS, CPU指令,在大多数处理器架构,包括IA3二、Space中采用的都是CAS指令,CAS的语义是“我认为V的值应该为A,若是是,那么将V的值更新为B,不然不修改并告诉V的值实际为多少”,CAS是项乐观锁技术,当多个线程尝试使用CAS同时更新同一个变量时,只有其中一个线程能更新变量的值,而其它线程都失败,失败的线程并不会被挂起,而是被告知此次竞争中失败,并能够再次尝试。CAS有3个操做数,内存值V,旧的预期值A,要修改的新值B。当且仅当预期值A和内存值V相同时,将内存值V修改成B,不然什么都不作。CAS无锁算法的C实现以下:

1
2
3
4
5
6
7
8
9
int  compare_and_swap ( int * reg, int  oldval, int  newval)
{
   ATOMIC();
   int  old_reg_val = *reg;
   if  (old_reg_val == oldval)
      *reg = newval;
   END_ATOMIC();
   return  old_reg_val;
}

CAS(乐观锁算法)的基本假设前提

CAS比较与交换的伪代码能够表示为:

do{   
       备份旧数据;  
       基于旧数据构造新数据;  
}while(!CAS( 内存地址,备份的旧数据,新数据 ))  

ConcurrencyCAS 

(上图的解释:CPU去更新一个值,但若是想改的值再也不是原来的值,操做就失败,由于很明显,有其它操做先改变了这个值。)

就是指当二者进行比较时,若是相等,则证实共享数据没有被修改,替换成新值,而后继续往下运行;若是不相等,说明共享数据已经被修改,放弃已经所作的操做,而后从新执行刚才的操做。容易看出 CAS 操做是基于共享数据不会被修改的假设,采用了相似于数据库的 commit-retry 的模式。当同步冲突出现的机会不多时,这种假设能带来较大的性能提高。

CAS的开销(CPU Cache Miss problem)

前面说过了,CAS(比较并交换)是CPU指令级的操做,只有一步原子操做,因此很是快。并且CAS避免了请求操做系统来裁定锁的问题,不用麻烦操做系统,直接在CPU内部就搞定了。但CAS就没有开销了吗?不!有cache miss的状况。这个问题比较复杂,首先须要了解CPU的硬件体系结构:

2014-02-19_11h35_45

上图能够看到一个8核CPU计算机系统,每一个CPU有cache(CPU内部的高速缓存,寄存器),管芯内还带有一个互联模块,使管芯内的两个核能够互相通讯。在图中央的系统互联模块可让四个管芯相互通讯,而且将管芯与主存链接起来。数据以“缓存线”为单位在系统中传输,“缓存线”对应于内存中一个 2 的幂大小的字节块,大小一般为 32 到 256 字节之间。当 CPU 从内存中读取一个变量到它的寄存器中时,必须首先将包含了该变量的缓存线读取到 CPU 高速缓存。一样地,CPU 将寄存器中的一个值存储到内存时,不只必须将包含了该值的缓存线读到 CPU 高速缓存,还必须确保没有其余 CPU 拥有该缓存线的拷贝。

好比,若是 CPU0 在对一个变量执行“比较并交换”(CAS)操做,而该变量所在的缓存线在 CPU7 的高速缓存中,就会发生如下通过简化的事件序列:

  • CPU0 检查本地高速缓存,没有找到缓存线。
  • 请求被转发到 CPU0 和 CPU1 的互联模块,检查 CPU1 的本地高速缓存,没有找到缓存线。
  • 请求被转发到系统互联模块,检查其余三个管芯,得知缓存线被 CPU6和 CPU7 所在的管芯持有。
  • 请求被转发到 CPU6 和 CPU7 的互联模块,检查这两个 CPU 的高速缓存,在 CPU7 的高速缓存中找到缓存线。
  • CPU7 将缓存线发送给所属的互联模块,而且刷新本身高速缓存中的缓存线。
  • CPU6 和 CPU7 的互联模块将缓存线发送给系统互联模块。
  • 系统互联模块将缓存线发送给 CPU0 和 CPU1 的互联模块。
  • CPU0 和 CPU1 的互联模块将缓存线发送给 CPU0 的高速缓存。
  • CPU0 如今能够对高速缓存中的变量执行 CAS 操做了

以上是刷新不一样CPU缓存的开销。最好状况下的 CAS 操做消耗大概 40 纳秒,超过 60 个时钟周期。这里的“最好状况”是指对某一个变量执行 CAS 操做的 CPU 正好是最后一个操做该变量的CPU,因此对应的缓存线已经在 CPU 的高速缓存中了,相似地,最好状况下的锁操做(一个“round trip 对”包括获取锁和随后的释放锁)消耗超过 60 纳秒,超过 100 个时钟周期。这里的“最好状况”意味着用于表示锁的数据结构已经在获取和释放锁的 CPU 所属的高速缓存中了。锁操做比 CAS 操做更加耗时,是因深刻理解并行编程 
为锁操做的数据结构中须要两个原子操做。缓存未命中消耗大概 140 纳秒,超过 200 个时钟周期。须要在存储新值时查询变量的旧值的 CAS 操做,消耗大概 300 纳秒,超过 500 个时钟周期。想一想这个,在执行一次 CAS 操做的时间里,CPU 能够执行 500 条普通指令。这代表了细粒度锁的局限性。

如下是cache miss cas 和lock的性能对比:

2014-02-19_11h43_23

JVM对CAS的支持:AtomicInt, AtomicLong.incrementAndGet()

在JDK1.5以前,若是不编写明确的代码就没法执行CAS操做,在JDK1.5中引入了底层的支持,在int、long和对象的引用等类型上都公开了CAS的操做,而且JVM把它们编译为底层硬件提供的最有效的方法,在运行CAS的平台上,运行时把它们编译为相应的机器指令,若是处理器/CPU不支持CAS指令,那么JVM将使用自旋锁。所以,值得注意的是,CAS解决方案与平台/编译器紧密相关(好比x86架构下其对应的汇编指令是lock cmpxchg,若是想要64Bit的交换,则应使用lock cmpxchg8b。在.NET中咱们可使用Interlocked.CompareExchange函数)

在原子类变量中,如java.util.concurrent.atomic中的AtomicXXX,都使用了这些底层的JVM支持为数字类型的引用类型提供一种高效的CAS操做,而在java.util.concurrent中的大多数类在实现时都直接或间接的使用了这些原子变量类。

Java 1.6中AtomicLong.incrementAndGet()的实现源码为:

因而可知,AtomicLong.incrementAndGet的实现用了乐观锁技术,调用了sun.misc.Unsafe类库里面的 CAS算法,用CPU指令来实现无锁自增。因此,AtomicLong.incrementAndGet的自增比用synchronized的锁效率倍增。

1
2
3
4
5
6
7
8
9
10
11
12
public  final  int  getAndIncrement() { 
         for  (;;) { 
             int  current = get(); 
             int  next = current + 1
             if  (compareAndSet(current, next)) 
                 return  current; 
        
   
public  final  boolean  compareAndSet( int  expect, int  update) { 
     return  unsafe.compareAndSwapInt( this , valueOffset, expect, update); 
}

下面是测试代码:能够看到用AtomicLong.incrementAndGet的性能比用synchronized高出几倍。

2014-02-12_14h56_39

CAS的例子:非阻塞堆栈

下面是比非阻塞自增稍微复杂一点的CAS的例子:非阻塞堆栈/ConcurrentStack 。ConcurrentStack 中的 push() 和pop() 操做在结构上与NonblockingCounter 上类似,只是作的工做有些冒险,但愿在 “提交” 工做的时候,底层假设没有失效。push() 方法观察当前最顶的节点,构建一个新节点放在堆栈上,而后,若是最顶端的节点在初始观察以后没有变化,那么就安装新节点。若是 CAS 失败,意味着另外一个线程已经修改了堆栈,那么过程就会从新开始。

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
public  class  ConcurrentStack<E> {
     AtomicReference<Node<E>> head = new  AtomicReference<Node<E>>();
     public  void  push(E item) {
         Node<E> newHead = new  Node<E>(item);
         Node<E> oldHead;
         do  {
             oldHead = head.get();
             newHead.next = oldHead;
         } while  (!head.compareAndSet(oldHead, newHead));
     }
     public  E pop() {
         Node<E> oldHead;
         Node<E> newHead;
         do  {
             oldHead = head.get();
             if  (oldHead == null )
                 return  null ;
             newHead = oldHead.next;
         } while  (!head.compareAndSet(oldHead,newHead));
         return  oldHead.item;
     }
     static  class  Node<E> {
         final  E item;
         Node<E> next;
         public  Node(E item) { this .item = item; }
     }
}

在轻度到中度的争用状况下,非阻塞算法的性能会超越阻塞算法,由于 CAS 的多数时间都在第一次尝试时就成功,而发生争用时的开销也不涉及线程挂起和上下文切换,只多了几个循环迭代。没有争用的 CAS 要比没有争用的锁便宜得多(这句话确定是真的,由于没有争用的锁涉及 CAS 加上额外的处理),而争用的 CAS 比争用的锁获取涉及更短的延迟。

在高度争用的状况下(即有多个线程不断争用一个内存位置的时候),基于锁的算法开始提供比非阻塞算法更好的吞吐率,由于当线程阻塞时,它就会中止争用,耐心地等候轮到本身,从而避免了进一步争用。可是,这么高的争用程度并不常见,由于多数时候,线程会把线程本地的计算与争用共享数据的操做分开,从而给其余线程使用共享数据的机会。

CAS的例子3:非阻塞链表

以上的示例(自增计数器和堆栈)都是很是简单的非阻塞算法,一旦掌握了在循环中使用 CAS,就能够容易地模仿它们。对于更复杂的数据结构,非阻塞算法要比这些简单示例复杂得多,由于修改链表、树或哈希表可能涉及对多个指针的更新。CAS 支持对单一指针的原子性条件更新,可是不支持两个以上的指针。因此,要构建一个非阻塞的链表、树或哈希表,须要找到一种方式,能够用 CAS 更新多个指针,同时不会让数据结构处于不一致的状态。

在链表的尾部插入元素,一般涉及对两个指针的更新:“尾” 指针老是指向列表中的最后一个元素,“下一个” 指针从过去的最后一个元素指向新插入的元素。由于须要更新两个指针,因此须要两个 CAS。在独立的 CAS 中更新两个指针带来了两个须要考虑的潜在问题:若是第一个 CAS 成功,而第二个 CAS 失败,会发生什么?若是其余线程在第一个和第二个 CAS 之间企图访问链表,会发生什么?

对于非复杂数据结构,构建非阻塞算法的 “技巧” 是确保数据结构总处于一致的状态(甚至包括在线程开始修改数据结构和它完成修改之间),还要确保其余线程不只可以判断出第一个线程已经完成了更新仍是处在更新的中途,还可以判断出若是第一个线程走向 AWOL,完成更新还须要什么操做。若是线程发现了处在更新中途的数据结构,它就能够 “帮助” 正在执行更新的线程完成更新,而后再进行本身的操做。当第一个线程回来试图完成本身的更新时,会发现再也不须要了,返回便可,由于 CAS 会检测到帮助线程的干预(在这种状况下,是建设性的干预)。

这种 “帮助邻居” 的要求,对于让数据结构免受单个线程失败的影响,是必需的。若是线程发现数据结构正处在被其余线程更新的中途,而后就等候其余线程完成更新,那么若是其余线程在操做中途失败,这个线程就可能永远等候下去。即便不出现故障,这种方式也会提供糟糕的性能,由于新到达的线程必须放弃处理器,致使上下文切换,或者等到本身的时间片过时(而这更糟)。

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
public  class  LinkedQueue <E> {
     private  static  class  Node <E> {
         final  E item;
         final  AtomicReference<Node<E>> next;
         Node(E item, Node<E> next) {
             this .item = item;
             this .next = new  AtomicReference<Node<E>>(next);
         }
     }
     private  AtomicReference<Node<E>> head
         = new  AtomicReference<Node<E>>( new  Node<E>( null , null ));
     private  AtomicReference<Node<E>> tail = head;
     public  boolean  put(E item) {
         Node<E> newNode = new  Node<E>(item, null );
         while  ( true ) {
             Node<E> curTail = tail.get();
             Node<E> residue = curTail.next.get();
             if  (curTail == tail.get()) {
                 if  (residue == null ) /* A */  {
                     if  (curTail.next.compareAndSet( null , newNode)) /* C */  {
                         tail.compareAndSet(curTail, newNode) /* D */  ;
                         return  true ;
                     }
                 } else  {
                     tail.compareAndSet(curTail, residue) /* B */ ;
                 }
             }
         }
     }
}

具体算法相见IBM Developerworks

Java的ConcurrentHashMap的实现原理

Java5中的ConcurrentHashMap,线程安全,设计巧妙,用桶粒度的锁,避免了put和get中对整个map的锁定,尤为在get中,只对一个HashEntry作锁定操做,性能提高是显而易见的。

8aea11a8-4184-3f1f-aba7-169aa5e0797a

具体实现中使用了锁分离机制,在这个帖子中有很是详细的讨论。这里有关于Java内存模型结合ConcurrentHashMap的分析。如下是JDK6的ConcurrentHashMap的源码:

Java的ConcurrentLinkedQueue实现方法

ConcurrentLinkedQueue也是一样使用了CAS指令,但其性能并不高由于太多CAS操做。其源码以下:

高并发环境下优化锁或无锁(lock-free)的设计思路

服务端编程的3大性能杀手:一、大量线程致使的线程切换开销。二、锁。三、非必要的内存拷贝。在高并发下,对于纯内存操做来讲,单线程是要比多线程快的, 能够比较一下多线程程序在压力测试下cpu的sy和ni百分比。高并发环境下要实现高吞吐量和线程安全,两个思路:一个是用优化的锁实现,一个是lock-free的无锁结构。但非阻塞算法要比基于锁的算法复杂得多。开发非阻塞算法是至关专业的训练,并且要证实算法的正确也极为困难,不只和具体的目标机器平台和编译器相关,并且须要复杂的技巧和严格的测试。虽然Lock-Free编程很是困难,可是它一般能够带来比基于锁编程更高的吞吐量。因此Lock-Free编程是大有前途的技术。它在线程停止、优先级倒置以及信号安全等方面都有着良好的表现。

  • 优化锁实现的例子:Java中的ConcurrentHashMap,设计巧妙,用桶粒度的锁和锁分离机制,避免了put和get中对整个map的锁定,尤为在get中,只对一个HashEntry作锁定操做,性能提高是显而易见的(详细分析见《探索 ConcurrentHashMap 高并发性的实现机制》)。
  • Lock-free无锁的例子:CAS(CPU的Compare-And-Swap指令)的利用和LMAX的disruptor无锁消息队列数据结构等。有兴趣了解LMAX的disruptor无锁消息队列数据结构的能够移步slideshare

disruptor无锁消息队列数据结构的类图和技术文档下载

2014-02-12_16h55_36

另外,在设计思路上除了尽可能减小资源争用之外,还能够借鉴nginx/node.js等单线程大循环的机制,用单线程或CPU数相同的线程开辟大的队列,并发的时候任务压入队列,线程轮询而后一个个顺序执行。因为每一个都采用异步I/O,没有阻塞线程。这个大队列可使用RabbitMQueue,或是JDK的同步队列(性能稍差),或是使用Disruptor无锁队列(Java)。任务处理能够所有放在内存(多级缓存、读写分离、ConcurrentHashMap、甚至分布式缓存Redis)中进行增删改查。最后用Quarz维护定时把缓存数据同步到DB中。固然,这只是中小型系统的思路,若是是大型分布式系统会很是复杂,须要分而治理,用SOA的思路,参考这篇文章的图。(注:Redis是单线程的纯内存数据库,单线程无需锁,而Memcache是多线程的带CAS算法,二者都使用epoll,no-blocking io)

png;base643f17317a5d7e7fe9

深刻JVM的OS的无锁非阻塞算法

若是深刻 JVM 和操做系统,会发现非阻塞算法无处不在。垃圾收集器使用非阻塞算法加快并发和平行的垃圾搜集;调度器使用非阻塞算法有效地调度线程和进程,实现内在锁。在 Mustang(Java 6.0)中,基于锁的SynchronousQueue 算法被新的非阻塞版本代替。不多有开发人员会直接使用 SynchronousQueue,可是经过Executors.newCachedThreadPool() 工厂构建的线程池用它做为工做队列。比较缓存线程池性能的对比测试显示,新的非阻塞同步队列实现提供了几乎是当前实现 3 倍的速度。在 Mustang 的后续版本(代码名称为 Dolphin)中,已经规划了进一步的改进。