In the second chapter we walked you through some code that used futures, but we didn't spend much time on them. We like to develop the idea a bit further here and show you how they can be used effectively to interact with actors and to sequence operations (or, computations, speaking more formally).
Earlier we defined futures very generally, but let's be a bit more specific. A future represents the value of some expression or computation that may not have yet completed or which is not yet known. This mechanism was first proposed as a model for obtaining the result of parallel evaluation of expressions in a programming language.
As you saw earlier, futures are useful when you need to get a response back from an actor. This is precisely the scenario described in the definition of futures: we want to send a message off to an actor and get a response, but we don't know when or whether that response will come back. Our future gives us a handle by which to get that response.
While the most obvious use of futures in the context of Akka is for receiving responses from actors when outside of an actor context, you can also simply create futures directly. This is useful if you have a block of code you'd like to be run asynchronously from the rest of your code. Yes, you could create an actor to do this, but that requires a lot more work when you have access to futures essentially for free. Here, in the simplest form, is how you can execute a block of code as a future:
0001 import scala.concurrent.future
0002 import scala.concurrent.ExecutionContext.Implicits.global
0003
0004 future { 1 + 2 }
Of course, this example is not really complete — we're not yet doing anything with the result.
Here's a more complete example, again using the global ExecutionContext (more on this topic momentarily) and this time we compute the sums of 3 random numbers (between 0 and 100). We also added a type annotation to demonstrate the type of the future being created:
0001 import scala.util.Random._
0002 import scala.concurrent._
0003 import scala.concurrent.ExecutionContext.Implicits.global
0004
0005 val sums = Future.reduce(Seq(future(nextInt(100)), future(nextInt(100)), future(nextInt(100)) ))(_ + _)
0006 sums.map(x ⇒ println(s"Sum is ${x}"))
As we hinted at earlier, an ExecutionContext is an abstraction for something that provides a way to execute computations. For example, it might represent a variable-sized thread pool that scales appropriately depending on the number of CPU cores available.
There are a variety of ways to get provide or get ahold of an existing ExecutionContext. If you're familiar with the java.util.concurrent API, you know that ExecutorService is very similar to what we've described. Conveniently, you can use one via the ExecutionContext.fromExecutorService method, passing in the ExecutorService you want to use. You should keep in mind that should you use an ExecutorService instance, you will still need to shut it down yourself — this can be done in the same place you shutdown your ActorSystem (for example, as we've showed in the servlet example earlier, you might do this by overriding the servlet's destroy method).
As we showed you earlier, there's also a default, global ExecutionContext available by importing scala.concurrent.ExecutionContext.Implicits.global. And if you're running within an environment using Akka, as you likely will be at some point, you can use the system or actor context local dispatcher (more about this later in the book), but using one of the following forms:
0001 class MyApp {
0002 val system = ActorSystem("MySystem")
0003 import system.dispatcher
0004 }
This form should only be used when you are not within an actual actor, but where you do have access to an ActorSystem instance. The next form would be used within an actor:
0001 class MyActor extends Actor {
0002 import context.dispatcher
0003 }
Choosing the appropriate ExecutionContext is important because the context in which work is being done. In particular, ExecutionContext allows one to select either the Fork-Join-Pool or ThreadPool implementations by defining the appropriate Executor or ExecutorService (Java7 has both but Java6 has Thread-Pool only). Without getting too much into specifics, ThreadPool differs from Fork-Join in the respect of work-stealing and with that, Fork-Join would be more suitable in the event when a coarse-grained (potentially long duration) task is handled by a fork-join-pool since work can be distributed amongst the idle workers whereas in a thread-pool implementation, that same work would be handled by the assigned thread regardless of how long it would take. Considering that creating your own Executor or ExecutorService is a non-trivial exercise, it would be a last resort to do that and instead explore other techniques to distribute the work.
The futures defined within Scala's standard library define a set of very useful methods that you may have already seen on a number of other classes in Scala. These methods are map, flatMap, filter, and foreach.
These methods collectively allow the Scala compiler to provide a bit of syntactic-sugar in the form of for-comprehensions. Syntactic-sugar, if you're not familiar with the term, is a mechanism of the compiler that transforms one form of syntax into another, equivalent form. Generally, this is used to make something that's somewhat cumbersome into a nicer form.
First, let's just look at a very simple example of using map.
0001 import akka.dispatch._
0002 def asyncAction: Future[String] = {
0003 // ...
0004 }
0005 val delayedResponse = asyncAction()
0006 // only called if the future returns a successful result
0007 val transformedResponse: Future[Int] = delayedResponse.map {
0008 response => response.length
0009 }
We have included a few unnecessary type annotations here to help clarify what's happening. Of particular note is the response type of map. It will always return another future. But the type parameter of that future may be different from the intial future if you perform some kind of transformation within the map call.
Another important thing to realize is that the map block will not actually be called until the initial future itself actually completes successfully. An example is as follows where you'll notice a 5 second latency when evaluating this expression:
0001 val sums = Future.reduce(Seq(future(nextInt(100)),
0002 future(nextInt(100)),
0003 future{Thread.sleep(5000);5}, // sleeps for 5 seconds, returns 5
0004 future(nextInt(100)) ))(_ + _);
0005 sums.map(x ⇒println(x))
If the initial future or the future returned by the call to map throws an exception, the final future returned from map will be a Failure, containing whatever exception caused it to fail.
flatMap is very similar to map. What differentiates the two is that flatMap actually expects the code called within its block to return a Future, directly, as opposed to returning a value which is then wrapped in a new Future. Otherwise, the semantics are essentially the same, including the handling of exceptions, of course.
filter takes the successful result of a future and, based on predicate you pass to it, returns either the original value in another future or returns a Failure with a NoSuchElementException value. If an exception occurs either in the original future or in the handling of the pedicate, a Failure is also returned with the exception that caused the failure.
Let's look at how we can take advantage of the awesome sugared syntax we've been offered. Let's say you have requests to make to two different actors and you want to calculate some final value based on the results of calling both those actors. You can do this using a for comprehension, essentially composing the final result from the individual results:
0001 val itemId = 998
0002 val buyersCurrency = "GBP"
0003 val currentPrice: Future[Double] =
0004 pricingActor ? GetPrice(itemId)
0005 val conversionRate: Future[Double] =
0006 conversionRateActor ? GetConversion(buyersCurrency)
0007 val convertedPrice: Future[Double] =
0008 for { price <- currentPrice;
0009 rate <- conversionRate } yield {
0010 currentPrice * conversionRate
0011 }
You'll note that, again, we added some type annotations to make it clear what types are received by the calls. The final result of the for comprehension is still a future, which is important to recognize.
Also, notice that we're making the requests to the actors before the for comprehension. If the calls were made directly inside, we'd be effectively sequencing the actual calls. That is, one call would be made to pricingActor and then it would sit waiting for the result. Only then would the second call to conversionRateActor be made.
To understand that, it helps to see how this gets transformed by the Scala compiler. We won't explain all the rules here (feel free to go read the Scala language specification, for the curious reader), but essentially, for comprehension is turned into the following code.
0001 val convertedPrice: Future[Double] = currentPrice.flatMap {
0002 case price => {
0003 conversionRate.map {
0004 case rate => {
0005 currentPrice * conversionRate
0006 }
0007 }
0008 }
0009 }
Looking at the example in this form hopefully makes it more clear why you want to make your calls to get the futures prior to entering the for comprehension. To be clear, here's the equivalent code without the futures defined outside the for comprehension after it has been de-sugared by the compiler.
0001 val convertedPrice: Future[Double] =
0002 (pricingActor ? GetPrice(itemId)).flatMap {
0003 case price => {
0004 (conversionRateActor ? GetConversion(buyersCurrency)).map {
0005 case rate => {
0006 currentPrice * conversionRate
0007 }
0008 }
0009 }
0010 }
If you look closely at this form, you'll see how the GetConversion message is not sent to the conversionRateActor until after the price has already been returned by the pricingActor.
Sometimes you really need to take call a series of futures as a sequence of operations. There are a couple of approaches to this. One is to simply invoke the next future in the sequence in the callback of the current future you're code is waiting on. We won't even bother showing an example of this — it's cumbersome and quickly gets ugly. Another approach is to use the for comprehensions we just demonstrated. This works well, but in some cases it results in code that's not exactly fluid. Thankfully, the Scala 2.10 API has added a few new features to futures that make this a lot nicer.
We were just talking about sequencing futures and an extra mechanism is provided in the form of andThen. This method is useful when you want to do something as a side-effect on the completion of another future. This method differs from onSuccess(or onFailure) in that the execution order of the execution of the registered callbacks in onSuccess/onFailure is not defined and it does not produce another future after it completes. The method, andThen, takes a partial function where the incoming type is Try[_], so you can restrict the match on Success or Failure, or, if you don't care, ignore it entirely. The result of the original future is still returned as the final value. And, finally, you can chain together as many calls to andThen as you like.
0001 val f = future { 1 + 2 }
0002 f andThen {
0003 case Success(i) => println("The result is: " + i)
0004 } andThen {
0005 case _ => doSomethingElse() // don't care what we've got
0006 }
There are cases where you want to make a series of calls and get the result from the first to return a result. In this situation, you can use Future.firstCompletedOf, which will handle as many futures as you want to give it and give you a future containing the result (whether a success or failure). This case can be very helpful in situations where you're calling some service that might timeout frequently.
0001 val tryOne, tryTwo, tryThree = future { makeSomeRequest() }
0002 val first = Future.firstCompletedOf(Seq(tryOne, tryTwo, tryThree))
Finally, you can take a transform a sequence of futures into a future of sequences. Let's say you have a sequence of URLs that you want to retrieve and then process them when they're all done, but not before the entire sequence has completed. You could do something like this:
0001 import scala.io.Source
0002
0003 val urls = Seq("http://google.com", "http://twitter.com", "http://typesafe.com")
0004 val pages = Seq.map { future { Source.fromURL(urls) } }
0005 Future.sequence(pages).onComplete {
0006 // process the Seq
0007 }
The last, but most definitely not least, thing we need to cover is error handling with futures. We've already shown you how you can use onFailure as part of this, but that's just one tool available but we know now that onFailure like its cousin onSuccess is non-deterministic in the respect that the execution order of callbacks is not in program order. We need to cover two additional observations about what we already know and then show a couple of helps that are available to you.
The first thing is to remedy the lack of execution order in the registered callbacks whenever we use methods like onComplete, onSuccess, onFailure, etc. We have already shown you a remedy in the last section using the method andThen to deterministically execute handlers for your futures and we can generalize that further and use map & flatMap where we can not only can handle the success and failures of our futures but also generate new ones, if we wish. As an example of how this would work, we assume we have a sequence of futures generating random numbers (as before) but we've implanted an exception in them (we need to get rid of that) but we still like to be able to compute the sums of all other valid numbers in the sequence, the following illustrates this approach:
0001 val ss = Seq(future(throw new Exception("haha!")), future(nextInt(100)), future(nextInt(100)) )
0002 val sss = ss.flatMap{ x => x.value match {case Some(Failure(_)) => Seq(); case b => Seq(future(b.get)) }}
0003 sss.foldLeft(0)( (acc,e) => acc + e.value.get.get.get )
The second thing to discuss is how onFailure can be used in cases where it might not be so obvious. Although we've already shown how callbacks can be used with futures even when using other mechanisms for handling the results, such as for comprehensions, we haven't really commented on an important idea here. Notice how for comprehensions and many of the other structures the library provides still give you a final future to deal with. This outermost future can still have callbacks assigned to it:
0001 val firstFuture = someCallReturningAFuture()
0002 val secondFuture = anotherCallReturningAFuture()
0003 val thirdFuture = aFinalCallReturningAFuture()
0004 val sequencedResult = for { first <- firstFuture
0005 second <- secondFuture
0006 third <- thirdFuture } yield {
0007 first + second + third // we'll assume the results can all added together
0008 }
0009
0010 sequencedResult.onFailure {
0011 case e: DomainException => {
0012 // do some cleanup here
0013 }
0014 }
This is not revolutionary, but again, it's important to recognize that after the for comprehension returns, we're still dealing with a future and any errors happening anywhere within the sequence of calls to each of those methods will bubble up to that final future. You can handle each of those individually and not use the for comprehension, but if you choose to go that route, it makes sense to handle the errors at the top.
Another common scenario you might run into occurs when you want to attempt to get some value as a future and, in the case of a failure, fall back to another future. This is provided by the appropriately named fallbackTo method, which simply takes a future as its argument:
0001 val firstTry = future { doSomethingQuestionable() }
0002 val backup = future { doSomethingLessQuestionable() }
0003 val result = firstTry fallbackTo backup
The other mechanisms you'll likely want to take a look at are recover and recoverWith. In the case of recover, the outgoing return type needs to be the same type as the original parameter type of the future you started with. In the case of recoverWith, it should be another future, but parameterized with the same type as the original future.
Here's a quick example of each:
0001 import scala.io.Source
0002 import java.net.MalformedURLException
0003
0004 val candidateURLOne = "htt://google.xyz"
0005 val candidateURLTwo = "http://google.com"
0006 val getOne = future { Source.fromURL(candidateURLOne) }
0007 val getTwo = future { Source.fromURL(candidateURLTwo) }
0008
0009 getOne.recover {
0010 case MalformedURLException =>
0011 Source.fromURL(candidateURLOne.replace("htt", "http"))
0012 }.recoverWith {
0013 case UnknownHostException =>
0014 getTwo
0015 }
As you can see, recover allows us to try a new operation, without it needing to be itself a future. The second approach, using recoverWith, is useful when you have an operation that could take time to perform and an alternate fallback approach that should be applied when the first doesn't work.
Note the difference between fallbackTo, which we showed you a little while ago, and recoverWith that we just demonstrated. fallbackTo takes a future and returns it on any failure of the original future, while recoverWith allows you to match on error returned by the original exception and return a suitable response based on it. Lastly, we can generalize recoverWith by realizing that we are effectively applying a function over the future and hence you can utilize map, flatMap in addition to recover and recoverWith.
We titled this section in a way that implies it's all about the response we get back from sending messages to actors. But the fact is that this applies to any futures you might be using. This might feel like we're rehashing old ground, but there's actually a lot more to handling results from futures than you might think. We've already seen a bit of this previously, but let's go over some of the subtleties here to make sure we understand what's going on.
An additional method, mentioned briefly in the second chapter, for getting the result from a Future is to use one of the callbacks provided. Of these, onComplete is basically the catch-all. It lets you get the result whether the call resulted in a success or a failure. As you can probably deduce, onSuccess and onFailure handle the individual cases of success and failure.
There are a couple of interesting behavioral details to know about before you start using these:
Now that you understand the ground-rules, let's look at an example:
0001 import akka.actor._
0002 import akka.patterns.ask
0003 import akka.actor.Status.Failure
0004
0005 case class Message(msg: String)
0006 case class Fail(msg: String)
0007
0008 val system = new ActorSystem("callbacks")
0009 val responder = system.actorOf(Props(new Actor {
0010 def receive = {
0011 case Message(msg) => sender ! msg
0012 // note: we need to respond with a Failure here!
0013 case Fail(msg) => sender ! Failure(new Exception(msg))
0014 }
0015 }))
0016
0017 val responseOne = responder ? Message("will succeed")
0018 val responseTwo = responder ? Fail("will fail")
0019 responseOne.onComplete {
0020 case Success(result) => println(result)
0021 case Failure(failure) => println("Oops! Something went wrong: " + failure)
0022 }
0023 responseTwo.onSuccess {
0024 case msg => println("This will never get called.")
0025 }
0026 responseTwo.onFailure {
0027 case e: Exception => println("This will get called.")
0028 }
In this example, we are creating a very simple actor just to give us a response. In this case, we're dictating what type of response we get based on the type of message we send. We're calling the actor twice and registering a single onComplete callback on the first response and both an onSuccess and onFailure callback on the second response.
As you can see again, each of the callbacks expect a PartialFunction to be passed to them. The onComplete callback will always receive one of Success or Failure. In a similarly rigid manner, onFailure will always receive some type of Throwable, typically some Exception object. onSuccess can receive pretty much anything -- that's defined by your code or the code of the actor you're interacting with.
All these callbacks have a return type of Unit. This is important since it means you can't chain the callbacks together. This isn't really a limitation, really, and should help you to remember that they are not going to be called in any predetermined order.
We should also mention that, in the rare case that you need to wait for a response, you can also perform a blocking operation to get the result from a future. Really, you should likely only encounter this when you're using futures alongside synchronous code. There is a solution to this problem and that is to use pipeTo in akka.pattern.PipeToSupport. Let's first take a look at how you can wait on a response from a future. You still must provide a timeout so it won't block forever:
0001 import scala.concurrent._
0002 import scala.concurrent.duration._
0003
0004 val responseFuture = future { longRunningOperation() }
0005 val response = Await.result(responseFuture, 5 seconds)
If the future returns a failure either through its own logic or from a timeout, the code calling Await.result will instead receive an Exception, so be prepared for this possibilty if you need to handle those errors. Now, let's take a look at how you can possibly do the same by using pipeTo(without blocking):
0001 import scala.concurrent._
0002 import scala.concurrent.duration._
0003 import akka.pattern.pipe
0004 import akka.actor.{Status,Success,Failure}
0005 val actorRef = system.actorOf(Props[SomeActor], "SomeActor")
0006 future { longRunningOperation() } pipeTo actorRef
0007 class SomeActor extends Actor {
0008 def receive = {
0009 case f:Failure(cause) => log.error("error in computation")
0010 case s:Success(status) => log.info("success")
0011 }
0012 }
}
This has been a bit of a whirlwind tour of the use of futures in both Scala and Akka, but it's important foundation material for you to understand as you start making use of these libraries. The fundamental nature of asynchronous computation is that some results will not be known until some future time, so having a general but usable mechanism for dealing with this is important. Futures, were a developer's way to introduce concurrency(blocking and non-blocking) into an otherwise sequential operation and it is also a technique to improve responsiveness by offloading task(s) that could be potentially long-running. An example is the Action.Async construct in the Play Framework which improves concurrency by using a Future to serve HTTP requests etc. There is a downside and that is it becomes the golden hammer to every problem and use case. We definitely urge the reader to step back from futures and think about whether the goal you're trying to accomplish can be done by using Actors instead and factor Futures into your design when it is needed.
There has been error in communication with Booktype server. Not sure right now where is the problem.
You should refresh this page.