English |  Español |  Français |  Italiano |  Português |  Русский |  Shqip

Developing an Akka edge special code

Chapter 8: Diving Deeper into Futures

In the second chapter we walked you through some code that used futures, but we didn't spend much time on them. We like to develop the idea a bit further here and show you how they can be used effectively to interact with actors and to sequence operations (or, computations, speaking more formally).

Clarifying our definition

Earlier we defined futures very generally, but let's be a bit more specific. A future represents the value of some expression or computation that may not have yet completed or which is not yet known. This mechanism was first proposed as a model for obtaining the result of parallel evaluation of expressions in a programming language.

As you saw earlier, futures are useful when you need to get a response back from an actor. This is precisely the scenario described in the definition of futures: we want to send a message off to an actor and get a response, but we don't know when or whether that response will come back. Our future gives us a handle by which to get that response.

While the most obvious use of futures in the context of Akka is for receiving responses from actors when outside of an actor context, you can also simply create futures directly. This is useful if you have a block of code you'd like to be run asynchronously from the rest of your code. Yes, you could create an actor to do this, but that requires a lot more work when you have access to futures essentially for free. Here, in the simplest form, is how you can execute a block of code as a future:

import scala.concurrent.future
import scala.concurrent.ExecutionContext.Implicits.global

future { 1 + 2 }

Of course, this example is not really complete — we're not yet doing anything with the result.

Here's a more complete example, again using the global ExecutionContext (more on this topic momentarily) and this time we compute the sums of 3 random numbers (between 0 and 100). We also added a type annotation to demonstrate the type of the future being created:

import scala.util.Random._
import scala.concurrent._
import scala.concurrent.ExecutionContext.Implicits.global
 
val sums = Future.reduce(Seq(future(nextInt(100)), future(nextInt(100)), future(nextInt(100)) ))(_ + _)
sums.map(x ⇒ println(s"Sum is ${x}"))

Execution Context

As we hinted at earlier, an ExecutionContext is an abstraction for something that provides a way to execute computations. For example, it might represent a variable-sized thread pool that scales appropriately depending on the number of CPU cores available.

There are a variety of ways to get provide or get ahold of an existing ExecutionContext. If you're familiar with the java.util.concurrent API, you know that ExecutorService is very similar to what we've described. Conveniently, you can use one via the ExecutionContext.fromExecutorService method, passing in the ExecutorService you want to use. You should keep in mind that should you use an ExecutorService instance, you will still need to shut it down yourself — this can be done in the same place you shutdown your ActorSystem (for example, as we've showed in the servlet example earlier, you might do this by overriding the servlet's destroy method).

As we showed you earlier, there's also a default, global ExecutionContext available by importing scala.concurrent.ExecutionContext.Implicits.global. And if you're running within an environment using Akka, as you likely will be at some point, you can use the system or actor context local dispatcher (more about this later in the book), but using one of the following forms:

class MyApp {
  val system = ActorSystem("MySystem")
  import system.dispatcher
}

This form should only be used when you are not within an actual actor, but where you do have access to an ActorSystem instance. The next form would be used within an actor:

class MyActor extends Actor {
  import context.dispatcher
}

Choosing the appropriate ExecutionContext is important because the context in which work is being done. In particular, ExecutionContext allows one to select either the Fork-Join-Pool or ThreadPool implementations by defining the appropriate Executor or ExecutorService (Java7 has both but Java6 has Thread-Pool only). Without getting too much into specifics, ThreadPool differs from Fork-Join in the respect of work-stealing and with that, Fork-Join would be more suitable in the event when a coarse-grained (potentially long duration) task is handled by a fork-join-pool since work can be distributed amongst the idle workers whereas in a thread-pool implementation, that same work would be handled by the assigned thread regardless of how long it would take. Considering that creating your own Executor or ExecutorService is a non-trivial exercise, it would be a last resort to do that and instead explore other techniques to distribute the work.  

For comprehensions

The futures defined within Scala's standard library define a set of very useful methods that you may have already seen on a number of other classes in Scala. These methods are map, flatMap, filter, and foreach.

These methods collectively allow the Scala compiler to provide a bit of syntactic-sugar in the form of for-comprehensions. Syntactic-sugar, if you're not familiar with the term, is a mechanism of the compiler that transforms one form of syntax into another, equivalent form. Generally, this is used to make something that's somewhat cumbersome into a nicer form.

First, let's just look at a very simple example of using map.

import akka.dispatch._ 
  def asyncAction: Future[String] = {
    // ...
  } 
val delayedResponse = asyncAction() 
// only called if the future returns a successful result
val transformedResponse: Future[Int] = delayedResponse.map {
  response => response.length
}

We have included a few unnecessary type annotations here to help clarify what's happening. Of particular note is the response type of map. It will always return another future. But the type parameter of that future may be different from the intial future if you perform some kind of transformation within the map call.

Another important thing to realize is that the map block will not actually be called until the initial future itself actually completes successfully. An example is as follows where you'll notice a 5 second latency when evaluating this expression:

val sums = Future.reduce(Seq(future(nextInt(100)),
       future(nextInt(100)),
       future{Thread.sleep(5000);5}, // sleeps for 5 seconds, returns 5
       future(nextInt(100)) ))(_ + _);
sums.map(x ⇒println(x))

If the initial future or the future returned by the call to map throws an exception, the final future returned from map will be a Failure, containing whatever exception caused it to fail. 

flatMap is very similar to map. What differentiates the two is that flatMap actually expects the code called within its block to return a Future, directly, as opposed to returning a value which is then wrapped in a new Future. Otherwise, the semantics are essentially the same, including the handling of exceptions, of course.

filter takes the successful result of a future and, based on predicate you pass to it, returns either the original value in another future or returns a Failure with a NoSuchElementException value. If an exception occurs either in the original future or in the handling of the pedicate, a Failure is also returned with the exception that caused the failure.

Let's look at how we can take advantage of the awesome sugared syntax we've been offered. Let's say you have requests to make to two different actors and you want to calculate some final value based on the results of calling both those actors. You can do this using a for comprehension, essentially composing the final result from the individual results:

val itemId = 998
val buyersCurrency = "GBP" 
val currentPrice: Future[Double] = 
  pricingActor ? GetPrice(itemId)
val conversionRate: Future[Double] = 
  conversionRateActor ? GetConversion(buyersCurrency) 
val convertedPrice: Future[Double] =
  for { price <- currentPrice;
         rate <- conversionRate } yield {
    currentPrice * conversionRate
  }

You'll note that, again, we added some type annotations to make it clear what types are received by the calls. The final result of the for comprehension is still a future, which is important to recognize.

Also, notice that we're making the requests to the actors before the for comprehension. If the calls were made directly inside, we'd be effectively sequencing the actual calls. That is, one call would be made to pricingActor and then it would sit waiting for the result. Only then would the second call to conversionRateActor be made.

To understand that, it helps to see how this gets transformed by the Scala compiler. We won't explain all the rules here (feel free to go read the Scala language specification, for the curious reader), but essentially, for comprehension is turned into the following code.

val convertedPrice: Future[Double] = currentPrice.flatMap {
  case price => {
    conversionRate.map {
      case rate => {
        currentPrice * conversionRate
      }
    }
  }
}

Looking at the example in this form hopefully makes it more clear why you want to make your calls to get the futures prior to entering the for comprehension. To be clear, here's the equivalent code without the futures defined outside the for comprehension after it has been de-sugared by the compiler.

val convertedPrice: Future[Double] = 
  (pricingActor ? GetPrice(itemId)).flatMap {
    case price => {
      (conversionRateActor ? GetConversion(buyersCurrency)).map {
        case rate => {
          currentPrice * conversionRate
        }
      }
    }
  }

If you look closely at this form, you'll see how the GetConversion message is not sent to the conversionRateActor until after the price has already been returned by the pricingActor.

Sequencing futures

Sometimes you really need to take call a series of futures as a sequence of operations. There are a couple of approaches to this. One is to simply invoke the next future in the sequence in the callback of the current future you're code is waiting on. We won't even bother showing an example of this — it's cumbersome and quickly gets ugly. Another approach is to use the for comprehensions we just demonstrated. This works well, but in some cases it results in code that's not exactly fluid. Thankfully, the Scala 2.10 API has added a few new features to futures that make this a lot nicer.

We were just talking about sequencing futures and an extra mechanism is provided in the form of andThen. This method is useful when you want to do something as a side-effect on the completion of another future. This method differs from onSuccess(or onFailure) in that the execution order of the execution of the registered callbacks in onSuccess/onFailure is not defined and it does not produce another future after it completes. The method, andThen, takes a partial function where the incoming type is Try[_], so you can restrict the match on Success or Failure, or, if you don't care, ignore it entirely. The result of the original future is still returned as the final value. And, finally, you can chain together as many calls to andThen as you like.

val f = future { 1 + 2 }
f andThen {
  case Success(i) => println("The result is: " + i)
} andThen {
  case _ => doSomethingElse() // don't care what we've got
}

There are cases where you want to make a series of calls and get the result from the first to return a result. In this situation, you can use Future.firstCompletedOf, which will handle as many futures as you want to give it and give you a future containing the result (whether a success or failure). This case can be very helpful in situations where you're calling some service that might timeout frequently.

val tryOne, tryTwo, tryThree = future { makeSomeRequest() }
val first = Future.firstCompletedOf(Seq(tryOne, tryTwo, tryThree))

Finally, you can take a transform a sequence of futures into a future of sequences. Let's say you have a sequence of URLs that you want to retrieve and then process them when they're all done, but not before the entire sequence has completed. You could do something like this:

import scala.io.Source

val urls = Seq("http://google.com", "http://twitter.com", "http://typesafe.com")
val pages = Seq.map { future { Source.fromURL(urls) } }
Future.sequence(pages).onComplete {
  // process the Seq
} 

Error handling and exceptions

The last, but most definitely not least, thing we need to cover is error handling with futures. We've already shown you how you can use onFailure as part of this, but that's just one tool available but we know now that onFailure like its cousin onSuccess is non-deterministic in the respect that the execution order of callbacks is not in program order. We need to cover two additional observations about what we already know and then show a couple of helps that are available to you.

The first thing is to remedy the lack of execution order in the registered callbacks whenever we use methods like onComplete, onSuccess, onFailure, etc. We have already shown you a remedy in the last section using the method andThen to deterministically execute handlers for your futures and we can generalize that further and use map & flatMap where we can not only can handle the success and failures of our futures but also generate new ones, if we wish. As an example of how this would work, we assume we have a sequence of futures generating random numbers (as before) but we've implanted an exception in them (we need to get rid of that) but we still like to be able to compute the sums of all other valid numbers in the sequence, the following illustrates this approach: 

val ss = Seq(future(throw new Exception("haha!")), future(nextInt(100)), future(nextInt(100)) )
val sss = ss.flatMap{ x => x.value match {case Some(Failure(_)) => Seq(); case b => Seq(future(b.get)) }}
sss.foldLeft(0)( (acc,e) => acc + e.value.get.get.get ) 

The second thing to discuss is how onFailure can be used in cases where it might not be so obvious. Although we've already shown how callbacks can be used with futures even when using other mechanisms for handling the results, such as for comprehensions, we haven't really commented on an important idea here. Notice how for comprehensions and many of the other structures the library provides still give you a final future to deal with. This outermost future can still have callbacks assigned to it:

val firstFuture = someCallReturningAFuture()
val secondFuture = anotherCallReturningAFuture()
val thirdFuture = aFinalCallReturningAFuture() 
val sequencedResult = for { first <- firstFuture 
                           second <- secondFuture 
                            third <- thirdFuture } yield {
  first + second + third // we'll assume the results can all added together
} 
 
sequencedResult.onFailure {
  case e: DomainException => {
    // do some cleanup here
  }
}

This is not revolutionary, but again, it's important to recognize that after the for comprehension returns, we're still dealing with a future and any errors happening anywhere within the sequence of calls to each of those methods will bubble up to that final future. You can handle each of those individually and not use the for comprehension, but if you choose to go that route, it makes sense to handle the errors at the top.

Another common scenario you might run into occurs when you want to attempt to get some value as a future and, in the case of a failure, fall back to another future. This is provided by the appropriately named fallbackTo method, which simply takes a future as its argument:

val firstTry = future { doSomethingQuestionable() }
val backup = future { doSomethingLessQuestionable() }
val result = firstTry fallbackTo backup

The other mechanisms you'll likely want to take a look at are recover and recoverWith. In the case of recover, the outgoing return type needs to be the same type as the original parameter type of the future you started with. In the case of recoverWith, it should be another future, but parameterized with the same type as the original future.

Here's a quick example of each:

import scala.io.Source
import java.net.MalformedURLException

val candidateURLOne = "htt://google.xyz"
val candidateURLTwo = "http://google.com"
val getOne = future { Source.fromURL(candidateURLOne) }
val getTwo = future { Source.fromURL(candidateURLTwo) }

getOne.recover {
  case MalformedURLException =>
    Source.fromURL(candidateURLOne.replace("htt", "http")) 
}.recoverWith { 
  case UnknownHostException =>
    getTwo 
} 

As you can see, recover allows us to try a new operation, without it needing to be itself a future. The second approach, using recoverWith, is useful when you have an operation that could take time to perform and an alternate fallback approach that should be applied when the first doesn't work.

Note the difference between fallbackTo, which we showed you a little while ago, and recoverWith that we just demonstrated. fallbackTo takes a future and returns it on any failure of the original future, while recoverWith allows you to match on error returned by the original exception and return a suitable response based on it. Lastly, we can generalize recoverWith by realizing that we are effectively applying a function over the future and hence you can utilize map, flatMap in addition to recover and recoverWith.

Handling actor responses

We titled this section in a way that implies it's all about the response we get back from sending messages to actors. But the fact is that this applies to any futures you might be using. This might feel like we're rehashing old ground, but there's actually a lot more to handling results from futures than you might think. We've already seen a bit of this previously, but let's go over some of the subtleties here to make sure we understand what's going on.

An additional method, mentioned briefly in the second chapter, for getting the result from a Future is to use one of the callbacks provided. Of these, onComplete is basically the catch-all. It lets you get the result whether the call resulted in a success or a failure. As you can probably deduce, onSuccess and onFailure handle the individual cases of success and failure.

There are a couple of interesting behavioral details to know about before you start using these:

  • You can create an arbitrary number of callbacks on any future.
  • These callbacks will not be called in a specific order. To put it another way, do not assume the callbacks will be called in the order you have defined them.
  • If any given callback throws an exception, the other callbacks will still be called.

Now that you understand the ground-rules, let's look at an example:

import akka.actor._
import akka.patterns.ask
import akka.actor.Status.Failure
 
case class Message(msg: String)
case class Fail(msg: String)

val system = new ActorSystem("callbacks")
val responder = system.actorOf(Props(new Actor {
  def receive = {
    case Message(msg) => sender ! msg
    // note: we need to respond with a Failure here!
    case Fail(msg) => sender ! Failure(new Exception(msg))
  }
}))

val responseOne = responder ? Message("will succeed")
val responseTwo = responder ? Fail("will fail") 
responseOne.onComplete {
  case Success(result) => println(result)
  case Failure(failure) => println("Oops! Something went wrong: " + failure)
}
responseTwo.onSuccess {
  case msg => println("This will never get called.")
}
responseTwo.onFailure {
  case e: Exception => println("This will get called.")
}

In this example, we are creating a very simple actor just to give us a response. In this case, we're dictating what type of response we get based on the type of message we send. We're calling the actor twice and registering a single onComplete callback on the first response and both an onSuccess and onFailure callback on the second response.

As you can see again, each of the callbacks expect a PartialFunction to be passed to them. The onComplete callback will always receive one of Success or Failure. In a similarly rigid manner, onFailure will always receive some type of Throwable, typically some Exception object. onSuccess can receive pretty much anything -- that's defined by your code or the code of the actor you're interacting with.

All these callbacks have a return type of Unit. This is important since it means you can't chain the callbacks together. This isn't really a limitation, really, and should help you to remember that they are not going to be called in any predetermined order. 

We should also mention that, in the rare case that you need to wait for a response, you can also perform a blocking operation to get the result from a future. Really, you should likely only encounter this when you're using futures alongside synchronous code. There is a solution to this problem and that is to use pipeTo in akka.pattern.PipeToSupport. Let's first take a look at how you can wait on a response from a future. You still must provide a timeout so it won't block forever:

import scala.concurrent._
import scala.concurrent.duration._ 

val responseFuture = future { longRunningOperation() }
val response = Await.result(responseFuture, 5 seconds)

If the future returns a failure either through its own logic or from a timeout, the code calling Await.result will instead receive an Exception, so be prepared for this possibilty if you need to handle those errors. Now, let's take a look at how you can possibly do the same by using pipeTo(without blocking):

import scala.concurrent._
import scala.concurrent.duration._ 
import akka.pattern.pipe
import akka.actor.{Status,Success,Failure}
val actorRef = system.actorOf(Props[SomeActor], "SomeActor")
future { longRunningOperation() } pipeTo actorRef
class SomeActor extends Actor {
    def receive = {
        case f:Failure(cause) => log.error("error in computation")
    case s:Success(status) => log.info("success")
    }
}

}

Wrap-up

This has been a bit of a whirlwind tour of the use of futures in both Scala and Akka, but it's important foundation material for you to understand as you start making use of these libraries. The fundamental nature of asynchronous computation is that some results will not be known until some future time, so having a general but usable mechanism for dealing with this is important. Futures, were a developer's way to introduce concurrency(blocking and non-blocking) into an otherwise sequential operation and it is also a technique to improve responsiveness by offloading task(s) that could be potentially long-running. An example is the Action.Async construct in the Play Framework which improves concurrency by using a Future to serve HTTP requests etc. There is a downside and that is it becomes the golden hammer to every problem and use case. We definitely urge the reader to step back from futures and think about whether the goal you're trying to accomplish can be done by using Actors instead and factor Futures into your design when it is needed.

There has been error in communication with Booktype server. Not sure right now where is the problem.

You should refresh this page.