阻塞队列和生产者-消费者模式+ThreadPoolExecutor
介绍:
阻塞队列(blocking queue)提供了可阻塞的put和take的方法,如果队列已满,put方法会被阻塞直到有空间可用为止;如果队列为空,take方法会被阻塞,直到有 元素可用为止。阻塞队列同时提供了可定时的offer和poll方法,如果不设定时间,offer或者是poll将立刻返回操作结果(是否插入队列或者是 取到队列的头元素),在实践中,这两个方法较put和take好用。队列的长度可以设定也可以无限,如果未指定容量,则它等于 Integer.MAX_VALUE。
生产者-消费者的模式是设计分离“需要完成的工作”和“执行工作”的模式。应用该模式,当发现一个工作的时候不会立刻处理,而是把工作放到一个 to do list 中去,以备后续处理。生产者-消费者模式把生产者和消费者之间的活动解耦,从而简化了开发。生产者和消费者可以围绕阻塞队列展开,生产者把处理好的数据发 送到阻塞队列中,消费者到队列中取需要处理的数据。
场景:
短信发送存在两种情况:A:jobs提取的异步发送存放在数据库中的待发送短信;B:用户需要发送的实时短信。两种情况的短信都需要和数据库交互。所有的 短信在提取出来的时候都需要发送网关!待发送短信数据的获取和成功发送到网关,就如同在餐馆洗盘子和把洗好的盘子烘干是一个流程。短信的获取代码可以把处 理好的待发送的短信放到一个阻塞队列中,同时更新数据库的状态为待发送;短信发送的代码可以把短信发送到网关,同时把发送过的消息从发送表删除,然后插入 到发送历史表。
实现:
阻塞队列类:注意点:singlton的模式需要时线程安全的,队列需要是静态的,在内存中只有一份,尽量使用有界队列。如果队列满了,需要把消息写入到消息发送表,状态设置为等待。
public class SmsSendQueueManager {
private final int SEND_QUEUE_MAX_LEN = 10000;
/**
* 系统唯一实例.
*/
private static SmsQueueManager queueManager = new SmsQueueManager();
/**
* 下行队列.
*/
private BlockingQueue
private SmsQueueManager() {
}
//线程安全的singlton
public static SmsQueueManager getInstance() {
return queueManager;
}
/**
* 将短信对象列表加入上行队列中.
* @param item 短信对象
*/
public boolean addSendItem(SmsMessage item) {
return this.sendQueue.offer(item);
}
/**
* 从上行队列中弹出最前面的短信对象.
* @return 短信对象
*/
public SmsMessage popSendQueue() throws InterruptedException {
return this.sendQueue.take();
}
}
生产者:
由quartz的jobs或者是自己实现的系统启动线程来把短信发送表中状态为待发送和重试类型的消息提取到消息队列中去,或者是实时消息发送直接把消息 offer到队列中。为了防止以后把生产者放入到线程池中,需要对读取数据库中的列表操作过程加锁。如果是一直单线程,则该操作不需要。
public class MessageSendProducter extend Thread{
public void run{
private SmsSendQueueManager queueManager = SmsSendQueueManager.getInstance();
private Lock lock = new ReentrantLock();
……
Thread.sleep(5*1000);
try{
lock.lock();
//从数据库中获取数据,修改相关标示位为待发送;
……处理逻辑 sms = ……
queueManager.addSendItem(sms)
}finally{
lock.unlock();
}
}
}
消费者:
发送消息的线程是把队列中的消息发送到网关,用线程来处理;
public class MessageSendCustomer extend Thread{
}
消息发送到网关的线程:
public class SenderThread extends Thread {
/**
* 发送计数器.
*/
private static final AtomicInteger counter = new AtomicInteger(0);
/**
* 待发送的短信对象.
*/
protected Sms sms = null;
public SenderThread(Sms sms){
this.sms = sms;
}
/**
* 线程运行方法.
*/
public void run() {
int i = counter.incrementAndGet();
logger.debug("线程[" + Thread.currentThread().getName() + "]发送第" + i + "条短信,短信ID为:" + this.smsMessage.getSmsId() + "\n");
try {
// 调用下行短信接口
CmppSender cmppSender = (CmppSender)BeanLocator.getInstance().getBean(SmsDispatchConfig.getCmppSender());
cmppSender.send(sms);
// 发送成功,放入历史表
}
catch(Exception se) {
// 发送失败处理处理逻辑
}
}
}
容错处理:
1:如果消息发送失败,需要更新发送表的记录;消息重试最大次数后不再发送该消息;
2:定期把重试次数为最大值的消息重新发送一遍;
3:消息发送的过程中系统重启的情况需要考虑。
生产者和消费者模式是java并发编程中相对比较容易理解的方式,也是研究多线程必须研究的问题。在非实时的需求处理的过程中,这种模式很好的把处理对象的生成和处理对象的处理逻辑分开,让代码更加的简洁。在多核时代,并发编程将是一种趋势!