IT博客汇
  • 首页
  • 精华
  • 技术
  • 设计
  • 资讯
  • 扯淡
  • 权利声明
  • 登录 注册

    jdk源码分析之ConcurrentHashMap

    summer发表于 2016-06-01 16:12:54
    love 0

    基本原理

    Hashtable使用synchronized锁住整张Hash表,锁的粒度太大导致Hashtable性能低下。ConcurrentHashMap允许多个修改操作并发进行,其关键在于使用了锁分离技术。它使用了多个锁来控制对hash表的不同部分进行的修改。ConcurrentHashMap内部使用段(Segment)来表示这些不同的部分,每个段其实就是一个小的hash table,它们有自己的锁。只要多个修改操作发生在不同的段上,它们就可以并发进行。

    如何定位段

    ConcurrentHashMap通过key的高位,将键值对分配到不同的段中
    ConcurrentHashMap有两个属性和一个方法用来定位段

    /**
    * Mask value for indexing into segments. The upper bits of a
    * key's hash code are used to choose the segment.
    */

    final int segmentMask;

    /**
    * Shift value for indexing within segments.
    */

    final int segmentShift;
    /**
    * Returns the segment that should be used for key with given hash
    * @param hash the hash code for the key
    * @return the segment
    */

    final Segment<K,V> segmentFor(int hash) {
    return segments[(hash >>> segmentShift) & segmentMask];
    }

    segmentFor方法传入key的hash值,返回该key对应的Segment
    段数组的定义如下:

    /**
    * The segments, each of which is a specialized hash table
    */

    final Segment<K,V>[] segments;

    将大的hash表拆分成小的hash表,每一个小的hash表为一个Segment段,每一个Segment段相当于一个同步的Hashtable,每个段加一把锁,不同的段加的是不同的锁,降低了锁的粒度,提高了锁的效率。

    Segment段的主要工作

    先贴出Segment的原文注释,然后给出简单翻译

    /**
    * Segments are specialized versions of hash tables. This
    * subclasses from ReentrantLock opportunistically, just to
    * simplify some locking and avoid separate construction.
    */
    static final class Segment<K,V> extends ReentrantLock implements Serializable {
    /*
    * Segments maintain a table of entry lists that are ALWAYS
    * kept in a consistent state, so can be read without locking.
    * Next fields of nodes are immutable (final). All list
    * additions are performed at the front of each bin. This
    * makes it easy to check changes, and also fast to traverse.
    * When nodes would otherwise be changed, new nodes are
    * created to replace them. This works well for hash tables
    * since the bin lists tend to be short. (The average length
    * is less than two for the default load factor threshold.)
    *
    * Read operations can thus proceed without locking, but rely
    * on selected uses of volatiles to ensure that completed
    * write operations performed by other threads are
    * noticed. For most purposes, the "count" field, tracking the
    * number of elements, serves as that volatile variable
    * ensuring visibility. This is convenient because this field
    * needs to be read in many read operations anyway:
    *
    * - All (unsynchronized) read operations must first read the
    * "count" field, and should not look at table entries if
    * it is 0.
    *
    * - All (synchronized) write operations should write to
    * the "count" field after structurally changing any bin.
    * The operations must not take any action that could even
    * momentarily cause a concurrent read operation to see
    * inconsistent data. This is made easier by the nature of
    * the read operations in Map. For example, no operation
    * can reveal that the table has grown but the threshold
    * has not yet been updated, so there are no atomicity
    * requirements for this with respect to reads.
    *
    * As a guide, all critical volatile reads and writes to the
    * count field are marked in code comments.
    */
    ...
    }

    段Segment是一个特殊的Hashtable,为了方便加锁操作,继承自ReentrantLock。
    段维护了一个始终保持一致状态的entry链表,保证读操作不需要加锁。
    next属性的作用是指向下一个entry,通过使用final修饰使用了不变(immutable)模式,不变模式保证了线程安全。next的不变性导致了next的指向在赋值后不能改变,因此entry链表添加数据只能在链表头部添加一个新的entry作为新的链表头,其next指向旧的链表头。删除数据的时候,因为entry的next指向不能改变,所以删除数据的时候就只能复制删除节点之前的所有节点并链接删除节点的下一个节点。
    读操作不加锁,通过volatile保持可见性,始终读取最新数据
    count属性表示段中entry数量的大小,volatile修饰保证可见
    所有的读操(非同步)作必须先读取count,count为0则不能读取entry链表
    所有写操作(同步)如果对entry链表做了结构性修改最后必须修改count,保证并发读的一致性。

    /**
    * The number of elements in this segment's region.
    */

    transient volatile int count;

    /**
    * Number of updates that alter the size of the table. This is
    * used during bulk-read methods to make sure they see a
    * consistent snapshot: If modCounts change during a traversal
    * of segments computing size or checking containsValue, then
    * we might have an inconsistent view of state so (usually)
    * must retry.
    */

    transient int modCount;

    count属性表示段中元素的数量
    modCount表示更改表大小的操作次数,modCount属性方便了并发读操作读取到一个一致的快照,如果modCount在需要遍历段的操作中(size方法计算所有段内元素数量的累加和与containsValue方法计算ConcurrenrHashMap是否包含某一value)被修改,表示看到了不一致的状态,需要重试

    HashEntry数据机构

    /**
    * Because the value field is volatile, not final, it is legal wrt
    * the Java Memory Model for an unsynchronized reader to see null
    * instead of initial value when read via a data race. Although a
    * reordering leading to this is not likely to ever actually
    * occur, the Segment.readValueUnderLock method is used as a
    * backup in case a null (pre-initialized) value is ever seen in
    * an unsynchronized access method.
    */
    static final class HashEntry<K,V> {
    final K key;
    final int hash;
    volatile V value;
    final HashEntry<K,V> next;
    ...
    }

    除了value属性不是final的,其他三个属性key、hash和next都是final的,通过构造函数赋值后就不能再修改
    因为value是一个非final的volatile属性,对于java内存模型来说,有可能读到null而不是初始值,虽然发生这种情况的概率比较低,但是一旦发生读取到的value是null,应该调用Segment.readValueUnderLock方法重新读取一次。

    添加数据put操作

    public V put(K key, V value) {
    if (value == null)
    throw new NullPointerException();
    int hash = hash(key.hashCode());
    return segmentFor(hash).put(key, hash, value, false);
    }

    首先检查添加的value是否为null,是的话抛出空指针异常,所以ConcurrentHashMap并不像HashMap那样可以保存null的value
    检查了value参数后,根据key的hash值定位到对应的段,调用段Segment的put方法,因此ConcurrentHashMap的put方法实际上是委托给了Segment段的put方法

    V put(K key, int hash, V value, boolean onlyIfAbsent) {
    lock();
    try {
    int c = count;
    if (c++ > threshold) // ensure capacity
    rehash();
    HashEntry<K,V>[] tab = table;
    int index = hash & (tab.length - 1);
    HashEntry<K,V> first = tab[index];
    HashEntry<K,V> e = first;
    while (e != null && (e.hash != hash || !key.equals(e.key)))
    e = e.next;

    V oldValue;
    if (e != null) {
    oldValue = e.value;
    if (!onlyIfAbsent)
    e.value = value;
    }
    else {
    oldValue = null;
    ++modCount;
    tab[index] = new HashEntry<K,V>(key, hash, first, value);
    count = c; // write-volatile
    }
    return oldValue;
    } finally {
    unlock();
    }
    }

    Segment段的put操作是需要加锁的,注意lock的规范用法,加锁后再try快中进行加锁后的操纵,在finally块中释放锁。
    添加数据操作可能导致数据量超过threshhold,超过的话就进行rehash操作。
    然后计算该key需要放置在哪一条entry链表,获取该链表头

    int index = hash & (tab.length - 1);
    HashEntry<K,V> first = tab[index];

    接着遍历链表,找到链表中对应key的entry,分链表中存在该key的entry和不存在两种情况讨论
    如果存在的话,直接替换旧值即可,链表结构没有结构性修改
    不存在的话,需要把添加的数据放在链表的头部,因为链表节点的数量增加了1,链表的结构性也被修改,因此需要修改count和modCount

    ++modCount;
    tab[index] = new HashEntry<K,V>(key, hash, first, value);
    count = c; // write-volatile

    获取数据get操作

    public V get(Object key) {
    int hash = hash(key.hashCode());
    return segmentFor(hash).get(key, hash);
    }

    get操作也是同样的直接委派给了Segment段的get方法

    V get(Object key, int hash) {
    if (count != 0) { // read-volatile
    HashEntry<K,V> e = getFirst(hash);
    while (e != null) {
    if (e.hash == hash && key.equals(e.key)) {
    V v = e.value;
    if (v != null)
    return v;
    return readValueUnderLock(e); // recheck
    }
    e = e.next;
    }
    }
    return null;
    }

    读操作不加锁,但是要先读取count,count为0直接返回null
    否则通过getFirst方法获取对应的链表头

    HashEntry<K,V> getFirst(int hash) {
    HashEntry<K,V>[] tab = table;
    return tab[hash & (tab.length - 1)];
    }

    找到链表头后就开始遍历链表,遍历的过程中找到对应key的entry后,获取此entry的value,此value不为null直接返回此value即可。但是根据前边的分析,value是非final的volatile的数据,java内存模型允许读取到null值而不是初始值(指令重排序导致),但是value不可能是null值,put操作的时候已经检查过入口参数的空指针,所以读取到null的value是很罕见的错误的状态,需要重新再加锁的情况下读取一次

    /**
    * Reads value field of an entry under lock. Called if value
    * field ever appears to be null. This is possible only if a
    * compiler happens to reorder a HashEntry initialization with
    * its table assignment, which is legal under memory model
    * but is not known to ever occur.
    */
    V readValueUnderLock(HashEntry<K,V> e) {
    lock();
    try {
    return e.value;
    } finally {
    unlock();
    }
    }

    删除数据remove操作

    public V remove(Object key) {
    int hash = hash(key.hashCode());
    return segmentFor(hash).remove(key, hash, null);
    }

    remove操作也是直接委托给Segment段的remove方法

    /**
    * Remove; match on key only if value null, else match both.
    */

    V remove(Object key, int hash, Object value) {
    lock();
    try {
    int c = count - 1;
    HashEntry<K,V>[] tab = table;
    int index = hash & (tab.length - 1);
    HashEntry<K,V> first = tab[index];
    HashEntry<K,V> e = first;
    while (e != null && (e.hash != hash || !key.equals(e.key)))
    e = e.next;

    V oldValue = null;
    if (e != null) {
    V v = e.value;
    if (value == null || value.equals(v)) {
    oldValue = v;
    // All entries following removed node can stay
    // in list, but all preceding ones need to be
    // cloned.
    ++modCount;
    HashEntry<K,V> newFirst = e.next;
    for (HashEntry<K,V> p = first; p != e; p = p.next)
    newFirst = new HashEntry<K,V>(p.key, p.hash,
    newFirst, p.value);
    tab[index] = newFirst;
    count = c; // write-volatile
    }
    }
    return oldValue;
    } finally {
    unlock();
    }
    }

    删除数据需要加锁,在try块中进行加锁后的操作,在finally中释放锁
    计算待删除的entry处于哪一个entry链表,并获取该链表头部

    HashEntry<K,V>[] tab = table;
    int index = hash & (tab.length - 1);
    HashEntry<K,V> first = tab[index];
    HashEntry<K,V> e = first;

    找到链表后开始遍历链表,如果没找到对应key的entry直接返回null
    找到的话就把就把待删除节点前边的所有entry赋值一遍,并链接到待删除节点的下一个节点,然后设置新的链表头

    for (HashEntry<K,V> p = first; p != e; p = p.next)
    newFirst = new HashEntry<K,V>(p.key, p.hash, newFirst,p.value);
    tab[index] = newFirst;

    可见复制待删除节点前的所有entry的时候,是先构造复制的first节点,然后构造赋值first节点的后继节点,称为second节点吧,second节点构造的时候next指针指向的是first节点,依次类推。这样复制后的entry和之前的entry顺序刚好相反。
    删除节点修改了链表的结构和节点数量,因此要回写count和modCount

    跨段操作,获取ConcurrentHashMap键值对数量size方法

    public int size() {
    final Segment<K,V>[] segments = this.segments;
    long sum = 0;
    long check = 0;
    int[] mc = new int[segments.length];
    // Try a few times to get accurate count. On failure due to
    // continuous async changes in table, resort to locking.
    for (int k = 0; k < RETRIES_BEFORE_LOCK; ++k) {
    check = 0;
    sum = 0;
    int mcsum = 0;
    for (int i = 0; i < segments.length; ++i) {
    sum += segments[i].count;
    mcsum += mc[i] = segments[i].modCount;
    }
    if (mcsum != 0) {
    for (int i = 0; i < segments.length; ++i) {
    check += segments[i].count;
    if (mc[i] != segments[i].modCount) {
    check = -1; // force retry
    break;
    }
    }
    }
    if (check == sum)
    break;
    }
    if (check != sum) { // Resort to locking all segments
    sum = 0;
    for (int i = 0; i < segments.length; ++i)
    segments[i].lock();
    for (int i = 0; i < segments.length; ++i)
    sum += segments[i].count;
    for (int i = 0; i < segments.length; ++i)
    segments[i].unlock();
    }
    if (sum > Integer.MAX_VALUE)
    return Integer.MAX_VALUE;
    else
    return (int)sum;
    }

    size方法主要思路是先在没有锁的情况下对所有段大小求和,如果不能成功(这是因为遍历过程中可能有其它线程正在对已经遍历过的段进行结构性更新),最多执行RETRIES_BEFORE_LOCK次,如果还不成功就在持有所有段锁的情况下再对所有段大小求和。在没有锁的情况下主要是利用Segment中的modCount进行检测,在遍历过程中保存每个Segment的modCount,遍历完成之后再检测每个Segment的modCount有没有改变,如果有改变表示有其它线程正在对Segment进行结构性并发更新,需要重新计算。



沪ICP备19023445号-2号
友情链接