English |  Español |  Français |  Italiano |  Português |  Русский |  Shqip

Developing an Akka edge special code

Chapter 2: Working with Actors

In this chapter, we'll get a brief description of how actors are created in Akka, including how we can add actors into an existing project and then we'll see some details on how to interact with actors beyond just blindly sending messages. Many of the examples in this book assumes you are working in a UNIX/Linux environment and it would work, generally, in a Windows-based environment with a few minor adjustments. In later chapters, we'll revisit portions of this example and refine it, explaining the features Akka provides along the way.

Determining the scope

It's hard to imagine there are many developers out there right now who haven't written some kind of web-based code, whether it was a Rails application, a Java servlet or whatever. This is a seemingly simple task, but a host of complexities can quickly rear their head. Though some of these systems are fairly trivial, in most cases the incoming requests require interaction with some other system components. This might be a database or it could be other web-services or perhaps a message queue, such as ActiveMQ or RabbitMQ.

In this case, we're going to see a simple service for managing a collection of bookmarks. The service will allow for the creation of new bookmarks and the ability to query the saved bookmarks. We'll leave out editing to keep the example code reasonably simple. As we develop this system, we'll look at what enhancements we might like to add and explore how Akka can help out.

You can imagine this service might start as a simple weekend hack that you put together to keep track of links you see on Twitter, Facebook, and elsewhere. But perhaps over time you show it to a few friends who ask to start using it and eventually it might grow into something much larger, needing mutiple servers and dedicated resources. So, you want to make sure the service is robust and that it is resilient to failures.

We'll look at how to build a simple service like that just described and then see how to add on the additional functionality as the requirements expand. We'll go over the basics in this chapter and get comfortable with Akka before we move into deeper waters in later chapters.

Laying the foundation

Now the first question is where to start. We're going to keep things somewhat simplified and ignore a few details, like validation, but these are easy enough to add later. We're going to build this using a very simple Java servlet-based approach, since we really don't want to complicate matters by introducing additional dependencies. And we'll use Jetty to handle loading and running the servlet, without requiring a complete container setup.

With that out of the way, let's think about how we might approach this. We can start by defining a simple servlet that accepts an HTTP Post request, parses the parameters given and updates some data structure. Here's a quick example of how this might work:

import javax.servlet.http.{HttpServletResponse, HttpServletRequest, HttpServlet}
import javax.servlet.annotation.WebServlet
import java.util.UUID
import scala.collection.concurrent.{ Map ⇒ ConcurrentMap }

case class Bookmark(title: String, url: String)

@WebServlet(name = "bookmarkServlet", urlPatterns = Array("/"))
class BookmarkServlet(bookmarks: ConcurrentMap[UUID, Bookmark]) extends HttpServlet {

  override def doPost(req: HttpServletRequest,
                      res: HttpServletResponse) {
    val out = res.getOutputStream()
    val title = req.getParameter("title")
    val url = req.getParameter("url")
    val bookmark = Bookmark(title, url)
    val uuid = UUID.randomUUID()
    bookmarks.put(uuid, bookmark)
    out.print("Stored bookmark with uuid: " + uuid)
  }
  override def doGet(req: HttpServletRequest,
                     res: HttpServletResponse) {
    val out = res.getOutputStream()
    val bookmarkId = req.getParameter("uuid")
    bookmarks.get(UUID.fromString(bookmarkId)) match {
       case Some(bookmark) ⇒ out.println("Retrieved " + bookmark)
       case None           ⇒ out.println("Bookmark with UUID specified does 
not exist.")
    }
 }
} 

This code is fairly typical for a servlet. We're using a scala.collection.concurrent.Map (aliased here as ConcurrentMap, to highlight the type) to store the bookmarks, so it's thread-safe. Let's exercise this example a little just to get a feel of how it works.

Store and Retrieve a bookmark

Navigate to the directory ch2-working-with-actors/Non-Akka and run: > sbt.

Next, enter run on the command prompt: > run

and the SBT (aka Simple Build Tool) will look for our main class i.e. Bookmarker and start it upon which you'll notice that a simple web server has lodged itself on your machine. A quick note here is that most of the code and commands you'll see later on assumes a SBT session with dependencies already loaded. Next, we are going to store our first bookmark in our system and retrieve it using a key that will be returned to the user. Assuming you are working with some kind of UNIX system, you would do the following to store our first bookmark (We're using curl which is a standard UNIX commandline tool which you can use to conduct HTTP operations, in our case it's POST and GET)

> curl localhost:8080 -d title="Developing an Akka edge"&url="http://somepublisher.com/akka-book"

and that previous command would return the following message with the identifier embedded into it

> Stored bookmark with uuid: 39d5968f-921e-41f0-a829-42a9cbf91ea9

Next, you can continue to use curl to query our system or fire up a browser 

> curl localhost:8080?uuid=39d5968f-921e-41f0-a829-42a9cbf91ea9

And the following message is rendered on your command prompt or browser

Retrieved Bookmark(OpenCL,null)

This is all good and well until we need to do something besides simply adding, editing, and getting the list of bookmarks. Let's assume at some point you want to retrieve a bookmark by URL, perhaps in order to avoid adding duplicates. Since the URL is contained within the Bookmark object, we would have to traverse the Map looking at all of the contained objects in it to find any bookmarks with the specified URL. It's certainly possible to build a set of data-structures to store various cross-referencing indexes, but at this point you'll end quickly lose the thread-safety of the concurrent map we've used above. Instead, you would need to manually lock the collections all at once, so that any changes to one get reflected in the others.

There's also the issues that you can't easily interact with the same map from other instances of the application or from another machine. A better solution would be to use an external database of some form. Once you begin interacting with external services, though, actors can be very useful, as we'll see shortly (and, to a larger degree, throughout the rest of this book).

Putting actors to work

First, though, we're going to take you on a brief detour to show you how to create and use actors in Akka. We mentioned in the previous chapter, it all starts with an ActorSystem. This is the top-level entry point which you need to have in place before you can create any actors. Creating one is as simple as giving it a name (we will be using this later):

scala> import akka.actor.ActorSystem
import akka.actor.ActorSystem
 scala> val system = ActorSystem("SimpleSystem")
system: akka.actor.ActorSystem = akka://SimpleSystem

But this is not very useful by itself. We still need to create the an actual actor, which we can do by defining a class that extends the akka.actor.Actor trait and that contains a receive method:

scala> import akka.actor.Actor
import akka.actor.Actor
scala> class SimpleActor extends Actor {
 |       def receive = {
 |         case "aMessage" => // ...
 |         case i: Int => // ...
 |       }
 |     }
defined class SimpleActor

Now that we have an actor class defined, we can create an instance of it using the actor system we created earlier. This will create an instance of the class, start it (note: this is done asynchronously), and return an instance of an akka.actor.ActorRef.

scala> import akka.actor.Props
import akka.actor.Props
scala> system.actorOf(Props[SimpleActor], name = "mySimpleActor")
res1: akka.actor.ActorRef = Actor[akka://SimpleSystem/user/mySimpleActor]

Don't worry about what Props is doing there, for now but know that it's a configuration object typically used when creating Actors. Later, we'll show you a few cases where it serves a more clear purpose, but for now just look at it as a basic factory for actor instances.

An ActorRef is the object you must always use to reference an actor in Akka. When I discussed how actors are always referred to be some address in the previous chapter, this is the corresponding representation of that address as used by Akka. From within the actor itself, you can get this reference using the self field. It's important to understand that this reference is the mechanism through which you interact with an actor. This separation is important for Akka to maintain location transparency and to prevent you from directly calling methods on actor instances, which can break certain properties of the actor model. From outside of the actor you cannot, for instance, call fields defined on the actor object itself, you can only interact through the interface that is exposed through this ActorRef object. Here's an example of what would happen if you (even) tried:

scala> class AActor extends Actor { var state = 42L; def changeState(x: Long) = state = x; def receive = { case x => println(s"YOU SENT: ${x}"); sender ! x } }
defined class AActor
scala> system.actorOf(Props[AActor], "a-actor")
res9: akka.actor.ActorRef = Actor[akka://test/user/a-actor#-1660648705]
scala> res9.state
<console>:17: error: value state is not a member of akka.actor.ActorRef
 res9.state
 ^
scala> res9.changeState
<console>:17: error: value changeState is not a member of akka.actor.ActorRef
 res9.changeState

There's another form of this actor initialization that you should know about, though, which comes into play when you have an actor class that takes constructor parameters.

scala> class NamedActor(name: String) extends Actor {
 |       def receive = {
 |         case _ => // ...
 |       }
 |     }
defined class NamedActor
scala> system.actorOf(Props(new NamedActor("myActor")), name = "myNamedActor")
res2: akka.actor.ActorRef = Actor[akka://SimpleSystem/user/myNamedActor]

Beyond this, you will later also encounter cases where you want to create actors from within other actors. This is a very important technique that you'll later discover is fundamental to building systems capable of handling a wide range of tasks. So far, we've been using the ActorSystem instance, generally called system in the examples I've shown, but within an actor instance, there's a private field called context that is defined on the Actor trait, which provides similar functionality. In fact, as with ActorSystem, it extends the ActorRefFactory trait, which defines the actorOf and actorFor (which you will be seeing later in the book) methods. You can also use context to get a reference to the current actor system by calling context.system. The flip-side of creating actors, which also starts them, is stopping them. You can use the context to stop an actor by calling context.stop(ref) where ref is an ActorRef instance. Starting and stopping an actor is done asynchrously.

Finally, the last piece you need to understand for now is how to send messages to actors. Messages can be of any type. Akka will not prevent you from sending whatever you want to send. But it's important to understand the protocol defined for a given actor.


The word protocol we used here was for a reason. A protocol is a well-defined set of rules for how communication occurs between two or more parties. When you're communicating with an actor (whether from another actor or from non-actor code), you should take the time to define your protocol of communication carefully. This is done by defining a set of messages that the actor will accept and possibly respond to. In Akka, we generally use case classes or case objects to define these messages. It's also typical to define them on the companion object for the actor, but we're getting ahead of the game. We'll come back to this later.


You can send messages using the ! method which is used with ActorRef objects. This method, referred to as tell, might look odd, at first, but you'll get used to it quickly and it makes for a very nice and succinct syntax: 

someActor ! MyMessage("hello")

Within an actor, you use the self ActorRef when it needs to send itself a message:

self ! MyMessage("hello")

After that whirlwind tour of using actors in Akka, let's return to our earlier work and see how we can bring Akka into the picture and let it help us out. Rather than showing you a full database enabled example right away, we're going to simply use an actor as a standin for the database. Later, we'll make this actor the actual interface for the database, taking advantage of Akka features such as fault-tolerance and routing to give us a robust and scalable way to interact with the database. Let's look at our new actor first:

import akka.actor.Actor
import java.util.UUID

case class Bookmark(uuid: UUID, title: String, url: String)

object BookmarkStore {
  case class AddBookmark(title: String, url: String)
  case class GetBookmark(uuid: UUID)
}
class BookmarkStore extends Actor {
  import BookmarkStore.{GetBookmark, AddBookmark}
  var bookmarks = Map[UUID, Bookmark]()
  def receive = {
    case AddBookmark(title, url) ⇒
      val exists = bookmarks.values.exists {
        bm ⇒
          (bm.title == title && bm.url == url) || false
      }
      if (!exists) {
        val id = UUID.randomUUID
        bookmarks += (id → Bookmark(id, title, url))
        sender ! Some(id)
      } else 
        sender ! None
      }
    case GetBookmark(uuid) ⇒
      sender ! bookmarks.get(uuid)
  }
}

There's not much to this simple actor. It's essentially just acting as a wrapper around a Map that stores Bookmark objects referenced by their UUID. The interesting thing to note here is how the actor responds to the messages it receives. The sender method used here is defined within the base Actor class and provides the ActorRef to the originator of the current message. You will use sender frequently when writing actor code. One important thing to note is that sender is a method and the object returned by it will change in different contexts. That is, if you need to reference it later (to pass on to some other actor or for replying later within some other context), you should create a copy of the object in a val and pass that around, as needed.

On the servlet end, we're going to take advantage of the Java Servlet 3.0 support for asynchronous calls. While this requires a bit more setup, it makes for much more natural integration with actors. First, here's the boilerplate to actually create the class and make the necessary imports:

import java.util.UUID
import javax.servlet.http.{HttpServletResponse, HttpServletRequest, HttpServlet}
import javax.servlet.annotation.WebServlet
import akka.actor.{ActorRef, ActorSystem}
import akka.pattern.ask
import akka.util.Timeout
import scala.concurrent.ExecutionContext
import scala.concurrent.duration._
import scala.util.{Failure, Success}

@WebServlet(asyncSupported = true)
class BookmarkServlet(system: ActorSystem, bookmarkStore: ActorRef) extends HttpServlet {
  
  // import the case classes we use for communicating with the actor
  import BookmarkStore.{AddBookmark, GetBookmark}
} 

Let's look at the doPost method to see how we interact with the actor:

override def doPost(req: HttpServletRequest,
                    res: HttpServletResponse) {
 
  import ExecutionContext.Implicits.global
 
  val asyncCtx = req.startAsync()
  val writer = asyncCtx.getResponse.getWriter
  val title = req.getParameter("title")
  val url = req.getParameter("url")

  implicit val timeout = Timeout(5 seconds)
  asyncCtx.setTimeout(5 * 1000)
  val uuidFuture = bookmarkStore ? AddBookmark(title, url)
  uuidFuture.mapTo[Option[UUID]].onComplete {
    case Success(uuid) ⇒
      writer.write(s"Successfully created bookmark with uuid = $uuid")
    case Failure(error) ⇒
      writer.write("Failure creating bookmark: " + error.getMessage)
  }
}

Here we first get a reference to an ExecutionContext. An ExecutionContext is an abstraction for defining the context in which some code will be executed. Here we need to specify this because of the requests we're making to the actor. You'll likely notice, after all the discussion earlier about the ! method, that we're not actually using it. Instead, we're using the ? method — along the same lines as tell, this is referred to as >ask. This method returns a future, which is a container for some value that may or may not have yet been computed and this newly created future can't possibly execute forever, that'll be a bad idea hence you would need to supply a timeout and we do so via the implicit value timeout. Since sending a message to an actor is asynchronous, we need to use a future to get a response in non-actor code. The onComplete method is a callback that's available on Future instances. It's this callback that requires the implicit ExecutionContext referenced earlier, because it executes asynchronously, the ExecutionContext needs to be available to supply a thread in which it can execute. We also use Future's mapTo method to constrain the type of the return value. This is useful since messages sent by actors are inherently of a dynamic type and so we can't guarantee the type of the message that will be sent back to use from an actor. If the Future takes longer than this timeout to return a value, we will receive a Failure in the onComplete callback.

You'll also note that we invoke req.startAsync(). This code is part of the Servlet 3.0 API which allows us to get a handle on the current request context so that we can then later return a response to the caller of this servlet asynchronously without having to keep a thread busy. In this case, we get a Writer instance via the asyncCtx.getResponse.getWriter call and then set a timeout (there's a notable similarity here to what we are doing with the Future, for good reason) before our asynchronous code begins executing. This will allow the servlet container to interrupt the flow of execution if the timeout is exceeded and return an appropriate response to the client, hopefully before the client's connection to the server has itself timed out. 




The futures used by Akka are now part of the Scala standard library. They were added with the release of Scala 2.10, but were in large part derived from the futures used in earlier versions of Akka. We brought this up in case you see mention of 'Akka futures' and wonder if they are a different sort of beast.


The servlet's doGet method will look very familiar:

override def doGet(req: HttpServletRequest,
                   res: HttpServletResponse) {

  implicit val ec = ExecutionContext.Implicits.global

  val asyncCtx = req.startAsync()
  val writer = asyncCtx.getResponse.getWriter
  val bookmarkId = UUID.fromString(req.getParameter("uuid"))

  implicit val timeout = Timeout(5 seconds)
  asyncCtx.setTimeout(5 * 1000)
  val bookmarkFuture = bookmarkStore ? GetBookmark(bookmarkId)
 
  bookmarkFuture.mapTo[Option[Bookmark]].onComplete {
    case Success(bm) ⇒
      writer.write(bm.getOrElse("Not found").toString)
    case Failure(error) ⇒
      writer.write("Could not retrieve bookmark: " + error.getMessage)
  }
}

This is essentially using the same pattern we saw in the doPost example. The last method of this servlet we should examine provides a very important bit of functionality, shutting down the ActorSystem when we're done with it. This is why earlier we passed the ActorSystem to the servlet. You should be on the look out when designing your Akka-based application for existing hooks such as this that might be available to handle tasks like cleanly shutting down your system:

override def destroy() {
  system.shutdown()
}

Putting our actor to better use

Now that we've shown a basic actor and how to integrate it into a common scenario, let's look a bit more at how we can expand on this to really leverage the capabilities of actors. As previously discussed, actors are excellent at handling concurrency and scenarios where failures are an expected occurence, so let's consider a situation where these are particularly important. As mentioned earlier, it really makes sense for an application like above to use an external database. This database could be your typical relational system, such as MySQL or PostgeSQL, a non-relational store, such as MongoDB, or perhaps even a web service sitting in front of some unknown anonymous store (that is, it might not be something of which we actually know the internals). To keep things simple, we'll use a standin for a data store, but this could just as easily be an external system:

import scala.collection.concurrent.TrieMap

trait Database[DATA, ID] {
  def create(id: ID, data: DATA): ID
  def read(id: ID): Option[DATA]
  def update(id: ID, data: DATA)
  def delete(id: ID): Boolean
  def find(data: DATA): Option[(ID, DATA)]
}

object Database {
 
  def connect[DATA, ID](service: String): Database[DATA, ID] = {
    new Database[DATA, ID] {
 
      // We're using a thread-safe concurrent collection here to avoid
      // odd behavior when you run the example code.
      private val store = TrieMap[ID, DATA]()

      def create(id: ID, data: DATA) {
        store += (id → data)
        id
      }

      def read(id: ID) = {
        store.get(id)
      }

      def update(id: ID, data: DATA) {
        for (item <- store.get(id)) yield {
          store += (id → data)
          data
        }
      }

      def delete(id: ID) = {
        store -= id
        !store.contains(id)
      }

      def find(data: DATA) = {
        store.find(_._2 == data) 
      }
    }
  }
}

As you can see, this data store of ours supports the basic CRUD (create, read, update, and delete) operations as well as a very simple find operation that assumes the candidate value matches precisely. We'll revise our earlier actor to use this implementation:

case class Bookmark(title: String, url: String)

class BookmarkStore(database: Database[Bookmark, UUID]) extends Actor {

  import BookmarkStore.{GetBookmark, AddBookmark}

  def receive = {
    case AddBookmark(title, url) ⇒
      val bookmark = Bookmark(title, url)
      database.find(bookmark) match {
        case Some(found) ⇒ sender ! None
        case None ⇒
          val uuid = UUID.randomUUID
          database.create(uuid, bookmark)
          sender ! Some(uuid)
      }
    case GetBookmark(uuid) ⇒
      sender ! database.read(uuid)
  }
}

You'll notice that the definition of Bookmark was modified slightly. This isn't critical, but it makes our usage of the simple data store we created earlier a bit simpler. You can see that the actor now expects to have a database instance passed to it, and that it is paramaterized on the Bookmark type we're storing in it, along with the UUID we're using as for our keys in the database. The rest of the functionality here isn't effectively different to the caller of this actor, since it still sends back the same response messages in each case.

Now, in the code that starts our application, we can add one line and modify the line that starts the actor in order to supply the necessary database instance:

val database = Database.connect[Bookmark, UUID]("bookmarkDatabase")

val bookmarkStore = system.actorOf(Props(new BookmarkStore(database)))

This still hasn't really brought anything new to the table. In the real world, if this were an external system as we've described above, we'd be expecting failures, resource contention (how many connections does the database allow or handle before giving us trouble?), etc. Let's add a couple things to both give us a bit of load-balancing and failure handling:

import akka.actor.{OneForOneStrategy, Props, ActorSystem}
import akka.actor.SupervisorStrategy.{Escalate, Restart}
import scala.concurrent.duration._
import akka.routing.RoundRobinRouter

val databaseSupervisorStrategy = 
  OneForOneStrategy(maxNrOfRetries = 5, withinTimeRange = 30 seconds) {
    case e: Exception ⇒ Restart
    case _ ⇒ Escalate
  }

val bookmarkStore =
  system.actorOf(Props(new BookmarkStore(database)).
    withRouter(RoundRobinRouter(nrOfInstances = 10, 
                                supervisorStrategy = databaseSupervisorStrategy)))

This code needs some explanation, of course, and while we will be going into much more detail later in chapters four and five, we'll give you a brief glimpse here of what's going on. First, let's cover the databaseSupervisorStrategy object that we created. Supervisors are a core feature of Akka that are responsible for determining what to do in the case of exceptions within your actors. In this particular strategy we've definied, we are telling the supervisor to restart an actor that throws a regular exception, to escalate any other errors. As briefly mentioned in the previous chapter, actors in an actor system form a hierarchy and each actor in that hierarchy is the supervisor for its children (this relationship defines the supervision hierarchy). Escalating means to pass the failure up to the parent, in which case that parent's own supervisor is responsible for determinging how to handle the failure.

Here, we're inserting a parent actor for the BookmarkStore actor by way of a RoundRobinRouter that's been added to the creation. Routers allow you to determine how messages are sent to one or more child actors. In this case, we decided to use the simple round robin strategy, which simply iterates through the actors it is routing for in a sequence, sending each new message to the next actor in turn, and restarting at the beginning of the sequence when it has reached the last actor. The parameters we've passed, set the number of actors to start within this router (that is, 10 instances of BookmarkStore) and the supervisor strategy I just described.

Wrap up

We've walked through an example showing how Akka can integrate with other libraries like the Servlet API to build interesting combinations of capabilities. So far, we have side stepped the question of how to actually build this app, though. In the next chapter, we'll cover these details and show you a few useful helpers that make your work easier.

There has been error in communication with Booktype server. Not sure right now where is the problem.

You should refresh this page.