IT博客汇
  • 首页
  • 精华
  • 技术
  • 设计
  • 资讯
  • 扯淡
  • 权利声明
  • 登录 注册

    Java并发

    a8167270发表于 2017-03-27 14:19:51
    love 0

    1、死锁

    产生死锁的四个必要条件:
    (1) 互斥条件:一个资源每次只能被一个进程使用。
    (2) 请求与保持条件:一个进程因请求资源而阻塞时,对已获得的资源保持不放。
    (3) 不剥夺条件:进程已获得的资源,在末使用完之前,不能强行剥夺。
    (4) 循环等待条件:若干进程之间形成一种头尾相接的循环等待资源关系。

    类似于下图:
    image_1bbq8dukd16jl1vs91bf6pkf6cl9.png-28.2kB

    甚至会有更复杂的,环状死锁:

    Thread 1 locks A, waits for B
    Thread 2 locks B, waits for C
    Thread 3 locks C, waits for D
    Thread 4 locks D, waits for A

    java代码示例

    public class DeadLock implements Runnable {  
        public int flag = 1;  
        //静态对象是类的所有对象共享的  
        private static Object o1 = new Object(), o2 = new Object();  
        @Override  
        public void run() {  
            if (flag == 1) {  
                synchronized (o1) {  
                    synchronized (o2) {  
                        System.out.println("1");  
                    }  
                }  
            }  
            if (flag == 0) {  
                synchronized (o2) {  
                    synchronized (o1) {  
                        System.out.println("0");  
                    }  
                }  
            }  
        }  
      
        public static void main(String[] args) {  
              
            DeadLock td1 = new DeadLock();  
            DeadLock td2 = new DeadLock();  
            
            td1.flag = 1;  
            td2.flag = 0;  
            
            //td1,td2都处于可执行状态,但JVM线程调度先执行哪个线程是不确定的。  
            //td2的run()可能在td1的run()之前运行  
            new Thread(td1).start();  
            new Thread(td2).start();  
      
        }  
    }  

    加锁顺序

    确保所有的线程都是按照相同的顺序获得锁,那么死锁就不会发生。

    Thread 1:
    lock A
    lock B

    Thread 2:
    wait for A
    lock C (when A locked)

    Thread 3:
    wait for A
    wait for B
    wait for C

    加锁时限

    获取锁的时候加一个超时时间,若一个线程没有在给定的时限内成功获得所有需要的锁,则会进行回退并释放所有已经获得的锁,然后等待一段随机的时间再重试。

    以下是一个例子,展示了两个线程以不同的顺序尝试获取相同的两个锁,在发生超时后回退并重试的场景:

    Thread 1 locks A
    Thread 2 locks B

    Thread 1 attempts to lock B but is blocked
    Thread 2 attempts to lock A but is blocked

    Thread 1's lock attempt on B times out
    Thread 1 backs up and releases A as well
    Thread 1 waits randomly (e.g. 257 millis) before retrying.

    Thread 2's lock attempt on A times out
    Thread 2 backs up and releases B as well
    Thread 2 waits randomly (e.g. 43 millis) before retrying.

    当然,如果有非常多的线程同一时间去竞争同一批资源,就算有超时和回退机制,还是可能会导致这些线程重复地尝试但却始终得不到锁。

    在Java中不能对synchronized同步块设置超时时间,需要创建一个自定义锁!

    死锁检测

    每当一个线程请求锁,或者获得了锁,可以在线程和锁相关的数据结构中(map、graph等等)将其记下。当一个线程请求锁失败时,这个线程可以遍历锁的关系图看看是否有死锁发生。

    死锁一般要比两个线程互相持有对方的锁这种情况要复杂的多,下面是一幅关于四个线程(A,B,C和D)之间锁占有和请求的关系图。像这样的数据结构就可以被用来检测死锁。
    image_1bbt7jhkssup1veuhm8q29hm29.png-19.7kB

    那么当检测出死锁时,可以按下面方式来处理:

    • 释放所有锁,回退,并且等待一段随机的时间后重试。

    • 给这些线程设置优先级,让一个(或几个)线程回退,剩下的线程就像没发生死锁一样继续保持着它们需要的锁。

    2、饥饿和公平

    一个线程因为CPU时间全部被其他线程抢走而得不到CPU运行时间,这种状态被称之为饥饿。

    解决饥饿的方案,所有线程均能公平地获得运行机会被称之为公平性

    饥饿原因

    在Java中,下面三个常见的原因会导致线程饥饿:

    1. 高优先级线程吞噬所有的低优先级线程的CPU时间。

    2. 线程被永久堵塞在一个等待进入同步块的状态
      Java的同步代码区对哪个线程允许进入的次序没有任何保障。理论上存在一个试图进入该同步区的线程处于被永久堵塞的风险,因为其他线程总是能持续地先于它获得访问

    3. 线程在等待一个本身(在其上调用wait())也处于永久等待完成的对象
      如果多个线程处在wait()方法执行上,而对其调用notify()不会保证哪一个线程会获得唤醒,任何线程都有可能处于继续等待的状态。因此存在这样一个风险:一个等待线程从来得不到唤醒,因为其他等待线程总是能被获得唤醒。

    公平性

    在java中不可能实现100%的公平性,为了提高等待线程的公平性,我们使用锁方式来替代同步块。

    public class Synchronizer{
        Lock lock = new Lock();
        
        //使用lock,而不是synchronized实现同步块
        public void doSynchronized() throws InterruptedException{
            this.lock.lock();
            //critical section, do a lot of work which takes a long time
            this.lock.unlock();
        }
    }

    Lock的简单实现原理:

    public class Lock{
        private boolean isLocked      = false;
        private Thread lockingThread = null;
    
        public synchronized void lock() throws InterruptedException{
    
        while(isLocked){
            wait();
        }
    
        isLocked = true;
        lockingThread = Thread.currentThread();
    
    }
    
        public synchronized void unlock(){
            if(this.lockingThread != Thread.currentThread()){
                throw new IllegalMonitorStateException(
                    "Calling thread has not locked this lock");
                }
    
            isLocked = false;
            lockingThread = null;
            notify();
        }
    }

    上面的例子可以看到两点:

    1. 如果多个线程同时调用lock.lock方法的话,线程将阻塞在lock方法处,因为lock方法是一个同步方法。

    2. 如果lock对象的锁被一个线程持有,那么其他线程都将调用在while循环中的wait方法而阻塞。

    现在在把目光集中在doSynchronized方法中,在lock和unlock之间有一段注释,写明了这一段代码将执行很长一段时间。我们假设这段时间比线程进入lock方法内部并且由于lock已被锁定而调用wait方法等待的时间长。这意味着线程大部分时间都消耗在了wait等待上而不是阻塞在lock方法上。

    之前曾提到同步块无法保证当多个线程等待进入同步块中时哪个线程先进入,同样notify方法也无法保证在多个线程调用wait的情况下哪个线程先被唤醒。当前这个版本的Lock类在公平性上和之前加了synchronized关键字的doSynchronized方法没什么区别,但是我们可以修改它。

    我们注意到,当前版本的Lock方法是调用自己的wait方法。如果每个线程调用不同对象的wait方法,那么Lock类就可以决定哪些对象调用notify方法,这样就可以选择性的唤醒线程。

    公平锁

    public class FairLock {
        private boolean           isLocked       = false;
        private Thread            lockingThread  = null;
        private List<QueueObject> waitingThreads =
                new ArrayList<QueueObject>();
    
      public void lock() throws InterruptedException{
        QueueObject queueObject           = new QueueObject();
        boolean     isLockedForThisThread = true;
        synchronized(this){
            waitingThreads.add(queueObject);
        }
    
        while(isLockedForThisThread){
          synchronized(this){
            isLockedForThisThread =
                isLocked || waitingThreads.get(0) != queueObject;
            if(!isLockedForThisThread){
              isLocked = true;
               waitingThreads.remove(queueObject);
               lockingThread = Thread.currentThread();
               return;
             }
          }
          try{
            queueObject.doWait();
          }catch(InterruptedException e){
            synchronized(this) { waitingThreads.remove(queueObject); }
            throw e;
          }
        }
      }
    
      public synchronized void unlock(){
        if(this.lockingThread != Thread.currentThread()){
          throw new IllegalMonitorStateException(
            "Calling thread has not locked this lock");
        }
        isLocked      = false;
        lockingThread = null;
        if(waitingThreads.size() > 0){
          waitingThreads.get(0).doNotify();
        }
      }
    }
    public class QueueObject {
    
      private boolean isNotified = false;
    
      public synchronized void doWait() throws InterruptedException {
        while(!isNotified){
            this.wait();
        }
        this.isNotified = false;
      }
    
      public synchronized void doNotify() {
        this.isNotified = true;
        this.notify();
      }
    
      public boolean equals(Object o) {
        return this == o;
      }
    }

    FairLock类会给每个调用lock方法的线程创建一个QueueObject对象,当线程调用unlock方法时队列中的第一个。
    QueueObject出列并且调用doNotify方法激活对应的线程。这种方式可以保证只有一个线程被唤醒而不是所有等待线程。

    注意到FairLock在同步块中设置了状态检测来避免失控。

    QueueObject实际上就是一个信号量(semaphore),QueueObject对象内部保存了一个信号isNotified.这样做是为了防止信号丢失。queueObject.wait方法是被放在了synchronized(this)块的外部来避免嵌套监视器闭环。这样当没有线程运行lock方法中的synchronized同步块时其他线程可以调用unlock方法。

    最后我们注意到lock方法用到了try-catch块,这样当发生InterruptedException时线程将退出lock方法,这个时候我们应该将对应的QueueObject对象出列。

    效率
    FairLock的执行效率相比Lock类要低一些。它对你的应用程序的影响取决于FairLock所保证的临界区代码的执行时间,这个时间越长,那么影响就越小;同时也取决于这段临界区代码的执行频率。

    3、嵌套管程锁死

    嵌套管程锁死与死锁类似,场景如下所示:

    线程1获得A对象的锁。
    线程1获得对象B的锁(同时持有对象A的锁)。
    线程1决定等待另一个线程的信号再继续。
    线程1调用B.wait(),从而释放了B对象上的锁,但仍然持有对象A的锁。

    线程2需要同时持有对象A和对象B的锁,才能向线程1发信号。
    线程2无法获得对象A上的锁,因为对象A上的锁当前正被线程1持有。
    线程2一直被阻塞,等待线程1释放对象A上的锁。

    线程1一直阻塞,等待线程2的信号,因此,不会释放对象A上的锁,
    而线程2需要对象A上的锁才能给线程1发信号……

    代码示例:

    //lock implementation with nested monitor lockout problem
    public class Lock{
        protected MonitorObject monitorObject = new MonitorObject();
        protected boolean isLocked = false;
    
        public void lock() throws InterruptedException{
            synchronized(this){
                while(isLocked){
                    synchronized(this.monitorObject){
                        this.monitorObject.wait();
                    }
                }
                isLocked = true;
            }
        }
    
        public void unlock(){
            synchronized(this){
                this.isLocked = false;
                synchronized(this.monitorObject){
                    this.monitorObject.notify();
                }
            }
        }
    }

    区别

    在死锁中我们已经对死锁有了个大概的解释,死锁通常是因为两个线程获取锁的顺序不一致造成的,线程1锁住A,等待获取B,线程2已经获取了B,再等待获取A。如死锁避免中所说的,死锁可以通过总是以相同的顺序获取锁来避免。

    但是发生嵌套管程锁死时锁获取的顺序是一致的。线程1获得A和B,然后释放B,等待线程2的信号。线程2需要同时获得A和B,才能向线程1发送信号。所以,一个线程在等待唤醒,另一个线程在等待想要的锁被释放。

    1. 死锁中,二个线程都在等待对方释放锁。

    2. 嵌套管程锁死中,线程1持有锁A,同时等待从线程2发来的信号,线程2需要锁A来发信号给线程1。

    4、Slipped Conditions

    从一个线程检查某一特定条件到该线程操作此条件期间,这个条件已经被其它线程改变,导致第一个线程在该条件上执行了错误的操作。这里有一个简单的例子:

    public class Lock {
        private boolean isLocked = true;
    
        public void lock(){
          synchronized(this){
            while(isLocked){
              try{
                this.wait();
              } catch(InterruptedException e){
                //do nothing, keep waiting
              }
            }
          }
    
          synchronized(this){
            isLocked = true;
          }
        }
    
        public synchronized void unlock(){
          isLocked = false;
          this.notify();
        }
    }

    假如在某个时刻isLocked为false,有两个线程同时访问lock方法。如果第一个线程先进入第一个同步块,这个时候它会发现isLocked为false,若此时允许第二个线程执行,它也进入第一个同步块,同样发现isLocked是false。现在两个线程都检查了这个条件为false,然后它们都会继续进入第二个同步块中并设置isLocked为true。

    为避免slipped conditions,条件的检查与设置必须是原子的,也就是说,在第一个线程检查和设置条件期间,不会有其它线程检查这个条件。

    public class Lock {
        private boolean isLocked = true;
    
        public void lock(){
          synchronized(this){
            while(isLocked){
              try{
                this.wait();
              } catch(InterruptedException e){
                //do nothing, keep waiting
              }
            }
            isLocked = true;
          }
        }
    
        public synchronized void unlock(){
          isLocked = false;
          this.notify();
        }
    }

    5、信号量

    Semaphore(信号量) 是一个线程同步结构,用于在线程间传递信号,以避免出现信号丢失,或者像锁一样用于保护一个关键区域。

    public class Semaphore {
        private boolean signal = false;
    
        public synchronized void take() {
            this.signal = true;
            this.notify();
        }
    
        public synchronized void release() throws InterruptedException{
            while(!this.signal) wait();
            this.signal = false;
        }
    }

    Take 方法发出一个被存放在 Semaphore内部的信号,而Release方法则等待一个信号,当其接收到信号后,标记位 signal 被清空,然后该方法终止。

    使用这个 semaphore 可以避免错失某些信号通知。用 take 方法来代替 notify,release 方法来代替 wait。如果某线程在调用 release 等待之前调用 take 方法,那么调用 release 方法的线程仍然知道 take 方法已经被某个线程调用过了,因为该 Semaphore 内部保存了 take 方法发出的信号。而 wait 和 notify 方法就没有这样的功能。

    6、阻塞队列

    阻塞队列与普通队列的区别在于

    当队列是空的时,从队列中获取元素的操作将会被阻塞。
    当队列是满时,往队列里添加元素的操作会被阻塞。

    试图从空的阻塞队列中获取元素的线程将会被阻塞,直到其他的线程往空的队列插入新的元素。同样,试图往已满的阻塞队列中添加新元素的线程同样也会被阻塞,直到其他的线程使队列重新变得空闲起来。
    image_1bc6sq76nmnh1o9v1181hqbh99.png-11.9kB

    public class BlockingQueue {
    
        private List queue = new LinkedList();
    
        private int  limit = 10;
    
        public BlockingQueue(int limit){
            this.limit = limit;
        }
    
        public synchronized void enqueue(Object item) throws InterruptedException  {
            while(this.queue.size() == this.limit) {
                wait();
            }
            
             if(this.queue.size() == 0) {
                notifyAll();
            }
    
             this.queue.add(item);
    }
    
        public synchronized Object dequeue() throws InterruptedException{
    
             while(this.queue.size() == 0){
                  wait();
            }
    
            if(this.queue.size() == this.limit){
                notifyAll();
            }   
    
            return this.queue.remove(0);
        }
    }

    必须注意到,在 enqueue 和 dequeue 方法内部,只有队列的大小等于上限(limit)或者下限(0)时,才调用notifyAll方法。
    如果队列的大小既不等于上限,也不等于下限,任何线程调用 enqueue 或者 dequeue 方法时,都不会阻塞,都能够正常的往队列中添加或者移除元素。

    7、线程池

    线程池(Thread Pool)对于限制应用程序中同一时刻运行的线程数很有用。因为每启动一个新线程都会有相应的性能开销,每个线程都需要给栈分配一些内存等等。

    我们可以把并发执行的任务传递给一个线程池,来替代为每个并发执行的任务都启动一个新的线程。只要池里有空闲的线程,任务就会分配给一个线程执行。在线程池的内部,任务被插入一个阻塞队列(Blocking Queue),线程池里的线程会去取这个队列里的任务。当一个新任务插入队列时,一个空闲线程就会成功的从队列中取出任务并且执行它。

    线程池经常应用在多线程服务器上。每个通过网络到达服务器的连接都被包装成一个任务并且传递给线程池。线程池的线程会并发的处理连接上的请求。

    public class PoolThread extends Thread {
    
      private BlockingQueue<Runnable> taskQueue = null;
      private boolean       isStopped = false;
    
      public PoolThread(BlockingQueue<Runnable> queue) {
        taskQueue = queue;
      }
    
      public void run() {
        while (!isStopped()) {
          try {
            Runnable runnable =taskQueue.take();
            runnable.run();
          } catch(Exception e) {
            // 写日志或者报告异常,
            // 但保持线程池运行.
          }
        }
      }
    
      public synchronized void toStop() {
        isStopped = true;
        this.interrupt(); // 打断池中线程的 dequeue() 调用.
      }
    
      public synchronized boolean isStopped() {
        return isStopped;
      }
    }


沪ICP备19023445号-2号
友情链接