IT博客汇
  • 首页
  • 精华
  • 技术
  • 设计
  • 资讯
  • 扯淡
  • 权利声明
  • 登录 注册

    Java并发编程实践:CompletionService和Future的结合

    longhao (longtask@gmail.com)发表于 2009-10-25 01:20:59
    love 0

         当Executor遇上BlockingQueue

         在系统中实现一个生产者和消费者的模式,最简单的方法就是使用Executor框架了。如果向Executor框架中提交一批希望得到结果的任 务,需要每个任务在执行关联Future,然后不断的调用timeout为0的get来检查任务是否完成。这样做可以实现功能,但是有点乏味。为了更好的 实现这种需求,Concurrent包中定义了CompletionService,并有实现类ExecutorCompletionService。

         案例:
         在积分计算的过程中,数据的读取和规则的计算和后来对分数和处罚结果的入库时2步不同的操作,可以简单的看成一个生产者和消费者模式,但是生产者的产出有消费者需要的结果,每个服务提供商的计算结果信息都是独立的,所以可以并发的处理这些事情。
        处理过程中的步骤:
        (1)定义实现Callable接口的类CreditComputeConcuurent和定义来传递计算的结果Domain对象;
                public class CallBack {

        private Credit credit;
        private CreditItem creditItem;
        private String punishMeasures;
        private String errorMessage;
        // get set method;
    }
       //这个类是计算的内部类
       private class CreditComputeConcuurent implements Callable<CallBack>{
            private final CreditOperation creditOperation;
            private final Map<String,Credit> lastMonthCreditMap;
            private final Map<String,Double> creditItemMap;
            private final double initScore;
            private final Sp sp;
            public CreditComputeConcuurent(final CreditOperation creditOperation,
                    final Map<String,Credit> lastMonthCreditMap,
                    final Map<String,Double> creditItemMap,final double initScore,final Sp sp){
                this.creditOperation = creditOperation;
                this.lastMonthCreditMap = lastMonthCreditMap;
                this.creditItemMap = creditItemMap;
                this.initScore = initScore;
                this.sp = sp;
            }
            public CallBack call() throws Exception {
                ……
            }
      }

        (2)使用ExecutorCompletionService的构造函数构造线程池,提交CreditComputeConcuurent;

     

                private static final int nThreads = 10;//启动的积分计算的线程数

     

                private ExecutorService exec = Executors.newFixedThreadPool(nThreads);

     

                CompletionService<CallBack> completionService=

     

                                        new ExecutorCompletionService<CallBack>(exec);

     

            if(size > 0){
                    Iterator<Sp> it = spList.iterator();
                    while(it.hasNext()){
                        sp = it.next();
                        completionService.submit(new CreditComputeConcuurent(creditOperation,
                                lastMonthCreditMap,creditItemMap,initScore,sp));
                    }
            }

        (3)通过CompletionService的take()方法获取返回的数据,然后处理数据。
            try{
                for(int i=0;i<size;i++){
                    Future<CallBack> future = completionService.take();
                        CallBack callBack = future.get() ;

                    Credit creditTmp = callBack.getCredit();
                    String creditId = null;
                    //写入积分信息
                    if(creditTmp != null){
                        creditId = creditBiz.insertCredit(creditTmp);
                    }
                    //写入积分指标项信息
                    CreditItem creditItemIns = callBack.getCreditItem();
                    if(creditItemIns != null){
                        creditBiz.insertCreditItem(creditItemIns);
                    }
                    //写入处罚信息;
                    String punishMeasures = callBack.getPunishMeasures();
                    if(StringUtils.isNotEmpty(punishMeasures)){
                        creditPunishStrategy.insertCreditPunish(creditId, punishMeasures);
                    }
                }
            }catch(InterruptedException e){
                logger.error("获取completionService出错,请检查原因!",e);
                throw new BusinessException("获取completionService出错,请检查原因!",e);
            }catch (ExecutionException e) {
                logger.error("获取future中的信息失败,请检查原因!",e);
                throw new BusinessException("获取future中的信息失败,请检查原因!",e);
            }

        注意事项:

          A:生产者的需要处理的条数和消费者需要处理的条数数据需要一致,如果单条处理失败,则需要抛出异常,终止线程池的操作,返回错误;
          B:事务控制尽量不要放在多线程中,Spring对多线程中的事务控制没有详细测试过!



沪ICP备19023445号-2号
友情链接