English |  Español |  Français |  Italiano |  Português |  Русский |  Shqip

Developing an Akka edge special code

Chapter 10 : Clusters

In this chapter, we'll explore a new feature available in the latest Akka 2.2 (soon to be 2.3) release codenamed "Coltrane" which is Akka clusters. Prior to this release clusters in Akka was experimental but we have not been available to conceal our excitement over this feature and we've decided to share with you, the reader, why you should look out for this feature but that is probably a misnomer, since akka-cluster is really more of a framework for building and running long-lasting distributed applications.

Clustering technology isn't something new and at least for the Java world, several clustering implementations have been offered in the past (some survived to this day) and most of them don't call themselves clusters but rather the name middleware was given (see Wikipedia's interpretation). To the Java/Scala developers amongst us, you're probably familiar with some cluster implementations like Oracle WebLogic, IBM WebSphere, RedHat's JBoss etc and those implementations typically focused on implementing the distributed computing constructs derived from the J2EE (dubbed by Sun Microsystems) or what is known now as JEE specifications (driven by present-day Oracle). 

The clustering solutions of yore and present, mostly focused on providing computing abstractions that was common place and some notable features includes the famous two-phase-commits for coordinating transactions over a distributed system; another one is the abstraction that allows the developer to design application dataflows by allowing the business logic to be encapsulated across stateless or stateful workers a.k.a Enterprise JavaBeans (EJBs) with their life-cycles managed by the containers (another name for J2EE/JEE-middleware) that hosted them and many of these implementations provide fail-overs for compute tasks when a node is under significant load, or when a node that went under the bridge. Many of these solutions comes with productivity tools that aids the developer to be more productive in developing applications in J2EE/JEE. The JEE specification covers more than what we have just described here and these implementations might be a perfect match for businesses who have these needs but there is a problem, we think.

Adopting these solutions, in our opinion, comes with a cost. This cost is visibility. In distributed systems, visibility is an intrinsic property that is very much desired. What we mean here is the ability to have transparency in the way things actually work, not how we think they should work and lastly to effect the changes so that others can benefit from it. Akka, being open sourced, allows the developer (or the team) unrestricted access to its source and hence the propensity to achieve this. An example that epitomizes the problem with the lack of visibility is when the commercial cluster system does not behave in the fashion in which it is suppose to. What happens next would be to file a incident report to the commercial entity that is supporting the software to its customers (that is, the customer is you or your organization) and the time to resolution could range from days to months, sometimes. In a situation that some of us (me including) have experienced, we were advised to upgrade to a recent version of that software and had to work out an action plan for migration and that is a pain that we hope we won't relived anytime soon.

Another observation is that we, developers, are living in a world where we no longer build software in isolation/obscurity and together with the proliferation of open source technologies that range from programming languages, cloud operating systems (eucalyptus, open stack etc) and their tooling etc the use case for building technology on closed systems in contrast to open source systems might not be that favourable as this trend of open source development proliferates. This observation is supported by the fact that you've picked up this book and know that you can download and examine not only this book's source code but also the source code from Scala and Akka means that you have already acquired the fundamental means to reason not only how it works but possibly, change the way it works. A nicety of akka-cluster is that it's built on akka-actor technology so in that way, its a extension of Actors that you've already learnt.

Having said that, smart developers like yourselves have begun to wonder what kinds of applications you can build using akka-cluster and the answer is aplenty (see link). 

We, are not advocating that you should drop the ball altogether on those clustering solutions that your organization has spent considerable resources building up on, rather we're encouraging you to explore akka-cluster and see whether you can employ it to use and we believe the prospects are high and even more so when you've dabbled with actors for a while now. Information gathered from water coolers and the company pantries reveal a compelling (somewhat crafty) strategy: discover a sensible clustering approach, implement and test it, finally swap out that J2EE/JEE solution with Akka cluster. Works like a charm.

For the rest of chapter, we are going to explore clusters in Akka by understanding how it works through (relatively) painless examples which should give you a good basic intuition and we'll demonstrate two use cases:

(a) A distributed computing cluster

(b) How you can integrate a REST server to a Akka cluster.

The last example was chosen because prior to akka-cluster, many developers would develop front-end services on a REST/HTTP server framework like Play or Spray possibly, and build front-end consumer services a.k.a. back-end using Actors in Akka and when they discovered the akka-cluster, the more adventurous hakkers amongst us would start migrating those services into clusters.

About Akka Clusters

In 2007, Amazon released a famous paper titled `Dynamo: Amazon's highly available key-value store` where the authors of that work went into detail explaining the design philosophies of Dynamo which covers not only cluster design, data replication design, node recovery and failure recovery etc and as far as we know, Akka derived its design philosophy from Dynamo and Basho's Riak to implement the cluster membership. Nodes in the cluster communicate their liveness, system metrics by gossiping amongst themselves via a gossip protocol; the gossip protocol propagates data like the state of the cluster and vector clocks are used to reconcile and merge the differences detected.

A cluster is made up of member nodes and these nodes can be on the same host (with different port numbers, of course) or on different hosts. Each node is identified by a string that is of the form hostname:port:uid and Akka clusters are versatile in the respect that clusters do not have a strict requirement of hosting only actors, infact you can run good ol' Scala code if you like; however, the typical use case of Akka clusters is to distribute the application over it. Clusters in Akka are fault-tolerant peer-to-peer based cluster membership with no single point of failure or single point of bottleneck. Sweet!

Let's take a look at how they actually work through an example. This example has a simple architecture: A Scala module will start a cluster and starts an Actor; this actor will print a message whenever it detects a node has just joined the cluster. The following code illustrates the module that will host our cluster:

package simplecluster
object SimpleCluster extends App {
  override def main(args: Array[String]): Unit = {
    if (args.nonEmpty) System.setProperty("akka.remote.netty.tcp.port", args(0))
    val system = ActorSystem("Cluster")
    val clusterListener = system.actorOf(Props[SimpleClusterListener],
                                         name = "clusterListener")
    Cluster(system).subscribe(clusterListener, classOf[ClusterDomainEvent])
  }
}

What we have done here is to create a ActorSystem and have that ActorSystem create the listener-actor. At this point in time, we have no cluster yet. The cluster comes to life when we create it through a constructor call Cluster(system) and have our cluster delegate all events that pertains to when a cluster to an object that is represented by the trait ClusterDomainEvent.

ClusterDomainEvent is the super-trait that represents events going on inside a cluster e.g. cluster goes up/down, cluster becomes unreachable, change in cluster behavior, change in cluster resource utilization. 

The listener will be waiting for messages to arrive and once they do, the response is to log some intrinsic information. For example, the listener will log a status whenever a new cluster shows up and when a member went down. Here's what the listener looks like:

package simplecluster
class SimpleClusterListener extends Actor with ActorLogging {
  def receive = {
    case state: CurrentClusterState ⇒
      log.info("Current members: {}", state.members.mkString(", "))
    case MemberUp(member) ⇒
      log.info("Member is Up: {}", member.address)
    case UnreachableMember(member) ⇒
      log.info("Member detected as unreachable: {}", member)
    case MemberRemoved(member, previousStatus) ⇒
      log.info("Member is Removed: {} after {}",
      member.address, previousStatus)
    case _: ClusterDomainEvent ⇒ // ignore
 }
}

You noticed that our listener is watching for certain types of messages to be processed and if you are keen to find out how events, failure detection, routing works in the (current) cluster, you can read the following sub-sections or skip to Start up a cluster just to see how things work!

Failure Detection

Failure detection is essential to the day to day operations of a cluster, because it is in the interests of the service providers and operators alike to be able to respond when one or more nodes in the cluster failed to provide the said service. How Akka clusters accomplished this is by having each node monitored by a few other nodes and when the node being monitored becomes unreachable because of network failure, JVM crashing etc it'll communicate this to the others by gossiping and likewise, if the node become reachable again everyone else would know about it through gossip and this monitoring is done by sending heart beat messages and the interpretation of the arrival times of the heartbeat messages is implemented in Akka by The Phi Accrual Failure Detector by Hayashibara et al. The key formulation is this formula:

The function φ you see measures the level of trust or conversely level of suspicion that any node is up or down (depending on your interpretation); the function F here is a cumulative distribution function of historical heartbeat inter-arrival times. This formulation looks at historical and current data to derive a level of suspicion on this node to predict whether a node is unreachable. You can control the sensitivity of this detector by adjusting the threshold through the configuration i.e. application.conf by adapting the value of akka.cluster.failure-detector.threshold to a suitable value in your production cluster environment; another control you can apply to adjust the sensitivity is through adapting the value of the key akka.cluster.failure-detector.acceptable-heartbeat-pause. You are invited to read about the details here.

Reactive Cluster Routers

Starting from Akka cluster 2.2, a new kind of cluster aware router was added to the family and this particular router was metrics-based i.e. its able to take advantage of the metric data read off the cluster and load balance the messages across the nodes in the cluster, it's known as AdaptiveLoadBalancingRouter.  This router consumes the metrics data (collected from the nodes in the cluster via the gossip protocol) and forms a weighted routees i.e. all routees are given a weight based on the amount of remaining capacity, and messages will be routed to the routee that has the larger available capacity at the time of measure. The following metrics are currently available: HeapMetricsSelector, CpuMetricsSelector, SystemLoadAverageMetricsSelector and MixMetricsSelector; and they are on the remaining capacities of the JVM heap memory, CPU (User + Sys + Nice + Wait on Linux systems), system load respectively and MixMetricsSelector uses a combination of cpu + heap + system load. The routees that are associated with this router would be automatically registered or un-registered depending on whether the node becomes reachable or unreachable.

However, starting from Akka cluster 2.3, the AdaptiveLoadBalancingRouter is deprecated in favour of two routers which carry the adaptivity of load balancing by allowing your choice of metrics-selector (as described in the previous paragraph) during router creation, hence increasing the flexibility of your cluster routing design:

  • AdaptiveLoadBalancingPool - Each router of this type would own its set of routees and different routers do not share routees. You might want to consider using this type of router when you have a master node delegating work to the othe nodes in the cluster through this router.
  • AdaptiveLoadBalancingGroup - Each router of this type would share the routees even if these routers are running on different nodes in the cluster. You might want to consider deploying this type of router in the situation where you have backends with frontends as a bulkhead.
We won't delve too much into the usage of these new routers and invite you to explore them here.

How events in the cluster works

When a cluster starts up, it creates many objects to help manage the state of the cluster, detecting if members are up or down among other activities. In terms of the membership detection, the approach is that an internal actor, ClusterDaemon (which in turn is child of another actor ClusterCoreSupervisor), is the main guy whose watching out for things like membership detection through Akka's gossip-protocol etc. Hence, when we say we want our actor SimpleClusterListener to be the receipient of what goes on around the cluster through this expression

Cluster(system).subscribe(clusterListener, classOf[ClusterDomainEvent])

What effectively happens is that the ClusterDaemon actor delegates all events to this publisher actor ClusterDomainEventPublisher whom is responsible for pushing events related to the cluster onto the event bus. In our example SimpleClusterListener is a subscriber of these events which would otherwise write to void figuratively speaking and when events are being published by the cluster, all subscribers are notified. 

Returning from that deviation, let's try starting up our cluster and have 2 more nodes join the cluster, in turn.

Start up a cluster

Let's start up a cluster of three nodes where two are seed nodes and the third node be associated with either one of the previously started seed nodes. These seed nodes are the entry-points for other nodes to join the cluster and when a new node joins the cluster, the seed nodes communicate amongst themselves to see which one is able to take on this new node. When the new node joins a particular seed node, you would noticed a message that reflects that. Besides getting the relevant dependencies ready in the build.sbt, we need to provide the usual application.conf in the src/main/resources with the following contents:

akka {
 actor {
  provider = "akka.cluster.ClusterActionRefProvider"
 }
 remote {
  log-remote-lifecycle-events = off
  netty.tcp {
   hostname = "localhost"
   port = 0
  }
 }
 cluster {
  seed-nodes = [
   "akka.tcp://Cluster@localhost:2551",
   "akka.tcp://Cluster@localhost:2552"]
  auto-down = on
 }
 log-dead-letters-during-shutdown = off
}

To start this cluster up, we want the seed nodes to start up first (before we allow other nodes to join the cluster) by issuing the following command over a terminal (noticed that the port numbers used corresponds to that found in application.conf):

sbt "run-main simplecluster.SimpleCluster 2551"
sbt "run-main simplecluster.SimpleCluster 2552"

In the time between the issue of two commands, you would notice that the cluster lodged at port 2551 is attempting to connect with the other at port 2552 and fails until the other party starts up and depending on far apart was the issue of the two commands, you would noticed the leadership of the cluster swings from the node at 2551 to the node at port 2552. Next, you can issue a command to bring up a new node which will join either one of these seed nodes by not providing a port number like this (a random port will be assigned to this new node):

sbt "run-main simplecluster.SimpleCluster"

And from this point onwards, you can basically add more nodes by issuing the above command to a terminal and you can bring down a node by either issuing a ctrl-c to the terminal or programmatically by a call like Cluster(system).leave(nodeAddress). The interesting thing you can do upon cluster startup is to run a piece of code and you do that by having an expression Cluster(system).registerOnMemberUp(someComputation) and we'll see in a while how you might apply that idea in the application.

A Distributed Computing Cluster Application

In this section, we are going to demonstrate how you can go about building a compute cluster that does one really important thing: computing the value of Pi i.e. ∏. When you start thinking about coming up with a solution with Scala alone, you can develop a sequential or parallel approach and when you rethink about solution in Akka, you get creative and start thinking of having a master-slave model where the master actor collates the results of multiple slaves (its basically parallel but distributed) and finally in akka-cluster, you may choose to spread the computing over multiple nodes.

Let's start by outlining how the value of Pi can be calculated and we can gradually work out a sequential to parallel approach, and we then transition over to using actors and finally a clustered approach. Calculating the value of Pi is a computationally intensive application or in other words, it's more CPU bound than I/O bound; though the idea of computing this well known value may look trivial, getting it right may not be the case. The formula we will be using for this computation is this:

Formula for calculating `Pi`

and it has the property of being embarassingly parallel, which literally means that each part of the computation can be completed in isolation, and it will be from this formula in which our code will be based upon.

A sequential approach

Like all things, we should establish a base reference implementation upon which to validate whether we are doing the right thing and as a simple example, we have encapsulated the computation of Pi in this function calculatePi and the computation within adopted a tail-recursive implementation which doesn't exhaust the stack and hence we won't experience the nasty StackOverflowException (admittedly, you can do this in other ways). Next, we provide a environment in which we can observe and measure its execution by wrapping this function inside a Scala module, Pi. The code below demonstrates how Pi can be calculated from a single executing thread i.e. the main thread.

package pi_sequential
import scala.annotation.tailrec
object Pi extends App {
  def calculatePi(numberOfElements: Int) : Double = {
    @tailrec
    def calculatePiFor(start: Int, limit: Int, acc: Double) : Double =
      start match {
        case x if x == limit ⇒ acc
        case _ ⇒ calculatePiFor(start + 1, limit, acc + 4.0 * (1 - (start % 2) * 2) / (2 * start + 1))
      }
    calculatePiFor(start, start + numberOfElements - 1, 0.0)
  }
  override def main(args: Array[String]) : Unit = {
    val start = System.currentTimeMillis
    val numberOfIterations = 10000
    val numberOfElements = 10000
    var acc = 0.0
    for(i ← 0 until numberOfIterations)
      acc += calculatePi(i * numberOfElements, numberOfElements)
    println(s"\n\tpi approximation: ${acc}, took: ${System.currentTimeMillis - start} millis")
  }

When you invoke this application via sbt, you would get an output that resembles something similar below

[info] Running pi_sequential.Pi
pi approximation: 3.1435501812459323, took: 739 millis

This is as simple as it gets, really. Compute bound computations when executed by a single thread are typically inefficient from the perspective that it doesn't take advantage of the modern machine's multi-core/threading capabilities and we can improve upon that by using the age-old technique known to most Java developers: running the computation on a pool of threads.

A parallel approach via a Thread Pool

We know that calculating the value of Pi using the above formulation can be achieved in parallel and in isolation. What this means is that we need to provide an abstraction around that individual computation so that we not only delay its evaluation but also to be able to assign each computation to a executor service of our choice; The executor service is useful in modern machine architectures where multiple CPU cores are available and allows the developer to utilize the entire resource available on the system and in Scala, one common technique is to create a pool of threads and when we need work to be done, we simply delegate the task to the pool of resources. We should caution the reader that there are tradeoffs whether to choose a strategy that's based on a single-thread or thread-pool and we invite readers to check out Brian Goetz et al Java Concurrency in Practice for details. The thread pool approach makes sense for a number of reasons and we've named one already, also this approach works because our tasks are homogeneous and independent (we'll see that soon) and when there are more tasks than available threads and creating a pool aprior shaves some latencies & resources off since the computations pick any available thread from the same pool and doesn't have to start/shutdown threads. This strategy works well when the duration of each task is relatively short and you have a considerable amount of compute cores on the machine.

Hence, first thing we need to do is to identify where the isolated task is. In our case, we can use the previous example and if we examine it close enough we can see that this expression 4.0 * (1 - (i % 2) * 2) / (2 * i + 1) comes to mind and this expression evaluates to a Double and we can delay its execution by reformulating this expression into a function such that the type of this function is () ⇒Double. Now that we know this, we can collect all these individual expressions into a collection like List[() ⇒ Double]. This is not enough because we know from standard Scala rules, we cannot dictate the execution preference to a Scala container so we need to build another abstraction that captures the function i.e. () ⇒Double as well as allowing us to dictate our computational preference and we called this container a ThreadStrategy. Finally, we have an expression that reads like the following:

for{
    // iterate through all expressions
  } yield executeStrategy.execute( () ⇒ 4.0 * (1 - (i % 2) * 2) / (2 * i + 1))

For the purpose of illustrating this example, we have created two strategies for executing functions: (a) SameThreadStrategy and (b) ThreadPoolStrategy. The above functionality is encapsulated inside calculatePi which carries the implicit object that indicates our preferred choice of running this calculation. The following code shows you our approach:

def calculatePi(start: Int, numberOfElements: Int)
               (implicit executeStrategy : ThreadStrategy = SameThreadStrategy) : Double = {
      val computations =
        for (i ← start until (start + numberOfElements - 1))
          yield executeStrategy.execute( () ⇒ 4.0 * (1 - (i % 2) * 2) / (2 * i + 1))
      computations.aggregate(0.0)( (acc,f) ⇒ f() + acc, _ + _ )
    }

That function calculatePi is turn wrapped inside a Scala module named Pi in which we can measure and observe its execution when given a ThreadPool versus the single-thread; the following code demonstrates this:

override def main(args: Array[String]) : Unit = {
  val start = System.currentTimeMillis
  val numberOfIterations = 10000
  val numberOfElements = 10000
  var acc = 0.0
  for(i ← 0 until numberOfIterations)
    acc += calculatePi(i * numberOfElements, numberOfElements)(ThreadPoolStrategy)
 
  println(s"\n\tpi approximation: ${acc}, took: ${System.currentTimeMillis - start} millis")
}

A run of this application via sbt would give you an output similar to this (if you have a multi-core machine, you would noticed that the console would indicate different thread ids):

[info] Running pi_parallel.Pi
...
thread-ID: 477
thread-ID: 479
thread-ID: 478
thread-ID: 480
pi approximation: 3.1435501812459323, took: 41592 millis

The careful reader would notice that it took a much longer time as compared to the sequential and the developer might be led to belief that the sequential approach is significantly better! There are plausibly a number of reasons for this: (a) our algorithm implementation is probably not the most optimized (b) the number of cores on the test machine is small (c) speed of each core etc. As an example how changing the algorithm would improve the run times of approximating Pi, we can use the discrete approach which means discovering the integral of the function that approximates Pi by calculating all the rectangles under that function and the following illustration illustrates this idea

and we divide the area underneath the function into bins and the approach we've taken here is to partition the data range and have a task processed the task's bin and we aggregate all the partials sums once all the tasks have completed execution and we would have an reasonably effective way of computing Pi and this is commonly known as the scatter-gather pattern and we'll see in the next section how we can adapt this pattern to the Actors model. The following formula sums it up:

Below, is the code that illustrates the ideas we just talked about and we gave it another name, calculatePiSlightlyClever for the purpose of contrasting this approach with the above and you noticed that we modeled the computation still using the ThreadPoolStrategy trait we created earlier.

def calculatePiSlightlyClever(start: Int, end : Int, step : Double)(implicit executeStrategy : ThreadStrategy = SameThreadStrategy) : Double = {
  val computations =
  for( i ← start until end )
    yield executeStrategy.execute{ () ⇒ val x = (i + 0.5) * step; 4.0 / (1.0 + (x * x)) }
  computations.aggregate(0.0)( (acc, f) ⇒ f() + acc, _ + _ )
}

This time, our runtimes have improved significantly by approximately six-fold by selecting another algorithm and application of simple task and data decomposition ideas for calculating Pi and a sample run of this computation shows

[info] Running pi_parallel.PiSlightlyClever
pi approximation: 3.141592653589859, took: 955 millis

We caution the reader to be judicious when applying a change in algorithm as it may result in a loss of precision and the best advice is to consult with subject matter experts and past-/recent research collaterals. In our example, we experienced a loss of precision to a thousandth and for our purpose in this book, its a tradeoff we can live with but you might not have luxury. Thus far, we have explored a few ways in which we can utilize the compute resources that's available on a single machine/node and let's take a look at how we can model our problem with Actors in the coming section.

An actors approach

In this section, we are going to take a look at how we can implement this solution using actors. Considering that you've been working through this book for a while now, you may have many ideas how to go about doing this. Our approach evolves from the previous section where we apply the scatter-gather pattern to the actor and this term originated in vector/array addressing, to the best of our knowledge, and has been applied to many disciplines in computer science. The Java fork-join model is probably the best analogy to go about understanding this and in our actor implementation, we go about implementing this idea.

We are going to a Master actor that's responsible for starting other Worker actors when the workers received the message Calculate, which also carried the payload encapsulated by a message called Work. >Each Worker will proceed and compute the value of Pi and return the result by bandwagoning the value on the message Result and the master keeps track of how many workers have returned and finally present the result back. The figure below illustrates the scatter-gather approach.

Additionally, we want to know how long it took for the computation to complete so two marks are made in the master actor to measure the duration it took when it started up to the time all the workers have returned from their computation and we have a message that does this named ApproximatedPi. Here are the messages we used for this application:

sealed trait PiMessage
case object Calculate extends PiMessage
case class Work(start: Int, numberOfElements: Int) extends PiMessage
case class Result(value: Double) extends PiMessage
case class ApproximatedPi(pi: Double, duration: Duration)

and the Worker's implementation is shown as follows:

class Worker extends Actor {
  @tailrec
  private def calculatePiFor(start: Int, limit: Int, acc: Double) : Double =
    start match {
      case x if x == limit ⇒ acc
      case _ ⇒ calculatePiFor(start + 1, limit, acc + 4.0 * (1 - (start % 2) * 2) / (2 * start + 1))
    }
  def receive = {
    case Work(start, numberOfElements) ⇒ sender ! Result(calculatePiFor(start, start + numberOfElements - 1, 0.0))
}}

with the Master actor's implementation which takes care of measuring the duration of the entire computation and once all the results have been aggregated, we notify the listener which simply prints out the result whilst the Master shuts down. All this code should be relatively familiar with you by now and you may want to refresh yourself on the chapters on Working with Actors and Routers.

class Master(numberOfWorkers: Int, numberOfMessages: Int, numberOfElements: Int, listener: ActorRef) extends Actor {
  var pi: Double = _
  var numberOfResults : Int = _
  val start: Long = System.currentTimeMillis
  val workerRouter = context.actorOf(Props[Worker].withRouter(RoundRobinRouter(numberOfWorkers)), name = "routerForWorkers")

class Master(numberOfWorkers: Int, numberOfMessages: Int, numberOfElements: Int, listener: ActorRef) extends Actor {
  var pi: Double = _
  var numberOfResults : Int = _
  val start: Long = System.currentTimeMillis
  val workerRouter = context.actorOf(Props[Worker].withRouter(RoundRobinRouter(numberOfWorkers)), name = "routerForWorkers")
  def receive = {
    case Calculate ⇒ for (i ← 0 until numberOfMessages) workerRouter ! Work(i * numberOfElements, numberOfElements)
    case Result(value) ⇒ pi += value
                          numberOfResults += 1
                          numberOfResults match {
                          case x if x == numberOfMessages ⇒ listener ! ApproximatedPi(pi, duration = (System.currentTimeMillis - start).millis)
                                                             context.stop(self)
                          case _ ⇒
                          }
  }
}
class Listener extends Actor {
  def receive = {
    case ApproximatedPi(pi, duration) ⇒ println(s"\n\tPi approximation: $pi, took: $duration ")
                                         context.system.shutdown()
  }
} 

Other than keeping track of how many workers have completed their job, the master is also responsible to shut down all supervised actors after sending the result, ApproximatedPi, to the listener (another actor which shutdowns after printing out the result); the master shutdowns by stopping itself through invoking context.stop(self) and that casades the shutdown to the other supervised actor which is workerRouter.

Let's take this for a spin! A sample run on our test machine via `sbt` has the following output (depending on your machine specifications, you might have a different runtime) 

[info] Running pi_actors.Pi
Pi approximation: 3.1435501812459323, took: 456 milliseconds

An clustered approach

Earlier in this chapter, we have understood some of the mechanics of the Akka cluster and over here we're going to demonstrate to you how we are going to adapt the design of the Actors approach to the cluster approach. There are several ways to design a solution around this problem i.e. computing the value of Pi and the approach we like to show you is to have a number of nodes start up in our cluster and once the cluster is ready, some front-end service will begin serving those compute requests to the cluster and the manner in which the cluster picks up jobs will be in a FIFO manner.

In our choice of design, we are going to build a cluster that has two basic components: (a) a front-end service & (b) a back-end service. In our context, the front-end service would be responsible for serving the request to compute Pi using our back-end service and in that, we would reuse the scatter-gather design we implemented in the previous section. To help our front-end service manage our backend's availability, we'll apply a simple coordination strategy to mark a backend i.e. cluster as busy or idle. The backend service, PiBackend, has two components: (a) a Cluster backed by an Actor (b) a Scala module that drives the creation of the cluster and we have the same approach for the frontend service as well, named PiFrontend.

object PiBackend {
  def main(args: Array[String]): Unit = {
    // Override the configuration of the port when specified as program argument
    val config =
      (if (args.nonEmpty) ConfigFactory.parseString(s"akka.remote.netty.tcp.port=${args(0)}")
        else ConfigFactory.empty).withFallback(
          ConfigFactory.parseString("akka.cluster.roles = [backend]")).
          withFallback(ConfigFactory.load())
    val system = ActorSystem("ClusterSystem", config)
    system.actorOf(Props[PiBackend], name = "backend")
  }
}

The above driver for our backend would start up our cluster-backed-actor on the designated port (read in from the commandline) or randomly assigned on the designated host. Simple. What happens next is a little more involved (see accompany code following this paragraph), when the actor PiBackend starts up, it evolves into a cluster by invoking Cluster(context.system) and assigning the returned value to an immutable value conveniently called cluster. During the cluster startup, it also registers itself to listen in when a new node joins the cluster and is ready to receive requests i.e. MemberUp in the function preStart. When the cluster experienced changes either members come and go or become unreachable, then a message of CurrentClusterState is intercepted by our cluster and for every node that is a member of the cluster and is up i.e. MemberUp we'll notify the front-end of this node's presence by passing that member to our function, register; we'll see in a while why this is important. Thus far, we've described mechanisms employed when building a typical cluster and we present the code:

class PiBackend extends Actor {
  var originalSender : ActorRef = _
  var jobDispatcher : ActorRef = _
  var currentJob : CalculatePi = _
  var pi: Double = _
  var numberOfResults : Int = _
  val start: Long = System.currentTimeMillis
  val numberOfWorkers = java.lang.Runtime.getRuntime.availableProcessors
  lazy val workerRouter = context.actorOf(Props[Worker].withRouter(RoundRobinRouter(numberOfWorkers)), name = "routerForWorkers")
  val cluster = Cluster(context.system)
  // subscribe to cluster changes, MemberUp
  // re-subscribe when restart
  override def preStart(): Unit = cluster.subscribe(self, classOf[MemberUp])
  def receive = {
    case (job : CalculatePi, orgSender: ActorRef) ⇒
      originalSender = orgSender // capture the original sender i.e. the module PiFrontend
      jobDispatcher = sender // capture the dispatcher i.e. the actor PiFrontend
      currentJob = job // capture the job the dispatcher wants me to do
      for (i ← 0 until job.numberOfMessages) workerRouter ! Work(i * job.numberOfElements, job.numberOfElements)
    case Result(value) ⇒ pi += value
      numberOfResults += 1
      numberOfResults match {
      case x if x == currentJob.numberOfMessages ⇒ originalSender ! ApproximatedPi(pi, duration = (System.currentTimeMillis - start).millis)
      jobDispatcher ! (currentJob, Idle())
      context.stop(self)
    case _ ⇒
    }
  case state: CurrentClusterState ⇒
    state.members.filter(_.status == MemberStatus.Up) foreach register
  case MemberUp(m) ⇒ register(m)
}
def register(member: Member): Unit =
  if (member.hasRole("frontend"))
    context.actorSelection(RootActorPath(member.address) / "user" / "frontend") ! BackendRegistration
}

Next, our backend service needs to know how to conduct two things: (a) compute Pi, (b) communicate the results back to the frontend service. The functionality is encapsulated in the function receive (as seen above) and over here we need to look at how our frontend service functions in relation to our backend service. Recall earlier that we wanted our frontend service to serve compute requests to our cluster and we should not send requests to the backend when it has finished the previous request. Another way to think about this is that the frontend, PiFrontend, is a gate-keeper of sorts to our cluster. 

PiFrontend attempts to send the job, CalculatePi, to our cluster by checking if any backends are registered and it knows this when any PiBackend sends the message BackendRegistration when it comes up in the cluster (see earlier code). Through this mechanism, PiFrontend is watching the multitude of PiBackends and in our example, PiFrontend is watching for the event when any of the backends leaves the cluster or goes down by keeping an eye for the message Terminated and our reaction to that is to remove that particular node. Any new node will, by default, be given a state Idle and transitions to Busy when it's working on our compute Pi request. The following code expresses these ideas:

class PiFrontend extends Actor {
  var backends = IndexedSeq.empty[(ActorRef, State)]
  var jobCounter = 0
  def receive = {
    case job: CalculatePi if backends.isEmpty ⇒
      sender ! CalculatePiFailed("Service unavailable, try again later", job)
    case job: CalculatePi ⇒
      jobCounter += 1
      val availBEs = backends.filter(_._2 == Idle()) // find `idle` backends
      val index = jobCounter % availBEs.size
      backends(index)._1 ! (job, sender) // send job to `Idle` backend
      val temp = backends(index)
      backends.updated(index, temp._1 → Busy()) // mark backend as `Busy`
    case (_: CalculatePi, s: State) ⇒
      val aBackend = backends.find(_._1 == sender).head // find the backend
      val index = backends.indexOf(aBackend)
      backends.updated(index, aBackend._1 → Idle()) // marked backend as `Idle`
    case BackendRegistration if !backends.contains(sender) ⇒
      context watch sender
      backends = backends :+ (sender, Idle())
    case Terminated(a) ⇒
      backends = backends.filterNot(_._1 == a)
  }
}

We've seen how our frontend would attempt to dispatch the jobs to available i.e. Idle backends for processing but we've not seen how those requests are generated. To generate those requests, we know that we want to launch that avalanche of requests only when the cluster is up (i.e. all nodes should be in the Up-state) and the manner in which we handle this is by configuring akka.cluster.min-nr-of-members to be equal to 3 in our application.conf. Finally, we use a convenience method, registerOnMemberUp, to place our launch code which basically lifts our code into a java.lang.Runnable

object PiFrontend {
  def main(args: Array[String]): Unit = {
    // Override the configuration of the port when specified as program argument
    val config =
      (if (args.nonEmpty) ConfigFactory.parseString(s"akka.remote.netty.tcp.port=${args(0)}")
      else ConfigFactory.empty).withFallback(
      ConfigFactory.parseString("akka.cluster.roles = [frontend]")).
      withFallback(ConfigFactory.load())
    val system = ActorSystem("ClusterSystem", config)
    val frontend = system.actorOf(Props[PiFrontend], name = "frontend")
    val cluster = Cluster(system)
    cluster.subscribe(frontend, classOf[ClusterDomainEvent])
    cluster.registerOnMemberUp {
      import scala.util.Random._
      import system.dispatcher
      implicit val timeout = Timeout(10 seconds)
      // Depending on how fast your cluster computes each computation, not all jobs may get to
      // run on the cluster. For purpose of illustration, we'll vary the computation length
      for (n ← 1 to 5) {
        (frontend ? CalculatePi(nextInt(10000), nextInt(10000))) onSuccess {
        case result ⇒ println(result)
      }
      // wait a while until next request,
      // to avoid flooding the console with output
      Thread.sleep(2000)
    }
   }
 }
}

And here's the configuration in our application.conf:

akka {
 actor {
   provider = "akka.cluster.ClusterActorRefProvider"
 }
 remote {
   log-remote-lifecycle-events = off
   netty.tcp {
     hostname = "localhost"
     port = 0
   }
 }
 cluster {
   seed-nodes = [
     "akka.tcp://ClusterSystem@localhost:2551",
     "akka.tcp://ClusterSystem@localhost:2552"]
     auto-down = on
     min-nr-of-members = 3
 }
 log-dead-letters-during-shutdown = off
}

To run our example, open three terminals and execute the following commands in each terminal, in turn:

sbt "run-main pi_cluster.PiBackend 2551"
sbt "run-main pi_cluster.PiBackend 2552"
sbt "run-main pi_cluster.PiFrontend"

When the Akka cluster framework detects that all members of the cluster are up i.e. akka.cluster.min-nr-of-members, it'll run the code that we registered via registerOnMemberUp and here's a sample output captured from the terminal hosting PiFrontend:

CalculatePiFailed(Service unavailable, try again later,CalculatePi(3351,4383))
ApproximatedPi(3.1935353605129233,8057 milliseconds)
ApproximatedPi(3.1438214359957004,11173 milliseconds)
CalculatePiFailed(Service unavailable, try again later,CalculatePi(6921,1532))
CalculatePiFailed(Service unavailable, try again later,CalculatePi(4597,4886))

The following diagram illustrates how our cluster looks like schematically with the kinds of messages transmitted within our cluster:

Our design of choice has deficiencies, naturally and for the purpose of this book we've ignored quite a fair bit but when using clusters in an production environment, you would want to give it further thought and asks questions like:

  • Do my application(s) really need a cluster?
    • Our example could be accomplished through the Actors approach too but depending on the requirements of the business, you may wish to ensure greater fault-tolerance via Akka clusters.
  • How does one package & deploy the cluster in production? Automated? Manual is out of the question, of course
    • We did not cover that in detail or barely since its really a topic that involves organization processes and systems architecture with a wide plethora of approaches and tools  
  • How to manage failures in application/business logic?
    • In our example, we would like our cluster to be able shelve our requests till the resources becomes available but we didn't do that. 
  • How to manage the cluster administratively?
  • How to make the cluster react based on load? In particular how to scale up or down?

That last question is a topic we did not cover in this book and we encourage readers to do so by exploring a particular facet of this problem by employing the cluster aware routers in the akka.cluster.routing package. We've outlined the new routers i.e. AdaptiveLoadBalancingGroup, AdaptiveLoadBalancingPool available in Akka 2.3 in a previous paragraph and its certainly allows a good level of granularity in your current or future cluster designs.

Testing the cluster approach

We are going to leverage the concepts we've developed in the chapter on Testing, in particular applying the multi-node testkit i.e. akka-multi-node-testkit to our cluster design. The two test cases we want to validate against are:

  • A frontend can be started within a time frame (with no backends) and issuing a request to compute Pi would inevitably fail
  • A frontend and two backends are started and issuing a compute Pi request would eventually be successful (once the backends have registered with the frontend, as what we designed)

Hence, we know that we need three nodes (i.e. 1 frontend , 2 backends) and in multi-node testing, we need to assign roles to each of these nodes as well as have as many objects of MultiNodeConfig as there are nodes (required by the test cases), i.e. 3, and the following demonstrates this manifestation:

object PiClusterSpecConfig extends MultiNodeConfig {
  // register the named roles (nodes) of the test
  val frontend1 = role("frontend1")
  val backend1 = role("backend1")
  val backend2 = role("backend2")
  ...
  nodeConfig(frontend1)(
    ConfigFactory.parseString("akka.cluster.roles =[frontend]"))
 
  nodeConfig(backend1, backend2)(
    ConfigFactory.parseString("akka.cluster.roles =[backend]"))
}
 
class PiClusterSpecMultiJvmNode1 extends PiClusterSpec
class PiClusterSpecMultiJvmNode2 extends PiClusterSpec
class PiClusterSpecMultiJvmNode3 extends PiClusterSpec

No surprises there, we hope. Next, we would like to be able to start the first frontend which would be used by the backends in the subsequent test and we like to make sure it can be started up. The test issues a CalculatePi request to an non-existent backend and this would failed and a CalculatePiFailed message would be caught and the following illustrates that:

"start first frontend" in within(15 seconds) {
  runOn(frontend1) {
    // this will only run on the 'first' node
    Cluster(system) join node(frontend1).address
 
    val piFrontend = system.actorOf(Props[PiFrontend], name = "frontend")
    piFrontend ! CalculatePi(10000, 10000)
 
    expectMsgPF() {
    // no backends yet, service unavailble
    case CalculatePiFailed(_, CalculatePi(10000,10000)) ⇒
    }
}
// this will run on all nodes
// use barrier to coordinate test steps
testConductor.enter("frontend1-started")
}

The next test leverages on the created frontend and we start our backends i.e. PiBackend, and lodge them on the two nodes we defined earlier and have a CalculatePi request and this time round, if the test doesn't take more than the time frame i.e. 20 seconds and the computation completes, then a message 3.2454043060553874 would be returned as expected. The following code encapsulates these ideas:

"start two backends which automatically registers, verify service" in within(20 seconds) {
  runOn(backend1) {
    Cluster(system) join node(frontend1).address
    system.actorOf(Props[PiBackend], name = "backend")
  }
  testConductor.enter("backend1-started")
 
  runOn(backend2) {
    Cluster(system) join node(frontend1).address
    system.actorOf(Props[PiBackend], name = "backend")
  }
  testConductor.enter("backend2-started")
 
  runOn(frontend1) {
    assertServiceOk()
  }
 
  testConductor.enter("all-ok")
}

To run this, we would issue the test command on sbt and it would reflect the execution of the two tests in their written order (exiting with a message: All tests passed). A sample output of that run looks like this:

[info] * pi_cluster.PiClusterSpec
[JVM-1] Run starting. Expected test count is: 2
[JVM-1] PiClusterSpecMultiJvmNode1:
[JVM-1] The compute `Pi` service
[JVM-2] Run starting. Expected test count is: 2
[JVM-2] PiClusterSpecMultiJvmNode2:
[JVM-3] Run starting. Expected test count is: 2
[JVM-3] PiClusterSpecMultiJvmNode3:
[JVM-2] The compute `Pi` service
[JVM-3] The compute `Pi` service
[JVM-1] - must start first frontend
[JVM-2] - must start first frontend
[JVM-3] - must start first frontend
[JVM-1]
[JVM-1] No backend is up to service request
[JVM-1]
[JVM-1] !!!BACKEND REGISTERED!!!
[JVM-1]
[JVM-1] !!!BACKEND REGISTERED!!!
[JVM-1]
[JVM-1] YES!!! 3.2454043060553874
[JVM-1] - must start two backends which automatically registers, verify service
[JVM-3] - must start two backends which automatically registers, verify service
[JVM-2] - must start two backends which automatically registers, verify service

Next, we will take a look at how our design can be adapted to the Play framework and in particular, we take a look at how we can front our akka cluster with a Play instance and how we can trigger these computations through a HTTP interface.

Integrating with Play 2.2.x Framework 

The Play Framework is an advanced HTTP server implementation that is not only light-weight, stateless but it's also built on top of Akka; it also supports asynchronous i.e non-blocking I/O, streams, WebSockets, Ajax and has JSON as a first-class citizen within the framework and finally RESTful by default. When you combine that with the capabilities in Akka as a whole, the developer would quickly realize that the choices of design becomes really large and with choice, comes with the agility in design. 

Previously, we had a configuration of three (3) nodes where the frontend i.e. PiFrontend would serve requests to two backends i.e. PiBackend and that worked out alright. Quite possibly in the not so far future, you might want to experiment building out the design to have a Play instance play the role of the frontend and have that instance serve requests to your Pi cluster. There are a few ways you can do this and for the purpose of this book, we like to show you how to reuse most of the components we have illustrated previously in the simplest manner.

Buildout the Play Controller

Play adopts a MVC i.e. model-controller-view model and when a Play instance starts up, it'll load the controller code and it's life begins and our frontend actor, PiFrontend, starts up upon receiving the first HTTP GET request. The controller does this by creating a ActorSystem with its configuration and when the actor starts up, it'll join the the backend which would have been started already. As before, the backends would register itself to the frontend therefore, the first calculatePi request would inevitably fail but the subsequent requests should succeed. The following code demonstrates how our actor code is part of the controller and how a cluster is subsequently started.

package controllers
object Application extends Controller {
  import play.api.Play.current
  import pi_cluster._
  def loadClusterConfiguration() : Config = ConfigFactory.load("ClusterSystem.conf")
  def createActorSystem(configuration: Config) = ActorSystem.create("ClusterSystem", configuration)
  def createFrontendService(actorSystem: ActorSystem) = actorSystem.actorOf(Props[PiFrontend], "frontend")
  def createClusterAndWatch(actorSystem: ActorSystem)(service: ActorRef) = Cluster(actorSystem).subscribe(service, classOf[ClusterDomainEvent])
  val actorSystem = createActorSystem(loadClusterConfiguration)
  val frontend = createFrontendService(actorSystem)
  createClusterAndWatch(actorSystem)(frontend)
  // more code omitted
  def calculatePi() = { .... }
}

Next, calculatePi would be function to activate when a HTTP GET request arrives and we would like to be able to serve requests to the backend and return that result to the requester; in Play all requests are handled by an Action but using that directly leads to synchronous code and that would not go too well if our computation takes a longer time than expected so we use another form i.e. Action.async that improves concurrency and returns a asynchronous result wrapped in a Future i.e. Future[Result] and to guard against longer-than-expected-computation we can apply an common idiom in Play to time-out after a pre-defined duration e.g. we use 5 seconds and we return the result back to the requester be it an error or the value of the computed value of Pi. The following code, calculatePi, in the controller captures these ideas:

def calculatePi() = Action.async {
  import scala.util.Random._
  val limit = 10000
  val catchAll = play.api.libs.concurrent.Promise.timeout("doh!", 5 seconds)
  import scala.concurrent.Future
  val f = frontend ? pi_cluster.CalculatePi(nextInt(limit), nextInt(limit))
  Future.firstCompletedOf(Seq(f, catchAll)) map {
    case msg : String                    ⇒ NotFound
    case f: pi_cluster.CalculatePiFailed ⇒ Ok(f.reason)
    case data: pi_cluster.ApproximatedPi ⇒ Ok(data.pi.toString)
  }
}

Now that we have our function to be invoked, we need to provide a mapping by using a routing file; this mapping maps the HTTP-request type (e.g. POST, GET, PUT, DELETE, TRACE) to a particular implementation in the controller and in our example we would like to invoke through a URI http://<somehost>:<someport>/calculatePi. In Play, you can do this by providing a routing file at conf/routes at the root directory of our Play application i.e. play_with_akka_cluster with the following contents:

GET <tabspace> / <tabspace> controllers.Application.index
GET <tabspace> / <tabspace> calculatePi controllers.Application.calculatePi
GET <tabspace> /assets/*file <tabspace> controllers.Assets.at(path="/public", file) 

A sample run

To get our example to run, we need to get a few things ready and let's walk through how to get the application running. First of all, get the code for the book and deposit it somewhere on a directory of your choice. Next, we need to build the code for this section but before we go about doing that, we need to make the artifacts of our dependency available to our controller by publishing them locally like this

ch10-akka-cluster $> sbt publish-local

Which deposits those artifacts locally and we need to be able to reference that dependency and in sbt we include that dependency in our sbt build file, build.sbt, like this:

libraryDependencies ++= Seq(
 "ch10-akka-cluster" %% "ch10-akka-cluster" % "1.0",

Finally, we build the controller logic using those published artifacts by launching sbt and issue a compile command 

[info] Set current project to play_with_akka_cluster (in build file:/Users/tayboonl/akka-edge/ch10-akka-cluster/play_with_akka_cluster/)
[play_with_akka_cluster] $ compile
[success] Total time: 1 s, completed Nov 12, 2013 1:52:04 PM
[play_with_akka_cluster] $

That completed, we are ready to launch our revised application and to do that we launch two instances of PiBackend and then the Play instance (which would launch our controller, Application) by issuing the following commands in three terminals like this:

ch10-akka-cluster $> sbt "run-main pi_cluster.PiBackend 2551"
ch10-akka-cluster $> sbt "run-main pi_cluster.PiBackend 2552"
ch10-akka-cluster/play_with_akka_cluster $> sbt run

As before, the backends would connect amongst themselves and our cluster, ClusterSystem, is formed. Next, we issue a HTTP GET request either using the UNIX command, curl e.g. curl localhost:9000/calculatePi and eventually you would receive a message on the computed value of Pi.

 

There has been error in communication with Booktype server. Not sure right now where is the problem.

You should refresh this page.