English |  Español |  Français |  Italiano |  Português |  Русский |  Shqip

Developing an Akka edge special code

Chapter 6: Dispatchers

Now, we turn to the matter of dispatchers. Dispatchers and Routers are distinct in the sense that the former governs the mechanics of messaging while the latter provides the mechanics of applying a strategy to select a actor to route the message to. It's perhaps best to think of dispatchers as embodying two fundamental concepts: management of the actor mailbox and the threading strategy used to allow actors to do actual work. We'll discuss both of these in a bit more depth as they are the focus of the primary differences between various dispatcher choices.

Let's look at the mailbox choices first, as they are the simpler part of this picture. There are primarily two important concerns that make up the selections available. That is, you have the choice of whether to assign a distinct mailbox for each actor or to share one for all. Further, you can choose whether the mailboxes should be of unlimited capacity or if they should be bounded to some fixed limit. The default dispatcher gives each actor its own unbounded mailbox. You also have the option of implementing a priority-based mailbox, which requires mapping messages to a numeric priority.

The task management piece is closely related to (and uses, behind the scenes) Java's java.util.concurrent.ExecutorService facility. The purpose is to abstract out how threads are managed and how actors are given time and resources in the form of these threads in which to perform their tasks.

The default dispatcher uses the relatively new Fork/Join framework developed by Doug Lea, one of the key masterminds behind Java's java.util.concurrent package. This framework is included with the Java 7 release, but Scala includes a copy of it, which means you can still use it with the Java 6 JVM. The Fork/Join framework is well suited for actor-based workflows because of the work-stealing approach it uses to keep all threads in the pool busy when possible. This approach tends to work well for systems that spawn lots of small tasks that need to be executed concurrently.

Configuring and using dispatchers

Let's look at how you might typically configure and use the default dispatcher before we dive into other dispatcher choices. The configuration of the default has a number of options (all custom dispatchers do, as well), but we'll focus on a few key points:

akka.actor {

  default-dispatcher {
     # We'll look at other possible values for this setting later
    type = Dispatcher
     # This is the default already, but you also have the option of
    # using "thread-pool-executor", which corresponds to a
    # java.util.concurrent.ThreadPoolExecutor instance
    executor = "fork-join-executor"

    # Throughput defines the "fairness" of the executor. This works by
    # setting the number of messages that the dispatcher will allow to
    # be processed before the thread is returned to the pool
    throughput = 10  

    # Since we defined this type of executor, we can use this section
    # to customize its behavior
    fork-join-executor { 

      # The minimum number of threads that should be allocated
      parallelism-min = 4 

      # The maximum number of threads that should be allocated
      parallelism-max = 32 

      # The executor will use this along with the number of available
      # CPU cores to select how many threads to allocate within the
      # bounds given by the previous settings. The formula used is
      # ceil(cores * factor). For example, if you have 8 cores and
      # the following 4.0 setting, the result will be 40, but since
      # we set an upper bound of 32, only 32 threads will be
      # allocated
      parallelism-factor = 4.0

    } 
  }
}

Let's talk for a moment about how these settings should be used. This is an area that can take a lot of effort to get right, so understanding some basic guidelines will be helpful. The first couple of configuration settings are fairly straightforward, though the executor setting can be set to a few different values. The fork-join-executor value is shown, and thread-pool-executor, is a fully qualified classname (FQCN) for a custom implementation of the akka.dispatch.ExecutorServiceConfigurator abstract class which two classes (representing the fork-join and thread-pool approach) are derived from namely ForkJoinExecutorConfigurator and ThreadPoolExecutorConfigurator which gives you a choice in selecting a concurrency model based on that of Java7 and not Java6 (since Java6 has only the implementation based on the ThreadPool). The parallelism-min and parallelism-max values are simple bounds that are used in combination with the parallelism-factor setting to  determine how many threads the executor is allowed to allocate. You will need to evaluate the nature of the work being performed by the actors (and futures, as we'll see in chapter 8) on this executor to determine how best to adjust these settings. For instance, if you are performing a lot of CPU-bound work, it doesn't make a lot of sense to allow the executor to use more threads than you have available CPU cores. On the other hand, if your actors are spending a lot of time waiting, whether for IO operations, other actors within your system, or whatever, then allocating sufficient threads to allow them enough processor time to check whether they have messages ready for them is important.

The throughput setting is very useful — it allows you to tell the dispatcher how many messages to drain from a given actors mailbox before it returns the thread being used to the pool for another actor to use. A setting of 1 here allows for maximum fairness, but also means that there may be a lot of context switching going on as the executor schedules your actors and you may experience latencies higher on older machines. The struggle is to determine what is the sweet spot for this value and the answer is that it depends; you want to achieve both fairness in the work distribution as well as having responsive applications and there's a good chance you need to conduct experiments to find that sweet spot.

It's perhaps not obvious, but now that we've configured the above settings in akka.actor.default-dispatcher, we don't actually have to modify any code to use it. Any actor for which we don't configure a specific dispatcher will use these settings — that's why it's called the default-dispatcher, of course.

To configure a custom dispatcher, you simply need to add a section defining the settings for that dispatcher and give it a suitable name. Here's a minimal configuration for a dispatcher that uses Java's java.util.concurrent.ThreadPoolExecutor (Akka provides a convenient shortcut with the thread-pool-executor name for this dispatcher and a corresponding implementation of an ExecutorServiceConfigurator for it).

my-thread-pool-dispatcher {

  type = Dispatcher

  executor = "thread-pool-executor"

  thread-pool-executor {

    core-pool-size-min = 4
    core-pool-size-max = 32
    core-pool-size-factor = 2.0

    max-pool-size-min = 8
    max-pool-size-max = 84
    max-pool-size-factor = 3.0

    keep-alive-time = 30ms 

    task-queue-type = "linked"
  }
}

These settings are essentially mirrors of the settings definied for the underlying ThreadPoolExecutor interface (see the Javadocs for a lot more detail). In case you're not familiar with this interface, the first thing we need to cover are the core-pool-size and the max-pool-size settings. The core pool is essentially the pool of threads that this executor tries to keep available for work, while the maximum pool settings define an outer bound for the number of threads available. If a new task is submitted to the pool and there are fewer threads than the core pool size, an additional thread will be created even if there are other idle threads in the core pool as long as the maximum pool size is not exceeded. The min, max and factor settings for each pool provide a way to dynamically size these pools based on the number of CPU cores available. That is, the number of cores will be multiplied by the factor and constrained within the bounds of the minimum and maximum sizes specified with the result being used to size the pools appropriately. As an example, suppose you have set the pool minimum size to 4, maximum size to 32 and a factor of 3.0. With 4 CPU cores this will result in 12 threads being placed in the pool. But with 16 CPU cores, you would get 32 threads due to the maximum size being less than 16 * 3.0. Finally, for pool size related settings, the keep-alive-time shown sets the time that the pool will wait before shutting down additional, non-core threads in the pool.

The task-queue-size and task-queue-type settings are used to define the type of queue used to hold incoming tasks that are waiting for an available thread from the pool. Akka provides two implementations for use here: linked which uses a LinkedBlockingQueue and array which uses an ArrayBlockingQueue. The LinkedBlockingQueue is, by default, an unbounded queue that can grow as needed and, as such, it allows tasks to wait until a core pool thread is available before allocating a thread for a task, so no more than core-pool-size-max threads will ever be used in this case. You can also specify an optional task-queue-size setting for LinkedBlockingQueue to limit the number of tasks that can be submitted when the queue is waiting on threads to be available. Tasks that are submitted when the queue is at capacity will be rejected. The ArrayBlockingQueue is a fixed-size, bounded queue, so it also requires the task-queue-size setting.

You would use the dispatcher just defined using the following code:

import akka.actor.Props

val anActor =
  context.actorOf(Props[MyActor].withDispatcher("my-thread-pool-dispatcher"))

Note that this configuration can be defined anywhere in your configuration. In the case of my-thread-pool-dispatcher, we're assuming that it's a top-level configuration entry. If you instead defined it at myapp.dispatchers.my-thread-pool-dispatcher, the code would need to reference that full patch to access it:

context.actorOf(Props[MyActor]
  .withDispatcher("myapp.dispatchers.my-thread-pool-dispatcher"))

You don't need to know what all the values shown above mean unless you find yourself in need of an alternative to the Fork/Join-based dispatcher. If that's the case, we highly recommend reading Java Concurrency in Practice to get a solid understanding of the different executors offered by the Java concurrency libraries. In general, you will usually want to stick with the default, Fork/Join-based dispatcher, as it provides very good performance characteristics to most cleanly designed non-blocking actor usages.

You should also be aware that these dispatchers can be used as an ExecutionContext since they implement that interface. This means you can use them for executing futures by creating an implicit value that refers to them.

implicit val myThreadPoolDispatcher = 
 system.dispatchers.lookup("myapp.dispatchers.my-thread-pool-dispatcher"))

Provided dispatchers

Akka provides four standard dispatchers for your use: Dispatcher, PinnedDispatcher, BalancingDispatcher, and CallingThreadDispatcher. We've already implicity covered the default Dispatcher, but to complete the picture, we should describe its other characteristics. As we've already seen, you can use and of fork-join-executor, thread-pool-executor or a FQCN pointing to an implementation of akka.dispatcher.ExecutorServiceConfigurator to determine how tasks submitted will be executed on this dispatcher. The dispatcher will allocate a mailbox so that each actor on this dispatcher has a unique mailbox.

The PinnedDispatcher is a special dispatcher that actually allocates a single thread for each actor assigned to it. Said another way, this means that for every actor you assign to use this dispatcher, that actor will always be guaranteed to have a full thread available for use. If you're asking yourself why you would want to do this, think about cases where you had a resource or set of resources that need to be given priority over most other parts of the system. The intent of this dispatcher is to attempt to assure that that these actors will always have some CPU time available, but it's impossible to really guarantee this, of course. Also keep in mind that you don't want to overuse this pattern. In fact, use it very sparingly. You can configure a dispatcher to be of type PinnedDispatcher with the following configuration:

my-pinned-dispatcher {
  executor = "thread-pool-executor"
  type = PinnedDispatcher
}

Another dispatcher you might hear about is BalancingDispatcher. This is what you might be tempted to call a work-stealing dispatcher, though that term is not technically accurate. With this dispatcher, all actors using it will share a single mailbox and the dispatcher will give messages in the mailbox to idle actors as they become available. So it's not really work-stealing in typical sense, but it does attempt to distribute the load fairly across a set of actors.

One caveat to be aware of with this dispatcher is that it's intended to be used with actors that are all of the same type. There's nothing that will prevent you from attempting to assign it to different actor types, but you will want to make sure you understand what you're doing if you ever try to do so -- at the least, each actor type should assume it will be receiving the same set of possible messages.

Using this dispatcher in your configuration is similar to the PinnedDispatcher. You simply assign it as the type, but you probably want to use the fork-join-executor here:

my-balancing-dispatcher {
  # this is not strictly necessary, as it is the default
  executor = "fork-join-executor"
  type = BalancingDispatcher
}

Mailboxes

The subject of mailboxes has been discussed already, but we haven't really devoted much time to them. As described earlier, mailboxes can be either bounded or unbounded. In the case of bounded mailboxes, you should also be aware that they are typically blocking. That is, the act of adding a message to a bounded will block for some period of time if the mailbox is currently full. The timeout for this is set in the configuration, as is the size and type of the mailbox. Also, it is certainly possible to implement a non-blocking, bounded mailbox. In Akka 2.2, the default mailbox type is the type akka.dispatch.UnboundMailbox i.e. unbounded mailbox.

Here are a couple quick configuration examples showing you basic unbounded and bounded configurations.

A typical unbounded mailbox

my-unbounded-mailbox-dispatcher {
 mailbox-capacity = -1
}

Here we are just asking for the most plain vanilla unbounded, nonblocking mailbox.

A mailbox with a 100 message limit

my-bounded-mailbox-dispatcher {
 mailbox-capacity = 100
 mailbox-push-timeout-time = 0
}

And here we create a simple bounded, blocking mailbox. It will accept up to 100 messages and then block indefinitely when new messages are enqueued. The key mailbox-capacity is self-explanatory and the other key mailbox-push-timeout-time is the maximum acceptable latency you are willing to accept when pushing messages and measured in nanoseconds and naturally, you would want the value to be close to zero i.e. 0.

We can also create PriorityMailbox instances, though this requires a bit of custom code. The idea with these mailbox types is that each incoming message is given a weighted priority that is used to determine in what order the messages in the mailbox are processed. Assuming we wanted to create a mailbox that would handle AddBookmark messages and prioritize them based on the contents of their URL, the following approach would be one possible solution:

package akkaguide

import akka.dispatch.{PriorityGenerator, UnboundedPriorityMailbox}
import akka.actor.ActorSystem
import com.typesafe.config.Config

object BookmarkPriorityQueueMailbox {
  import akkaguide.BookmarkStore.AddBookmark

  val priorityGenerator = PriorityGenerator {
    case AddBookmark(title, url) if url.contains("typesafe.com") => 0
    case AddBookmark(title, url) if url.contains("oracle.com") => 2
    case _ => 1
 }
}

class BookmarkPriorityQueueMailbox(settings: ActorSystem.Settings, config: Config)
  extends UnboundedPriorityMailbox(BookmarkPriorityQueueMailbox.priorityGenerator)

As you can see, this relies on creating a PriorityGenerator instance that ranks the messages as integer values. The lower the value, the higher priority the message is given. In this example, URLs containing the value "typesafe.com" are given the highest priority, while those containing "oracle.com" are given the lowest. We assume you can see where our biases are more inclined to. To use this dispatcher, you can simply specify the fully qualified class name of the new mailbox class in the mailbox-type setting of the dispatcher configuration:

boundedBookmarkDispatcher {
  type = Dispatcher
  executor = "fork-join-executor"
  mailbox-type = "akkaguide.UnboundedBookmarkPriorityQueueMailbox"
}

Now that we've defined this, we can modify the code that starts the BookmarkStoreGuardian we created previously so that it uses this dispatcher to handle all messages that pass through it:

val bookmarkStoreGuardian =
  system.actorOf(Props(new BookmarkStoreGuardian(bookmarkStore))
    .withDispatcher("boundedBookmarkDispatcher"))

There is another way to achieve this (new in Akka 2.2) and what we need to do is to create two configurations: (a) deployment (b) mailbox. The main idea is to decouple the mailbox configuration from that of the dispatcher and the linkage between them is the name of the mailbox; Akka 2.2 allows us to achieve this by linking the value of mailbox to the implementation marked by mailbox-type as shown below:

#(option 1)
# Value of 'mailbox' here is name of another configuration block which 
# houses the configuration of the particular mailbox. 
akka.actor.deployment {
 /boundedMailboxactor {
 mailbox = boundedBookmarkMailbox
  ... other deployment configuration can be here
 }
}
#(option 2)
boundedBookmarkMailbox {
 mailbox-type = "akkaguide.UnboundedBookmarkPriorityQueueMailbox"
 mailbox-capacity = 1000
 mailbox-push-timeout-time = 1
}

From the perspective of our example, now we have and now you would have another two methods in which to create the actor of the same semantics which we show below and they are equivalent. The first form which looks up the configuration by looking up the value of the name in the actorOf API which only only names the given actor but also through this associative linking (remember it'll discover the mailbox's configuration) create the appropriate mailbox.

(option 1) val bookmarkStoreGuardian = system.actorOf(Props(new BookmarkStoreGuardian(bookmarkStore)), "boundedMailboxactor")

(option 2) val bookmarkStoreGuardian = system.actorOf(Props(new BookmarkStoreGuardian(bookmarkStore)).withMailbox("boundedBookmarkMailbox"))

You should be aware that Akka provides six mailbox types namely UnboundedMailbox, SingleConsumerOnlyUnboundedMailbox, BoundedMailbox, UnboundedPriorityMailbox, BoundedPriorityMailbox and Durable mailboxes and the first five of these mailboxes are in-memory (check the documentation here) with no message persistency in them and only the durable mailboxes provide a way for you to persist those messages through its file-based implementation. Readers interested in introducing durability in messages are invited to read the documentation on the details.

Other mailboxes

You can also create your own custom mailbox types and while this might seem quite appealing at first, we would advise against it unless you have a very strong reason and know for certain that you really understand well the semantics behind the enqueueing and dequeueing used by whatever container you choose to use for your messages. Creating a custom mailbox itself is easy, but getting it right is another matter, and the mailboxes supplied with Akka are most often all you need. Implementing your own mailbox is not a trivial enterprise but if you had to ask, we would invite you to start by asking yourself the question: "what kind of properties does my mailbox would have?" and we suspect that Akka would have answered a big part of those questions since you've learnt that Akka allows the means to configure a mailbox's capacity through mailbox-capacity, throughput via a combination of dispatchers, routers (think scatter-gather pattern), reducing controller delay via mailbox-push-timeout-time and message durability (through durable mailboxes) and the sweet spot is discovering a pattern combining these techniques that suits your use case.

Finally, you will also likely run across a reference to durable mailboxes, of which there were historically a number of implementations supplied with Akka. For reasons of maintainability, these were all removed from the distribution with the exception of one that is backed by a journaled transaction log on the filesystem. It's easy enough to use this mailbox with the following configuration:

journaled-file-dispatcher {
 mailbox-type = akka.actor.mailbox.filebased.FileBasedMailboxType
}

Using this mailbox type is just like any other mailbox (though, it provides a wealth of configuration parameters), with the exception that the messages will still be retained if the virtual machine running Akka dies for whatever reason. This can be very handy for protecting yourself against failure, particularly when isolating difference resources (local or remote) within your system, such as a remote webservice or an incoming data feed. There are additional third-party durable mailboxes available, including one based on AMQP, which you can find online.

Wrap-up

This whirlwind tour of dispatchers has hopefully given you an idea of the flexibility Akka gives you for creating robust configurations that can handle very different types of workflow, depending upon your needs. There are a huge range of choices available to you, so you might feel overwhelmed, but it's generally best to start with the defaults to get things working. Then, through watching the performance and profiling under real workloads, you can get a better sense of where to apply these different tools and understand how they might impact your overall system.

There has been error in communication with Booktype server. Not sure right now where is the problem.

You should refresh this page.