当Executor遇上BlockingQueue
在系统中实现一个生产者和消费者的模式,最简单的方法就是使用Executor框架了。如果向Executor框架中提交一批希望得到结果的任 务,需要每个任务在执行关联Future,然后不断的调用timeout为0的get来检查任务是否完成。这样做可以实现功能,但是有点乏味。为了更好的 实现这种需求,Concurrent包中定义了CompletionService,并有实现类ExecutorCompletionService。
案例:
在积分计算的过程中,数据的读取和规则的计算和后来对分数和处罚结果的入库时2步不同的操作,可以简单的看成一个生产者和消费者模式,但是生产者的产出有消费者需要的结果,每个服务提供商的计算结果信息都是独立的,所以可以并发的处理这些事情。
处理过程中的步骤:
(1)定义实现Callable接口的类CreditComputeConcuurent和定义来传递计算的结果Domain对象;
public class CallBack {
(2)使用ExecutorCompletionService的构造函数构造线程池,提交CreditComputeConcuurent;
private static final int nThreads = 10;//启动的积分计算的线程数
private ExecutorService exec = Executors.newFixedThreadPool(nThreads);
CompletionService
new ExecutorCompletionService
(3)通过CompletionService的take()方法获取返回的数据,然后处理数据。
try{
for(int i=0;i
Future
CallBack callBack = future.get() ;
Credit creditTmp = callBack.getCredit();
String creditId = null;
//写入积分信息
if(creditTmp != null){
creditId = creditBiz.insertCredit(creditTmp);
}
//写入积分指标项信息
CreditItem creditItemIns = callBack.getCreditItem();
if(creditItemIns != null){
creditBiz.insertCreditItem(creditItemIns);
}
//写入处罚信息;
String punishMeasures = callBack.getPunishMeasures();
if(StringUtils.isNotEmpty(punishMeasures)){
creditPunishStrategy.insertCreditPunish(creditId, punishMeasures);
}
}
}catch(InterruptedException e){
logger.error("获取completionService出错,请检查原因!",e);
throw new BusinessException("获取completionService出错,请检查原因!",e);
}catch (ExecutionException e) {
logger.error("获取future中的信息失败,请检查原因!",e);
throw new BusinessException("获取future中的信息失败,请检查原因!",e);
}
注意事项:
A:生产者的需要处理的条数和消费者需要处理的条数数据需要一致,如果单条处理失败,则需要抛出异常,终止线程池的操作,返回错误;
B:事务控制尽量不要放在多线程中,Spring对多线程中的事务控制没有详细测试过!