IT博客汇
  • 首页
  • 精华
  • 技术
  • 设计
  • 资讯
  • 扯淡
  • 权利声明
  • 登录 注册

    Java并发编程实践:CompletionService和Future的结合

    longhao (longtask@gmail.com)发表于 2009-10-25 01:20:59
    love 0

    当Executor遇上BlockingQueue

    在系统中实现一个生产者和消费者的模式,最简单的方法就是使用Executor框架了。如果向Executor框架中提交一批希望得到结果的任 务,需要每个任务在执行关联Future,然后不断的调用timeout为0的get来检查任务是否完成。这样做可以实现功能,但是有点乏味。为了更好的 实现这种需求,Concurrent包中定义了CompletionService,并有实现类ExecutorCompletionService。

    案例:
    在积分计算的过程中,数据的读取和规则的计算和后来对分数和处罚结果的入库时2步不同的操作,可以简单的看成一个生产者和消费者模式,但是生产者的产出有消费者需要的结果,每个服务提供商的计算结果信息都是独立的,所以可以并发的处理这些事情。
    处理过程中的步骤:
    (1)定义实现Callable接口的类CreditComputeConcuurent和定义来传递计算的结果Domain对象;
    public class CallBack {

    private Credit credit;
    private CreditItem creditItem;
    private String punishMeasures;
    private String errorMessage;
    // get set method;
    }
    //这个类是计算的内部类
    private class CreditComputeConcuurent implements Callable{
    private final CreditOperation creditOperation;
    private final Map lastMonthCreditMap;
    private final Map
    creditItemMap;
    private final double initScore;
    private final Sp sp;
    public CreditComputeConcuurent(final CreditOperation creditOperation,
    final Map
    lastMonthCreditMap,
    final Map
    creditItemMap,final double initScore,final Sp sp){
    this.creditOperation = creditOperation;
    this.lastMonthCreditMap = lastMonthCreditMap;
    this.creditItemMap = creditItemMap;
    this.initScore = initScore;
    this.sp = sp;
    }
    public CallBack call() throws Exception {
    ……
    }
    }

    (2)使用ExecutorCompletionService的构造函数构造线程池,提交CreditComputeConcuurent;

    private static final int nThreads = 10;//启动的积分计算的线程数

    private ExecutorService exec = Executors.newFixedThreadPool(nThreads);

    CompletionService completionService=

    new ExecutorCompletionService(exec);

    if(size > 0){
    Iterator it = spList.iterator();
    while(it.hasNext()){
    sp = it.next();
    completionService.submit(new CreditComputeConcuurent(creditOperation,
    lastMonthCreditMap,creditItemMap,initScore,sp));
    }
    }

    (3)通过CompletionService的take()方法获取返回的数据,然后处理数据。
    try{
    for(int i=0;i
    Future future = completionService.take();
    CallBack callBack = future.get() ;

    Credit creditTmp = callBack.getCredit();
    String creditId = null;
    //写入积分信息
    if(creditTmp != null){
    creditId = creditBiz.insertCredit(creditTmp);
    }
    //写入积分指标项信息
    CreditItem creditItemIns = callBack.getCreditItem();
    if(creditItemIns != null){
    creditBiz.insertCreditItem(creditItemIns);
    }
    //写入处罚信息;
    String punishMeasures = callBack.getPunishMeasures();
    if(StringUtils.isNotEmpty(punishMeasures)){
    creditPunishStrategy.insertCreditPunish(creditId, punishMeasures);
    }
    }
    }catch(InterruptedException e){
    logger.error("获取completionService出错,请检查原因!",e);
    throw new BusinessException("获取completionService出错,请检查原因!",e);
    }catch (ExecutionException e) {
    logger.error("获取future中的信息失败,请检查原因!",e);
    throw new BusinessException("获取future中的信息失败,请检查原因!",e);
    }

    注意事项:

    A:生产者的需要处理的条数和消费者需要处理的条数数据需要一致,如果单条处理失败,则需要抛出异常,终止线程池的操作,返回错误;
    B:事务控制尽量不要放在多线程中,Spring对多线程中的事务控制没有详细测试过!



沪ICP备19023445号-2号
友情链接