Visualizing Metrics in Storm using StatsD & Graphite

Storm Metrics API #

Jason Trost from Endgame has written a nice post on how to setup Storm to publish metrics using the Metrics API. Endgame has also open sourced a module storm-metrics-statsd for Storm that allows you to send messages to StatsD.

Build #

If you use maven, you can use the following snippets in your topology pom.xml to the load storm-metrics-statsd jar. Alternately, you can clone the github project and build it yourself.

    <repository>
      <id>central-bintray</id>
      <url>http://dl.bintray.com/lookout/systems</url>
      <snapshots>
        <enabled>false</enabled>
      </snapshots>
    </repository>

...
    <dependency>
      <groupId>com.timgroup</groupId>
      <artifactId>java-statsd-client</artifactId>
      <version>2.0.0</version>
    </dependency>
    <dependency>
      <groupId>com.lookout</groupId>
      <artifactId>storm-metrics-statsd</artifactId>
      <version>1.0.0</version>
    </dependency>

Registering your Metric #

You can now register the metric you wish to track by adding the following in your topology class

//  Configure the StatsdMetricConsumer
Map statsdConfig = new HashMap();
statsdConfig.put(StatsdMetricConsumer.STATSD_HOST, statsdHost);
statsdConfig.put(StatsdMetricConsumer.STATSD_PORT, 8125);
statsdConfig.put(StatsdMetricConsumer.STATSD_PREFIX,"data.storm.metrics");

topologyConfig.registerMetricsConsumer(StatsdMetricConsumer.class, statsdConfig, 2);

Now, let’s say you wish to track success and errors for a particular message. In the bolt class, you can send increments to a counter like such

// Metrics - Note: these must be declared as transient since they are not Serializable
transient CountMetric _successCountMetric;
transient CountMetric _errorCountMetric;

@Override
public void prepare(java.util.Map stormConf, TopologyContext context) {
    // Metrics must be initialized and registered in the prepare() method for bolts, or the open() method for spouts.  Otherwise, an Exception will be thrown
    initMetrics(context);
}

private void initMetrics(TopologyContext context) {
    _successCountMetric = new CountMetric();
    _errorCountMetric = new CountMetric();
    context.registerMetric("success_count", _successCountMetric, 1);
    context.registerMetric("error_count", _errorCountMetric, 1);
}

@Override
public void execute(Tuple tuple, BasicOutputCollector collector) {
   try{
       //some complex logic
       //On success increment the success counter
       _successCountMetric.incr();
   } catch (Exception ex) {
      //On Error incerment the error counter
       _errorCountMetric.incr();
   }
}

Accessing your graph from Graphite #

storm-metrics-statsd sends data to statsd under the following namespace:
topology_name.host_name.port_number.bolt_name.metric_name. The port number in this case is the supervisor worker that is running the bolt. You can find this in the Storm UI. In addition, the module will also send internal storm metrics of the topology to statsd. For e.g. metrics such as __ack-count, __tranfer-count, etc.

A common requirement is to get the counts regardless of the supervisor host / port that is responsible for running the bolt. In such cases, you can use wildcard characters and sumSeries function to get metrics across hosts/topolgies, etc. Here’s an example graph link in graphite

graphite.endlesspuppies.com/render/?colorList=red%2Cgreen&from=-60minutes&target=sumSeries(storm.metrics.topology...tsv.error_count)&target=sumSeries(storm.metrics.topology...tsv.success_count)


If you wish to display graphs(s) auto-cyled on a TV/dashboard you can use this javascript.

A Word of Caution #

When the Storm topology sends data to StatsD, it is actually sending that data over UDP. UDP is a connectionless protocol i.e. there is no guarantee that the message sent over UDP will be received by the server at the other end. Depending on your network connection and on how much load your StatsD server is under, the server may drop a small or significant amount of your data. This results in unreliable dashboards. So be very careful not to overwork your StatsD boxes and make sure the StatsD box is running closer to your Storm Topologies.

Debugging Topology Stats #

Once your topology is all setup and running you may find yourself wanting to know the exact data being sent to the StatsD server. Login to a supervisor box and listen to outgoing UDP traffic on port 8125 (default statsd port). You can achieve this by using ngrep.

sudo ngrep -W byline -d en3 . udp port 8125 > /tmp/capture.txt
tail -f /tmp/capture.txt
 
48
Kudos
 
48
Kudos

Now read this

First Experiences with Scalding

Recently, I’ve been evaluating using Scalding to replace some parts of our ETL. Scalding is a Scala library that makes it easy to specify Hadoop MapReduce jobs. Scalding is built on top of Cascading, a Java library that abstracts away... Continue →