Setting up Camus - LinkedIn’s Kafka to HDFS pipeline

Few days ago I started tinkering with Camus to evaluate its use for dumping raw data from Kafka=>HDFS. This blog post will cover my experience and first impressions with setting up a Camus pipeline. Overall I found Camus was easy to build and deploy.

What is Camus? #

Camus is LinkedIn’s open source project that can dump raw/processed data from Kafka to HDFS. It does this by a map-reduce job which when kicked off can -

The github readme has a details on how this is achieved.

Building Camus #

To build Camus:

How Camus does Date Partitioning #

Camus achieves date partitioning by introspecting the message and extracting the timestamp field from the messages. It uses this date to then determine the folder in HDFS where it lands the message. Camus creates folders in HDFS partitioned by date of the message. E.g. a json message with a timestamp of “2014-12-19T01:00:59Z” will land in folder 2014/12/19. You may choose to not introspect the message and assign a timestamp = currentTime(). In this case the data will land in folder which represents the time the job was run. You may want to do this if you are just trying out Camus and don’t want to invest in writing a custom decoder for a specific message format.

Setting up - What you will need #

There are two touch-points where you will have to possibly write your own code.

  1. Reading Messages from Kafka - You will write a class which extends com.linkedin.camus.coders.MessageDecoder that will tell Camus what’s the timestamp of the message.
  2. Writing to HDFS - You will write a class which extends com.linkedin.camus.etl.RecordWriterProvider that will tell Camus what’s the payload that should be written to HDFS.

Writing you own decoder #

Now, to actually run the jar you will need to create a message decoder or use one of the supplied classes e.g. KafkaAvroMessageDecoder, JSONStringMessageDecoder class. This will obviously depend on what kind of data you are reading from Kafka. The following is an example of a String Message Decoder which reads string messages from Kafka and writes them to HDFS:

camus/camus-kafka-coders/src/main/java/com/linkedin/camus/etl/kafka/coders/StringMessageDecoder.java

package com.linkedin.camus.etl.kafka.coders;

import com.linkedin.camus.coders.CamusWrapper;
import com.linkedin.camus.coders.MessageDecoder;
import org.apache.log4j.Logger;
import java.util.Properties;


/**
 * MessageDecoder class that will convert the payload into a String object,
 * System.currentTimeMillis() will be used to set CamusWrapper's
 * timestamp property

 * This MessageDecoder returns a CamusWrapper that works with Strings payloads,
 */
public class StringMessageDecoder extends MessageDecoder<byte[], String> {
  private static final Logger log = Logger.getLogger(StringMessageDecoder.class);

  @Override
  public void init(Properties props, String topicName) {
    this.props = props;
    this.topicName = topicName;
  }

  @Override
  public CamusWrapper<String> decode(byte[] payload) {
    long timestamp = 0;
    String payloadString;

    payloadString = new String(payload);
    timestamp = System.currentTimeMillis();

    return new CamusWrapper<String>(payloadString, timestamp);
  }
}

You can also use a ByteArray decoder if you are reading binary data.
UPDATE: Before you do this make sure you can read the data back from HDFS. For example, if you want to write protobufs, then perhaps its a wiser solution to first convert it a format such as parquet. Parquet will allow you to project columns off arbitrarily nested data.

camus/camus-kafka-coders/src/main/java/com/linkedin/camus/etl/kafka/coders/ByteArrayMessageDecoder.java

package com.linkedin.camus.etl.kafka.coders;

import com.linkedin.camus.coders.CamusWrapper;
import com.linkedin.camus.coders.MessageDecoder;
import org.apache.log4j.Logger;

import java.util.Properties;


/**
 * MessageDecoder class that will convert the payload into a ByteArray object,
 * System.currentTimeMillis() will be used to set CamusWrapper's
 * timestamp property

 * This MessageDecoder returns a CamusWrapper that works with ByteArray payloads,
 */
public class ByteArrayMessageDecoder extends MessageDecoder<byte[], byte[]> {
  private static final Logger log = Logger.getLogger(ByteArrayMessageDecoder.class);

  @Override
  public void init(Properties props, String topicName) {
    this.props = props;
    this.topicName = topicName;
  }

  @Override
  public CamusWrapper<byte[]> decode(byte[] payload) {
    //Push the raw payload and add the current time
    return new CamusWrapper<byte[]>(payload, System.currentTimeMillis());
  }
}

The date for each message in this case is the time the Kafka job is run (or more specifically the time the message is fetched from Kafka).

Writing your own RecordWriter #

RecordWriter interface has methods which tell Camus the payload that will be written to HDFS. Here you can tell Camus what record terminator will be used. You may wish to choose a String “\n” or a (byte)0x0. You can also specify if you want the output to be compressed - good thing to do if you are using HDFS as a backup of your kafka topics.

camus-etl-kafka/src/main/java/com/linkedin/camus/etl/kafka/common/ByteArrayRecordWriterProvider.java

package com.linkedin.camus.etl.kafka.common;

import com.linkedin.camus.coders.CamusWrapper;
import com.linkedin.camus.etl.IEtlKey;
import com.linkedin.camus.etl.RecordWriterProvider;
import com.linkedin.camus.etl.kafka.mapred.EtlMultiOutputFormat;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.FSDataOutputStream;
import org.apache.hadoop.fs.FileSystem;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.io.compress.CompressionCodec;
import org.apache.hadoop.io.compress.DefaultCodec;
import org.apache.hadoop.io.compress.GzipCodec;
import org.apache.hadoop.io.compress.SnappyCodec;
import org.apache.hadoop.mapreduce.RecordWriter;
import org.apache.hadoop.mapreduce.TaskAttemptContext;
import org.apache.hadoop.mapreduce.lib.output.FileOutputCommitter;
import org.apache.hadoop.mapreduce.lib.output.FileOutputFormat;
import org.apache.hadoop.util.ReflectionUtils;

import java.io.ByteArrayOutputStream;
import java.io.DataOutputStream;
import java.io.IOException;


/**
 * Provides a RecordWriter that uses FSDataOutputStream to write
 * a Byte record as bytes to HDFS without any reformatting or compression.
 *
 * Null byte is used as record delimiter unless a string is specified
 */
public class ByteArrayRecordWriterProvider implements RecordWriterProvider {
  public static final String ETL_OUTPUT_RECORD_DELIMITER = "etl.output.record.delimiter";
  public static final String DEFAULT_RECORD_DELIMITER = "null";

  protected String recordDelimiter = null;

  private String extension = "";
  private boolean isCompressed = false;
  private CompressionCodec codec = null;

  public ByteArrayRecordWriterProvider(TaskAttemptContext context) {
    Configuration conf = context.getConfiguration();

    if (recordDelimiter == null) {
      recordDelimiter = conf.get(ETL_OUTPUT_RECORD_DELIMITER, DEFAULT_RECORD_DELIMITER);
    }

    isCompressed = FileOutputFormat.getCompressOutput(context);

    if (isCompressed) {
      Class<? extends CompressionCodec> codecClass = null;
      if ("snappy".equals(EtlMultiOutputFormat.getEtlOutputCodec(context))) {
        codecClass = SnappyCodec.class;
      } else if ("gzip".equals((EtlMultiOutputFormat.getEtlOutputCodec(context)))) {
        codecClass = GzipCodec.class;
      } else {
        codecClass = DefaultCodec.class;
      }
      codec = ReflectionUtils.newInstance(codecClass, conf);
      extension = codec.getDefaultExtension();
    }
  }

  @Override
  public String getFilenameExtension() {
    return extension;
  }

  @Override
  public RecordWriter<IEtlKey, CamusWrapper> getDataRecordWriter(TaskAttemptContext context, String fileName,
      CamusWrapper camusWrapper, FileOutputCommitter committer) throws IOException, InterruptedException {

    // If recordDelimiter hasn't been initialized, do so now
    if (recordDelimiter == null) {
      recordDelimiter = context.getConfiguration().get(ETL_OUTPUT_RECORD_DELIMITER, DEFAULT_RECORD_DELIMITER);
    }

    // Get the filename for this RecordWriter.
    Path path =
        new Path(committer.getWorkPath(), EtlMultiOutputFormat.getUniqueFile(context, fileName, getFilenameExtension()));

    FileSystem fs = path.getFileSystem(context.getConfiguration());
    if (!isCompressed) {
      FSDataOutputStream fileOut = fs.create(path, false);
      return new ByteRecordWriter(fileOut, recordDelimiter);
    } else {
      FSDataOutputStream fileOut = fs.create(path, false);
      return new ByteRecordWriter(new DataOutputStream(codec.createOutputStream(fileOut)), recordDelimiter);
    }
  }

  protected static class ByteRecordWriter extends RecordWriter<IEtlKey, CamusWrapper> {
    private DataOutputStream out;
    private byte[] recordDelimiter;

    public ByteRecordWriter(DataOutputStream out, String recordDelimiterString) {
      this.out = out;
      this.recordDelimiter = recordDelimiterString.toUpperCase().equals("NULL") ?
              new byte[] {(byte)0x0} :
              recordDelimiterString.getBytes();
    }

    @Override
    public void write(IEtlKey ignore, CamusWrapper value) throws IOException {
      boolean nullValue = value == null;
      if (!nullValue) {
        ByteArrayOutputStream outBytes = new ByteArrayOutputStream();
        byte[] record = (byte[]) value.getRecord();
        outBytes.write(record);
        outBytes.write(recordDelimiter);
        out.write(outBytes.toByteArray());
      }
    }

    @Override
    public void close(TaskAttemptContext taskAttemptContext) throws IOException, InterruptedException {
      out.close();
    }
  }
}

Run Camus #

Great, you’re all setup to run Camus. Time to change the configs and point it to your Kafka cluster.

config.properties


# The job name.
camus.job.name=Camus Fetch

# final top-level data output directory, sub-directory will be dynamically created for each topic pulled
etl.destination.path=/camus/topics
# HDFS location where you want to keep execution files, i.e. offsets, error logs, and count files
etl.execution.base.path=/camus/exec
# where completed Camus job output directories are kept, usually a sub-dir in the base.path
etl.execution.history.path=/camus/exec/history

camus.message.decoder.class=com.linkedin.camus.etl.kafka.coders.ByteArrayMessageDecoder

# The record writer for Hadoop
etl.record.writer.provider.class=com.linkedin.camus.etl.kafka.common.ByteArrayRecordWriterProvider

# max hadoop tasks to use, each task can pull multiple topic partitions
mapred.map.tasks=10
# max historical time that will be pulled from each partition based on event timestamp
kafka.max.pull.hrs=1
# events with a timestamp older than this will be discarded.
kafka.max.historical.days=3
# Max minutes for each mapper to pull messages (-1 means no limit)
kafka.max.pull.minutes.per.task=-1

# if whitelist has values, only whitelisted topic are pulled. Nothing on the blacklist is pulled
kafka.blacklist.topics=
kafka.whitelist.topics=mytopic
log4j.configuration=false

# Name of the client as seen by kafka
kafka.client.name=camus
# The Kafka brokers to connect to, format: kafka.brokers=host1:port,host2:port,host3:port
kafka.brokers=kafka.test.org:6667,kafka.test.org:6667

#Stops the mapper from getting inundated with Decoder exceptions for the same topic
#Default value is set to 10
max.decoder.exceptions.to.print=5

#Controls the submitting of counts to Kafka
#Default value set to true
post.tracking.counts.to.kafka=false
#monitoring.event.class=class.that.generates.record.to.submit.counts.to.kafka

# everything below this point can be ignored for the time being, will provide more documentation down the road
##########################
etl.run.tracking.post=false
kafka.monitor.tier=
etl.counts.path=
kafka.monitor.time.granularity=10

#etl.hourly=hourly
etl.daily=daily

# Should we ignore events that cannot be decoded (exception thrown by MessageDecoder)?
# `false` will fail the job, `true` will silently drop the event.
etl.ignore.schema.errors=false

# configure output compression for deflate or snappy. Defaults to deflate
mapred.output.compress=false
etl.output.codec=gzip
etl.deflate.level=6
#etl.output.codec=snappy

etl.default.timezone=America/Los_Angeles
etl.output.file.time.partition.mins=60
etl.keep.count.files=false
etl.execution.history.max.of.quota=.8

mapred.map.max.attempts=1

kafka.client.buffer.size=20971520
kafka.client.so.timeout=60000

Now, submit the shaded jar to hadoop. The shaded jar exists in the camus-example/target directory.

hadoop jar camus-example-0.1.0-SNAPSHOT-shaded.jar com.linkedin.camus.etl.kafka.CamusJob -P camus.properties

As the job runs it will store the temporary data in the camus/exec HDFS directory. Once the job completes the files will be moved to the topics directory in HDFS.

Output Compression #

To enable compression set mapred.output.compress=true and set
etl.output.codec=gzip in the config file. The advantage of using gzip is that anyone can see contents of the file very quickly by unzipping the file using ubiquitous gunzip from the command line. Deflate (which is the underlying compression for gzip) and snappy will require you to download additional tools to uncompress data.

Conclusion #

The project is being actively committed to and new documentation is being added everyday. My experience with Camus has been good so far. Another advantage of Camus is that it can be used for basic transformations on your data. I’m looking forward to working more with this project.

 
350
Kudos
 
350
Kudos

Now read this

First Experiences with Scalding

Recently, I’ve been evaluating using Scalding to replace some parts of our ETL. Scalding is a Scala library that makes it easy to specify Hadoop MapReduce jobs. Scalding is built on top of Cascading, a Java library that abstracts away... Continue →