Tuesday, December 16, 2014

Finding old messages in ActiveMQ queues

ActiveMQ doesn't have a direct way to find old messages.  There shouldn't be old messages really, but sometimes it happens especially when you have a hundred developers running a variety of things and not always cleaning up DLQs or other queues without consumers.  Below is some python code to find the old messages.  The output of the python script can be used to feed a script (on another blog entry on this site) to pull off the old messages.

Why would you do this? Well, ActiveMQ holds on to message data files in KahaDB when just one unconsumed message is present.  That can lead to disk storage issues which can cause queue producers to block if not managed.  We have that problem.  Yes, you could do this queue checking via JMX or Jolokia, but we're running a variety of ActiveMQ instances and not all have Jolokia and I wanted to write something that would be easier for Linux sys admins to update if needed (assuming Python was more what they'd like).  For using Jolokia and HawtIO check this blog.

Meanwhile, here's the python code regardless of what version of ActiveMQ you're using - but assuming you're running some of the front end resources.

Here's the code:
 import calendar  
 import sys  
 import time  
 from xml.dom.minidom import parse,parseString  
 import urllib2  
 if len(sys.argv) < 3:  
  print "run with parameters: activemq_url(without :8161) time_in_secs_in_past"  
  print "Running with: ",url_root,time_delta  
 print url  
   print "error pulling queue list, stopping"  
 for node in dom.getElementsByTagName('queue'):  
   con_count= int(node.childNodes[1].getAttribute('consumerCount'))  
   queue_size= int(node.childNodes[1].getAttribute('size'))  
   queue_name= str(node.getAttribute('name'))  
 #  print queue_name,con_count,queue_size  
   if queue_size > 0 and con_count < 1 :  
    print "queue with messages and no consumers: ",queue_name,queue_size  
 #now check each queue for old messages  
 ts=calendar.timegm(time.gmtime()) #get current time  
 #get timestamp for X days ago:  
 for k in queues_msgs.iterkeys():  
     for node in queue_dom.getElementsByTagName('pubDate'):  
      time_str= node.childNodes[0].nodeValue  
   #    example: Tue, 16 Dec 2014 08:30:44 GMT  
      time_tup=time.strptime(time_str,'%a, %d %b %Y %H:%M:%S %Z')  
      if time_secs < ts_old:  
        print "old msg on queue:",k,"msg time: ",time_str,count  
     print "failed to retrieve all messages for queue:",k  
 for k,v in queues_old_msgs.iteritems():  
   print "old messages in queue,count:",k,v  

Monday, March 3, 2014

Avro - a simple example

When moving data from one place to another or just storing it, there are loads of options from plain text to specialized, binary formats. Somewhere in the middle are XML, JSON, ProtocolBuffers, Thrift and a newer entry Avro. Avro differs a little from some of these as it is in a binary format like protobufs and Thrift, but unlike these two, it also stores the schema with the file. It is easy to use being very similar to protobufs and Thrift or even XSD derived classes. 

Follow the detail below (or the simple tutorial on the Avro pages):

Download or use dependency management (maven/gradle/etc) to get: avro-1.7.6.jar and avro-tools-1.7.6.jar and Jackson JSON library - specifically, core-asl and mapper-asl jars (those are 1.9.x jar names) or core for v2.x of Jackson. Make sure they're on the build path.

Create a schema (as in example.avsc):
 {"namespace": "example.avro",  
  "type": "record", "name": "MyExample",  
  "fields": [  
    {"name": "title_of_doc", "type": "string"},  
    {"name": "author_name", "type": ["string", "null"]},  
    {"name": "number_pages", "type": ["int", "null"]}  
... and run the avro command line tool to generate the class:
cd .../workspace/avro-example/
java -jar /path/to/avro-tools-1.7.6.jar compile schema example.avsc .
which will create a myExample.java file in example/avro folder

Move the example/avro folder to be under src or move the newly created file to be under src/example.avro package or add the new file to the build path.

Put the schema to use by pulling it in as a class and creating a few instances - note the different constructors.  Then open a writer and filewriter to write out the data, then open a reader and filereader to pull it back in - that should cover the basics!  Note that the reader and writer and their corresponding filereader and filewriter can have differing schemas - in case you have versioning and want to open a file with one schema, but operate on the data with another.

 package example.avro;  
 import java.io.File;  
 import java.io.IOException;  
 import org.apache.avro.file.DataFileReader;  
 import org.apache.avro.file.DataFileWriter;  
 import org.apache.avro.io.DatumReader;  
 import org.apache.avro.io.DatumWriter;  
 import org.apache.avro.specific.SpecificDatumReader;  
 import org.apache.avro.specific.SpecificDatumWriter;  
 public class AvroEx {  
      public static void main(String args[]){  
           MyExample exmplDoc = new MyExample(); //basic constructor, class from the record name in avsc file  
           exmplDoc.setTitleOfDoc("Testing for fun"); //notice it replaced title_of_doc with TitleOfDoc  
           MyExample exmplDoc2 = new MyExample("Growing Green Software",322,"Mr Green"); //alt constructor  
           MyExample exmplDoc3 = MyExample.newBuilder().setTitleOfDoc("Forget Testing") //using builder requires setting   
                     .setAuthorName("Miss Read").setNumberPages(null) //all fields even if null  
           //Write out an AVRO file  
           File file = new File("Example-out-in.avro");  
           DatumWriter<MyExample> userDatumW = new SpecificDatumWriter<MyExample>(MyExample.class); //serialize in memory  
           DataFileWriter<MyExample> dataFW = new DataFileWriter<MyExample>(userDatumW); //allow difference schema if necessary  
           try {  
                dataFW.create(exmplDoc.getSchema(), file);//write schema and records to file  
           } catch (IOException e) {  
           //Read in AVRO data  
           DatumReader<MyExample> userDR = new SpecificDatumReader<MyExample>(MyExample.class);  
           try {  
                DataFileReader<MyExample> dataFR = new DataFileReader<MyExample>(file,userDR); //schema option again  
                MyExample userReadIn = null;   
                while (dataFR.hasNext()){  
                     userReadIn = dataFR.next(userReadIn);  
           } catch (IOException e) {  

Run the AvroEx.java file as an application.  It will create the avro file for writing and reading and print out the data that was written out and read back in.

Saturday, February 22, 2014

Akka in 100 lines - Actors and Message Passing

Here is a quick program in Akka (2.2 in case it matters).  The program is kept short at 100 lines to make it easier to understand what is going on which is also why there are many println statements and a few unused variables.  It's a starting point for getting something bigger running!

Akka uses the actor model and message passing to run a distributed processing system.  This model has been a standard approach in high performance computing for decades (see MPI for examples that started in the 1980s or the similar, although heavier, idea of fork() from unix in the 1970s). Akka brings an excellent model of this to Java in a more native fashion (there are MPI-Java bindings). Message passing is a different approach to concurrent processing than multi-threading as threads have things such as shared memory and deadlocks for those shared objects and less easier distributed computing (multi-threading is often within one jvm) where message passing are independent workers with independent memory (fork is somewhat different as it cloned the parent's memory as well which workers don't usually do). 

This program starts 12 workers and collects their results for final display.  You may get messages about dead letters (and the number may vary between runs) - see the Akka docs on this (snapshot; v2.2.1 is here) to understand, but the code is still working fine in this case.

Remember to add a few jars to your library/classpath: scala-library.jar, akka-actor_2.10-2.2.0.jar, and config-1.0.2.jar which are all taken from the akka-2.2.0 distribution in /lib.

 package tutorial;  
 import akka.actor.*;  
 import akka.japi.*;  
 import akka.routing.RoundRobinRouter;  
 public class Example {  
      public static void main(String args[]) {  
           System.out.println("Starting main ...");  
           Example xam = new Example();  
           // Main will finish before all of the threads finish  
           System.out.println("Finishing main ..."); //note when this is displayed in output  
      public void callActorsToCalculate() {  
           // create the basic Akka system:  
           ActorSystem xamSys = ActorSystem.create("example-system"); // no "_" allowed  
           // set up actor that will control the worker actors via messages; props first, then actor  
           Props pMaster = Props.create(MyMasterActor.class, "master_hello", 12);  
           final ActorRef masterRef = xamSys.actorOf(pMaster);  
           System.out.println("...(properties for master) and now MasterActor ready");  
           // start the master off with empty msg; should use ActorRef.noSender() leaving as is for clarity  
           masterRef.tell(new StartCalculationMessage(), masterRef.noSender());  
           System.out.println("just sent tell 'StartCalculationMessage' to Master...");  
      public static class MyMasterActor extends UntypedActor {  
           int numberOfWorkers;  
           int numberOfResults = 0;  
           int resultSum = 0;  
           private final ActorRef workActionsRouter;  
           public MyMasterActor(String name, int numOfSubWorkers) { // normal constructor  
                numberOfWorkers = numOfSubWorkers;  
                System.out.println("constructing "+ numberOfWorkers + " workers with str:"+name);  
                // create the router to send 'call to action' msg to workers  
                workActionsRouter = this.getContext()  
                     .actorOf(new Props(MyActorWorker.class).withRouter(new RoundRobinRouter(  
                                    numberOfWorkers)), "workerRouter"); //use Props.create as above  
                System.out.println("workerRouter ready");  
           public void onReceive(Object message) throws Exception { // required method  
                if (message instanceof StartCalculationMessage) {  
                     // the starting message has been received - kick off the actors  
                     // to do the work  
                     System.out.println("starting workers");  
                     for (int numWork = 0; numWork < numberOfWorkers; numWork++) {  
                          workActionsRouter.tell(new WorkStartMsg(), getSelf());  
                } else if (message instanceof WorkerResult) {  
                     WorkerResult workerResult = (WorkerResult) message;  
                     System.out.println("master received message back from worker");  
                     resultSum += workerResult.result*numberOfResults;  
                     if (numberOfResults == numberOfWorkers) { // have all results come back?  
                          System.out.println("A message from MasterActor:"  
                               + (numberOfResults + 100)+" sum="+resultSum);//could have called a FinalActor with Final msg  
                          getContext().stop(getSelf()); //stopping the master (and thus its children)  
                          getContext().system().shutdown(); // stop the system - seems like it  
                          //should be in the method that started it, but that method has exited already  
                } else {  
                     System.out.println("Unhandled message in master");  
      static class StartCalculationMessage {  
      } // empty class to use for messaging the start of work  
      static class WorkStartMsg {  
      } // message sent to workers, in this case to start off the work  
      static class WorkerResult { // data class to store the result info  
           private final int result;  
           WorkerResult(int value) { result = value; }  
           public int getResult() { return result; }  
      static class MyActorWorker extends UntypedActor {  
           public void onReceive(Object message) throws Exception {  
                if (message instanceof WorkStartMsg) {  
                     WorkStartMsg workMsg = (WorkStartMsg) message; //unused, but could be to set starting point, e.g.  
                     int result = 10; // set result to some value  
                     getSender().tell(new WorkerResult(result), getSelf());  
                     System.out.println("worker:start msg:"+this.toString()+"; & msg'd master");  
                } else {  
                     System.out.println("Unhandled message in worker");  

For other, quick introductions, look here: http://doc.akka.io/docs/akka/2.0.2/intro/getting-started-first-java.html and http://java.dzone.com/articles/your-first-message-discovering

Details on some of the specifics around actors can be found: http://doc.akka.io/docs/akka/snapshot/java/untyped-actors.html

A more detailed introduction to actors is here: http://www.javaworld.com/article/2078775/scripting-jvm-languages/open-source-java-projects-akka.html

A simple example for load testing:  http://www.javacodegeeks.com/2012/05/processing-10-million-messages-with.html