In this chapter, we'll get a brief description of how actors are created in Akka, including how we can add actors into an existing project and then we'll see some details on how to interact with actors beyond just blindly sending messages. Many of the examples in this book assumes you are working in a UNIX/Linux environment and it would work, generally, in a Windows-based environment with a few minor adjustments. In later chapters, we'll revisit portions of this example and refine it, explaining the features Akka provides along the way.
It's hard to imagine there are many developers out there right now who haven't written some kind of web-based code, whether it was a Rails application, a Java servlet or whatever. This is a seemingly simple task, but a host of complexities can quickly rear their head. Though some of these systems are fairly trivial, in most cases the incoming requests require interaction with some other system components. This might be a database or it could be other web-services or perhaps a message queue, such as ActiveMQ or RabbitMQ.
In this case, we're going to see a simple service for managing a collection of bookmarks. The service will allow for the creation of new bookmarks and the ability to query the saved bookmarks. We'll leave out editing to keep the example code reasonably simple. As we develop this system, we'll look at what enhancements we might like to add and explore how Akka can help out.
You can imagine this service might start as a simple weekend hack that you put together to keep track of links you see on Twitter, Facebook, and elsewhere. But perhaps over time you show it to a few friends who ask to start using it and eventually it might grow into something much larger, needing mutiple servers and dedicated resources. So, you want to make sure the service is robust and that it is resilient to failures.
We'll look at how to build a simple service like that just described and then see how to add on the additional functionality as the requirements expand. We'll go over the basics in this chapter and get comfortable with Akka before we move into deeper waters in later chapters.
Now the first question is where to start. We're going to keep things somewhat simplified and ignore a few details, like validation, but these are easy enough to add later. We're going to build this using a very simple Java servlet-based approach, since we really don't want to complicate matters by introducing additional dependencies. And we'll use Jetty to handle loading and running the servlet, without requiring a complete container setup.
With that out of the way, let's think about how we might approach this. We can start by defining a simple servlet that accepts an HTTP Post request, parses the parameters given and updates some data structure. Here's a quick example of how this might work:
0001 import javax.servlet.http.{HttpServletResponse, HttpServletRequest, HttpServlet}
0002 import javax.servlet.annotation.WebServlet
0003 import java.util.UUID
0004 import scala.collection.concurrent.{ Map ⇒ ConcurrentMap }
0005
0006 case class Bookmark(title: String, url: String)
0007
0008 @WebServlet(name = "bookmarkServlet", urlPatterns = Array("/"))
0009 class BookmarkServlet(bookmarks: ConcurrentMap[UUID, Bookmark]) extends HttpServlet {
0010
0011 override def doPost(req: HttpServletRequest,
0012 res: HttpServletResponse) {
0013 val out = res.getOutputStream()
0014 val title = req.getParameter("title")
0015 val url = req.getParameter("url")
0016 val bookmark = Bookmark(title, url)
0017 val uuid = UUID.randomUUID()
0018 bookmarks.put(uuid, bookmark)
0019 out.print("Stored bookmark with uuid: " + uuid)
0020 }
0021 override def doGet(req: HttpServletRequest,
0022 res: HttpServletResponse) {
0023 val out = res.getOutputStream()
0024 val bookmarkId = req.getParameter("uuid")
0025 bookmarks.get(UUID.fromString(bookmarkId)) match {
0026 case Some(bookmark) ⇒ out.println("Retrieved " + bookmark)
0027 case None ⇒ out.println("Bookmark with UUID specified does
0028 not exist.")
0029 }
0030 }
0031 }
This code is fairly typical for a servlet. We're using a scala.collection.concurrent.Map (aliased here as ConcurrentMap, to highlight the type) to store the bookmarks, so it's thread-safe. Let's exercise this example a little just to get a feel of how it works.
Navigate to the directory ch2-working-with-actors/Non-Akka and run: > sbt
.
Next, enter run on the command prompt: > run
and the SBT (aka Simple Build Tool) will look for our main class i.e. Bookmarker and start it upon which you'll notice that a simple web server has lodged itself on your machine. A quick note here is that most of the code and commands you'll see later on assumes a SBT session with dependencies already loaded. Next, we are going to store our first bookmark in our system and retrieve it using a key that will be returned to the user. Assuming you are working with some kind of UNIX system, you would do the following to store our first bookmark (We're using curl which is a standard UNIX commandline tool which you can use to conduct HTTP operations, in our case it's POST and GET)
0001 > curl localhost:8080 -d title="Developing an Akka edge"&url="http://somepublisher.com/akka-book"
and that previous command would return the following message with the identifier embedded into it
> Stored bookmark with uuid: 39d5968f-921e-41f0-a829-42a9cbf91ea9
Next, you can continue to use curl to query our system or fire up a browser
> curl localhost:8080?uuid=39d5968f-921e-41f0-a829-42a9cbf91ea9
And the following message is rendered on your command prompt or browser
Retrieved Bookmark(OpenCL,null)
This is all good and well until we need to do something besides simply adding, editing, and getting the list of bookmarks. Let's assume at some point you want to retrieve a bookmark by URL, perhaps in order to avoid adding duplicates. Since the URL is contained within the Bookmark object, we would have to traverse the Map looking at all of the contained objects in it to find any bookmarks with the specified URL. It's certainly possible to build a set of data-structures to store various cross-referencing indexes, but at this point you'll end quickly lose the thread-safety of the concurrent map we've used above. Instead, you would need to manually lock the collections all at once, so that any changes to one get reflected in the others.
There's also the issues that you can't easily interact with the same map from other instances of the application or from another machine. A better solution would be to use an external database of some form. Once you begin interacting with external services, though, actors can be very useful, as we'll see shortly (and, to a larger degree, throughout the rest of this book).
First, though, we're going to take you on a brief detour to show you how to create and use actors in Akka. We mentioned in the previous chapter, it all starts with an ActorSystem. This is the top-level entry point which you need to have in place before you can create any actors. Creating one is as simple as giving it a name (we will be using this later):
0001 scala> import akka.actor.ActorSystem
0002 import akka.actor.ActorSystem
0003 scala> val system = ActorSystem("SimpleSystem")
0004 system: akka.actor.ActorSystem = akka://SimpleSystem
But this is not very useful by itself. We still need to create the an actual actor, which we can do by defining a class that extends the akka.actor.Actor trait and that contains a receive method:
0001 scala> import akka.actor.Actor
0002 import akka.actor.Actor
0003 scala> class SimpleActor extends Actor {
0004 | def receive = {
0005 | case "aMessage" => // ...
0006 | case i: Int => // ...
0007 | }
0008 | }
0009 defined class SimpleActor
Now that we have an actor class defined, we can create an instance of it using the actor system we created earlier. This will create an instance of the class, start it (note: this is done asynchronously), and return an instance of an akka.actor.ActorRef.
0001 scala> import akka.actor.Props
0002 import akka.actor.Props
0003 scala> system.actorOf(Props[SimpleActor], name = "mySimpleActor")
0004 res1: akka.actor.ActorRef = Actor[akka://SimpleSystem/user/mySimpleActor]
Don't worry about what Props is doing there, for now but know that it's a configuration object typically used when creating Actors. Later, we'll show you a few cases where it serves a more clear purpose, but for now just look at it as a basic factory for actor instances.
An ActorRef is the object you must always use to reference an actor in Akka. When I discussed how actors are always referred to be some address in the previous chapter, this is the corresponding representation of that address as used by Akka. From within the actor itself, you can get this reference using the self field. It's important to understand that this reference is the mechanism through which you interact with an actor. This separation is important for Akka to maintain location transparency and to prevent you from directly calling methods on actor instances, which can break certain properties of the actor model. From outside of the actor you cannot, for instance, call fields defined on the actor object itself, you can only interact through the interface that is exposed through this ActorRef object. Here's an example of what would happen if you (even) tried:
0001 scala> class AActor extends Actor { var state = 42L; def changeState(x: Long) = state = x; def receive = { case x => println(s"YOU SENT: ${x}"); sender ! x } }
0002 defined class AActor
0003 scala> system.actorOf(Props[AActor], "a-actor")
0004 res9: akka.actor.ActorRef = Actor[akka://test/user/a-actor#-1660648705]
0005 scala> res9.state
0006 <console>:17: error: value state is not a member of akka.actor.ActorRef
0007 res9.state
0008 ^
0009 scala> res9.changeState
0010 <console>:17: error: value changeState is not a member of akka.actor.ActorRef
0011 res9.changeState
There's another form of this actor initialization that you should know about, though, which comes into play when you have an actor class that takes constructor parameters.
0001 scala> class NamedActor(name: String) extends Actor {
0002 | def receive = {
0003 | case _ => // ...
0004 | }
0005 | }
0006 defined class NamedActor
0007 scala> system.actorOf(Props(new NamedActor("myActor")), name = "myNamedActor")
0008 res2: akka.actor.ActorRef = Actor[akka://SimpleSystem/user/myNamedActor]
Beyond this, you will later also encounter cases where you want to create actors from within other actors. This is a very important technique that you'll later discover is fundamental to building systems capable of handling a wide range of tasks. So far, we've been using the ActorSystem instance, generally called system in the examples I've shown, but within an actor instance, there's a private field called context that is defined on the Actor trait, which provides similar functionality. In fact, as with ActorSystem, it extends the ActorRefFactory trait, which defines the actorOf and actorFor (which you will be seeing later in the book) methods. You can also use context to get a reference to the current actor system by calling context.system. The flip-side of creating actors, which also starts them, is stopping them. You can use the context to stop an actor by calling context.stop(ref) where ref is an ActorRef instance. Starting and stopping an actor is done asynchrously.
Finally, the last piece you need to understand for now is how to send messages to actors. Messages can be of any type. Akka will not prevent you from sending whatever you want to send. But it's important to understand the protocol defined for a given actor.
The word protocol we used here was for a reason. A protocol is a well-defined set of rules for how communication occurs between two or more parties. When you're communicating with an actor (whether from another actor or from non-actor code), you should take the time to define your protocol of communication carefully. This is done by defining a set of messages that the actor will accept and possibly respond to. In Akka, we generally use case classes or case objects to define these messages. It's also typical to define them on the companion object for the actor, but we're getting ahead of the game. We'll come back to this later.
You can send messages using the ! method which is used with ActorRef objects. This method, referred to as tell, might look odd, at first, but you'll get used to it quickly and it makes for a very nice and succinct syntax:
someActor ! MyMessage("hello")
Within an actor, you use the self ActorRef when it needs to send itself a message:
self ! MyMessage("hello")
After that whirlwind tour of using actors in Akka, let's return to our earlier work and see how we can bring Akka into the picture and let it help us out. Rather than showing you a full database enabled example right away, we're going to simply use an actor as a standin for the database. Later, we'll make this actor the actual interface for the database, taking advantage of Akka features such as fault-tolerance and routing to give us a robust and scalable way to interact with the database. Let's look at our new actor first:
0001 import akka.actor.Actor
0002 import java.util.UUID
0003
0004 case class Bookmark(uuid: UUID, title: String, url: String)
0005
0006 object BookmarkStore {
0007 case class AddBookmark(title: String, url: String)
0008 case class GetBookmark(uuid: UUID)
0009 }
0010 class BookmarkStore extends Actor {
0011 import BookmarkStore.{GetBookmark, AddBookmark}
0012 var bookmarks = Map[UUID, Bookmark]()
0013 def receive = {
0014 case AddBookmark(title, url) ⇒
0015 val exists = bookmarks.values.exists {
0016 bm ⇒
0017 (bm.title == title && bm.url == url) || false
0018 }
0019 if (!exists) {
0020 val id = UUID.randomUUID
0021 bookmarks += (id → Bookmark(id, title, url))
0022 sender ! Some(id)
0023 } else
0024 sender ! None
0025 }
0026 case GetBookmark(uuid) ⇒
0027 sender ! bookmarks.get(uuid)
0028 }
0029 }
There's not much to this simple actor. It's essentially just acting as a wrapper around a Map that stores Bookmark objects referenced by their UUID. The interesting thing to note here is how the actor responds to the messages it receives. The sender method used here is defined within the base Actor class and provides the ActorRef to the originator of the current message. You will use sender frequently when writing actor code. One important thing to note is that sender is a method and the object returned by it will change in different contexts. That is, if you need to reference it later (to pass on to some other actor or for replying later within some other context), you should create a copy of the object in a val and pass that around, as needed.
On the servlet end, we're going to take advantage of the Java Servlet 3.0 support for asynchronous calls. While this requires a bit more setup, it makes for much more natural integration with actors. First, here's the boilerplate to actually create the class and make the necessary imports:
0001 import java.util.UUID
0002 import javax.servlet.http.{HttpServletResponse, HttpServletRequest, HttpServlet}
0003 import javax.servlet.annotation.WebServlet
0004 import akka.actor.{ActorRef, ActorSystem}
0005 import akka.pattern.ask
0006 import akka.util.Timeout
0007 import scala.concurrent.ExecutionContext
0008 import scala.concurrent.duration._
0009 import scala.util.{Failure, Success}
0010
0011 @WebServlet(asyncSupported = true)
0012 class BookmarkServlet(system: ActorSystem, bookmarkStore: ActorRef) extends HttpServlet {
0013
0014 // import the case classes we use for communicating with the actor
0015 import BookmarkStore.{AddBookmark, GetBookmark}
0016 }
Let's look at the doPost method to see how we interact with the actor:
0001 override def doPost(req: HttpServletRequest,
0002 res: HttpServletResponse) {
0003
0004 import ExecutionContext.Implicits.global
0005
0006 val asyncCtx = req.startAsync()
0007 val writer = asyncCtx.getResponse.getWriter
0008 val title = req.getParameter("title")
0009 val url = req.getParameter("url")
0010
0011 implicit val timeout = Timeout(5 seconds)
0012 asyncCtx.setTimeout(5 * 1000)
0013 val uuidFuture = bookmarkStore ? AddBookmark(title, url)
0014 uuidFuture.mapTo[Option[UUID]].onComplete {
0015 case Success(uuid) ⇒
0016 writer.write(s"Successfully created bookmark with uuid = $uuid")
0017 case Failure(error) ⇒
0018 writer.write("Failure creating bookmark: " + error.getMessage)
0019 }
0020 }
Here we first get a reference to an ExecutionContext. An ExecutionContext is an abstraction for defining the context in which some code will be executed. Here we need to specify this because of the requests we're making to the actor. You'll likely notice, after all the discussion earlier about the ! method, that we're not actually using it. Instead, we're using the ? method — along the same lines as tell, this is referred to as >ask. This method returns a future, which is a container for some value that may or may not have yet been computed and this newly created future can't possibly execute forever, that'll be a bad idea hence you would need to supply a timeout and we do so via the implicit value timeout. Since sending a message to an actor is asynchronous, we need to use a future to get a response in non-actor code. The onComplete method is a callback that's available on Future instances. It's this callback that requires the implicit ExecutionContext referenced earlier, because it executes asynchronously, the ExecutionContext needs to be available to supply a thread in which it can execute. We also use Future's mapTo method to constrain the type of the return value. This is useful since messages sent by actors are inherently of a dynamic type and so we can't guarantee the type of the message that will be sent back to use from an actor. If the Future takes longer than this timeout to return a value, we will receive a Failure in the onComplete callback.
You'll also note that we invoke req.startAsync(). This code is part of the Servlet 3.0 API which allows us to get a handle on the current request context so that we can then later return a response to the caller of this servlet asynchronously without having to keep a thread busy. In this case, we get a Writer instance via the asyncCtx.getResponse.getWriter call and then set a timeout (there's a notable similarity here to what we are doing with the Future, for good reason) before our asynchronous code begins executing. This will allow the servlet container to interrupt the flow of execution if the timeout is exceeded and return an appropriate response to the client, hopefully before the client's connection to the server has itself timed out.
The futures used by Akka are now part of the Scala standard library. They were added with the release of Scala 2.10, but were in large part derived from the futures used in earlier versions of Akka. We brought this up in case you see mention of 'Akka futures' and wonder if they are a different sort of beast.
The servlet's doGet method will look very familiar:
0001 override def doGet(req: HttpServletRequest,
0002 res: HttpServletResponse) {
0003
0004 implicit val ec = ExecutionContext.Implicits.global
0005
0006 val asyncCtx = req.startAsync()
0007 val writer = asyncCtx.getResponse.getWriter
0008 val bookmarkId = UUID.fromString(req.getParameter("uuid"))
0009
0010 implicit val timeout = Timeout(5 seconds)
0011 asyncCtx.setTimeout(5 * 1000)
0012 val bookmarkFuture = bookmarkStore ? GetBookmark(bookmarkId)
0013
0014 bookmarkFuture.mapTo[Option[Bookmark]].onComplete {
0015 case Success(bm) ⇒
0016 writer.write(bm.getOrElse("Not found").toString)
0017 case Failure(error) ⇒
0018 writer.write("Could not retrieve bookmark: " + error.getMessage)
0019 }
0020 }
This is essentially using the same pattern we saw in the doPost example. The last method of this servlet we should examine provides a very important bit of functionality, shutting down the ActorSystem when we're done with it. This is why earlier we passed the ActorSystem to the servlet. You should be on the look out when designing your Akka-based application for existing hooks such as this that might be available to handle tasks like cleanly shutting down your system:
0001 override def destroy() {
0002 system.shutdown()
0003 }
Now that we've shown a basic actor and how to integrate it into a common scenario, let's look a bit more at how we can expand on this to really leverage the capabilities of actors. As previously discussed, actors are excellent at handling concurrency and scenarios where failures are an expected occurence, so let's consider a situation where these are particularly important. As mentioned earlier, it really makes sense for an application like above to use an external database. This database could be your typical relational system, such as MySQL or PostgeSQL, a non-relational store, such as MongoDB, or perhaps even a web service sitting in front of some unknown anonymous store (that is, it might not be something of which we actually know the internals). To keep things simple, we'll use a standin for a data store, but this could just as easily be an external system:
0001 import scala.collection.concurrent.TrieMap
0002
0003 trait Database[DATA, ID] {
0004 def create(id: ID, data: DATA): ID
0005 def read(id: ID): Option[DATA]
0006 def update(id: ID, data: DATA)
0007 def delete(id: ID): Boolean
0008 def find(data: DATA): Option[(ID, DATA)]
0009 }
0010
0011 object Database {
0012
0013 def connect[DATA, ID](service: String): Database[DATA, ID] = {
0014 new Database[DATA, ID] {
0015
0016 // We're using a thread-safe concurrent collection here to avoid
0017 // odd behavior when you run the example code.
0018 private val store = TrieMap[ID, DATA]()
0019
0020 def create(id: ID, data: DATA) {
0021 store += (id → data)
0022 id
0023 }
0024
0025 def read(id: ID) = {
0026 store.get(id)
0027 }
0028
0029 def update(id: ID, data: DATA) {
0030 for (item <- store.get(id)) yield {
0031 store += (id → data)
0032 data
0033 }
0034 }
0035
0036 def delete(id: ID) = {
0037 store -= id
0038 !store.contains(id)
0039 }
0040
0041 def find(data: DATA) = {
0042 store.find(_._2 == data)
0043 }
0044 }
0045 }
0046 }
As you can see, this data store of ours supports the basic CRUD (create, read, update, and delete) operations as well as a very simple find operation that assumes the candidate value matches precisely. We'll revise our earlier actor to use this implementation:
0001 case class Bookmark(title: String, url: String)
0002
0003 class BookmarkStore(database: Database[Bookmark, UUID]) extends Actor {
0004
0005 import BookmarkStore.{GetBookmark, AddBookmark}
0006
0007 def receive = {
0008 case AddBookmark(title, url) ⇒
0009 val bookmark = Bookmark(title, url)
0010 database.find(bookmark) match {
0011 case Some(found) ⇒ sender ! None
0012 case None ⇒
0013 val uuid = UUID.randomUUID
0014 database.create(uuid, bookmark)
0015 sender ! Some(uuid)
0016 }
0017 case GetBookmark(uuid) ⇒
0018 sender ! database.read(uuid)
0019 }
0020 }
You'll notice that the definition of Bookmark was modified slightly. This isn't critical, but it makes our usage of the simple data store we created earlier a bit simpler. You can see that the actor now expects to have a database instance passed to it, and that it is paramaterized on the Bookmark type we're storing in it, along with the UUID we're using as for our keys in the database. The rest of the functionality here isn't effectively different to the caller of this actor, since it still sends back the same response messages in each case.
Now, in the code that starts our application, we can add one line and modify the line that starts the actor in order to supply the necessary database instance:
0001 val database = Database.connect[Bookmark, UUID]("bookmarkDatabase")
0002
0003 val bookmarkStore = system.actorOf(Props(new BookmarkStore(database)))
This still hasn't really brought anything new to the table. In the real world, if this were an external system as we've described above, we'd be expecting failures, resource contention (how many connections does the database allow or handle before giving us trouble?), etc. Let's add a couple things to both give us a bit of load-balancing and failure handling:
0001 import akka.actor.{OneForOneStrategy, Props, ActorSystem}
0002 import akka.actor.SupervisorStrategy.{Escalate, Restart}
0003 import scala.concurrent.duration._
0004 import akka.routing.RoundRobinRouter
0005
0006 val databaseSupervisorStrategy =
0007 OneForOneStrategy(maxNrOfRetries = 5, withinTimeRange = 30 seconds) {
0008 case e: Exception ⇒ Restart
0009 case _ ⇒ Escalate
0010 }
0011
0012 val bookmarkStore =
0013 system.actorOf(Props(new BookmarkStore(database)).
0014 withRouter(RoundRobinRouter(nrOfInstances = 10,
0015 supervisorStrategy = databaseSupervisorStrategy)))
This code needs some explanation, of course, and while we will be going into much more detail later in chapters four and five, we'll give you a brief glimpse here of what's going on. First, let's cover the databaseSupervisorStrategy object that we created. Supervisors are a core feature of Akka that are responsible for determining what to do in the case of exceptions within your actors. In this particular strategy we've definied, we are telling the supervisor to restart an actor that throws a regular exception, to escalate any other errors. As briefly mentioned in the previous chapter, actors in an actor system form a hierarchy and each actor in that hierarchy is the supervisor for its children (this relationship defines the supervision hierarchy). Escalating means to pass the failure up to the parent, in which case that parent's own supervisor is responsible for determinging how to handle the failure.
Here, we're inserting a parent actor for the BookmarkStore actor by way of a RoundRobinRouter that's been added to the creation. Routers allow you to determine how messages are sent to one or more child actors. In this case, we decided to use the simple round robin strategy, which simply iterates through the actors it is routing for in a sequence, sending each new message to the next actor in turn, and restarting at the beginning of the sequence when it has reached the last actor. The parameters we've passed, set the number of actors to start within this router (that is, 10 instances of BookmarkStore) and the supervisor strategy I just described.
We've walked through an example showing how Akka can integrate with other libraries like the Servlet API to build interesting combinations of capabilities. So far, we have side stepped the question of how to actually build this app, though. In the next chapter, we'll cover these details and show you a few useful helpers that make your work easier.
There has been error in communication with Booktype server. Not sure right now where is the problem.
You should refresh this page.