scala - Java, Akka Actor and Bounded Mail Box -
i have following configuration in application.conf:
bounded-mailbox { mailbox-type = "akka.dispatch.boundedmailbox" mailbox-capacity = 100 mailbox-push-timeout-time = 3s } akka { loggers = ["akka.event.slf4j.slf4jlogger"] loglevel = info daemonic = on } this way how configured actor
public class mytestactor extends untypedactor implements requiresmessagequeue<boundedmessagequeuesemantics>{ @override public void onreceive(object message) throws exception { if (message instanceof string){ thread.sleep(500); system.out.println("message = " + message); } else { system.out.println("unknown message " ); } } } now how initate actor:
mytestactor = myactorsystem.actorof(props.create(mytestactor.class).withmailbox("bounded-mailbox"), "simple-actor"); after it, in code i'm sending 3000 messages actor.
(int =0;i<3000;i++){ mytestactor.tell(guestname, null);} what expect see exception queues full, messages printed inside onreceive method every half second, nothing happened. believe mailbox configuration not applied.
what doing wrong?
updated: created actor subscribes dead letter events:
deadletteractor = myactorsystem.actorof(props.create(deadlettermonitor.class),"deadletter-monitor"); and installed kamon queues monitoring:
after sending 3000 messages sent actor, kamin shows me following:
actor: user/simple-actor mailbox size: min: 100 avg.: 100.0 max: 101 actor: system/deadletterlistener mailbox size: min: 0 avg.: 0.0 max: 0 actor: system/deadletter-monitor mailbox size: min: 0 avg.: 0.0 max: 0
by default akka discards overflowing messages deadletters , actor doesn't stop processing: https://github.com/akka/akka/blob/876b8045a1fdb9cdd880eeab8b1611aa976576f6/akka-actor/src/main/scala/akka/dispatch/mailbox.scala#l411
but sending thread blocked on interval configured mailbox-push-timeout-time before discarding message. try decrease 1ms , see following test pass:
import java.util.concurrent.atomic.atomicinteger import akka.actor._ import com.typesafe.config.config import com.typesafe.config.configfactory._ import org.specs2.mutable.specification class boundedactorspec extends specification { args(sequential = true) def config: config = load(parsestring( """ bounded-mailbox { mailbox-type = "akka.dispatch.boundedmailbox" mailbox-capacity = 100 mailbox-push-timeout-time = 1ms } """)) val system = actorsystem("system", config) "some messages should go dead letters" in { system.eventstream.subscribe(system.actorof(props(classof[deadlettermetricsactor])), classof[deadletter]) val mytestactor = system.actorof(props(classof[mytestactor]).withmailbox("bounded-mailbox")) (i <- 0 until 3000) { mytestactor.tell("guestname", null) } thread.sleep(100) system.shutdown() system.awaittermination() deadlettermetricsactor.deadlettercount.get must greaterthan(0) } } class mytestactor extends actor { def receive = { case message: string => thread.sleep(500) println("message = " + message); case _ => println("unknown message") } } object deadlettermetricsactor { val deadlettercount = new atomicinteger } class deadlettermetricsactor extends actor { def receive = { case _: deadletter => deadlettermetricsactor.deadlettercount.incrementandget() } }
Comments
Post a Comment