为了便于后面的演示,首先定义一些辅助工具
ThreadQueue { var queue: List of Thread; // 处于等待状态下的线程的队列 sleep() { enqueue current thread; // 把当前线程插入到队列里 put current thread to sleep; // 挂起当前线程 } wake() { if (queue not empty) { dequeue thread; // 取出队头的线程 wake thread; // 唤醒它 } } // 唤醒所有线程 wake-all() { while (queue not empty) wake(); } }
这个概念是由荷兰计算机科学家Dijkstra(迪杰斯特拉)发明的,他的灵感来自铁路上的信号灯。在铁路上,信号灯的一个用途是标识前方路段是否有火车,后车根据信号灯的状态决定是停车等待还是继续行进。如下图所示,铁路被分为很多段,每段同时只能有一列火车,当火车进入到某段时,该段的信号灯切换为红灯,示意后方火车停车等待,当火车离开该段时,信号灯切换为绿灯,示意后方列车可以进入该段。
信号量和上面所描述的铁路信号灯是一样的,但有两种形式: binary semaphore(二元信号量)和 counting semaphore(计数信号量),二元信号量相当于每段只能容纳一列火车,计数信号量相当于每段能容纳一定数量的火车。和现实中的信号灯一样,信号量具有两个操作:P(尝试进入)和V(离开)。(注:P和V是荷兰语单词的首字母)
信号量实现的伪代码
Semaphore(counter) { var counter: Unsigned Integer; var waitQueue: ThreadQueue; P() { DisableInterrupts() while (counter == 0) waitQueue.sleep() counter-- RestoreInterrupts() } V() { DisableInterrupts() counter++ waitQueue.wake() RestoreInterrupts() } }
二元信号量可用于保护监界区(critical section),实现互斥访问。计数信号量可用来控制多个线程对资源的访问。
生产者 – 消费者问题
这个问题描述的是多个生产者和多个消费者共同操作同一个固定大小数据缓冲区的场景
解决这个问题需要3个信号量,一个二元信号量,用于避免缓冲区被多个线程并发操作,一个计数信号量,表示缓冲区剩余空间,用于控制生产者的生产与等待,一个计数信号量,表示缓冲区现有数据的数量,用于控制消费者的消费与等待。Java示例:
import java.io.*; import java.lang.*; import java.text.*; import java.util.*; import java.util.concurrent.*; public class Program { public static void main(String[] args) { BoundedBuffer buf = new BoundedBuffer(); for(int i = 0; i < 10; ++i) { new Thread(()->{ while(true) { try { buf.produce(); Thread.sleep((int)(Math.random() * 10 * 1000)); } catch(Exception ex) { ex.printStackTrace(); } } }).start(); } for(int i = 0; i < 10; ++i) { new Thread(()->{ while(true) { try { buf.consume(); Thread.sleep((int)(Math.random() * 10 * 1000)); } catch(Exception ex) { ex.printStackTrace(); } } }).start(); } try { System.in.read(); } catch(IOException ex) { ex.printStackTrace(); } } } class BoundedBuffer { private static final int CAPACITY = 5; private Queue<String> buf = new LinkedList<>(); private Semaphore bufSync = new Semaphore(1); private Semaphore unused = new Semaphore(CAPACITY); private Semaphore occupied = new Semaphore(0); public void produce() throws Exception { unused.acquire(); bufSync.acquire(); String item = new SimpleDateFormat("mm:ss.SSS").format(new Date()); buf.add(item); System.out.printf("生产者%d 产出了:%s,缓冲区数量:%d\n", Thread.currentThread().getId(), item, buf.size()); bufSync.release(); occupied.release(); } public String consume() throws Exception { occupied.acquire(); bufSync.acquire(); String first = buf.remove(); System.out.printf("消费者%d 取走了:%s,缓冲区数量:%d\n", Thread.currentThread().getId(), first, buf.size()); bufSync.release(); unused.release(); return first; } }
锁也叫 mutex(mutual exclusion的合成词,互斥量),伪代码:
Lock { var isHeld: boolean; var waitQueue: ThreadQueue; acquire() { DisableInterrupts() while (isHeld) waitQueue.sleep(); isHeld = true; RestoreInterrupts() } release() { DisableInterrupts() isHeld = false; waitQueue.wake(); RestoreInterrupts() } }
上面的伪代码是对锁的一个基本的实现,所以看起来和二元信号量没有区别,虽然从用途上看它们也确实是一样的,都是用于实现互斥,保护临界区的,但在具体实现中,锁是有所有权的概念的,也就是锁会关联到某个线程,锁的释放只能由获取它的线程执行,而信号量则没有限制,而且锁可以实现为“可重入”,也就是同一个线程可以获取多次,但信号量则不行。
取款操作前需要检查余额是否够,如果够才执行扣款操作,伪代码:
void withdraw(double amount) { if(balance - amount >= 0) balance -= amount; }
问题:如果余额正好为amount,这时多个线程同时执行到判断语句,这时条件为真,然后都同时执行了扣款操作,则余额就被扣成负的了,这里判断、扣款操作其实构成了一个临界区,需要实现互斥,所以需要加锁进行保护:
void withdraw(double amount) { lock(); if(balance - amount >= 0) balance -= amount; unlock(); }
条件变量是用来和锁配合使用的对象,每个条件变量都会关联到一个锁,只有拥有对应锁的线程才能使用与之关联的条件变量,伪代码:
Condition(lock) { var lock: Lock; var waitQueue: ThreadQueue; wait() { DisableInterrupts() lock.release() waitQueue.sleep() lock.acquire() RestoreInterrupts() } signal() { DisableInterrupts() waitQueue.wake(); RestoreInterrupts() } broadcast() { DisableInterrupts() waitQueue.wake-all(); RestoreInterrupts() } }
回顾前面用信号量解决的“生产者 – 消费者”问题,用锁同样也能解决:把二元信号量换锁,去掉另外两个计数信号量,通过循环实现等待。
public void produce() throws Exception { lock.lock(); while(buf.size() == CAPACITY) { lock.unlock(); lock.lock(); } String item = new SimpleDateFormat("mm:ss.SSS").format(new Date()); buf.add(item); System.out.printf("生产者%d 产出了:%s,缓冲区数量:%dn", Thread.currentThread().getId(), item, buf.size()); lock.unlock(); } public String consume() throws Exception { lock.lock(); while(buf.size() == 0) { lock.unlock(); lock.lock(); } String first = buf.remove(); System.out.printf("消费者%d 取走了:%s,缓冲区数量:%dn", Thread.currentThread().getId(), first, buf.size()); lock.unlock(); return first; }
上面思路是在循环里临时释放锁,让出占有权,再获取锁,再判断,直到满足条件才往下继续。但这种方法会产生很多次不必要的循环,严重浪费CPU资源,当然可以在释放锁让线程睡眠一段时间,但睡多长时间又不好确定。
这种情况条件变量就派上用场了,条件变量的作用就是在临界区中临时释放锁并让当前线程进入等待状态。用条件变量解决“生产者 – 消费者”问题:
import java.io.*; import java.lang.*; import java.text.*; import java.util.*; import java.util.concurrent.locks.*; public class Program { public static void main(String[] args) { BoundedBuffer buf = new BoundedBuffer(); for(int i = 0; i < 5; ++i) { new Thread(()->{ while(true) { try { buf.produce(); Thread.sleep((int)(Math.random() * 10 * 1000)); } catch(Exception ex) { ex.printStackTrace(); } } }).start(); } for(int i = 0; i < 5; ++i) { new Thread(()->{ while(true) { try { buf.consume(); Thread.sleep((int)(Math.random() * 10 * 1000)); } catch(Exception ex) { ex.printStackTrace(); } } }).start(); } try { System.in.read(); } catch(IOException ex) { ex.printStackTrace(); } } } class BoundedBuffer { private static final int CAPACITY = 5; private Queue<String> buf = new LinkedList<>(); private Lock lock = new ReentrantLock(); private Condition notFull = lock.newCondition(); private Condition notEmpty = lock.newCondition(); public void produce() throws Exception { lock.lock(); while(buf.size() == CAPACITY) notFull.await(); String item = new SimpleDateFormat("mm:ss.SSS").format(new Date()); buf.add(item); System.out.printf("生产者%d 产出了:%s,缓冲区数量:%d\n", Thread.currentThread().getId(), item, buf.size()); notEmpty.signalAll(); lock.unlock(); } public String consume() throws Exception { lock.lock(); while(buf.isEmpty()) notEmpty.await(); String first = buf.remove(); System.out.printf("消费者%d 取走了:%s,缓冲区数量:%d\n", Thread.currentThread().getId(), first, buf.size()); notFull.signalAll(); lock.unlock(); return first; } }
监视器呢就是一种既支持互斥操作,又具有让线程进行条件等待的功能的同步设施,所以 监视器 = 锁+条件变量。
监视器的语义是这样的:
如下图所示,Entry Set表示等待进入的线程集合,The Owner表示监视器的所有者,即当前正在运行的线程,Wait Set表示等待条件的线程集合
其实上面用一个锁 + 两个条件变量 解决“生产者 – 消费者”问题的代码就构成了一个监视器。
在Java中,方法可以加synchronized关键字实现互斥,顶层类Object具有wait、notify、notifyAll方法,所以每个Java对象其实都可以是一个监视器。
如果你仔细分析上面的条件变量示例代码时,你肯定有个疑问:为什么要把await放在while循环里,用if不行吗?
这是因为Java中的条件变量是Mesa-style的(大多数条件变量的实现都是Mesa-style的),如果是Hoare-style的话就可以用if了,这两种风格的区别是
用监视器实现信号量
伪代码
Semaphore(counter) { var counter: Unsigned Integer; var lock: Lock; var notZero: Condition(lock); P() { lock.acquire() while (counter == 0) notZero.wait() counter-- lock.release() } V() { lock.acquire() counter++ notZero.signal() lock.release() } }
在编译器对目标代码进行优化时,可能会为了提高变量的访问速度,在修改变量值后并没有把值重新写回内存,而是放在CPU寄存器里,这在多线程环境中就可能会出现一个线程更新了变量的值,但另一个线程并不能及时读取到最新的值。C#和Java中的volatile关键字正是解决这个问题的,当一个变量由volatile修饰后,编译器便不会对它进行优化,所以能保证线程读取某个变量的值时读取的是最新的值。
再看这条语句++i;
,它不是线程安全的,虽然只有一条语句,但编译为CPU指令实际上是三个步骤:读到寄存器、计算、写回内存,这三个步骤就可能被多个线程交错地执行,比如:i最初为0,如果两个线程a和b都执行了该语句,则理应i的值变为2,但可能a执行到���算时,b执行到读取,因为a还未将新值写入内存,所以b读取的是0,当a和b都完成写入后,i的值最终为1,对于这种情况大多数平台都提供了原子性操作变量的设施,比如.NET的Interlocked和Java中java.util.concurrent.atomic包下的类。
理解常见的线程同步设施,首发于文章 - 伯乐在线。