IT博客汇
  • 首页
  • 精华
  • 技术
  • 设计
  • 资讯
  • 扯淡
  • 权利声明
  • 登录 注册

    对actor的邮箱计数

    hongjiang发表于 2015-06-29 05:42:36
    love 0
    假设某个actor里消息有下面几种类型:def wrappedReceive: PartialFunction[Any, Unit] = { case "kafkaReady" => sendRequest() case Some(Task(data)) => assign(data) case None => await() case "done" => next() }当我想要查看这个actor的邮箱有没有堆积时,一个简单的方式是通过jmap来看类型的实例数:$ jmap -histo:live 12662 num #instances #bytes class name ---------------------------------------------- ... 3: 51906 9965952 com.wacai.xxx.dataobject.XX ... 6: 149338 3584112 java.util.concurrent.ConcurrentLinkedQueue$Node 7: 149318 3583632 akka.dispatch.Envelope ... 17: 7266 232512 scala.collection.mutable.ListBuffer ... 21: 7274 116384 scala.Some 22: 7266 116256 com.wacai.xxx.bridge.actor.Task注意必须以live方式执行jmap触发一次full gc回收掉无用对象,否则计数不准确。上面大致可以看出Some[Task]对象实例有7266个,Envelop有149318个,因为None是一个单例,所以尽管可能堆积多条这样的消息,实例数里却只有1个,只能通过Envelop的实例数减去其他类型的实例数来判断,但问题是Envelop实例数是所有actor的消息实例数;另外像String类型这样的消息类型也可能存在多个,很难准确判断每个actor的邮箱到底堆积了多少条。Akka在2.0的时候去掉了mailboxSize方法,参考官方的这篇blog, 里面说了为何不再支持这个方法,也提到如果你真的想要获取mailboxSize可以自己扩展一下,于是按照这个思路,我们对enqueue和dequeue做一个计数,以便在某些场景判断mailbox是否有堆积。class MyMailbox extends akka.dispatch.UnboundedMailbox.MessageQueue { private val counter = new java.util.concurrent.atomic.AtomicInteger(0) override def dequeue(): Envelope = { counter.decrementAndGet() super.dequeue() } override def enqueue(receiver: ActorRef, handle: Envelope): Unit = { counter.incrementAndGet() super.enqueue(receiver, handle) } override def cleanUp(owner: ActorRef, deadLetters: MessageQueue): Unit = { counter.set(0) super.cleanUp(owner, deadLetters) } def getSize(): Int = counter.get() } class MyMailboxType(settings: ActorSystem.Settings, config: Config) extends MailboxType { override def create(owner: Option[ActorRef], system: Option[ActorSystem]) = (owner, system) match { case (Some(o), Some(s)) ⇒ val mailbox = new MyMailbox MailboxExtension(s).register(o, mailbox) mailbox case _ ⇒ throw new Exception("no mailbox owner or system given") } }然后放到一个akka的extension里使用:object MailboxExtension extends ExtensionId[MailboxExtension] with ExtensionIdProvider { override def createExtension(s: ExtendedActorSystem) = new MailboxExtension(s) override def lookup = this def getSize()(implicit context: ActorContext): Int = MailboxExtension(context.system).getSize() } class MailboxExtension(val system: ExtendedActorSystem) extends Extension { private val mboxMap = new java.util.concurrent.ConcurrentHashMap[ActorRef, MyMailbox] def register(actorRef: ActorRef, mailbox: MyMailbox): Unit = mboxMap.put(actorRef, mailbox) def unregister(actorRef: ActorRef): Unit = mboxMap.remove(actorRef) def getSize()(implicit context: ActorContext): Int = { val mbox = mboxMap.get(context.self) if (mbox == null) throw new IllegalArgumentException("Mailbox not registered for: " + context.self) mbox.getSize() } }


沪ICP备19023445号-2号
友情链接