scala - Java, Akka Actor and Bounded Mail Box -


i have following configuration in application.conf:

bounded-mailbox {   mailbox-type = "akka.dispatch.boundedmailbox"   mailbox-capacity = 100   mailbox-push-timeout-time = 3s }  akka {      loggers = ["akka.event.slf4j.slf4jlogger"]     loglevel = info     daemonic = on } 

this way how configured actor

public class mytestactor extends untypedactor implements requiresmessagequeue<boundedmessagequeuesemantics>{       @override     public void onreceive(object message) throws exception {         if (message instanceof string){               thread.sleep(500);              system.out.println("message = " + message);         }         else {             system.out.println("unknown message " );         }      } } 

now how initate actor:

mytestactor = myactorsystem.actorof(props.create(mytestactor.class).withmailbox("bounded-mailbox"), "simple-actor"); 

after it, in code i'm sending 3000 messages actor.

  (int =0;i<3000;i++){         mytestactor.tell(guestname, null);} 

what expect see exception queues full, messages printed inside onreceive method every half second, nothing happened. believe mailbox configuration not applied.

what doing wrong?

updated: created actor subscribes dead letter events:

deadletteractor = myactorsystem.actorof(props.create(deadlettermonitor.class),"deadletter-monitor"); 

and installed kamon queues monitoring:

after sending 3000 messages sent actor, kamin shows me following:

actor: user/simple-actor   mailbox size:  min: 100   avg.: 100.0  max: 101  actor: system/deadletterlistener   mailbox size:  min: 0   avg.: 0.0  max: 0  actor: system/deadletter-monitor   mailbox size:  min: 0   avg.: 0.0  max: 0 

by default akka discards overflowing messages deadletters , actor doesn't stop processing: https://github.com/akka/akka/blob/876b8045a1fdb9cdd880eeab8b1611aa976576f6/akka-actor/src/main/scala/akka/dispatch/mailbox.scala#l411

but sending thread blocked on interval configured mailbox-push-timeout-time before discarding message. try decrease 1ms , see following test pass:

import java.util.concurrent.atomic.atomicinteger  import akka.actor._ import com.typesafe.config.config import com.typesafe.config.configfactory._ import org.specs2.mutable.specification  class boundedactorspec extends specification {   args(sequential = true)    def config: config = load(parsestring(     """     bounded-mailbox {       mailbox-type = "akka.dispatch.boundedmailbox"       mailbox-capacity = 100       mailbox-push-timeout-time = 1ms     }     """))    val system = actorsystem("system", config)    "some messages should go dead letters" in {     system.eventstream.subscribe(system.actorof(props(classof[deadlettermetricsactor])), classof[deadletter])     val mytestactor = system.actorof(props(classof[mytestactor]).withmailbox("bounded-mailbox"))     (i  <- 0 until 3000) {       mytestactor.tell("guestname", null)     }     thread.sleep(100)     system.shutdown()     system.awaittermination()     deadlettermetricsactor.deadlettercount.get must greaterthan(0)   } }  class mytestactor extends actor {   def receive = {     case message: string =>       thread.sleep(500)       println("message = " + message);     case _ => println("unknown message")   } }  object deadlettermetricsactor {   val deadlettercount = new atomicinteger }  class deadlettermetricsactor extends actor {   def receive = {     case _: deadletter => deadlettermetricsactor.deadlettercount.incrementandget()   } } 

Comments

Popular posts from this blog

java - Spring Data JPA: Why findOne(id) executing delete query internally? -

python - Mongodb How to add addtional information when aggregating? -

java - Incorrect order of records in M-M relationship in hibernate -