Finding old messages in ActiveMQ queues
ActiveMQ doesn't have a direct way to find old messages. There shouldn't be old messages really, but sometimes it happens especially when you have a hundred developers running a variety of things and not always cleaning up DLQs or other queues without consumers. Below is some python code to find the old messages. The output of the python script can be used to feed a script (on another blog entry on this site) to pull off the old messages.
Why would you do this? Well, ActiveMQ holds on to message data files in KahaDB when just one unconsumed message is present. That can lead to disk storage issues which can cause queue producers to block if not managed. We have that problem. Yes, you could do this queue checking via JMX or Jolokia, but we're running a variety of ActiveMQ instances and not all have Jolokia and I wanted to write something that would be easier for Linux sys admins to update if needed (assuming Python was more what they'd like). For using Jolokia and HawtIO check this blog.
Meanwhile, here's the python code regardless of what version of ActiveMQ you're using - but assuming you're running some of the front end resources.
Here's the code:
Why would you do this? Well, ActiveMQ holds on to message data files in KahaDB when just one unconsumed message is present. That can lead to disk storage issues which can cause queue producers to block if not managed. We have that problem. Yes, you could do this queue checking via JMX or Jolokia, but we're running a variety of ActiveMQ instances and not all have Jolokia and I wanted to write something that would be easier for Linux sys admins to update if needed (assuming Python was more what they'd like). For using Jolokia and HawtIO check this blog.
Meanwhile, here's the python code regardless of what version of ActiveMQ you're using - but assuming you're running some of the front end resources.
Here's the code:
import calendar
import sys
import time
from xml.dom.minidom import parse,parseString
import urllib2
if len(sys.argv) < 3:
print "run with parameters: activemq_url(without :8161) time_in_secs_in_past"
sys.exit(0)
else:
url_root=sys.argv[1]
time_delta=int(sys.argv[2])
print "Running with: ",url_root,time_delta
url=url_root+':8161/admin/xml/queues.jsp'
print url
try:
dom=parse(urllib2.urlopen(url))
except:
print "error pulling queue list, stopping"
sys.exit(2)
queues_msgs={}
queues_old_msgs={}
#dom=parse('queues.jsp')
for node in dom.getElementsByTagName('queue'):
con_count= int(node.childNodes[1].getAttribute('consumerCount'))
queue_size= int(node.childNodes[1].getAttribute('size'))
queue_name= str(node.getAttribute('name'))
# print queue_name,con_count,queue_size
if queue_size > 0 and con_count < 1 :
print "queue with messages and no consumers: ",queue_name,queue_size
queues_msgs[queue_name]=queue_size
#now check each queue for old messages
ts=calendar.timegm(time.gmtime()) #get current time
#get timestamp for X days ago:
delta=time_delta
ts_old=ts-delta
for k in queues_msgs.iterkeys():
count=0
url=url_root+':8161/admin/queueBrowse/'+k+'?view=rss'
try:
queue_dom=parse(urllib2.urlopen(url))
for node in queue_dom.getElementsByTagName('pubDate'):
time_str= node.childNodes[0].nodeValue
# example: Tue, 16 Dec 2014 08:30:44 GMT
time_tup=time.strptime(time_str,'%a, %d %b %Y %H:%M:%S %Z')
time_secs=int(time.mktime(time_tup))
if time_secs < ts_old:
count+=1
print "old msg on queue:",k,"msg time: ",time_str,count
queues_old_msgs[k]=count
except:
print "failed to retrieve all messages for queue:",k
for k,v in queues_old_msgs.iteritems():
print "old messages in queue,count:",k,v
Comments
Post a Comment