scala - spray-cache not showing new elements after adding -
my aim add elements existing spray-cache instance. not happen without obvious reason or error.
my approach use available scala-future api intermediate results , available mapping methods of scala-futures (map, foreach) "unpack" data produced futures.
here's code showing approach (it complete , can instantly tested):
import akka.actor.actorsystem import spray.caching.lrucache import scala.util.{failure, success} case class cached(elements: list[int]) object testapp extends app { //ignore these, cache gets execution context implicit val system = actorsystem("testapp") implicit def dispatcher = system.dispatcher val cache = lrucache[cached]() val cachekey = "test" cache(cachekey) { new cached(list(1, 2, 3, 4)) } def merge(key: string, mergelist: list[int]) = { // existing cache content cache.get(key).foreach { cachedfuture => // wait until returned cachedfuture.oncomplete { // merge extracted elements mergelist case success(cached) => val mergedcache = cached.elements ++ mergelist // before re-constructing cache element, remove spray-cache cache.remove(key).foreach { removefuture => // remove complete removefuture.oncomplete { // re-construct cache element case success(_) => // reconstructing cache element key cache(key) { new cached(mergedcache) } case failure(ex) => println(ex) } } case failure(ex) => println(ex) } } } // merge new element 5 merge(cachekey, list(5)) cache.get(cachekey).map(_.oncomplete { case success(cached) => // new cache should contain 1, 2, 3, 4, 5 println(s"cache content is: ${cached.elements.mkstring("; ")}") sys.exit(0) case failure(ex) => println(ex); sys.exit(0) }) }
below working solution solving issues had code above.
i adapted suggestions jrudolph in particular
- solve race condition between merge call , subsequent get
- solve ignoring case cache.get() not contain entry yet
- solve thread-safety of merge operation
what not able solve, usage of await.result(...) make def merge appear synchronous "outside world".
regarding comment of jrudolph regarding using map instead of lrucache principally correct. in particular use case tho chose lrucache because actual application putting couple of keys in cache elements: list[int] wrapped within cached huge complex collection should evicted if application adds many keys @ point.
import akka.actor.actorsystem import spray.caching.lrucache import scala.concurrent.{await, future} import scala.concurrent.duration._ import scala.util.{failure, success} case class cached(elements: list[int]) object testapp extends app { //ignore these, cache gets execution context implicit val system = actorsystem("testapp") implicit def dispatcher = system.dispatcher val cache = lrucache[cached]() val cachekey = "test" cache(cachekey) { new cached(list(1, 2, 3, 4)) } def merge(key: string, mergelist: list[int]) = { // existing cache content val mergeresult: future[cached] = cache.get(key) match { case some(cachefuture) => cachefuture.flatmap { case cached => recreatecachekey(key, cached(cached.elements ++ mergelist)) } case none => recreatecachekey(key, cached(mergelist)) } // await result users of def merge impression def synchronous await.result(mergeresult, 5 seconds) } // synchronized should eliminate issue thread-safety of cache modifications private def recreatecachekey(cachekey: string, newcached: cached) = synchronized { val cacherecreationfuture = cache.remove(cachekey) .fold(cache(cachekey) { newcached })(_.flatmap(removedfuture => cache(cachekey, () => future { newcached }))) cacherecreationfuture } private def printcachecontent = { cache.get(cachekey).map(_.oncomplete { case success(cached) => // new cache should contain 1, 2, 3, 4, 5 println(s"cache content is: ${cached.elements.mkstring("; ")}") sys.exit(0) case failure(ex) => println(ex); sys.exit(0) }) } /*****************/ /* execute stuff */ /*****************/ // merge new element 5 merge(cachekey, list(5)) printcachecontent }
Comments
Post a Comment