Saturday, February 22, 2014

Akka in 100 lines - Actors and Message Passing

Here is a quick program in Akka (2.2 in case it matters).  The program is kept short at 100 lines to make it easier to understand what is going on which is also why there are many println statements and a few unused variables.  It's a starting point for getting something bigger running!

Akka uses the actor model and message passing to run a distributed processing system.  This model has been a standard approach in high performance computing for decades (see MPI for examples that started in the 1980s or the similar, although heavier, idea of fork() from unix in the 1970s). Akka brings an excellent model of this to Java in a more native fashion (there are MPI-Java bindings). Message passing is a different approach to concurrent processing than multi-threading as threads have things such as shared memory and deadlocks for those shared objects and less easier distributed computing (multi-threading is often within one jvm) where message passing are independent workers with independent memory (fork is somewhat different as it cloned the parent's memory as well which workers don't usually do). 

This program starts 12 workers and collects their results for final display.  You may get messages about dead letters (and the number may vary between runs) - see the Akka docs on this (snapshot; v2.2.1 is here) to understand, but the code is still working fine in this case.

Remember to add a few jars to your library/classpath: scala-library.jar, akka-actor_2.10-2.2.0.jar, and config-1.0.2.jar which are all taken from the akka-2.2.0 distribution in /lib.


 package tutorial;  
 import akka.actor.*;  
 import akka.japi.*;  
 import akka.routing.RoundRobinRouter;  
 public class Example {  
      public static void main(String args[]) {  
           System.out.println("Starting main ...");  
           Example xam = new Example();  
           xam.callActorsToCalculate();  
           // Main will finish before all of the threads finish  
           System.out.println("Finishing main ..."); //note when this is displayed in output  
      }  
      public void callActorsToCalculate() {  
           // create the basic Akka system:  
           ActorSystem xamSys = ActorSystem.create("example-system"); // no "_" allowed  
           // set up actor that will control the worker actors via messages; props first, then actor  
           Props pMaster = Props.create(MyMasterActor.class, "master_hello", 12);  
           final ActorRef masterRef = xamSys.actorOf(pMaster);  
           System.out.println("...(properties for master) and now MasterActor ready");  
           // start the master off with empty msg; should use ActorRef.noSender() leaving as is for clarity  
           masterRef.tell(new StartCalculationMessage(), masterRef.noSender());  
           System.out.println("just sent tell 'StartCalculationMessage' to Master...");  
      }  
      public static class MyMasterActor extends UntypedActor {  
           int numberOfWorkers;  
           int numberOfResults = 0;  
           int resultSum = 0;  
           private final ActorRef workActionsRouter;  
           public MyMasterActor(String name, int numOfSubWorkers) { // normal constructor  
                numberOfWorkers = numOfSubWorkers;  
                System.out.println("constructing "+ numberOfWorkers + " workers with str:"+name);  
                // create the router to send 'call to action' msg to workers  
                workActionsRouter = this.getContext()  
                     .actorOf(new Props(MyActorWorker.class).withRouter(new RoundRobinRouter(  
                                    numberOfWorkers)), "workerRouter"); //use Props.create as above  
                System.out.println("workerRouter ready");  
           }  
           @Override  
           public void onReceive(Object message) throws Exception { // required method  
                if (message instanceof StartCalculationMessage) {  
                     // the starting message has been received - kick off the actors  
                     // to do the work  
                     System.out.println("starting workers");  
                     for (int numWork = 0; numWork < numberOfWorkers; numWork++) {  
                          workActionsRouter.tell(new WorkStartMsg(), getSelf());  
                     }  
                } else if (message instanceof WorkerResult) {  
                     WorkerResult workerResult = (WorkerResult) message;  
                     System.out.println("master received message back from worker");  
                     numberOfResults++;  
                     resultSum += workerResult.result*numberOfResults;  
                     if (numberOfResults == numberOfWorkers) { // have all results come back?  
                          System.out.println("A message from MasterActor:"  
                               + (numberOfResults + 100)+" sum="+resultSum);//could have called a FinalActor with Final msg  
                          getContext().stop(getSelf()); //stopping the master (and thus its children)  
                          getContext().system().shutdown(); // stop the system - seems like it  
                          //should be in the method that started it, but that method has exited already  
                     }  
                } else {  
                     System.out.println("Unhandled message in master");  
                     unhandled(message);  
                }  
           }  
      }  
      static class StartCalculationMessage {  
      } // empty class to use for messaging the start of work  
      static class WorkStartMsg {  
      } // message sent to workers, in this case to start off the work  
      static class WorkerResult { // data class to store the result info  
           private final int result;  
           WorkerResult(int value) { result = value; }  
           public int getResult() { return result; }  
      }  
      static class MyActorWorker extends UntypedActor {  
           @Override  
           public void onReceive(Object message) throws Exception {  
                if (message instanceof WorkStartMsg) {  
                     WorkStartMsg workMsg = (WorkStartMsg) message; //unused, but could be to set starting point, e.g.  
                     int result = 10; // set result to some value  
                     getSender().tell(new WorkerResult(result), getSelf());  
                     System.out.println("worker:start msg:"+this.toString()+"; & msg'd master");  
                } else {  
                     System.out.println("Unhandled message in worker");  
                     unhandled(message);  
                }  
           }  
      }  
 }  

For other, quick introductions, look here: http://doc.akka.io/docs/akka/2.0.2/intro/getting-started-first-java.html and http://java.dzone.com/articles/your-first-message-discovering

Details on some of the specifics around actors can be found: http://doc.akka.io/docs/akka/snapshot/java/untyped-actors.html

A more detailed introduction to actors is here: http://www.javaworld.com/article/2078775/scripting-jvm-languages/open-source-java-projects-akka.html

A simple example for load testing:  http://www.javacodegeeks.com/2012/05/processing-10-million-messages-with.html