IT博客汇
  • 首页
  • 精华
  • 技术
  • 设计
  • 资讯
  • 扯淡
  • 权利声明
  • 登录 注册

    CompletableFuture 不能被中断

    simonwang发表于 2016-08-21 08:14:24
    love 0

    原文链接 作者:Tomasz Nurkiewicz  译者:simonwang

    我之前写过一篇关于InterruptedException and interrupting threads的文章。总之,如果你调用Future.cancel(),那么Future不仅会终止正在等待的get(),还会试图去中断底层的线程。这是个很重要的特征,它能够使线程池变得更加利于使用。我在之前的文章中也说过,相对于标准的Future,尽量使用CompletableFuture。但事实证明,Future的更加强大的兄弟-CompletableFuture并不能优雅地处理cancel()。

    请思考下面的任务代码,在接下来的测试中会用到:

    class InterruptibleTask implements Runnable {
    
        private final CountDownLatch started = new CountDownLatch(1)
        private final CountDownLatch interrupted = new CountDownLatch(1)
    
        @Override
        void run() {
            started.countDown()
            try {
                Thread.sleep(10_000)
            } catch (InterruptedException ignored) {
                interrupted.countDown()
            }
        }
    
        void blockUntilStarted() {
            started.await()
        }
    
        void blockUntilInterrupted() {
            assert interrupted.await(1, TimeUnit.SECONDS)
        }
    
    }
    

    客户端线程可以检查InterruptibleTask是否已经开始运行或者是被中断了。首先,我们可以从外部查看InterruptibleTask到底会对cancel()作出怎么样的反应:

    def "Future is cancelled without exception"() {
        given:
            def task = new InterruptibleTask()
            def future = myThreadPool.submit(task)
            task.blockUntilStarted()
        and:
            future.cancel(true)
        when:
            future.get()
        then:
            thrown(CancellationException)
    }
    
    def "CompletableFuture is cancelled via CancellationException"() {
        given:
            def task = new InterruptibleTask()
            def future = CompletableFuture.supplyAsync({task.run()} as Supplier, myThreadPool)
            task.blockUntilStarted()
        and:
            future.cancel(true)
        when:
            future.get()
        then:
            thrown(CancellationException)
    }
    

    到目前为止一切顺利,Future和CompletableFuture都以几乎相同的方式工作着-在cancel之后取回结果会抛出CancellationException(这里要解释一下,Future.cancel()是不会抛出异常的,而CompletableFuture.cancel()则会以抛出CancellationException强行结束,上面的代码作者都手动抛出了CancellationException)。但在myThreadPool中的线程会怎样呢?我猜会被中断然后被线程池重新回收,我大错特错!

    def "should cancel Future"() {
        given:
            def task = new InterruptibleTask()
            def future = myThreadPool.submit(task)
            task.blockUntilStarted()
        when:
            future.cancel(true)
        then:
            task.blockUntilInterrupted()
    }
    
    @Ignore("Fails with CompletableFuture")
    def "should cancel CompletableFuture"() {
        given:
            def task = new InterruptibleTask()
            def future = CompletableFuture.supplyAsync({task.run()} as Supplier, myThreadPool)
            task.blockUntilStarted()
        when:
            future.cancel(true)
        then:
            task.blockUntilInterrupted()
    }
    

    第一个测试提交普通的Runnable给ExecutorService然后等待直到它开始执行,接着我们取消Future等待直到抛出InterruptedException,当底层的线程被中断的时候blockUntilInterrupted()会返回。第二个测试失败了,CompletableFuture.cancel()不会中断线程,尽管Future看起来被取消了,但后台线程仍然在执行,sleep()不会抛出InterruptionException。这是一个bug还是这就是CompletableFuture的特点?你们可以查看此文档,不幸地是这就是它的特点:

    Parameters: mayInterruptIfRunning – this value has no effect in this implementation because interrupts are not used to control processing.

    RTFM(Read The Fucking Manual),但为什么CompletableFuture会以这样的方式工作?首先让我们检查一下“老的”Future的实现与CompletableFuture的有什么不同。FutureTask会在执行ExecutorService.submit()之后返回,而且它的cancel()有如下的实现(我移除了Unsafe以及相似的非线程安全的Java代码,所以仅仅把它当作伪代码看待):

    public boolean cancel(boolean mayInterruptIfRunning) {
        if (state != NEW)
            return false;
        state = mayInterruptIfRunning ? INTERRUPTING : CANCELLED;
        try {
            if (mayInterruptIfRunning) {
                try {
                    Thread t = runner;
                    if (t != null)
                        t.interrupt();
                } finally { // final state
                    state = INTERRUPTED;
                }
            }
        } finally {
            finishCompletion();
        }
        return true;
    }
    

    FutureTask的state变量状态如下图:
    state变量的几种状态
    万一执行cancel(),我们要么进入CANCELLED状态,要么通过INTERRUPTING进入INTERRUPTED。这里的核心部分是我们要获取runner线程(如果存在,例如如果task正在被执行)然后试着去中断它。这里要小心对于正在运行的线程的强制中断。最后在finishCompletion()中我们要通知所有阻塞在Future.get()的线程(这一步在这里无关痛痒可以忽略)。所以我们可以直观的看到老的Future是如何取消正在运行的tasks的。那CompletableFuture呢?它的cancel()伪代码如下:

    public boolean cancel(boolean mayInterruptIfRunning) {
        boolean cancelled = false;
        if (result == null) {
            result = new AltResult(new CancellationException());
            cancelled = true;
        }
        postComplete();
        return cancelled || isCancelled();
    }
    

    这相当令人失望,我们很少把result赋值为CancellationException而忽略mayInterruptIfRunning标志。postComplete()的作用和finishCompletion()的作用相似,通知所有注册在future下的正在等待的回调操作。这种实现相当让人不愉快(使用了非阻塞的Treiber stack),但它的确没有中断任何底层的线程。

    Reasons and implications

    CompletableFuture的这种cancel限制并不是bug,而是一种设计决定。CompletableFuture天生就没有和任何线程绑定在一起,但Future却几乎总是代表在后台运行的task。使用new关键字创造一个CompletableFuture(new CompletableFuture<>())就很好,这时没有任何底层的线程去取消。但是仍然有大部分的CompletableFuture和后台的task以及线程有联系,在这种情况下有问题的cancel()就是一个潜在的问题。我不建议盲目地用CompletableFuture替换Future,因为如果程序里面有cancel(),那么替换可能会改变程序的行为。这就意味着CompletableFuture有意地违背了里氏替换原则,我们要认真思考这样做的含义。

    原创文章,转载请注明: 转载自并发编程网 – ifeve.com本文链接地址: CompletableFuture 不能被中断




沪ICP备19023445号-2号
友情链接