Tag hadoop

Data Durability

A friend of mine half-jokingly says that the only reason to put data into a database is to get it back out again. In order to get data out, we need to ensure some kind of durability.

Relational databases offer single server durability through write-ahead logging and checkpoint mechanisms. These are tried and true methods of writing data to a replay log on disk as well as caching writes in memory. Whenever a checkpoint occurs, dirty data is flushed to disk. The benefit of a write ahead log is that we can always recover from a crash (so long as we have the log files, of course).

How does single server durability work with non-relational databases? Most of them don’t have write-ahead logging.

MongoDB currently has limited single server durability. While some people consider this a weakness, it has some strengths – writes complete very quickly since there is no write-ahead log that needs to immediately sync to disk. MongoDB also has the ability to create replica sets for increased durability. There is one obvious upside to replica sets – the data is in multiple places. Another advantage of replica sets is that it’s possible to use getLastError({w:...}) to request acknowledgement from multiple replica servers before a write is reported as complete to a client. Just keep in mind that getLastError is not used by default – application code will have to call the method to force the sync.

Setting a w-value for writes is something that was mentioned in Getting Faster Writes with Riak. Although, in that article we were decreasing durability to increase write performance. In Amazon Dynamo inspired systems writes are not considered complete until multiple clients have responded. The advantage is that durable replication is enforced at the database and clients have to elect to use less security for the data. Refer to the Cassandra documentation on Writes and Consistency or the Riak Replication documentation for more information on how Dynamo inspired replication works. Datastores using HDFS for storage can take advantage of HDFS’s built-in data replication.

Even HBase, a column-oriented database, uses HDFS to handle data replication. The trick is that rows may be chopped up based on columns and split into regions. Those regions are then distributed around the cluster on what are called region servers. HBase is designed for real-time read/write random-access. If we’re trying to get real-time reads and writes, we can’t expect HBase to immediately sync files to disk – there’s a commit log (RDBMS people will know this as a write-ahead log). Essentially, when a write comes in from a client, the write is first written to the commit log (which is stored using HDFS), then it’s written in memory and when the in-memory structure fills up, that structure is flushed to the filesystem. Here’s something cunning: since the commit log is being written to HDFS, it’s available in multiple places in the cluster at the same time. If one of the region servers goes down it’s easy enough to recover from – that region server’s commit log is split apart and distributed to other region servers which then take up the load of the failed region server.

There are plenty of HBase details that have been grossly oversimplified or blatantly ignored here for the sake of brevity. Additional details can be found in HBase Architecture 101 – Storage as well as this Advanced HBase presentation. As HBase is inspired by Google’s big table, additional information can be found in Chang et al. Bigtable: A distributed storage system for structured data and The Google File System.

Interestingly enough, there is a proposed feature for PostgreSQL 9.1 to add synchronous replication to PostgreSQL. Current replication in PostgreSQL is more like asynchronous database mirroring in SQL Server, or the default replica set write scenario with MongoDB. Synchronous replication makes it possible to ensure that data is being written to every node in the RDBMS cluster. Robert Haas discusses some of the pros and cons of replication in PostgreSQL in his post What Kind of Replication Do You Need?.

Microsoft’s Azure environment also has redundancy built in. Much like Hadoop, the redundancy and durability is baked into Azure at the filesystem. Building the redundancy at such a low level makes it easy for every component of the Azure environment to use it to achieve higher availability and durability. The Windows Azure Storage team have put together an excellent overview. Needless to say, Microsoft have implemented a very robust storage architecture for the Azure platform – binary data is split into chunks and spread across multiple servers. Each of those chunks is replicated so that there are three copies of the data at any given time. Future features will allow for data to be seamlessly geographically replicated.

Even SQL Azure, Microsoft’s cloud based relational database, takes advantage of this replication. In SQL Azure when a row is written in the database, the write occurs on three servers simultaneously. Clients don’t even see an operation as having committed until the filesystem has responded from all three locations. Automatic replication is designed into the framework. This prevents the loss of a single server, rack, or rack container from taking down a large number of customers. And, just like in other distributed systems, when a single node goes down, the load and data are moved to other nodes. For a local database, this kind of durability is typically only obtained using a combination of SAN technology, database replication, and database mirroring.

There is a lot of solid technology backing the Azure platform, but I suspect that part of Microsoft’s ultimate goal is to hide the complexity of configuring data durability from the user. It’s foreseeable that future upgrades will make it possible to dial up or down durability for storage.

While relational databases are finding more ways to spread load out and retain consistency, there are changes in store for MongoDB to improve single server durability. MongoDB has been highly criticized for its lack of single server durability. Until recently, the default response has been that you should take frequent backups and write to multiple replicas. This is still a good idea, but it’s promising to see that the MongoDB development team are addressing single server durability concerns.

Why is single server durability important for any database? Aside from guaranteeing that data is correct in the instance of a crash, it also makes it easier to increase adoption of a database at the department level. A durable single database server makes it easy to build an application on your desktop, deploy it to the server under your desk, and move it into the corporate data center as the application gains importance.

Logging and replication are critical technologies for databases. They guarantee data is durable and available. There are also just as many options as there are databases on the market. It’s important to understand the requirements of your application before choosing mechanisms to ensure durability and consistency across multiple servers.


Getting Started with Hive

This is a follow up to the last two posts I made about querying HBase and Hive. The set up for this was a bit trickier than I would have liked, so I’m documenting my entire process for three reasons.

  1. To remind myself for the next time I have to do this.
  2. To help someone else get started.
  3. In the hope that someone will know a better way and help me improve this.

Installing Hadoop and Hive

This was the easiest part of the installation process. I used the Cloudera distribution of Hadoop for everything in this stack. There are a few ways to go about getting the Cloudera Distribution of Hadoop (CDH for short) on your machine. The Hadoop Quick Start is a good place to get started.


I’m pretty sure we’ve covered this before, but you need to have Java 1.6 installed in order to use any part of the Hadoop ecosystem.


If you’re using Windows, I recommend using the Hadoop VMWare image that Cloudera make available. If you want to run Hadoop natively on Windows, you need to set up Cygwin and creating a UNIX-like environment on your local machine. Setting. up Cygwin can be a bit tricky the first time you do it. Luckily, you’re not alone and documentation does exist: Setting up a Single-Node Hadoop “Cluster” on Windows XP.


Linux users have it the easiest out of any platform, in my opinion. Most of the documentation assumes that you’re running some UNIX-based operating system. If you’re willing to use the CDH version of Hadoop, you can tell your package manager to talk to the Cloudera repository. Cloudera have thoughtfully provided documentation for you in the installation guide.

You issue a few commands and you’ll have a working Hadoop set up on your machine in no time.

Other *NIX

This is the route that I took with OS X. You could also do the same on Linux if you want to use the stock distribution of Hadoop. The Hadoop Quick Start guide covers this in some detail, but here’s what I did:

wget http://archive.cloudera.com/cdh/3/hadoop-0.20.2+737.tar.gz
wget http://archive.cloudera.com/cdh/3/hive-0.5.0+32.tar.gz
wget http://archive.cloudera.com/cdh/3/pig-0.7.0+9.tar.gz

I copied these three files into my /opt folder and uncompressed them. As you can see in the screenshot, I renamed the hadoop, hive, and pig folders to use a cdh_ prefix. This makes it easier for me to identify where the installation came from, just in case I need to upgrade or raise a stink with someone.

Aliasing Hadoop for Easy Upgrades


Nothing is ever quite as easy as it seems at first, right? Now that we have our software installed, we need to supply some configuration information. Hadoop has three modes.

  1. Standalone In standalone mode – nothing runs as a service or daemon. This requires no configuration on our part, but it does make things trickier with Hive because only one Hive database connection can exist at a time in standalone mode.
  2. Pseudo-distributed This is what we’re aiming for. In pseudo-distributed mode everything is running on one machine as a service process. What we’re actually doing (sort of) is configuring our Hadoop installation to a be a one node cluster. It’s not reliable or failure proof (or a good idea), but it does let us develop on our workstations and laptops instead of buying a lot of servers for the house.
  3. Fully distributed This is a real live cluster running on many servers. Books are being written about this.

Hadoop is, by default, set up to run in standalone mode. Having a standalone Hadoop/HDFS installation is nice, but it doesn’t do what we need in the long-term.

There are Linux packages for CDH that will set up your server to run in pseudo-distributed mode. If you used one of those, you can skip ahead. Personally, I’d stick around just so you can read up on what you’re missing and so you make me feel better (I’ll know if you skip ahead).


The core-site.xml file is used for machine specific configuration details. The default configuration is set up in core-default.xml. Settings in core-default.xml are overridden by core-site.xml, so there’s no reason to go messing around in there. If you want to check out the contents of the file, it’s located in build/classes/core-default.xml.

Once you’ve opened, or created your own core-site.xml, you want to put appropriate information in there. Mine looks like this:

<?xml version="1.0"?>
<?xml-stylesheet type="text/xsl" href="configuration.xsl"?>

<!-- Put site-specific property overrides in this file. -->


    <description>The name of the default file system.  A URI whose
    scheme and authority determine the FileSystem implementation.  The
    uri's scheme determines the config property (fs.SCHEME.impl) naming
    the FileSystem implementation class.  The uri's authority is used to
    determine the host, port, etc. for a filesystem.</description>

    <description>The maximum number of tasks that will be run simultaneously by a
    a task tracker

    <description>Default block replication.
    The actual number of replications can be specified when the file is created.
    The default is used if replication is not specified in create time.

I’ll summarize these three settings here. A full explanation of the core-site.xml file can be found in the Hadoop wiki: http://hadoop.apache.org/common/docs/current/cluster_setup.html.


This setting is telling Hadoop to use HDFS running on the local host for storage. This could also be a local file or something in Amazon S3.


We want to set this to a sane value because this is the maximum number of MapReduce tasks that Hadoop will fire up. If we set this value too high, Hadoop could theoretically start up a large number of tasks and overwhelm our CPU.


Non-relational databases like Hadoop get their resiliency by replicating data throughout the database cluster. Since we’re running in pseudo-distributed mode (it’s a cluster with only one node) we want to set this to 1.


I changed a few variables in my hadoop-env.sh file based on settings I found in the Hadoop Wiki article Running Hadoop on OS X 10.5 64-bit). You can set these environment variables anywhere in your operating system, but since they’re Hadoop-specific, it’s probably best to set them in the hadoop-env.sh file. Based on my own experience, I did not change the HADOOP_CLASSPATH variable. Here’s what the first 15 lines of my hadoop-env.sh looks like:

# Set Hadoop-specific environment variables here.

# The only required environment variable is JAVA_HOME.  All others are
# optional.  When running a distributed configuration it is best to
# set JAVA_HOME in this file, so that it is correctly defined on
# remote nodes.

# The java implementation to use.  Required.
export JAVA_HOME=/System/Library/Frameworks/JavaVM.framework/Versions/1.6.0/Home/

# Extra Java CLASSPATH elements.  Optional.
# export HADOOP_CLASSPATH="<extra_entries>:$HADOOP_CLASSPATH"

# The maximum amount of heap to use, in MB. Default is 1000.

I’ve also added some additional configuration to my .profile that make working with Hadoop and Hive a lot easier:

export CLASSPATH=$CLASSPATH:/Library/PostgreSQL/9.0/share/java/postgresql-9.0-801.jdbc4.jar

export JAVA_HOME=$(/usr/libexec/java_home)
export HADOOP_HOME=/opt/hadoop
export HIVE_HOME=/opt/hive
export PATH=$HIVE_HOME/bin:$PATH
export PIG_HOME=/opt/pig
export PATH=$PIG_HOME/bin:$PATH
export HIVE_AUX_JARS_PATH=/Library/PostgreSQL/9.0/share/java/

# CLASSPATH variables don't seem to like symbolic links on OS X
# setting up the CLASSPATH here makes it easy to change things around
# in the future as I upgrade between Hadoop/Hive/HBase/Pig versions

export CLASSPATH=$CLASSPATH:${hadoop_dir}/hadoop-${hadoop_version}-core.jar:${hadoop_dir}/hadoop-${hadoop_version}-tools.jar:${hadoop_dir}/hadoop-${hadoop_version}-ant.jar:${hadoop_dir}/lib/commons-logging-1.0.4.jar:${pig_dir}/pig-${pig_version}.jar:${pig_dir}/pig-${pig_version}-core.jar

One Last Step

Before we can start anything running on Hadoop we have to get our namenode ready. The namenode is part of Hadoop that manages information for the distributed filesystem. A namenode knows where all of the data is stored across our distributed filesystem. Without a running namenode, we can’t find the files we’re storing.

Before we get started, we have to format our filesystem. This process creates empty directories to be used for the namenode’s data. Datanodes aren’t involved because they can be added and removed from the cluster dynamically.

hadoop namenode -format

You’ll see some output from this command and, once it’s all said and done, you’ll have a formatted namenode that’s ready to go.

Starting Hadoop

Once you have everything set up and installed, you should be able to start Hadoop by running start-all.sh from the command prompt. You should see some messages that tell you a bunch of nodes are being started with output going to a specific directory.

Starting Hadoop on the Command Prompt

Verifying That It’s Running

You can verify that everything is configured and running hadoop dfs -ls / on the command line. You should see a basic directory listing that looks something like this image.

Verifying that Hadoop is Running

What About Hive?

Astute readers will notice that we’ve only configured Hadoop and haven’t done anything to get Hive up and running. This is, unfortunately, a separate task. Hadoop and Hive are separate tools, but they’re part of the same tool chain.

Configuring the Metastore

Hive, like Hadoop, uses XML configuration files. But, since Hive is a database, it also needs a place to store metadata about the database – it’s called the metastore. By default, Hive uses what’s called an embedded metastore. The problem with the default configuration is that it only allows for one user or process at a time to connect to a Hive database. This isn’t that good for production databases. It’s also no good if you’re like me and forget which virtual desktop you left that query window open on. We’re going to fix this.

It’s possible to store the metastore’s information in a persistent external database. Hive comes configured to use the Derby database, however any JDBC-compliant database can be used by setting a few options. This is precisely what we’re going to do. There’s a delicious irony in storing one database’s configuration settings in another database. Since I’m a huge fan of PostgreSQL, that’s what we’re going to use to get started.

Creating the Metastore User

I don’t want Hive talking to PostgreSQL as me, or any other random user account, so we’re going to set up a new user.

Creating the Metastore Database

Now that we have our user, let’s create the database:

Database Driver

Of course, we also need a database driver. Up in the changes to my .profile I set up a HIVE_AUX_JARS_PATH variable. This is a : delimited path where we can tell Hive to look for extra functionality to load. This folder specifically contains the PostgreSQL JDBC driver. You can download the appropriate PostgreSQL JDBC driver from the PostgreSQL JDBC driver site. Appropriate documentation is also there. It’s pretty much as simple as copying the JAR file into an appropriate directory.


Now that we’ve created our metastore user and database as well as installed a JDBC driver, the last thing to do is to make some changes in hive-site.xml. The hive-site.xml overrides the settings in hive-default.xml – just like the Hadoop configuration we did earlier. One thing to keep in mind is that you can also override Hadoop configuration in this file as well. We’re not going to, but it’s good to know that it’s possible.

<?xml version="1.0"?>
<?xml-stylesheet type="text/xsl" href="configuration.xsl"?>

        <description>controls whether to connect to remote metastore server or open a new metastore server in Hive Client JVM</description>

The javax.dbo.option.* settings are where we set up our metastore database connection. Everything else should be, largely, self-explanatory.

Creating Tables

Let’s go ahead and create a table in Hive:

CREATE TABLE leagues (
  league_id STRING,
  league_name STRING

This creates a table named leagues with two columns: league_id and league_name. The last two lines might look a bit funny to people familiar with SQL. They are specific to the Hive Query Language (HiveQL). These lines say that each row in the underlying data file will be stored as tab-delimited text.

Getting Data Into Hive

We’ve configured Hadoop and Hive and we’ve created a table. How do we get data into the table? That’s a trickier matter.

We’re going to be using sample data from Retrosheet to populate our Hive database. You should get started downloading and keep reading, it takes a bit of time.

Loading Data With Pig

Pig is a way to execute data flows written in a language called Pig Latin. In this use case, I’ve written very primitive data clean-up scripts using Pig Latin (it would have been just as easy, if not easier, to write them in Ruby or Perl).

The first section of this Pig Latin script registers the PiggyBank contributed functionality and creates convenient aliases for the functions. We use DEFINE to set up the aliases so we don’t have to type the fully qualified function name every time we want to use it. It’s a lot easier to type replace(state, 'Ohio', 'OH') than it is to type org.apache.pig.piggybank.evaluation.string.REPLACE(state, 'Ohio', 'OH').

REGISTER /opt/pig/contrib/piggybank/java/piggybank.jar;
-- Create a function alias because nobody wants to type in the full function 
-- name over and over again.
DEFINE replace org.apache.pig.piggybank.evaluation.string.REPLACE(); 
DEFINE substring org.apache.pig.piggybank.evaluation.string.SUBSTRING(); 
DEFINE s_split org.apache.pig.piggybank.evaluation.string.Split(); 
DEFINE reverse org.apache.pig.piggybank.evaluation.string.Reverse();

-- Read the first CSV file and dump it as a tab-separated file
league_codes = LOAD 'league-codes.csv'
USING PigStorage(',')
AS (league_id:chararray, league_name);

STORE league_codes 
INTO 'league-codes' 
USING PigStorage('\t');

names = LOAD 'retrosheet-CurrentNames.csv'
USING PigStorage(',')
AS (franchise_id:chararray,

-- We don't need to explicitly specify the PigStorage mechanism.
-- Pig will use tab separated files by default.
STORE names
INTO 'team_names';

-- It's our first transformation:
-- 1) Load the data from the text file.
-- 2) Remove double-quote characters from the data.
-- 3) Write the data to the filesystem.
ballparks = LOAD 'retrosheet-ballparkcodes.txt'
USING PigStorage(',')
AS (park_id:chararray,

formatted_parks = foreach ballparks GENERATE
  replace(notes, '"', '');

STORE formatted_parks
INTO 'ballparks';

personnel = LOAD 'retrosheet-umpire.coach.ids.txt'
USING PigStorage(',')
AS (last_name:chararray,

-- the debut date is the only weirdly formatted date in here, so we'll reformat
-- it to be sortable in YYYYMMDD format
-- I had originally intended to do something with this, but I never
-- got around to writing it. As it stands, it's a good way to show 
-- how the string splitting function works (it returns an array)
-- and how that can be combined with flatten to turn it into a tuple
split_personnel = FOREACH personnel GENERATE
  flatten(s_split(debut_date, '/'))
AS (  

-- These subsequent x_, y_, z_ stages load our personnel 
-- relations into intermediate relations while we're doing
-- work on the data. This could have been done in a single
-- step, but it's much easier to read this way.
x_personnel = FOREACH split_personnel GENERATE
  reverse(CONCAT('000', month)) AS month, 
  reverse(CONCAT('000', day)) AS day, 

y_personnel = FOREACH x_personnel GENERATE
  reverse(substring($3, 0, 2)) AS month,
  reverse(substring($4, 0, 2)) AS day,

formatted_personnel = FOREACH y_personnel GENERATE
  CONCAT(CONCAT(year, month), day) AS debut;

STORE formatted_personnel
INTO 'personnel';

You might think that each of the STORE statements would produce a file named personnel or ballparks. Unfortunately, you’d be wrong. You see, Pig turns our Pig Latin script into a MapReduce job on the fly. The final output is written into the appropriately named folder in a part file. The part files have names like part-m-00000. This tells us that it’s part of a MapReduce job, specifically the map phase, and it’s the first part. If there were multiple map operations that were performed (if we had a full cluster), there would be multiple files with incrementing part numbers.

There is a second Pig Latin program that will load the standings, I’ve left that out of the article due to the sheer length of the code. However, it’s a relatively trivial script and should be easy enough to follow. The interesting thing to note is that the Pig Latin program to load the standings makes use of variable substitution. The driver script for this whole process, load_standings.sh, passes parameters to Pig on the command line and they are interpreted at run time.

For more information on Pig and Pig Latin, you should check out the Pig project page or the Pig Latin Cookbook.

Since we’ve formatted our data, how do we load it? It’s pretty simple to do with a Hive LOAD statement:

LOAD DATA LOCAL INPATH 'personnel/part-m-00000'

LOAD DATA LOCAL INPATH 'team_names/part-m-00000'

LOAD DATA LOCAL INPATH 'ballparks/part-m-00000'

LOAD DATA LOCAL INPATH 'league-codes/part-m-00000'

Because the data for the standings is separated out by year in the Retrosheet data set, we have to drive the insertion through a shell script:

echo "  loading Box Scores..."
for d in `ls -d stats/*`

    echo "    Loading box scores for $year"
    for f in `ls $d/part*`
      `hive -e "LOAD DATA LOCAL INPATH '$f' INTO TABLE raw_standings PARTITION (year='$year'); "`;

This loops through the output folders for the standings one at a time and loads the data into Hive. The raw_standings uses partitioning to separate out the data by year. Similar to partitioning in a relational database, this lets us store data with the same partition key in the same physical file. This improves query performance when we’re querying on a limited range of data. In this case, we’re only partitioning by year, but it’s possible to partition on multiple columns for greater granularity.

To improve long term performance further, we’ve gone ahead and created a second table, rc_standings. The rc_standings table is partitioned by year, but it’s also using a ROW FORMAT setting to create an RCFile – a columnar files. The data in this table will be stored in a column-oriented layout. This makes it possible to skip reading from columns that aren’t used in our query. With a table as wide as the rc_standings table, that’s going to be beneficial for our query performance.

In order to get the data into the RCFile table, we have to copy it using a second driver in the shell script:

for d in `ls -d stats/*`
    # This only overwrites the $year partition in the standings table
    # We also make use of the REGEX Column Specification to avoid pulling back the
    #  partitioning column(s) in the query. 
    #  See http://wiki.apache.org/hadoop/Hive/LanguageManual/Select#REGEX_Column_Specification
    #  for more about the REGEX Column Specification.
    hive -S -e "INSERT OVERWRITE TABLE rc_standings PARTITION (year=$year) SELECT \`(year)?+.+\` FROM raw_standings WHERE year = $year;"

Loading the standings took an incredible amount of time on my test system, despite using an SSD and having 8GB of memory available. I’m not sure why, and I would love to know what I did wrong, so feel free to sound off in the comments so we can make this process faster for people to get started.

Querying Hive

Once we have the data loaded, querying becomes possible. To connect to hive, open up a command prompt and type hive. You should see blank hive prompt waiting for your commands.

The Hive Prompt

Let’s take a look at the contents of a single table. Type select * from leagues; and hit enter. You should see something like the following:

A Simple Query

Let’s try something trickier and use a JOIN.

select l.league_name, t.franchise_id, t.first_game_date, t.last_game_date, t.nickname
from leagues l
join teams t on l.league_id = t.league_id;

Query Results - With a Join

So far, so good. These have both been very small queries. What if we want to look at some aggregated data. What then? Turns out that it’s just as easy to do as any other query.

select l.league_name, s.year, avg(s.home_score)
from rc_standings s
join leagues l on s.home_league_id = l.league_id
group by l.league_name, s.year;

Now We're Using GROUP BY

Once again, Hive is responding exactly as we would expect. That’s because HiveQL was designed to be as close to SQL as possible. The Hive engine is transforming our HiveQL into MapReduce jobs in the back end. Depending on the type of queries that we’re running, Our queries may execute on as few as one node in the cluster or as many as all nodes in the cluster.

While HiveSQL looks like SQL, some additions have been made that make sense for Hive – SORT BY guarantees that the order of results within a single reducer while ORDER BY guarantees the order of results in the total output. There is also a DISTRIBUTE BY command determines how our data will be distributed to the reducers that are handling the query.


The pig latin, shell script, and HiveSQL files used in this script can be found here, on Amazon S3.

The source data can be found here. Please note that you’ll need 7-zip to get to the source data. Which, by the way, was obtained free of charge and is copyrighted by Retrosheet. For more information, please contact www.retrosheet.org.

Querying Hive with Toad for Cloud Databases

I recently talked about using the Toad for Cloud Databases Eclipse plug-in to query an HBase database. After I finished up the video, I did some work loading a sample dataset from Retrosheet into my local Hive instance.

This 7 minute tutorial shows you brand new functionality in the Toad for Cloud Databases Eclipse plug-in and how you can use it to perform data warehousing queries against Hive.

Querying HBase with Toad for Cloud Databases

We recently released a new version of Toad for Cloud Databases as an Eclipse plug-in. While this functionality has been in Toad for Cloud since the last release, this video shows Toad for Cloud running in Eclipse and demonstrates some basic querying against HBase using Toad for Cloud’s ability to translate between ANSI compliant SQL code and native database calls.

[media id=5 width=680 height=560]

If the video above doesn’t work for you, everything is available full size at NoSQLPedia – Querying HBase.

Facebook Messaging – HBase Comes of Age

Update: I want to thank Ben Black, Todd Lipcon, and Kelley Reynolds for pointing out the inaccuracies in the original post. I’ve gone through the references they provided and made corrections.

Facebook, the data giant of the internet, recently unveiled a new messaging system. To be fair, I would normally ignore feature roll outs and marketing flimflam, but Facebook’s announcement is worthy of attention not only because of the underlying database but how the move was done.

Historically Speaking

Long ago, the engineers at Facebook developed a product named Cassandra to meet their needs for inbox search. This was a single purpose application, and a lot of Facebook’s data continued to live in MySQL clusters. Cassandra solves the needs of the original Facebook Inbox search very well.

Building a New Product

Whenever we solve a new problem, it’s a good idea to investigate all of the options available before settling on a solution. By their own admission, the developers at Facebook evaluated at several platforms (including Cassandra, MySQL, and Apache HBase) before deciding to use Apache HBase. What makes this decision interesting is not just the reasons that Apache HBase was chosen, but also the reasons that MySQL and Cassandra were not chosen.

Why Not Use MySQL?

It’s not MySQL can’t scale. MySQL can clearly scale – many high traffic, high data volume sites and applications use MySQL. The problem comes when you are scaling a relational database to a massive size – Facebook estimate that they will be storing well over 135 billion messages every month (15 billion person-to-person messages plus 120 billion chat messages). Even if these messages were limited in size to a single SMS (160 characters), that’s 21,600,000,000,000 bytes of data per month. Or, as we say where I come from, a buttload of data.

When you start storing that amount of data, strange things start to happen. Updates to indexes take a long time, data statistics get updated very rarely. With a use case like tracking email and messaging, prolific users of the messaging platform could cause massive data skew which would cause queries for other users to perform poorly.

Data sharding is a difficult problem to solve with an RDBMS at any scale, but when you move to the massive scales that Facebook is targeting, data sharding moves from a difficult problem to an incredibly difficult problem. Sharding algorithms needs to be carefully chosen to ensure that data is spread evenly across all of the database servers in production and the algorithm needs to be flexible enough to ensure that new servers can be easily added into the database without disrupting the existing set up.

As Todd Lipcon points out in the comments, one of the issues with scaling an RDBMS is the write pattern of B+-tree indexes. Writing data to a B+-tree index with an arbitrarily increasing integer key is fairly trivial – it’s effectively an append to the index. Things get trickier when the index is used to support various searches. Keying by email address or subject line will cause rows to be inserted in the middle of the index leading to all kinds of undesirable side effects like page splits and index fragmentation. Performing maintenance on large indexes is problematic in terms of time to complete, CPU time, and I/O issues.

Why Not Use Cassandra?

If MySQL can’t handle this kind of load, why not use Cassandra? Cassandra was purpose built to handle Facebook’s Inbox Search feature, after all.

Cassandra, unfortunately, has it’s own problems dealing with scale. One of the difficulties scaling Cassandra is a poorly designed partitioning scheme. It is safe to assume that the Facebook engineering team has enough experience with Cassandra to know how to effectively partition their data. If partitioning data isn’t a likely problem, what is?

Is replication the problem?

When a record is written to Cassandra using the data is immediately, and asynchronously, written to every other server that is receiving a replica of the data. Cassandra allows you to tune the replication settings varying from no write confirmation through full write confirmation. Different consistency levels are appropriate for different workloads, but they all carry different risks. For example, using a consistency level of ConsistencyLevel.ALL would require every replica to respond before the write returns to the client as successful. Needless to say, this isn’t a recommended best practice. Benjamin Black (blog | twitter) informs me that the recommended best practice is to use ConsistencyLevel.ONE or ConsistencyLevel.QUORUM with a replication factor of 3.

Of course, thinking about Cassandra’s implementation could be a non-issue because the Facebook implementation of Cassandra is proprietary and may have very few similarities to the current open source Apache Cassandra codebase.

Cassandra’s replication can be configured to meet the needs of a variety of workloads and deployment patterns and we can rule that out as the reason to not choose Cassandra. In this post from Facebook Engineering, the only concrete statement is “We found Cassandra’s eventual consistency model to be a difficult pattern to reconcile for our new Messages infrastructure.” Cassandra is designed to be constantly available and eventually consistent – reads will always succeed but they may not always return the same data – the inconsistencies are resolved through a read repair process. HDFS (the data store underlying HBase) is designed to be strongly consistent, even if that consistency causes availability problems.

Choosing HBase

Cassandra is a known quantity at Facebook. Cassandra was, after all, developed at Facebook. The operations teams have a great deal of experience with supporting and scaling Cassandra. Why make the switch?


HBase’s replication model (actually, it’s the HDFS replication model) differs from Cassandra’s replication through replication pipelining. Replication pipelining makes it possible to have guarantees of data consistency – we know that every node will have the same data once a write has completed. We can guarantee write order (which is important for knowing when a message arrives in your inbox).

Because of HDFS’s data replication strategy, it’s possible to gain some automatic fault tolerance. The load will be pipelined through multiple servers – if any single server fails, the query can be served by another HDFS node with the data. Likewise, data replication makes it easy to handle node failures. If a node fails, queries for that specific region of data can be routed to one of the remaining replica servers. A new server can be added and the data from the failed server can easily be replicated to the new server.

In the spirit of full disclosure, this replication is not a feature specific to HBase/HDFS – Cassandra is able to do this as well. Todd Lipcon’s comment includes some information on the differences between HDFS and Cassandra replication. It’s important to remember that HBase/HDFS and Cassandra use two different replication methodologies. Cassandra is an highly available, eventually consistent system. HBase/HDFS is strongly consistent system. Each is appropriate for a certain types of tasks.


One of the criticisms of many NoSQL databases is that they do not log data before it is written. This is, clearly, not the case. Both HBase and Cassandra use a write ahead logger to make sure that data can be safely written to disk in the log before it is persisted to disk. This allows either data store, much like a relational database, to recover from crashes without having to write to disk every time a new row is inserted.

Existing Expertise

The technology behind HBase – Hadoop and HDFS – is very well understood and has been used previously at Facebook. During the development of their data warehouse, Facebook opted to use Hive. Hive is a set of tools for data analytics built on top of Hadoop and HDFS. While Hive performs very well with analytic queries on largely static data, it does not perform well with rapidly changing data. This makes Hive poorly suited for the new Social Messaging feature at Facebook. Since Hive makes use of Hadoop and HDFS, these shared technologies are well understood by Facebook’s operations teams. As a result, the same technology that allows Facebook to scale their data will be the technology that allows Facebook to scale their Social Messaging feature. The operations team already understands many of the problems they will encounter.

Other Technologies

There are, of course, other technologies involved, but the most interesting part to me is the data store. Facebook’s decision to use HBase is a huge milestone. HBase has come of age and is being used by a prominent customer. Facebook developers are contributing their improvements back to the open source community – they’ve publicly said that they are running the open source version of Hadoop.

Open Sourcing Sawzall – What Does It Mean?

For Data Analytics or automotive modification, you will find no finer tool.

While perusing twitter, I saw that Google has open sourced Sawzall, one of their internal tools for data processing. WTF does this mean?

Sawzall, WTF?

Apart from a tool that I once used to cut the muffler off of my car (true story), what is Sawzall?

Sawzall is a procedural language for analyzing excessively large data sets. When I say “excessively large data sets”, think Google Voice logs, utility meter readings, or the network traffic logs for the Chicago Public Library. You could also think of anything where you’re going to be crunching a lot of data over the course of many hours on your monster Dell R910 SQL Server.

There’s a lengthy paper about how Sawzall works, but I’ll summarize it really quickly. If you really want to read up on all the internal Sawzall goodness, you can check it out on Google code – Interpreting the Data: Parallel Analysis with Sawzall.

Spell It Out for Me

At its most basic, Sawzall is a MapReduce engine, although the Google documentation goes to great pains to not use the word MapReduce, so maybe it’s not actually MapReduce. It smells oddly like MapReduce to me.

I’ll go into more depth on the ideas behind MapReduce in the future, but here’s the basics of MapReduce as far as Sawzall is concerned:

  1. Data is split into partitions.
  2. Each partition is filtered. (This is the Map.)
  3. The results of the filtering operation are used by an aggregation phase. (This is the Reduce.)
  4. The results of the aggregation are saved to a file.

It’s pretty simple. That simplicity makes it possible to massively parallelize the analysis of data. If you’re in the RDBMS world, think Vertica, SQL Server Parallel Data Warehouse, or Oracle Exadata. If you are already entrenched and in love with NoSQL, you already know all about MapReduce and probably think I’m an idiot for dumbing it down so much.

The upside to Sawzall’s approach is that rather than write a Map program and a Reduce program and a job driver and maybe some kind of intermediate aggregator, you just write a single program in the Sawzall language and compile it.

… And Then?

I don’t think anyone is sure, yet. One of the problems with internal tools is that they’re part of a larger stack. Sawzall is part of Google’s internal infrastructure. It may emit compiled code, but how do we go about making use of those compiled programs in our own applications? Your answer is better than mine, most likely.

Sawzall uses something called Protocol Buffers – PB is a cross language way to efficiently move objects and data around between programs. It looks like Twitter is already using Protocol Buffers for some of their data storage needs, so it might only be a matter of time before they adopt Sawzall – or before some blogger opines that they might adopt Sawzall ;) .

So far nobody has a working implementation of Sawzall running on top of any MapReduce implementations – Hadoop, for instance. At a cursory glance, it seems like Sawzall could be used in Hadoop Streaming jobs. In fact, Section 10 of the Sawzall paper seems to point out that Sawzall is a record by record analytical language – your aggregator needs to be smart enough to handled the filtered records.

Why Do I Need Another Language?

This is a damn good question. I don’t program as much as I used to, but I can reasonably write code in C#, Ruby, JavaScript, numerous SQL dialects, and Java. I can read and understand at least twice as many languages. What’s the point of another language?

One advantage of a special purpose language is that you don’t have to worry shoehorning domain specific functionality into existing language constructs. You’re free to write the language the way it needs to be written. You can achieve a wonderful brevity by baking features into the language. Custom languages let developers focus on the problems at hand and ignore implementation details.

What Now?

You could download the code from the Google Code repository, compile it, and start playing around with it. It should be pretty easy to get up and running on Linux systems. OS X developers should look at these instructions from a helpful Hacker News reader. Windows developers should install Linux on a VM or buy a Mac.

Outside of downloading and installing Sawzall yourself to see what the fuss is about, the key is to keep watching the sky and see what happens.

Foursquare and MongoDB: What If

People have chimed in and talked about the Foursquare outage. The nice part about these discussions is that they’re focusing on the technical problems with the current set up and Foursquare. They’re picking it apart and looking at what is right, what went wrong, and what needs to be done differently in MongoDB to prevent problems like this in the future.

Let’s play a “what if” game. What if Foursquare wasn’t using MongoDB? What if they were using something else?


Riak is a massively scaleable key/value data store. It’s based on Amazon’s Dynamo. If you don’t want to read the paper, that just means that it uses some magic to make sure that data is evenly spread throughout the database cluster and that adding a new node makes it very easy to rebalance the cluster.

What would have happened at Foursquare if they had been using Riak?

Riak still suffers from the same performance characteristics around disk access as MongoDB – once you have to page to disk, operations become slow and throughput dries up. This is a common problem in any software that needs to access disks – disks are slow, RAM is fast.

Riak, however, has an interesting distinction. It allocates keys inside the cluster using something called a consistent hash ring – this is just a convenient way to rapidly allocate ranges of keys to different nodes within a cluster. The ring itself isn’t interesting. What’s exciting is that the ring is divided into partitions (64 by default). Every node in the system claims an equal share of those 64 partitions. In addition, each partition is replicated so there are always three copies of a give key/value pair at any time. Because there are multiple copies of the data, it’s unlikely that any single node will fail or become unbalanced. In theory, if Foursquare had used Riak it is very unlikely that we’ll run into a problem were a single node becomes full.

How would this consistent hash ring magical design choice have helped? Adding a new node causes the cluster to redistribute data in the background. The new node will claim and equal amount of space in the cluster and the data will be redistributed from the other nodes in the background. The same thing happens when a node fails, by the way. Riak also only stores keys in memory, not all of the data. So it’s possible to reference an astronomical amount before running out of RAM.

There’s no need to worry about replica sets, re-sharding, or promoting a new node to master. Once a node joins a riak cluster, it takes over its share of the load. As long as you have the network throughput on your local network (which you probably do), then this operation can be fairly quick and painless.


Cassandra, like Riak, is based on Amazon’s Dynamo. It’s a massively distributed data store. I suspect that if Foursquare had used Cassandra, they would have run into similar problems.

Cassandra makes use of range partitioning to distribute data within the cluster. The Foursquare database was keyed off of the user name which, in turn, saw abnormal growth because some groups of users were more active than others. Names also tend to clump around certain letters, especially when you’re limited to a Latin character set. I know a lot more Daves that I know Zachariahs, and I have a lot of friends whose names start with the letter L. This distribution causes data within the cluster to be overly allocated to one node. This would, ultimately, lead to the same problems that happened at Foursquare with their MongoDB installation.

That being said, it’s possible to use a random partitioner for the data in Cassandra. The random partitioner makes it very easy to add additional nodes and distribute data across them. The random partitioner comes with a price. It makes it impossible to do quick range slice queries in Cassandra – you can no longer say “I want to see all of the data for January 3rd, 2010 through January 8th, 2010”. Instead, you would need to build up custom indexes to support your querying and build batch processes to load the indexes. The tradeoffs between the random partitioner and the order preserving partitioner are covered very well in Dominic Williams’s article Cassandra: RandomPartitioner vs OrderPreservingPartitioner.

Careful use of the random partitioner and supporting batch operations could have prevented the outage that Foursquare saw, but this would have lead to different design challenges, some of which may have been difficult to overcome without resorting to a great deal of custom code.


HBase is a distributed column-oriented database built on top of HDFS. It is based on Google’s Bigtable database as described in “Bigtable: A Distributed Storage System for Structured Data”. As an implementation of Bigtable, HBase has a number of advantages over MongoDB – write ahead logging, rich ad hoc querying, and redundancy

HBase is not going to suffer from the same node redistribution problems that caused the Foursquare outage. When it comes time to add a new node to the cluster data will be migrated in much larger chunks, one data file at a time. This makes it much easier to add a new data node and redistribute data across the network.

Just like a relational database, HBase is designed so that all data doesn’t need to reside in memory for good performance. The internal data structures are built in a way that makes it very easy to find data and return it to the client. Keys are held in memory and two levels of caching make sure that frequently used data will be in memory when a client requests it.

HBase also has the advantage of using a write ahead log. In the event of a drive failure, it is possible to recover from a backup of your HBase database and play back the log to make sure data is correct and consistent.

If all of this sounds like HBase is designed to be a replacement for an RDBMS, you would be close. HBase is a massively distributed database, just like Bigtable. As a result, data needs to be logged because there is a chance that a hardware node will fail and will need to be recovered. Because HBase is a column-oriented database, we need to be careful not to treat it exactly like a relational database, but

A Relational Database

To be honest, Foursquare’s data load would be trivial in any relational database. SQL Server, Oracle, MySQL, and PostgreSQL can all handle orders of magnitude more data than the 132GB of data that Foursquare was storing at the time of the outage. This begs the question “How we could handle the constant write load?” Foursquare is a write-intensive application.

Typically, in the relational database world, when you need to scale read and write loads we add more disks. There is a finite amount of space in a server chassis and these local disks don’t provide the redundancy necessary for data security and performance; software RAID is also CPU intensive and slow. A better solution is to purchase a dedicated storage device, either a SAN, NAS, or DAS. All of these devices offer read/write caching and can be configured with in a variety of RAID levels for performance and redundancy.

RDBMSes are known quantities – they are easy to scale to certain points. Judging by the amount of data that Foursquare reports to have, they aren’t likely to reach the point where an RDBMS can no longer scale for a very long time. The downside to this approach is that an RDBMS is much costlier per TB of storage (up to ten times more expensive) than using MongoDB, but if your business is your data, then it’s important to keep the data safe.


It’s difficult to say if a different database solution would have prevented the Foursquare outage. But it is a good opportunity to highlight how different data storage systems would respond in the same situation.

Hadoop World Follow Up

I should have written this right when I got back from Hadoop World, instead of a week or so later, but things don’t always happen the way you plan. Before I left to go to Hadoop World (and points in between), I put up a blog post asking for questions about Hadoop. You guys responded with some good questions and I think I owe you answers.

What Is Hadoop?

Hadoop isn’t a simple database; it’s a bunch of different technologies built on top of the Hadoop common utilities, MapReduce, and HDFS (Hadoop Distributed File System). Each of these products serves a simple purpose – HDFS handles storage, MapReduce is a parallel job distribution system, HBase is a distributed database with support for structured tables. You can find out more on the Apache Hadoop page. I’m bailing on this question because 1) it wasn’t asked and 2) I don’t think it’s fair or appropriate for me to regurgitate these answers.

How Do You Install It?

Installing Hadoop is not quite as easy as installing Cassandra. There are two flavors to choose from:

Cloudera flavors

Homemade flavors

  • If you want to run Hadoop natively on your local machine, you can go through the single node setup from the Apache Hadoop documentation.
  • Hadoop The Definitive Guide also has info on how to get started.
  • Windows users, keep in mind that Hadoop is not supported on Windows in production. If you want to try it, you’re completely on your own and in uncharted waters.

What Login Security Model(s) Does It Have?

Good question! NoSQL databases are not renowned for their data security.

Back in the beginning, at the dawn of Hadoopery, it was assumed that everyone accessing the system would be a trusted individual operating inside a secured environment. Clearly this happy ideal won’t fly in a modern, litigation fueled world. As a result, Hadoop has support for a Kerberos authentication (now meaning all versions of Hadoop newer than version 0.22 for the Apache Hadoop distribution, Cloudera’s CDH3 distribution, or the 0.20.S Yahoo! Distribution of Hadoop). The Kerberos piece handles the proper authentication, but it is still up to Hadoop and HDFS(it’s a distributed file system, remember?) to make sure that an authenticated user is authorized to mess around with a file.

In short: Kerberos guarantees that I’m Jeremiah Peschka and Hadoop+HDFS guarantee that I can muck with data.

N.B. As of 2010-10-19 (when I’m writing this), Hadoop 0.22 is not available for general use. If you want to run Hadoop 0.22, you’ll have to build from the source control repository yourself. Good luck and godspeed.

When Does It Make Sense To Use Hadoop Instead of SQL Server, Oracle, or DB2?

Or any RDBMS for that matter. Personally, I think an example makes this easier to understand.

Let’s say that we have 1 terabyte of data (the average Hadoop cluster is reported as being 114.5TB in size) and we need to process this data nightly – create reports, analyze trends, detect anomalous data, etc. If we are using an RDBMS, we’d have to batch this data up (to avoid transaction log problems). We would also need to deal with the limitations of parallelism in your OS/RDBMS combination, as well as I/O subsystem limitations (we can only read so much data at one time). SQL dialects are remarkably bad languages for loop and flow control.

If we were using Hadoop for our batch operations, we’d write a MapReduce program (think of it like a query, for now). This MapReduce program would be distributed across all of the nodes in your cluster and then run in parallel using local storage and resources. So instead of hitting a single SAN across 8 or 16 parallel execution threads, we might have 20 commodity servers all processing 1/20 of the data simultaneously. Each server is going to process 50GB. The results will be combined once the job is done and then we’re free to do whatever we want with it – if we’re using SQL Server to store the data in tables for report, we would probably bcp the data into a table.

Another benefit of Hadoop is that the MapReduce functionality is implemented in Java, C, or another imperative programming language. This makes it easy to solve computationally intensive operations in MapReduce programs. There are a large number of programming problems that cannot be easily solved in a SQL dialect; SQL is designed for retrieving data and performing relatively simple transformations on it, not for complex programming tasks.

Hadoop (Hadoop common + HDFS + MapReduce) is great for batch processing. If you need to consume massive amounts of data, it’s the right tool for the job. Hadoop is being used in production for point of sale analysis, fraud detection, machine learning, risk analysis, and fault/failure prediction.

The other consideration is cost: Hadoop is deployed on commodity servers using local disks and no software licensing fees (Linux is free, Hadoop is free). The same thing with an RDBMS is going to involve buying an expensive SAN, a high end server with a lot of RAM, and paying licensing fees to Microsoft, Oracle, or IBM as well as any other vendors involved. In the end, it can cost 10 times less to store 1TB of data in Hadoop than in an RDBMS.

HBase provides the random realtime read/write that you might be used to from the OLTP world. The difference, however, is that this is still NoSQL data access. Tables are large and irregular (much like Google’s Big Table) and there are no complex transactions.

Hive is a data warehouse toolset that sits on top of Hadoop. It supports many of the features that you’re used to seeing in SQL, including a SQL-ish query language that should be easily learned but also provides support for using MapReduce functionality in ad hoc queries.

In short, Hadoop and SQL Server solve different sets of problems. If you need transactional support, complex rule validation and data integrity, use an RDBMS. If you need to process data in parallel, perform batch and ad hoc analysis, or perform computationally expensive transformations then you should look into Hadoop.

What Tools Are Provided to Manage Hadoop?

Unfortunately, there are not a lot of management tools on the market for Hadoop – the only tools I found were supplied by Cloudera. APIs are available to develop your own management tools. From the sound of discussions that I overheard, I think a lot of developers are going down this route – they’re developing monitoring tools that meet their own, internal, needs rather than build general purpose tools that they could sell to third parties. As the core product improves, I’m sure that more vendors will be stepping up to the plate to provide additional tools and support for Hadoop.

Right now, there are a few products on the market that support Hadoop:

  • Quest’s Toad for Cloud makes it easy to query data stored using HBase and Hive.
  • Quest’s OraOop is an Oracle database driver for Sqoop – you can think of Sqoop as Hadoop’s equivalent of SQL Server’s bcp program
  • Karmasphere Studio is a tool for writing, debugging, and watching MapReduce jobs.

What Is The State of Write Consistency?

As I mentioned earlier, there is no support for ACID transactions. On the most basic level, Hadoop processes data in bulk from files stored in HDFS. When writes fail, they’re retried until the minimum guaranteed number of replicas is written. This is a built-in part of HDFS, you get this consistency for free just by using Hadoop.

As far as eventual consistency is concerned, Hadoop uses an interesting method to ensure that data is quickly and effectively written to disk. Basically, HDFS finds a place on disk to write the data. Once that write has completed, the data is forwarded to another node in the cluster. If that node fails to write the data, a new node is picked and the data is written until the minimum number of replicas have had data written to them. There are additional routines in place that will attempt to spread the data throughout the data center.

If this seems like an overly simple explanation of how HDFS and Hadoop write data, you’re right. The specifics of write consistency can be found in the HDFS Architecture Guide (which is nowhere near as scary as it sounds).

I’m Going to Hadoop World

Sounds like I’m bragging, right? There is a free book involved. Or maybe you don’t care because it’s all NoSQLs and stuff and you’re a SQL Server DBA. And that’s where we differ.

When I first heard about NoSQL databases, I had the same reaction that a lot of people are having right now: disbelief and mockery. I remember making fun of MySQL when I first ran into it. It was such an odd database: it didn’t have foreign keys, joins didn’t work, it sometimes ate all of your data, and writes put locks on tables. It was no comparison for SQL Server, Oracle, or PostgreSQL. When I hit publish, this blog post is going to get dumped into a MySQL database on my server. I’ll probably hit up slashdot or read some blogs. That’s going to hit MySQL as well. These days, I try to keep a passing familiarity with MySQL because you never know when you’re going to need to use such a cheap, powerful, tool.

I see a filthy commie on the other side of this intertubes

This is what I think about NoSQL databases; especially Hadoop.

So, I’m going to Hadoop World and I’m going to learn as much as I can about Hadoop and the technology that surrounds it. The fun part is that I’m going to take some notes. And I’m going to share those notes.

That’s all well and good. As Senator Joe McCarthy used to say, “sharing is caring and caring is for commies.”

So, instead of just going and learning about what is interesting to me, I want to know what kind of questions DBAs have about this new-fangled Hadoop thing. I want to find the answers to the questions that people have about Hadoop, its place in the enterprise, and how it may or may not change the DBA’s job.

So, sound off in the comments. If you’ve got a question, type in the box and hit send. I’ll give you props when I post my write up of Hadoop World and I’ll do my best to find the answer.

I owe this entire idea to Andy Warren (blog | twitter). He mentioned that he was curious about Hadoop and that sparked this blog post. Here are his questions:

  • How do you install it?
  • What login security model(s) does it have
  • When does it make sense to use Hadoop vs SQL Server?
  • What tools are provided to manage it?

This site is protected with Urban Giraffe's plugin 'HTML Purified' and Edward Z. Yang's Powered by HTML Purifier. 531 items have been purified.