IT博客汇
  • 首页
  • 精华
  • 技术
  • 设计
  • 资讯
  • 扯淡
  • 权利声明
  • 登录 注册

    Java并发编程-非阻塞同步方式原子类(Atomic)的使用

    summer发表于 2016-06-20 08:46:21
    love 0

    非阻塞同步

    在大多数情况下,我们为了实现线程安全都会使用Synchronized或lock来加锁进行线程的互斥同步,但互斥同步的最主要的问题就是进行线程的阻塞和唤醒所带来的性能问题,因此这种阻塞也称作阻塞同步。从处理问题的方式上说,互斥同步属于一种悲观的并发策略,总是认为只要不去做正确的同步措施,那就肯定会出现问题,无论共享数据是否真的会出现竞争,它都会进行加锁、用户态核心态转换、维护锁的计数器和检查是否有被阻塞的线程需要被唤醒等操作。

    随着硬件指令集的发展,我们有了另一个选择:基于冲突检测的乐观并发策略,通俗的说,就是先进行操作,如果没有其他线程争用共享数据,那操作就成功;如果共享数据有争用,产生了冲突,那就再采用其他的补偿措施(最常见的就是不断的重试,直到成功),这种乐观的并发策略的许多实现都不需要把线程挂起,因此这种同步措施称为非阻塞同步。

    原子类Atomic

    在JDK 1.5以后,Java程序提供了CAS(比较和交换),其实我们在前面文章已经见过CAS操作了,并发包下的并发队列的原理就是CAS操作,可以查看这篇文章:Java并发编程-并发队列(ConcurrentLinkedQueue)的原理分析

    当CAS指令执行时,会先将内存的值进行保存,当操作完成时再判断保存的值和当前内存的值是否相同,如果不同则说明其他线程操作了该数据,所以需要再次进行操作直到成功。在java.util.concurrent.atomic包中提供的原子类Atomic实现CAS操作。

    原子操作类共有13个类,在java.util.concurrent.atomic包下,可以分为四种类型的原子更新类:原子更新基本类型、原子更新数组类型、原子更新引用和原子更新属性。下面将分别介绍这四种原子操作类。

    原子更新基本类型

    使用原子方式更新基本类型,共包括3个类:

    • AtomicBoolean:原子更新布尔变量
    • AtomicInteger:原子更新整型变量
    • AtomicLong:原子更新长整型变量

    常用的方法如下,以AtomicInteger源码为例进行说明(JDK 1.7):

    public class AtomicInteger extends Number implements java.io.Serializable {

    private volatile int value;

    /**
    * Creates a new AtomicInteger with the given initial value.
    *
    * @param initialValue the initial value
    */

    public AtomicInteger(int initialValue) {
    value = initialValue;
    }

    /**
    * Creates a new AtomicInteger with initial value {@code 0}.
    */

    public AtomicInteger() {
    }

    //返回当前值
    public final int get() {
    return value;
    }

    //设置新的值,非原子操作
    public final void set(int newValue) {
    value = newValue;
    }

    //最终会设置成newValue,使用lazySet设置值后,可能导致其他线程在之后的一小段时间内还是可以读到旧的值
    public final void lazySet(int newValue) {
    unsafe.putOrderedInt(this, valueOffset, newValue);
    }

    //原子操作,取当前的值,并设置新的值,返回旧值
    public final int getAndSet(int newValue) {
    for (;;) {
    int current = get();
    if (compareAndSet(current, newValue))
    return current;
    }
    }

    //如果当前值 == 预期值,则以原子方式将该值设置为给定的更新值。
    public final boolean compareAndSet(int expect, int update) {
    return unsafe.compareAndSwapInt(this, valueOffset, expect, update);
    }

    //如果当前值 == 预期值,则以原子方式将该值设置为给定的更新值。
    public final boolean weakCompareAndSet(int expect, int update) {
    return unsafe.compareAndSwapInt(this, valueOffset, expect, update);
    }

    //以原子方式将当前值加 1,返回旧值
    public final int getAndIncrement() {
    for (;;) {
    int current = get();
    int next = current + 1;
    if (compareAndSet(current, next))
    return current;
    }
    }

    //以原子方式将当前值减 1,返回旧值
    public final int getAndDecrement() {
    for (;;) {
    int current = get();
    int next = current - 1;
    if (compareAndSet(current, next))
    return current;
    }
    }

    //以原子方式将给定值与当前值相加,返回旧值
    public final int getAndAdd(int delta) {
    for (;;) {
    int current = get();
    int next = current + delta;
    if (compareAndSet(current, next))
    return current;
    }
    }

    //以原子方式将当前值加 1。
    public final int incrementAndGet() {
    for (;;) {
    int current = get();
    int next = current + 1;
    if (compareAndSet(current, next))
    return next;
    }
    }

    //以原子方式将当前值减 1,返回新值
    public final int decrementAndGet() {
    for (;;) {
    int current = get();
    int next = current - 1;
    if (compareAndSet(current, next))
    return next;
    }
    }

    //以原子方式将给定值与当前值相加,返回新值
    public final int addAndGet(int delta) {
    for (;;) {
    int current = get();
    int next = current + delta;
    if (compareAndSet(current, next))
    return next;
    }
    }
    }

    其中LazySet()方法比较特殊,关于该方法的更多信息可以参考并发网翻译的一篇文章《AtomicLong.lazySet是如何工作的?》

    使用例子
    多线程情况下,对AtomicInteger的原子自增操作。

    import java.util.concurrent.atomic.AtomicInteger;

    public class AtomicIntegerTest {

    private static AtomicInteger atomicInteger = new AtomicInteger(0);

    public static void increase(){
    atomicInteger.incrementAndGet();
    }
    public static void main(String[] args){
    for (int i = 0; i < 5; i++){
    new Thread(new Runnable() {
    public void run() {
    for(int j=0;j<1000;j++)
    increase();
    }
    }).start();
    }
    while(Thread.activeCount()>1)
    Thread.yield();
    System.out.println(atomicInteger.get());
    }
    }

    在多线程的情况下,得到的结果是正确的,但是如果仅仅使用int类型的成员变量则可能得到不同的结果。这里的关键在于getAndIncrement是原子操作compareAndSet(current, next)。

    原子更新数组

    通过原子更新数组里的某个元素,共有3个类:

    • AtomicIntegerArray:原子更新整型数组的某个元素
    • AtomicLongArray:原子更新长整型数组的某个元素
    • AtomicReferenceArray:原子更新引用类型数组的某个元素

    AtomicIntegerArray常用的方法与AtomicInteger类似,只不过参数需要带索引,部分源码如下(JDK 1.7):

    public class AtomicIntegerArray implements java.io.Serializable {

    private final int[] array;

    /**
    * Creates a new AtomicIntegerArray of the given length, with all
    * elements initially zero.
    *
    * @param length the length of the array
    */

    public AtomicIntegerArray(int length) {
    array = new int[length];
    }

    /**
    * Creates a new AtomicIntegerArray with the same length as, and
    * all elements copied from, the given array.
    *
    * @param array the array to copy elements from
    * @throws NullPointerException if array is null
    */

    public AtomicIntegerArray(int[] array) {
    // Visibility guaranteed by final field guarantees
    this.array = array.clone();
    }

    /**
    * Returns the length of the array.
    *
    * @return the length of the array
    */

    public final int length() {
    return array.length;
    }

    /**
    * Gets the current value at position {@code i}.
    *
    * @param i the index
    * @return the current value
    */

    public final int get(int i) {
    return getRaw(checkedByteOffset(i));
    }

    private int getRaw(long offset) {
    return unsafe.getIntVolatile(array, offset);
    }

    /**
    * Sets the element at position {@code i} to the given value.
    *
    * @param i the index
    * @param newValue the new value
    */

    public final void set(int i, int newValue) {
    unsafe.putIntVolatile(array, checkedByteOffset(i), newValue);
    }

    /**
    * Eventually sets the element at position {@code i} to the given value.
    *
    * @param i the index
    * @param newValue the new value
    * @since 1.6
    */

    public final void lazySet(int i, int newValue) {
    unsafe.putOrderedInt(array, checkedByteOffset(i), newValue);
    }

    /**
    * Atomically sets the element at position {@code i} to the given
    * value and returns the old value.
    *
    * @param i the index
    * @param newValue the new value
    * @return the previous value
    */

    public final int getAndSet(int i, int newValue) {
    long offset = checkedByteOffset(i);
    while (true) {
    int current = getRaw(offset);
    if (compareAndSetRaw(offset, current, newValue))
    return current;
    }
    }

    /**
    * Atomically sets the element at position {@code i} to the given
    * updated value if the current value {@code ==} the expected value.
    *
    * @param i the index
    * @param expect the expected value
    * @param update the new value
    * @return true if successful. False return indicates that
    * the actual value was not equal to the expected value.
    */

    public final boolean compareAndSet(int i, int expect, int update) {
    return compareAndSetRaw(checkedByteOffset(i), expect, update);
    }

    private boolean compareAndSetRaw(long offset, int expect, int update) {
    return unsafe.compareAndSwapInt(array, offset, expect, update);
    }
    }

    使用例子:
    多线程情况下,对数组的每个值都自增,这里使用Executor框架管理线程,代码如下:

    import java.util.concurrent.Executors;
    import java.util.concurrent.ThreadPoolExecutor;
    import java.util.concurrent.TimeUnit;
    import java.util.concurrent.atomic.AtomicIntegerArray;

    public class AtomicIntegerArrayDemo {

    static AtomicIntegerArray array = new AtomicIntegerArray(5);

    public static void main(String[] args){
    ThreadPoolExecutor e= (ThreadPoolExecutor) Executors.newCachedThreadPool();
    for(int i=0;i<5;i++){
    e.execute(new Task(array));
    }

    e.shutdown();
    try {
    e.awaitTermination(5000,TimeUnit.SECONDS);
    } catch (InterruptedException e1) {
    // TODO Auto-generated catch block
    e1.printStackTrace();
    }
    System.out.println(array.toString());
    }
    static class Task implements Runnable {
    private AtomicIntegerArray array;
    Task(AtomicIntegerArray array) {
    this.array=array;
    }
    @Override
    public void run() {
    for(int i=0;i<array.length();i++){
    array.incrementAndGet(i);
    }

    }
    }
    }

    结果:
    [5, 5, 5, 5, 5]

    原子更新引用类型

    需要更新引用类型往往涉及多个变量,早atomic包有三个类:

    • AtomicReference:原子更新引用类型
    • AtomicReferenceFieldUpdater:原子更新引用类型里的字段
    • AtomicMarkableReference:原子更新带有标记位的引用类型。

    下面以AtomicReference为例进行说明:

    import java.util.concurrent.atomic.AtomicReference;

    public class AtomicReferenceDemo {

    static class User{
    private String name;
    private int id;

    public User(String name, int id) {
    this.name = name;
    this.id = id;
    }

    public String getName() {
    return name;
    }

    public void setName(String name) {
    this.name = name;
    }

    public int getId() {
    return id;
    }

    public void setId(int id) {
    this.id = id;
    }
    }

    public static AtomicReference<User> ar = new AtomicReference<User>();

    public static void main(String[] args){
    User user = new User("aa",11);
    ar.set(user);
    User newUser = new User("bb",22);
    ar.compareAndSet(user,newUser);
    System.out.println(ar.get().getName());
    System.out.println(ar.get().getId());
    }

    原子更新字段类

    如果需要原子更新某个类的某个字段,就需要用到原子更新字段类,可以使用以下几个类:

    • AtomicIntegerFieldUpdater:原子更新整型字段
    • AtomicLongFieldUpdater:原子更新长整型字段
    • AtomicStampedReference:原子更新带有版本号的引用类型。

    要想原子更新字段,需要两个步骤:

    • 每次必须使用newUpdater创建一个更新器,并且需要设置想要更新的类的字段
    • 更新类的字段(属性)必须为public volatile

    下面的代码演示如何使用原子更新字段类更新字段:

    import java.util.concurrent.atomic.AtomicIntegerFieldUpdater;

    public class AtomicIntegerFieldUpdaterDemo {

    //创建一个原子更新器
    private static AtomicIntegerFieldUpdater<User> atomicIntegerFieldUpdater =
    AtomicIntegerFieldUpdater.newUpdater(User.class,"old");

    public static void main(String[] args){
    User user = new User("Tom",15);
    //原来的年龄
    System.out.println(atomicIntegerFieldUpdater.getAndIncrement(user));
    //现在的年龄
    System.out.println(atomicIntegerFieldUpdater.get(user));
    }

    static class User{
    private String name;
    public volatile int old;

    public User(String name, int old) {
    this.name = name;
    this.old = old;
    }

    public String getName() {
    return name;
    }

    public void setName(String name) {
    this.name = name;
    }

    public int getOld() {
    return old;
    }

    public void setOld(int old) {
    this.old = old;
    }
    }
    }



沪ICP备19023445号-2号
友情链接