IT博客汇
  • 首页
  • 精华
  • 技术
  • 设计
  • 资讯
  • 扯淡
  • 权利声明
  • 登录 注册

    Never ever block an actor

    hongjiang发表于 2015-05-23 05:21:05
    love 0

    遇到一个Akka进程没法被正常停止的情况,程序退出时调用了system.awaitTermination,发现阻塞在了某个actor线程上:

    "AkkaActorSystem-akka.actor.default-dispatcher-8" #25 prio=5 os_prio=0 tid=0x00007f319c002800 nid=0x76b2 waiting on condition [0x00007f31fbcfb000]
     java.lang.Thread.State: WAITING (parking)
      at sun.misc.Unsafe.park(Native Method)
      - parking to wait for  <0x00000000c0645828> (a java.util.concurrent.locks.AbstractQueuedSynchronizer$ConditionObject)
      at java.util.concurrent.locks.LockSupport.park(LockSupport.java:175)
      at java.util.concurrent.locks.AbstractQueuedSynchronizer$ConditionObject.await(AbstractQueuedSynchronizer.java:2039)
      at java.util.concurrent.ArrayBlockingQueue.take(ArrayBlockingQueue.java:403)
      at xxx.KafkaMessageReceiver.prepareTask(KafkaMessageReceiver.scala:78)
      ...
      at akka.actor.Actor$class.aroundReceive(Actor.scala:465)
      at com.wacai.csw.bridge.service.KafkaMessageReceiver.aroundReceive(KafkaMessageReceiver.scala:21)
      at akka.actor.ActorCell.receiveMessage(ActorCell.scala:516)
      at akka.actor.ActorCell.invoke(ActorCell.scala:487)
      at akka.dispatch.Mailbox.processMailbox(Mailbox.scala:254)
      at akka.dispatch.Mailbox.run(Mailbox.scala:221)
      at akka.dispatch.Mailbox.exec(Mailbox.scala:231)
      at scala.concurrent.forkjoin.ForkJoinTask.doExec(ForkJoinTask.java:260)
      at scala.concurrent.forkjoin.ForkJoinPool$WorkQueue.runTask(ForkJoinPool.java:1339)
      at scala.concurrent.forkjoin.ForkJoinPool.runWorker(ForkJoinPool.java:1979)
      at scala.concurrent.forkjoin.ForkJoinWorkerThread.run(ForkJoinWorkerThread.java:107)
    

    这个程序里主要有2个Actor,一个负责从kafka读取数据,另一个处理这些数据。为了避免两边的流速不匹配,采用了pull模式,kafka消费者Actor跟DataProcessor之间通过一个BlockingQueue来维持平衡;而ProcessorActor在从BlockingQueue获取数据时因为使用了take方法导致了Actor线程被阻塞,系统关闭时等待这个Actor的消息处理完,也被阻塞住了。

    按说KafkaConsumer端在往BlockqingQueue放数据(put方法)时也可能存在类似阻塞的情况,但这个Consumer并未使用Akka Actor的线程调度,而是一个独立的线程,所以并不影响system.awaitTermination。

    解决的方式是将take替换为poll阻塞一个可以接受的时间依然拿不到数据返回None,在另一方Actor的receive方法里针对None类型数据再做轮询。

    使用Actor模型有个原则是“never ever block an actor”,切记。



沪ICP备19023445号-2号
友情链接