First Experiences with Scalding

Recently, I’ve been evaluating using Scalding to replace some parts of our ETL. Scalding is a Scala library that makes it easy to specify Hadoop MapReduce jobs. Scalding is built on top of Cascading, a Java library that abstracts away low-level Hadoop details. You specify your jobs in the clean and expressive Scala syntax and out spits MapReduce which runs on your Hadoop Cluster.

Options for Cluster Processing #

There are several options to run a custom processing task on a hadoop cluster without actually writing Java Mapreduce code. The major ones are Pig, HIve, Scalding and Spark(I’m sure I’m missing some that you may think are significant). All the options except for Spark work by allowing you to write in an easy to use/expressive DSL which later gets compiled to Java Map Reduce. Spark has its own engine to run workloads over the cluster and is gaining massive popularity. However, I’ve decided to give Spark a little more time to mature. Although, it seems pretty strong and well supported in its state as of this writing.

Scalding is used by Big Companies #

Another reason why I’m particularly interested in Scalding is that it is being used in several large companies. E.g. Etsy, Twitter. Twitter runs most of their backend batch tasks using scalding.

Getting Scalding #

You can get scalding by cloning and building https://github.com/twitter/scalding
On the twitter/scalding github page(s) the tutorial uses scald.rb to trigger jobs. Don’t use it please. The code is hideous and it will take you forever to make a simple change. On the other hand, I use the project here: https://github.com/Cascading/scalding-tutorial/. Advantage of the former is that you get a REPL to play with - which can be very useful. To kick off jobs from your local machine, you will have to make sure that you have hadoop client installed. If you don’t want to do that, then you can always run in the –local mode.

Simple Use Case #

We had an issue where one of HDFS folders of an external HIVE JSON table was having issues with bad / Incomplete JSON. Any hive query on the table would error because of the bad JSON.

I decided to write a scalding job which will look at each line for each file in the HDFS folder and find the offending JSON. I did a regex to check if the line ended with a “}”. Not the best JSON check but a good idea to see how prevalent the problem was. I wrote this class in the tutorial dir.

Note: This code uses the FieldsAPI which is not typed. It is recommended to use the Typed API

import com.twitter.scalding._

 class FindBadJson(args: Args) extends Job(args) {
   TextLine(args("input"))
   .read
   .filter ('line) { line: String => line.matches(".*[^}]$")}
   .write(Tsv(args("output")))
 }

Then from the scalding tutorial directory

scalding-tutorial git:(wip-2.6) ✗ sbt assembly
scalding-tutorial git:(wip-2.6) ✗ yarn jar target/scalding-tutorial-0.11.2.jar FindBadJson --input hdfs:///user/yranadive/data/json --output hdfs:///user/yranadive/data/output --hdfs

Exception in thread "main" cascading.flow.FlowException: step failed: (1/1) ...ser/yranadive/data/output, with job id: job_1423699617785_0038, please see cluster logs for failure messages
    at cascading.flow.planner.FlowStepJob.blockOnJob(FlowStepJob.java:221)
    at cascading.flow.planner.FlowStepJob.start(FlowStepJob.java:149)

On further digging in to resource manager UI I found this

Diagnostics:    
MAP capability required is more than the supported max container capability in the cluster. Killing the Job. mapResourceReqt: 2048 maxContainerCapability:1222
Job received Kill while in RUNNING state.

Believable, Since I was running this on a small QA cluster, which was probably resource starved. I changed the mapreduce.map.memory.mb and mapreduce.map.memory.mb in yarn-site.xml to the cluster max 1024 (tiny). The job now ran but threw an error. It looks like my client is not able to get updates from YARN server about the status of the job.

ERROR hadoop.HadoopStepStats: unable to get remote counters, no cached values, throwing exception
No enum constant org.apache.hadoop.mapreduce.JobCounter.MB_MILLIS_MAPS

The MR job keeps chugging and succeeds. Aah..finally some data!!!

But Wait! Since, we didn’t specify a reducer, we have just as many files as the mapper read. Bad MR…. Bad.. The output files are named like part-00001, part-00002, etc. Too much to go through. Time to declare a reducer:

import com.twitter.scalding._

class FindBadJson(args: Args) extends Job(args) {
  TextLine(args("input"))
   .read
   .filter ('line) { line: String => line.matches(".*[^}]$")}
   .groupAll { _.size }
   .write(Tsv(args("output")))
}

And Voila! All offenders in one file!

Conclusion #

Using scalding was really easy. The fact that I was able to kick off a MR job that went across the cluster and did things with only 5 lines is pretty cool. However, I do find people being wary of functional programming languages and using scala. To them I can say that if you are only using the scalding dsl, your are going to be fine for the most part and you really wouldn’t have to learn the nitty gritty details of scala. I’m going to update this space with more scalding related posts as I go through my journey.

Note: this example uses the fields api which is not typed. It is recommended to use the Typed API.

 
22
Kudos
 
22
Kudos

Now read this

Create a file of size x bytes

One of the common requirements I run across in moving data around is finding if I’m doing it the fastest way possible. A good indicator of speed is to find out how long it takes for a large file to get copied from one server to another.... Continue →