假设某个actor里消息有下面几种类型:def wrappedReceive: PartialFunction[Any, Unit] = {
case "kafkaReady" => sendRequest()
case Some(Task(data)) => assign(data)
case None => await()
case "done" => next()
}当我想要查看这个actor的邮箱有没有堆积时,一个简单的方式是通过jmap来看类型的实例数:$ jmap -histo:live 12662
num #instances #bytes class name
----------------------------------------------
...
3: 51906 9965952 com.wacai.xxx.dataobject.XX
...
6: 149338 3584112 java.util.concurrent.ConcurrentLinkedQueue$Node
7: 149318 3583632 akka.dispatch.Envelope
...
17: 7266 232512 scala.collection.mutable.ListBuffer
...
21: 7274 116384 scala.Some
22: 7266 116256 com.wacai.xxx.bridge.actor.Task注意必须以live方式执行jmap触发一次full gc回收掉无用对象,否则计数不准确。上面大致可以看出Some[Task]对象实例有7266个,Envelop有149318个,因为None是一个单例,所以尽管可能堆积多条这样的消息,实例数里却只有1个,只能通过Envelop的实例数减去其他类型的实例数来判断,但问题是Envelop实例数是所有actor的消息实例数;另外像String类型这样的消息类型也可能存在多个,很难准确判断每个actor的邮箱到底堆积了多少条。Akka在2.0的时候去掉了mailboxSize方法,参考官方的这篇blog, 里面说了为何不再支持这个方法,也提到如果你真的想要获取mailboxSize可以自己扩展一下,于是按照这个思路,我们对enqueue和dequeue做一个计数,以便在某些场景判断mailbox是否有堆积。class MyMailbox extends akka.dispatch.UnboundedMailbox.MessageQueue {
private val counter = new java.util.concurrent.atomic.AtomicInteger(0)
override def dequeue(): Envelope = {
counter.decrementAndGet()
super.dequeue()
}
override def enqueue(receiver: ActorRef, handle: Envelope): Unit = {
counter.incrementAndGet()
super.enqueue(receiver, handle)
}
override def cleanUp(owner: ActorRef, deadLetters: MessageQueue): Unit = {
counter.set(0)
super.cleanUp(owner, deadLetters)
}
def getSize(): Int = counter.get()
}
class MyMailboxType(settings: ActorSystem.Settings, config: Config) extends MailboxType {
override def create(owner: Option[ActorRef], system: Option[ActorSystem]) =
(owner, system) match {
case (Some(o), Some(s)) ⇒
val mailbox = new MyMailbox
MailboxExtension(s).register(o, mailbox)
mailbox
case _ ⇒ throw new Exception("no mailbox owner or system given")
}
}然后放到一个akka的extension里使用:object MailboxExtension extends ExtensionId[MailboxExtension] with ExtensionIdProvider {
override def createExtension(s: ExtendedActorSystem) = new MailboxExtension(s)
override def lookup = this
def getSize()(implicit context: ActorContext): Int =
MailboxExtension(context.system).getSize()
}
class MailboxExtension(val system: ExtendedActorSystem) extends Extension {
private val mboxMap = new java.util.concurrent.ConcurrentHashMap[ActorRef, MyMailbox]
def register(actorRef: ActorRef, mailbox: MyMailbox): Unit = mboxMap.put(actorRef, mailbox)
def unregister(actorRef: ActorRef): Unit = mboxMap.remove(actorRef)
def getSize()(implicit context: ActorContext): Int = {
val mbox = mboxMap.get(context.self)
if (mbox == null)
throw new IllegalArgumentException("Mailbox not registered for: " + context.self)
mbox.getSize()
}
}