遇到一个Akka进程没法被正常停止的情况,程序退出时调用了system.awaitTermination
,发现阻塞在了某个actor线程上:
"AkkaActorSystem-akka.actor.default-dispatcher-8" #25 prio=5 os_prio=0 tid=0x00007f319c002800 nid=0x76b2 waiting on condition [0x00007f31fbcfb000]
java.lang.Thread.State: WAITING (parking)
at sun.misc.Unsafe.park(Native Method)
- parking to wait for <0x00000000c0645828> (a java.util.concurrent.locks.AbstractQueuedSynchronizer$ConditionObject)
at java.util.concurrent.locks.LockSupport.park(LockSupport.java:175)
at java.util.concurrent.locks.AbstractQueuedSynchronizer$ConditionObject.await(AbstractQueuedSynchronizer.java:2039)
at java.util.concurrent.ArrayBlockingQueue.take(ArrayBlockingQueue.java:403)
at xxx.KafkaMessageReceiver.prepareTask(KafkaMessageReceiver.scala:78)
...
at akka.actor.Actor$class.aroundReceive(Actor.scala:465)
at com.wacai.csw.bridge.service.KafkaMessageReceiver.aroundReceive(KafkaMessageReceiver.scala:21)
at akka.actor.ActorCell.receiveMessage(ActorCell.scala:516)
at akka.actor.ActorCell.invoke(ActorCell.scala:487)
at akka.dispatch.Mailbox.processMailbox(Mailbox.scala:254)
at akka.dispatch.Mailbox.run(Mailbox.scala:221)
at akka.dispatch.Mailbox.exec(Mailbox.scala:231)
at scala.concurrent.forkjoin.ForkJoinTask.doExec(ForkJoinTask.java:260)
at scala.concurrent.forkjoin.ForkJoinPool$WorkQueue.runTask(ForkJoinPool.java:1339)
at scala.concurrent.forkjoin.ForkJoinPool.runWorker(ForkJoinPool.java:1979)
at scala.concurrent.forkjoin.ForkJoinWorkerThread.run(ForkJoinWorkerThread.java:107)
这个程序里主要有2个Actor,一个负责从kafka读取数据,另一个处理这些数据。为了避免两边的流速不匹配,采用了pull模式,kafka消费者Actor跟DataProcessor之间通过一个BlockingQueue来维持平衡;而ProcessorActor在从BlockingQueue获取数据时因为使用了take
方法导致了Actor线程被阻塞,系统关闭时等待这个Actor的消息处理完,也被阻塞住了。
按说KafkaConsumer
端在往BlockqingQueue
放数据(put方法)时也可能存在类似阻塞的情况,但这个Consumer并未使用Akka Actor的线程调度,而是一个独立的线程,所以并不影响system.awaitTermination
。
解决的方式是将take
替换为poll
阻塞一个可以接受的时间依然拿不到数据返回None
,在另一方Actor的receive方法里针对None
类型数据再做轮询。
使用Actor模型有个原则是“never ever block an actor”,切记。