Tuesday, December 16, 2014

Finding old messages in ActiveMQ queues

ActiveMQ doesn't have a direct way to find old messages.  There shouldn't be old messages really, but sometimes it happens especially when you have a hundred developers running a variety of things and not always cleaning up DLQs or other queues without consumers.  Below is some python code to find the old messages.  The output of the python script can be used to feed a script (on another blog entry on this site) to pull off the old messages.

Why would you do this? Well, ActiveMQ holds on to message data files in KahaDB when just one unconsumed message is present.  That can lead to disk storage issues which can cause queue producers to block if not managed.  We have that problem.  Yes, you could do this queue checking via JMX or Jolokia, but we're running a variety of ActiveMQ instances and not all have Jolokia and I wanted to write something that would be easier for Linux sys admins to update if needed (assuming Python was more what they'd like).  For using Jolokia and HawtIO check this blog.

Meanwhile, here's the python code regardless of what version of ActiveMQ you're using - but assuming you're running some of the front end resources.

Here's the code:
 import calendar  
 import sys  
 import time  
 from xml.dom.minidom import parse,parseString  
 import urllib2  
 if len(sys.argv) < 3:  
  print "run with parameters: activemq_url(without :8161) time_in_secs_in_past"  
  sys.exit(0)  
 else:  
  url_root=sys.argv[1]  
  time_delta=int(sys.argv[2])  
  print "Running with: ",url_root,time_delta  
 url=url_root+':8161/admin/xml/queues.jsp'  
 print url  
 try:  
   dom=parse(urllib2.urlopen(url))  
 except:  
   print "error pulling queue list, stopping"  
   sys.exit(2)  
 queues_msgs={}  
 queues_old_msgs={}  
 #dom=parse('queues.jsp')  
 for node in dom.getElementsByTagName('queue'):  
   con_count= int(node.childNodes[1].getAttribute('consumerCount'))  
   queue_size= int(node.childNodes[1].getAttribute('size'))  
   queue_name= str(node.getAttribute('name'))  
 #  print queue_name,con_count,queue_size  
   if queue_size > 0 and con_count < 1 :  
    print "queue with messages and no consumers: ",queue_name,queue_size  
    queues_msgs[queue_name]=queue_size  
 #now check each queue for old messages  
 ts=calendar.timegm(time.gmtime()) #get current time  
 #get timestamp for X days ago:  
 delta=time_delta  
 ts_old=ts-delta  
 for k in queues_msgs.iterkeys():  
   count=0  
   url=url_root+':8161/admin/queueBrowse/'+k+'?view=rss'  
   try:   
     queue_dom=parse(urllib2.urlopen(url))  
     for node in queue_dom.getElementsByTagName('pubDate'):  
      time_str= node.childNodes[0].nodeValue  
   #    example: Tue, 16 Dec 2014 08:30:44 GMT  
      time_tup=time.strptime(time_str,'%a, %d %b %Y %H:%M:%S %Z')  
      time_secs=int(time.mktime(time_tup))  
      if time_secs < ts_old:  
        count+=1  
        print "old msg on queue:",k,"msg time: ",time_str,count  
        queues_old_msgs[k]=count  
   except:  
     print "failed to retrieve all messages for queue:",k  
 for k,v in queues_old_msgs.iteritems():  
   print "old messages in queue,count:",k,v