Tuesday, October 1, 2013

Persistence Adaptors and ActiveMQ Options

Where do you find ActiveMQ options that aren't listed on the ActiveMQ site pages?  And how do you configure the newer, more pluggable persistence and lock adapters?

Recently, we've had problems around ActiveMQ again - less in terms of the brokers than in the clients having ever increasing numbers of connections causing the broker to eventually crash.  While we try to understand the issue with the clients (unlikely due to ActiveMQ), we tried to reconfigure the ActiveMQ brokers to cope with the load.  One thing mentioned previously was turning off Network of Brokers - it fails too easily and ends up in a split brain.  Restarting a broker fixes it, but can kick off the clients - consumers and producers - into increasing connections.  In other words, we were trapped - increasing connections causing ActiveMQ problems causing increasing connections causing ...

While we were experimenting, we went back to a SQL persistence layer rather than KahaDB.  For pseudo random testing, it was about 60-70% as performance as KahaDB, but in real work loads, it seemed noticeably slower.  The consumers and producers seemed to think so as well as the spiraling connections was much worse with SQL backed storage, presumably due to slower performance causing more connection issues causing slower performance causing ... you get the picture.

We're tempted by a solution that is based completely NFS shared storage, but some colleagues are nervous about KahaDB stale file issues over NFS and shared between brokers.  If we have KahaDB files that won't clear and are shared between two brokers, it seems even more problematic than our current set up.

A couple of ideas: ActiveMQ supports mKahaDB which allows you to have different KahaDB files for different queues or topics (with "catch all" defaults available).  That way the slow and fast consumers can be separated from each other and the file size can be controlled better.  It might also help with stale file issues.  See this page for more: http://activemq.apache.org/kahadb.html
Another idea is to switch to LevelDB which should be faster than KahaDB according to the ActiveMQ site and also might not suffer from the stale file problems.  Or maybe it does.
Yet another option is to use the ActiveMQ pluggable storage lockers http://activemq.apache.org/pluggable-storage-lockers.html and set the storage to be a local directory for each broker and the lock file can be a shared directory.**  Ok, this option isn't great because messages could be stranded, especially if you have some slow consumers, but for us, we might prefer a little manual work after a failover instead of risking down time - well some of the other guys.

Regardless of approach, the pluggable storage has the ability to separate locks from data if needed and to set some options.**  Look at this for a quick example of
  <kahaDB directory="activemq-data">
       <shared-file-locker lockAcquireSleepInterval="100000"/>
For setting the lock directory, use:
 <shared-file-locker directory="activemq-lock-directory"/>
or similar.  A few of the options for the pluggable storage and lockers is on the main ActiveMQ site, but there are more that aren't.

Finding all the ActiveMQ XML configuration options, including the storage locking ones, is easiest by looking directly at the ActiveMQ XML.  Here's the link to the kahaDB store:
and to the shared-file-locker which is a possible element of that store:
Be aware of version numbers in the two links above (both are for 5.8.0).
The ActiveMQ guys are in the process of finishing the separation of storage vs locks, but not all made it into 5.8 - mixing kaha with SQL lease locks might have to wait until 5.9 (and 5.9 is out with the SQL lease locks):

*** Update - a little delayed in mentioning this, but ActiveMQ 5.9 appears to have all that is needed to use shared locking with individual kahadb stores - we'll test that and report back.
The options in the <statements/> section are available by looking at the code which also has useful pieces like the SQL create statements.  Hopefully, we can drop the work below now!

**Ok, one problem - the XML supports the shared-file-locker directoy="..." syntax, but the code does NOT do anything with it!  After trying this, we realized the lock file was still being put in the same location as the data files.  Reviewing the ActiveMQ code (search for sharedfilelocker.java) made it clear that it hasn't been finished yet.  So, how to use a feature like this: linux based file locking - if you set a script that either locks or changes ownership of the 'lock' file then you'll control ActiveMQ start up.  Detecting and setting the lock requires a little work with NFS, flock, or perhaps something like python - actually just changing the ownership is easier.  Since you're trying to detect if the other ActiveMQ is running, looking for a lock is nice, but you could just curl one of the standard URLs on the other broker to see if it is running - not foolproof, but perhaps workable with the right supplemental checks.

One option to having a shared lock is to try the DB shared locker, but in the spirit of avoiding the DB (and it would be fine to use it for a lock!), here's a little script that flags the lock by leaving a file on the NFS mount (msg_dir).  It's set to look at /proc/kmsg as a test, but change it to the activemq/lock file instead.

#check if file is locked or at leasted opened and indicate with another file
#  using this as flock across NFS didn't initially work for me

#exit value should be 0 for an opened file and fopened should have a process id
# else file shouldn't be opened and therefore should be in use by activemq

if [ -e $fn ]
  fopened=`lsof -wt $fn`
  echo "no file to check!"

if [ "$fopened" != "" -a $exit_value -eq 0 ]
   echo "file is locked"
   echo $fopened > $msg_dir/file_is_locked_on_$hn
   rm -f $msg_dir/file_NOT_or_unknown
   if [ "$standby_server" = "yes" ]
       sleep 5 #make the standby server sleep waiting to see if race for lock with another server
       remote_lock=`ls -1 $msg_dir | grep locked | grep -v $hn | wc -l`
       if [ $remote_lock -gt 0 ]
          touch $msg_dir/file_NOT_or_unknown
          rm -f $msg_dir/file_is_locked_on_$hn
         echo "still not locked remotely so setting for local startup"
   echo "not locked or unknown"
   touch $msg_dir/file_NOT_or_unknown
   rm -f $msg_dir/file_is_locked_on_$hn

# Have left a marker that one instance is up or not, now use that to control activemq

remote_lock=`ls -1 $msg_dir | grep locked | grep -v $hn | wc -l`
local_lock=`ls -1 $msg_dir | grep locked | grep $hn | wc -l`
echo "remote lock: $remote_lock; locally locked: $local_lock"
if [ $remote_lock -gt 0 ]
 chown root.root $amq_lock_file #prevent activemq from locking and starting
elif [ $local_lock -eq 1 ]
 chown activemq.activemq $amq_lock_file #allow activemq to lock and start
 # could just start activemq at this point


Monday, September 16, 2013

More ActiveMQ admin URLs

In a previous post, we looked at querying ActiveMQ to get some details about queues.  These details are very useful to see a detailed view of the system.  Here are two more ways of looking into ActiveMQ - one more superficially and the other in detail.

Given that we run ActiveMQ in a production environment, we need to know that all of our applications are consuming messages and events from ActiveMQ as we'd expect.  We can monitor the apps themselves for their throughput, but that depends on how many messages we have queued up for the app.  Previously, we saw how to use activemq-admin query to find the number of messages enqueued, dequeued, dispatched, etc.  However there are a couple of other, perhaps easier, ways to get some basic information about the number of messages in a queue.  Using the web UI, you can get the number of messages programmatically just as you would browse yourself:


Below is a way to get an XML list of the queue message ids.  It is still under /demo/ in 5.8, but you'll need to enable demo in the jetty.xml file (look at jetty-demo.xml for the demo section):
This is a quick and easy way to get queue lengths and allows you to 'diff' against a previous check to see if the messages have moved much at all such as:

curl localhost:8161/demo/queueBrowse/my_queue_name > /tmp/id_list.now
sleep 30 # sleeps 30 seconds
mv /tmp/id_list.now /tmp/id_list.30s_old
curl localhost:8161/demo/queueBrowse/my_queue_name > /tmp/id_list.now
diff /tmp/id_list.30s_old /tmp/id_list.now # you can grep for newer entries (grep ^>) and count them

All easy stuff.  In 5.9 and above, the urls have moved a little and you'll still need to enable as desired.
Viewing and consuming messages (and unsubscribing):
localhost:8161/demo/message/myTestQueue?type=Queue (5.8 and below)
localhost:8161/api/message/myTestQueue?type=Queue (5.9 and above)
Viewing xml message lists:
localhost:8161/demo/queueBrowse/myTestQueue(5.8 and below)
localhost:8161/admin/queueBrowse/myTestQueue(5.9 and above)

Don't forget that you can add the value &clientId=myConsumerId123 to the url to keep session state easily, as in:
localhost:8161/demo/message/myTestQueue?type=Queue&clientId=myNewConsumer (5.8 and below)
and then unsubscribe that client when done (or it will time out eventually):
localhost:8161/demo/message/myTestQueue?clientId=myNewConsumer&action=unsubscribe (5.8 and below)

JMX information
ActiveMQ 5.8 added a nice feature in the form of Jolokia which gives easier access to JMX information.  With ActiveMQ 5.8+, check out URLs like these:
http://localhost:8161/api/jolokia (really only good for the time stamp)
which shows you details such as the consumer count.
There's another example at the bottom of this page as well.

Monday, August 26, 2013

Using rrdtool

RRDTool and Round Robin Databases

A round-robin database (rrd) is basically a fixed size database that will overwrite the oldest data when all the other spaces have been used.  This behavior is common with caches using the least recently used approach, for example.  However, it seems a little odd for databases - after all, you want to store things and that is why you put it in a database, right?  Well, maybe not entirely right for entirely all data - imagine you wanted to store time series data where old values were aggregated (less detailed information) together and stored elsewhere or where data one year old just doesn't matter anymore.

RRDtool is a round robin database and a suite of tools all in one (graphite is a somewhat nicer alternative).  You can store data in RRDtool and retrieve the data or better yet, retrieve plots of the data.

(Update from 2019: DB-Engines ranks RRDtool still comparable to Graphite and Prometheus although Prometheus has been rising very quickly for the last few years. This is likely due to what RRDtool did to open up this field years ago.)

Here are some simple commands for using RRDtool: 
sudo yum install rrdtool #install or sudo apt-get install rrdtool

man rrdtool #if you want to read man pages
man rrdcreate #more details on creating an rrd file

Create an example rrd for storing counts of updates versus time - it's made up metric here, but could be a count of hits on a webpage or number of packets on a network interface:
 rrdtool create counts.rrd --start 1377510000 \ #set start time if needed
 -s 300 DS:count_updt:COUNTER:600:U:U RRA:AVERAGE:0.5:1:12 RRA:AVERAGE:0.5:3:10
#step 300s, COUNTER type, assume null after 600s, Unknown for min, Unknown for max, RRA - round robin aggregation patter: AVERAGE values to consolidate .5 (1/2) values needed to consolidate, every 1 step (maybe silly), 12 times; then another set with .5, 3 steps consolidated, 10 times (step = 5m, 3 steps = 15m, 10 points = 150m)

If you read the manual pages, then you'll see CF frequently. CF is the 'consolidation function' or how RRDtool consolidates data for storage or display in a plot. Above, the CF is AVERAGE, there are others and more sophisticated approaches like Holt-Winters.

Load some data using the Unix timestamp (date +%s). Below, we're loading a number of points starting at 1377510000 which works with the rrd created above. You may need to update this to the current time if you change things.
  rrdtool update counts.rrd 1377510000:1234 1377510300:1230 1377510600:1200
  rrdtool update counts.rrd 1377510900:1363 1377511200:963 1377511500:1275
  rrdtool update counts.rrd 1377511800:1083 1377512100:999 1377512400:1099
  rrdtool update counts.rrd 1377512700:1500 1377513000:1810 1377513300:840

Pull the data back to double check - first pull it, then over a time range, then a dump:
 rrdtool fetch counts.rrd AVERAGE
 rrdtool fetch counts.rrd AVERAGE --start 1377510000  --end 1377513000
 rrdtool dump counts.rrd

Create an image from the data:
 rrdtool graph count_updt.png --start 1377510000 --end 1377513000   DEF:count_updt=counts.rrd:count_updt:AVERAGE LINE2:count_updt#FF0000 

DEF:virtual_name=rrd_filename:data-source-name:CF defines a virtual_name for the rrd_filename followed by a data-source-name and consolidation function, but here we've only used one data source (graphite is a little easier for things like this, but once you have the format, rrdtool is fine). LINE2:... means use a line, weight 2, data-source-name and HTML color code.

Check the source docs (here and here) for more details information. Also, read the man pages:
 man rrdgraph_data
 man rrdgraph_graph
 man rrdgraph_examples

Let's create a little more involved one without the rigid time settings. The type has been switched from COUNTER to GAUGE (counter expects increasing values while gauge can vary up and down):

  rrdtool create counts1.rrd --start now-7200 DS:count_updt:GAUGE:600:U:U RRA:AVERAGE:0.5:1:12 RRA:AVERAGE:0.5:3:10
Fill with some data - in this case increasing values each time:
 i=0; d=`date +%s`
 while [ $i -lt 24 ] ; do echo $i; let i=$i+1; let t1=d-7200+300*$i; c=$(($i*500)); echo $t1 $c ; rrdtool update counts1.rrd $t1:$c ; done

Have a look at the data, if you want:
 rrdtool dump counts1.rrd
Create a plot:
 rrdtool graph count_secd.png --start now-7200 DEF:count_updt=counts1.rrd:count_updt:AVERAGE AREA:count_updt#FF0000

Now, let's create and fill an rrd like the above, but with varying data:
d=`date +%s`
rrdtool create noise.rrd --start=now-7200 DS:noise_meas:GAUGE:600:U:U RRA:AVERAGE:0.5:1:12   RRA:AVERAGE:0.5:3:10
 rrdtool dump noise.rrd
 let t=$d-7200
 while [ $i -lt 24 ] ; do echo $i; let i=$i+1; let t1=t+300*$i; v=`echo $RANDOM`; rrdtool update noise.rrd $t1:$v ; done
 rrdtool graph noise.png DEF:noise_meas=noise.rrd:noise_meas:AVERAGE LINE2:noise_meas#00FF00:"example_line\l" -t "Sample Graph" -v "values" -w 500 -h 200 -c BACK#AAAAAA -c GRID#333333
 rrdtool graph noise.png --start now-7200 DEF:noise_meas=noise.rrd:noise_meas:AVERAGE AREA:noise_meas#00FF00:"example_line\l" -t "Sample Graph" -v "values" -w 500 -h 200 -c BACK#AAAAAA -c GRID#333333

Now, let's create a plot with the data from the noise and counts1 data:
 rrdtool graph noise_count.png --start now-7200 DEF:noise_meas=noise.rrd:noise_meas:AVERAGE DEF:count_updt=counts1.rrd:count_updt:AVERAGE AREA:noise_meas#00FF00:"example_line\l" LINE2:count_updt#FF0000:"count\l" -t "Sample Graph" -v "values" -w 500 -h 200 -c BACK#AAAAAA -c GRID#333333

Using with Python
To make RRDtool more useful, it's good to link it with a bit of code to insert data as desired. Here, we're using python-rrdtool is the package (install python-rrdtool) with info here and here. There are other python pages and packages for other libraries.

Using python with rrdtool will require a package install like this: yum install python-rrdtool

Add some python (fairly straight forward, but details depend on the package):
#quick set of python commands for putting rrd data into rrdtool:
from rrdtool import update as rrdtool_update
value = "30"
result = rrdtool_update('counts.rrd', '1552217030:30') 
# this was the format for rrdtool-python:
#result = rrdtool_update('test.rrd','N:%s',%(value)) #N means now, otherwise could specify a specific unix timestamp and could send two values if both created in test.rrd 'N:%s:%s',%(value1,value2) 

Combine this with some web calls (see the python page on this blog) and you'll be storing time series data quickly! Before you write your own simple receiver, though, have a look at rrdcached which handles receiving metrics

Saturday, August 17, 2013

Using Graphite - metrics data

Graphite is a tool for time series data storage and graphing.  The main page is here: Graphite.

Graphite (and its Carbon receiver and Whisper data store) stores a metric value against a specific time stamp for the metric that you want to track.  In other words, if you want to know the number of requests per minute that a web server is receiving, you'd send the data to your graphite set up with a command like this (from a Unix/Linux/max command line):

echo "webserver1.requests_per_minute 201 1376748060" | nc graphite_svr.mycomp.com 2003

Where webserver1.requests_per_minute is the metric to be saved. 201 is the value of the metric and 1376748060 is the time in seconds since Jan 1, 1970.  This simple set of values - metric name, metric value, metric timestamp in seconds is all that's needed by Graphite.
In the example above, this information is then piped into the netcat command to the graphite server at port 2003, the default port for Graphite.  There are other ways to get information into Graphite - python's pickle format, for example (there's also apparently AMQP support).

Graphite will store this data into a whisper directory as set up in the graphite installation. Whisper is the database Graphite uses to store data. It is a round-robin database very similar to rrdtool's storage (rrd meaning round robin database) where Graphite started off; however, limitations in the version of rrd then led to the creation of whisper.

In the Graphite/Whisper storage area, the metric data will be stored in a hierarchy based on the metric name given. In the example above, the metric name is webserver1.requests_per_minute which will lead to a directory called webserver1 in which there will be a file called requests_per_minute.wsp.  Graphite uses the "." as a delimiter to create the hierarchy.

To view the data, use a web browser to go to the graphite front end (for example, graphite_svr.mycomp.com) where you can browse the metrics that Graphite is storing and create graphs and dashboards of graphs for viewing.  Individual charts can be viewed by creating the right URL as in:


This URL will cause Graphite to draw a graph of the requests_per_minute from webserver1 and webserver2 for the last 24 hours until now and with a chart size of 400x250 pixels.

To see the raw data, add "&format=raw" to the end of a request; it will print the data per time slot, but won't show the time stamp. To see the time stamp and the values, you'll need to use some Whisper commands. To view json data, add "&format=json" instead.

whisper-fetch.py requests_per_minute.wsp will show the data with the timestamps.
whisper-info.py requests_per_minute.wsp will show basic information about the wsp file such as the expected time intervals
whisper-resize.py requests_per_minute.wsp 5m:1y 30m:3y will resize the whisper data file to store data every 5 minutes for a year and then start aggregating values to 30 minutes for 3 years.
whisper-update.py requests_per_minute.wsp 1376748060:199 will overwrite the currently stored value at time 1376748060 (201) with the new value (199).  I've had trouble getting this command to work, but have had success resubmitting the information via the netcat command as above.
whisper-dump.py will show a mix of whisper-info.py and whisper-fetch.py data including unfilled slots.
whisper-create.py to create a new metric file - this isn't needed as sending the data to Graphite will cause it to create the file with defaults matching the metric.

Whisper files are created with default values set in the storage-schemas.conf file which has entries like:
pattern = webserver*
retentions = 60s:90d 5m:3y

which sets the data intervals and retentions to every 60s for 90d.  The default values are every minute for 24 hours/1 day. Make sure you resize it or set the defaults before creating it.  When Whisper starts to aggregate the data it requires a certain number of metrics to start the aggregation. The default is xfactor=.5 which means that at least 50% of the data points must exist for an aggregated value to be created. If you have a flaky data injection, you might want to reduce this amount.

To stop and start carbon:
carbon-cache.py stop
carbon-cache.py start
For more info on this, look at: https://testing.readthedocs.org/en/latest/monitoring_201.html

Scaling a Graphite system
Graphite can be clustered to provide data and performance and even up-time at a scale that isn't possible on a single system.  The clustering available allows spreading data out and/or duplicating data for availability.  See more here: http://www.aosabook.org/en/graphite.html

Monday, August 12, 2013

elasticsearch Basics

With any data, there's often a need to search for specific pieces, especially when that data comes grouped like documents. elasticsearch is a great tool for building a search mechanism over your data.  This search engine is very easy to start up after downloading:
bin/elasticsearch # add -f to run it in console mode

elasticsearch Queries and Testing:
http://localhost:9200/twitter/_search?pretty  - simple index level search on the twitter example

Test the default analyzer:  
curl -XGET localhost:9200/_analyze?pretty -d 'Test String and Data' # it will drop the "and"

Test with a specific analyzer:  
curl -XGET localhost:9200/index_name/_analyze?analyzer=autocomplete&pretty -d 'New Text'

Test with tokenizers/filters:  
curl localhost:9200/index_name/_analyze?tokenizer=whitespace&filters=lower-case,engram&pretty -d 'Newer data' #two filters, one tokenizer

Explaining a match:  
curl -XGET localhost:9200/index_name/type/document/_explain?pretty&q=quiet

Validate a search:
curl localhost:9200/index_name/type/_validate/query?explain&pretty -d '{...}' #remember to escape the & on *nix

Setting up and monitoring a cluster:
cluster.name : nodes with same cluster name will try to form a cluster
node.name : instance name, will be chosen automatically on each start up, but best to set unique one
set num of open files > 32k; set memory to no more than 1/2 of system memory (to allow for disk caching)
curl localhost:9200/_cluster/health?pretty
(to turn this into an alert, grep for status and grep for anything that isn't green - that will give you an indication that elasticsearch is reporting a problem.  You might also want to check that 'timed_out' is false.)
To check index or shard level add a parameter to the statement above ?level=indices - see below.

shutdown whole cluster : curl -XPOST localhost:9200/_cluster/nodes/_shutdown

shutdown a node in cluster: curl -XPOST localhost:9200/_cluster/nodes/node_name/_shutdown
get node names from : curl localhost:9200/_cluster/nodes

 ... and cluster/node stats:
curl localhost:9200/index_name,index_name2/_stats?pretty #in addition, can grep for count to find number of docs
curl localhost:9200/_cluster/health?pretty&level=indices #add grep for status and grep -v green to turn into alert
curl localhost:9200/_cluster/health?pretty&level=shards #add grep for status and grep -v green to turn into alert 
curl localhost:9200/_stats
curl localhost:9200/_nodes # _nodes will also return OS status
curl localhost:9200/_nodes/SpecificNodeName
curl localhost:9200/_nodes/stats ; curl locahost:9200/_nodes/SpecificNodeName/stats
curl localhost:9200/_cluster/state #can add ?filter_nodes to remove node info (or filter_blocks,filter_indices, etc)

  • XPUT to put a document with a specific id into the system: localhost:9200/index/type/id -d 'document body'
  • POST then ES will generate an ID which needs to be read from the respons:  localhost:9200/index/type -d 'document body' #id in response body
  • XGET to retrieve a known document: localhost:9200/index/type/id
/_update to update
/_search to query - with a body or like curl localhost:9200/index/_search?q=customerId:20020

localhost:9200/ -d '{ "query":{"match_all":{}}} #get all data
use PUT is specifying doc id, use POST if want ES to do it; diff types can have diff mappings
curl -XPUT localhost:9200/entertainment/movies/1 -d '{"movie_title": "The Monsters", "actor":"some nutty guy", "revenue":2000}'
curl -XPUT localhost:9200/entertainment/movies/2 -d '{"movie_title": "Alien Remake", "actor":"some nutty guy", "revenue":150000}'
curl -XPUT localhost:9200/entertainment/movies/flop -d '{"movie_title": "Slugs", "actor":"some nutty guy as bob", "revenue":123}' #note the change in document naming style - not always a good idea
curl -XPOST localhost:9200/entertainment/movies -d '{"movie_title": "Hairslugs", "actor":"bob as guy", "revenue":12300000}' # ES will return an id in the response - that's the key to finding this document directly

curl -XGET localhost:9200/entertainment/movies/1 to retrieve that doc
curl -XDELETE  localhost:9200/entertainment/movies/1  to remove
curl -XGET localhost:9200/_search - across all indices
curl -XGET localhost:9200/entertainment/_search - across all types in entertainment
curl -XGET localhost:9200/entertainment/movies/_search - doc type movies in entertainment index

simple query:
curl -XGET localhost:9200/_search -d '{"query": {"query_string":{"query":"Monsters"}}}'
curl -XGET localhost:9200/_search -d '{"query": {"query_string":{"query":"Monsters", "fields":["movie_title"]}}}'

Filtered search:
curl -XGET localhost:9200/_search -d '{"query": {"filtered": {"query_string":{"query":"Monsters", "fields":["movie_title"]}}, "filter": { "term": {"revenue":2000}}}}'
could switch the query to 'match_all' and just filter over everything
or use constant score like this:
'{"query": { "constant_score":{ "filter":{ "term":{"revenue":2000}}}}}'

Mapping example:
Here is a multi-field (to have normally tokenized analysis and unaltered indexing) mapping (basically a schema) example.
Create the mapping on a new index (you might need to -XDELETE the index if it already exists or try updating an existing index, but you won't be able to update existing documents from what I understand). Below, we're creating a mapping on one type (movies) on an index (entertainment):
curl -XPUT localhost:9200/entertainment/movies/ -d '{
 "movies":{ "properties":{ "actor":{ "type": "multi_field", "fields": {"actor": {"type":"string"},
 "fullname":{"type":"string", "index":"not_analyzed"}}}}} '
and check your mappings on an index:
curl -XGET localhost:9200/entertainment/movies/_mapping

 sets actor to be analyzed as normal, but also adds actor.fullname as a field that can be search in as a combined bit

To create a mapping on an entire index (all document types): use a similar query, but include your document types in the mapping:

curl -XPUT localhost:9200/entertainment/ -d '{
"mappings" : {
        "type": "multi_field", "fields": {
             "actor": {"type":"string"},
             "fullname":{"type":"string", "index":"not_analyzed"}
     "location":{ "type": "string" }

} '
Here we're creating two types of documents (entertainment/movies and entertainment/cinemas) using one mapping file.  Alternatively, this mapping could be loaded by
curl -XPUT localhost:9200/entertainment -d @mapping.json
where mapping.json is a file containing the json above.

Warming queries:
elasticsearch allows you to set warming queries to be run during start up.  The queries are defined
via PUTting a query to localhost:9200/index/_warmer/warmer_query_name -d'...'
A warming script can be retrieved by GETting it: localhost:9200/index/_warmer/warmer_query_name
Warming queries can be deleted like any document and can be disabled by putting a setting ( {"index.warmer.enable": false} to localhost:9200/index/_settings

A number of useful links:
https://github.com/elasticsearch/elasticsearch - documentation including the twitter example
https://gist.github.com/justinvw/5025854 - auto-suggest example using a custom analyzer
http://www.elasticsearch.org/guide/reference/api/search/term-suggest/ - details on the auto-suggest
http://joelabrahamsson.com/elasticsearch-101/ - similar to some examples above
http://exploringelasticsearch.com/ - good, book like resource

Sunday, July 28, 2013

Java's REST Jersey 2.x examples

Jersey updated to 2.0 (JAX-RS 2.0), but most of the examples on the internet are for Jersey 1.x.  There are a few changes, none are that big, but here are some examples of serving and requesting (client) Jersey 2.0.

First, grab the Jersey jars from here: http://jersey.java.net/download.html - I got the Jersey JAX-RS 2.1 RI bundle  which contains what I needed.  Next, as I'm working in Eclipse, I created a Dynamic Web Project (right click in package explorer area -> Other Project -> Web -> Dynamic Web Project) and called it DynamicWebP. 

Extract the jars from the bundle and copy them into the project's WebContent/WEB-INF/lib directory. Also, add them to the project build path (right click on the project -> build path -> add external libraries).  For these examples, only javax.ws.rs-api-2.0.jar and jersey-container-servlet-core.jar are needed on the build path, but it's easier to add them all.  For deployment, all/nearly all are needed so it's easier to add them from the start to the WEB-INF/lib directory.  (If you have problems with some of the jars added to build path, but not to WEB-INF/lib, because you didn't copy them, then go to project properties, deployment, add and add the jars needed for deployment.)

Now, add a class to the project - here I've put it in a package I called example.jersey and named the class Hello. This class is a simple servlet to greet a request. Note the @Path to specify the path off the root in web.xml (shown next), the @GET for the HTTP verb that a method will respond to, and the @Produces for the media type to produce (there's also @Consumes for the media type to expect in the request).

 package example.jersey;  
 import javax.ws.rs.*;  
 import javax.ws.rs.core.MediaType;  
 public class Hello {  
      @Produces(MediaType.TEXT_PLAIN)     // nature of MIME type  
      public String simpleStringResponse() {  
           return "hello from the text response of Hello";  
      @Produces(MediaType.TEXT_HTML) // nature of MIME type  
      public String simpleHTMLResponse() {  
           return "<html> <title> Simple RESTful Hello</title> "  
                     + "<body><h2>Hello from the html response of class Hello :)</h2></body></html>";  

Here is the web.xml to update (in WEB-INF).  One noticeable change from Jersey 1.x to 2.0 is the classes to be referenced in servlet-class and param-name:
As expected, note the url-pattern path (relates to the url needed to find the resource) and param-value (relates to the class package) as that will be important to checking the result.

 <?xml version="1.0" encoding="UTF-8"?>  
 <web-app xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance" xmlns="http://java.sun.com/xml/ns/javaee" xmlns:web="http://java.sun.com/xml/ns/javaee/web-app_2_5.xsd" xsi:schemaLocation="http://java.sun.com/xml/ns/javaee http://java.sun.com/xml/ns/javaee/web-app_2_5.xsd" id="WebApp_ID" version="2.5">  
With that configured, right click on the project (assuming in Eclipse still) and click Run As -> Run on Server to launch the app on a configured server (tomcat).  Then go to the url: http://localhost:8080/DynamicWebP/jersey_test/hello - if you use a browser (instead of curl), the response should be:

      Hello from the html response of class Hello :)

Now, add a REST Jersey client by using the same project and the following JerseyClient class in the package client.jersey.

 package client.jersey;  
 import javax.ws.rs.client.*;  
 import org.glassfish.jersey.client.filter.CsrfProtectionFilter;  
 public class JerseyClient {  
       public static void main(String[] args) {  
            Client restClient = ClientBuilder.newClient();  
            //restClient.register(new CsrfProtectionFilter()); //register a filter, here a predefined one  
            WebTarget target = restClient.target("http://jersey.java.net//");  
           // target.register(new CsrfProtectionFilter());//or register on a target  
            WebTarget resourceTarget = target.path("download.html"); //change the URI without affecting a root URI  
            String responseString = resourceTarget.request("text/plain").get(String.class);  
            System.out.println("Here is the response: "+responseString);  

Note the separation of higher web targets (urls) from lower ones - you can set filters and paths at various levels.  The commented out code is just to illustrate the setting of filters at two levels.
(More info and examples on the simple setup of a client are here: https://jersey.java.net/documentation/latest/user-guide.html#client.)

To run the example, right click on the JerseyClient.java file in Eclipse and Run As -> Java Application to invoke the main(...) in the class.  Your console output should be the html text from the Jersey download page.

That's it - client and server side. We could make it more exciting by using query parameters and doing a recursive call to each class (you'd need to make some changes) until a value is reached like a predetermined limit (or something crashes :)

Jersey 2 client - no pathParam

I came across the pathParam in the Jersey 2 client documentation the other day and thought it would be useful in various client requests:
Specifically, this line in the JAX-RS 2.0 way:
String result = target.pathParam("param", "value").get(String.class);

However, coding it up ran into a problem very quickly - namely that the method wasn't available on a WebTarget class!  Searching the jar confirmed that, but searching the internet didn't turn up much besides this:
Which looks like the beginning of the discussion to remove pathParam :)
So, the lesson is: ignore that line in the documentation!

Thursday, July 18, 2013

Fixing the Akka Java tutorial

The first Akka Java tutorial is a nice introduction to Akka actors and messages.  In terms of programming, this method has been around for a long time and the Akka libraries provide a good way for the Java community to use messages and actors.

However, the tutorial in its current state (July 2013) has some problems that will trip up developers and leave you searching for solutions - here are some answers.  As usual, if you know what the problem is, then it's easy to deal with - in fact, the code is fine, but only when mixed with the right Akka.

First, the links to the tutorial in github are wrong - look on this tree (2.0.2). Alternatively, you can cut and paste the program code from the tutorial here

Which version of Akka do you want to run? The tutorial code as is requires an older version of Akka - the tutorial was written to Akka 2.0.2, but also works with Akka 2.0.5 which can be downloaded here or via the download page.  It won't work as is with Akka 2.2.0.

Akka 2.2.0+ - change one line
The most annoying issue that I came across was the error below.  It's a bit easier to fix when using Eclipse (or similar IDE), but I was using a text editor:
tutorial\Pi.java:31: error: method tell in class ActorRef cannot be applied to given types;
     master.tell(new Calculate());
  required: Object,ActorRef
  found: Calculate
  reason: actual and formal argument lists differ in length 
(compiled with:
 C:\Tools\akka-2.2.0>"c:\Program Files\Java\jdk1.7.0_09\bin\javac.exe" 
          -cp lib\scala-library.jar;lib\akka\akka-actor_2.10-2.2.0.jar tutorial\Pi.java 

The method signature for master.tell() should accept messages only or messages and actor references - except that they were deprecated in Akka 2.1 and removed in Akka 2.2.  Make sure you can find the right documentation.

To fix, add master.noSender() or null to the method call if using an Akka version greater than 2.1. Then recompile and run including the same jars as above.

Second solution - use older version of Akka

For the sake of simplicity, comment out the package statement at the top of the program:
 //package akka.tutorial.first.java;

Now, it's just a matter of compiling and running the code - as easy as it was supposed to be.  Put the tutorial directory in the akka-2.0.5 directory so that it's at the same level as Akka's lib directory.

javac.exe -cp lib/scala-library.jar:lib/akka/akka-actor-2.0.5.jar tutorial/Pi.java 
(for *nix/linux)
javac.exe -cp lib\scala-library.jar;lib\akka\akka-actor-2.0.5.jar tutorial\Pi.java 
(for windows)
or more generically: 
javac.exe -cp path_to_akka/lib/scala-library.jar:path_to_akk/libakka/akka-actor-2.0.5.jar \
(for linux - similarly for windows)
(compiled with java 7)

Running the example requires just a little more:
java.exe -cp lib\scala-library.jar;lib\akka\akka-actor-2.0.5.jar;tutorial;lib\akka\config-0.3.1.jar Pi 
(for windows, run from the common directory if you've done that)
java.exe -cp lib/scala-library.jar;lib/akka/akka-actor-2.0.5.jar;tutorial;lib/akka/config-0.3.1.jar Pi 
(for linux, *nix, run from the common directory if you've done that)
p_a=path_to_akka #set p_a equal to the path to akka, for example /opt/akka/lib
java.exe -cp \
$p_a/lib/scala-library.jar:$p_a/lib/akka/akka-actor-2.0.5.jar:path_to_tutorial/tutorial:$p_a/lib/akka/config-0.3.1.jar Pi 
(for linux, *nix, specifying exact paths)

When run, the output should look like:
        Pi approximation:               3.1415826535897926
        Calculation time:       1187 milliseconds

Wednesday, April 24, 2013

Performance Issues in ActiveMQ

There are times that we've seen messages taking a long time to be consumed by active, fast consumers. The consumer isn't working hard and the message queue isn't under load, but the messages aren't being processed quickly (in our case at 20% the normal pace).

Using the activemq-admin command (see the post here), we queried the stats on the queue.  The results confirmed our observations and pointed to the problem.  The output was like this:

For reference (see this link on some stats meanings and this similar link):
EnqueueCount =  number of messages sent and committed to the queue
DequeueCount = number of messages read from queue and committed or acknowledged by consumer
DispatchCount = number of messages read from the queue (DequeuedCount + InFlightCount)
InFlightCount = number of read messages waiting for acknowledgement from consumer

A relatively high DispatchCount or InFlightCount is not good.

The most notable instance of this slow message delivery occurred on our network of brokers cluster which runs two ActiveMQ servers with a network connection between the two.  Clients will connect to either broker as they wish. Signs of the issue were mostly that there was:
  • a backlog of messages waiting to be consumed (i.e. the producer was working fine)
  • the app had active consumers
  • messages were being consumed at what appeared to be 1/10th the normal speed
We were monitoring the consumer's processing time per message (about 1s per message - each message required plenty of work) which was much faster than the rate messages were being consumed.  The consumer app wasn't under heavy load, in fact, it wasn't working enough.  There was one other odd bit of behavior: the consumers were switching back and forth between the two brokers very quickly, every few seconds.

The interesting portion of this is the DequeueCount vs DispatchCount - DequeueCount is a count of the messages sent and committed as sent, DispatchCount is the number of messages sent to consumers regardless of whether they were committed.  Ideally, these two would match, but normally DispatchCount will be a little higher than DequeueCount.  Here DispatchCount is more than 10 times DequeueCount - messages are being sent again and again without being processed.  ActiveMQ has prefetch limits which might allow a consumer to grab 100s or even a 1000 messages at once - that's generally good for performance.  However, we'd seen the consumers moving from broker to broker every few seconds.  That meant the consumer wasn't able to process all those messages and on every reconnect would grab another group of messages that it couldn't process fast enough.

For this, we added randomize=false to the connection string on the consumer. This tells the consumer to prefer one broker and stick to that broker as much as possible.  See more here: ActiveMQ Failover Transport  The first broker listed will have priority with randomize=false.  Setting this solved the problem quickly when we really needed it and has kept things running much better.

Since then, we have occasionally seen instances where consumers with randomize=false are still struggling.  Sometimes this has required a restart (of the app or of the broker), but this is rarer.  When this has happened, we've noticed that we've had a large number of clients disconnect which appears to be doing something to the broker connection - a rolling restart of the brokers fixes the problem quickly.  The issue seems to be that the consumer is respecting the randomize=false and sticks to one broker, but the messages are on the other broker and moving slowly; the broker restarts fix the connection and tend to move the messages to the consumer's broker.