Tag HBase

Data Durability

A friend of mine half-jokingly says that the only reason to put data into a database is to get it back out again. In order to get data out, we need to ensure some kind of durability.

Relational databases offer single server durability through write-ahead logging and checkpoint mechanisms. These are tried and true methods of writing data to a replay log on disk as well as caching writes in memory. Whenever a checkpoint occurs, dirty data is flushed to disk. The benefit of a write ahead log is that we can always recover from a crash (so long as we have the log files, of course).

How does single server durability work with non-relational databases? Most of them don’t have write-ahead logging.

MongoDB currently has limited single server durability. While some people consider this a weakness, it has some strengths – writes complete very quickly since there is no write-ahead log that needs to immediately sync to disk. MongoDB also has the ability to create replica sets for increased durability. There is one obvious upside to replica sets – the data is in multiple places. Another advantage of replica sets is that it’s possible to use getLastError({w:...}) to request acknowledgement from multiple replica servers before a write is reported as complete to a client. Just keep in mind that getLastError is not used by default – application code will have to call the method to force the sync.

Setting a w-value for writes is something that was mentioned in Getting Faster Writes with Riak. Although, in that article we were decreasing durability to increase write performance. In Amazon Dynamo inspired systems writes are not considered complete until multiple clients have responded. The advantage is that durable replication is enforced at the database and clients have to elect to use less security for the data. Refer to the Cassandra documentation on Writes and Consistency or the Riak Replication documentation for more information on how Dynamo inspired replication works. Datastores using HDFS for storage can take advantage of HDFS’s built-in data replication.

Even HBase, a column-oriented database, uses HDFS to handle data replication. The trick is that rows may be chopped up based on columns and split into regions. Those regions are then distributed around the cluster on what are called region servers. HBase is designed for real-time read/write random-access. If we’re trying to get real-time reads and writes, we can’t expect HBase to immediately sync files to disk – there’s a commit log (RDBMS people will know this as a write-ahead log). Essentially, when a write comes in from a client, the write is first written to the commit log (which is stored using HDFS), then it’s written in memory and when the in-memory structure fills up, that structure is flushed to the filesystem. Here’s something cunning: since the commit log is being written to HDFS, it’s available in multiple places in the cluster at the same time. If one of the region servers goes down it’s easy enough to recover from – that region server’s commit log is split apart and distributed to other region servers which then take up the load of the failed region server.

There are plenty of HBase details that have been grossly oversimplified or blatantly ignored here for the sake of brevity. Additional details can be found in HBase Architecture 101 – Storage as well as this Advanced HBase presentation. As HBase is inspired by Google’s big table, additional information can be found in Chang et al. Bigtable: A distributed storage system for structured data and The Google File System.

Interestingly enough, there is a proposed feature for PostgreSQL 9.1 to add synchronous replication to PostgreSQL. Current replication in PostgreSQL is more like asynchronous database mirroring in SQL Server, or the default replica set write scenario with MongoDB. Synchronous replication makes it possible to ensure that data is being written to every node in the RDBMS cluster. Robert Haas discusses some of the pros and cons of replication in PostgreSQL in his post What Kind of Replication Do You Need?.

Microsoft’s Azure environment also has redundancy built in. Much like Hadoop, the redundancy and durability is baked into Azure at the filesystem. Building the redundancy at such a low level makes it easy for every component of the Azure environment to use it to achieve higher availability and durability. The Windows Azure Storage team have put together an excellent overview. Needless to say, Microsoft have implemented a very robust storage architecture for the Azure platform – binary data is split into chunks and spread across multiple servers. Each of those chunks is replicated so that there are three copies of the data at any given time. Future features will allow for data to be seamlessly geographically replicated.

Even SQL Azure, Microsoft’s cloud based relational database, takes advantage of this replication. In SQL Azure when a row is written in the database, the write occurs on three servers simultaneously. Clients don’t even see an operation as having committed until the filesystem has responded from all three locations. Automatic replication is designed into the framework. This prevents the loss of a single server, rack, or rack container from taking down a large number of customers. And, just like in other distributed systems, when a single node goes down, the load and data are moved to other nodes. For a local database, this kind of durability is typically only obtained using a combination of SAN technology, database replication, and database mirroring.

There is a lot of solid technology backing the Azure platform, but I suspect that part of Microsoft’s ultimate goal is to hide the complexity of configuring data durability from the user. It’s foreseeable that future upgrades will make it possible to dial up or down durability for storage.

While relational databases are finding more ways to spread load out and retain consistency, there are changes in store for MongoDB to improve single server durability. MongoDB has been highly criticized for its lack of single server durability. Until recently, the default response has been that you should take frequent backups and write to multiple replicas. This is still a good idea, but it’s promising to see that the MongoDB development team are addressing single server durability concerns.

Why is single server durability important for any database? Aside from guaranteeing that data is correct in the instance of a crash, it also makes it easier to increase adoption of a database at the department level. A durable single database server makes it easy to build an application on your desktop, deploy it to the server under your desk, and move it into the corporate data center as the application gains importance.

Logging and replication are critical technologies for databases. They guarantee data is durable and available. There are also just as many options as there are databases on the market. It’s important to understand the requirements of your application before choosing mechanisms to ensure durability and consistency across multiple servers.

References

Getting Started with Hive

This is a follow up to the last two posts I made about querying HBase and Hive. The set up for this was a bit trickier than I would have liked, so I’m documenting my entire process for three reasons.

  1. To remind myself for the next time I have to do this.
  2. To help someone else get started.
  3. In the hope that someone will know a better way and help me improve this.

Installing Hadoop and Hive

This was the easiest part of the installation process. I used the Cloudera distribution of Hadoop for everything in this stack. There are a few ways to go about getting the Cloudera Distribution of Hadoop (CDH for short) on your machine. The Hadoop Quick Start is a good place to get started.

Java

I’m pretty sure we’ve covered this before, but you need to have Java 1.6 installed in order to use any part of the Hadoop ecosystem.

Windows

If you’re using Windows, I recommend using the Hadoop VMWare image that Cloudera make available. If you want to run Hadoop natively on Windows, you need to set up Cygwin and creating a UNIX-like environment on your local machine. Setting. up Cygwin can be a bit tricky the first time you do it. Luckily, you’re not alone and documentation does exist: Setting up a Single-Node Hadoop “Cluster” on Windows XP.

Linux

Linux users have it the easiest out of any platform, in my opinion. Most of the documentation assumes that you’re running some UNIX-based operating system. If you’re willing to use the CDH version of Hadoop, you can tell your package manager to talk to the Cloudera repository. Cloudera have thoughtfully provided documentation for you in the installation guide.

You issue a few commands and you’ll have a working Hadoop set up on your machine in no time.

Other *NIX

This is the route that I took with OS X. You could also do the same on Linux if you want to use the stock distribution of Hadoop. The Hadoop Quick Start guide covers this in some detail, but here’s what I did:

wget http://archive.cloudera.com/cdh/3/hadoop-0.20.2+737.tar.gz
wget http://archive.cloudera.com/cdh/3/hive-0.5.0+32.tar.gz
wget http://archive.cloudera.com/cdh/3/pig-0.7.0+9.tar.gz

I copied these three files into my /opt folder and uncompressed them. As you can see in the screenshot, I renamed the hadoop, hive, and pig folders to use a cdh_ prefix. This makes it easier for me to identify where the installation came from, just in case I need to upgrade or raise a stink with someone.

Aliasing Hadoop for Easy Upgrades

Configuration

Nothing is ever quite as easy as it seems at first, right? Now that we have our software installed, we need to supply some configuration information. Hadoop has three modes.

  1. Standalone In standalone mode – nothing runs as a service or daemon. This requires no configuration on our part, but it does make things trickier with Hive because only one Hive database connection can exist at a time in standalone mode.
  2. Pseudo-distributed This is what we’re aiming for. In pseudo-distributed mode everything is running on one machine as a service process. What we’re actually doing (sort of) is configuring our Hadoop installation to a be a one node cluster. It’s not reliable or failure proof (or a good idea), but it does let us develop on our workstations and laptops instead of buying a lot of servers for the house.
  3. Fully distributed This is a real live cluster running on many servers. Books are being written about this.

Hadoop is, by default, set up to run in standalone mode. Having a standalone Hadoop/HDFS installation is nice, but it doesn’t do what we need in the long-term.

There are Linux packages for CDH that will set up your server to run in pseudo-distributed mode. If you used one of those, you can skip ahead. Personally, I’d stick around just so you can read up on what you’re missing and so you make me feel better (I’ll know if you skip ahead).

core-site.xml

The core-site.xml file is used for machine specific configuration details. The default configuration is set up in core-default.xml. Settings in core-default.xml are overridden by core-site.xml, so there’s no reason to go messing around in there. If you want to check out the contents of the file, it’s located in build/classes/core-default.xml.

Once you’ve opened, or created your own core-site.xml, you want to put appropriate information in there. Mine looks like this:

<?xml version="1.0"?>
<?xml-stylesheet type="text/xsl" href="configuration.xsl"?>

<!-- Put site-specific property overrides in this file. -->

<configuration>

  <property>
    <name>fs.default.name</name>
    <value>hdfs://localhost:9000</value>
    <description>The name of the default file system.  A URI whose
    scheme and authority determine the FileSystem implementation.  The
    uri's scheme determines the config property (fs.SCHEME.impl) naming
    the FileSystem implementation class.  The uri's authority is used to
    determine the host, port, etc. for a filesystem.</description>
  </property>

  <property>
    <name>mapred.tasktracker.tasks.maximum</name>
    <value>8</value>
    <description>The maximum number of tasks that will be run simultaneously by a
    a task tracker
    </description>
  </property>

  <property>
    <name>dfs.replication</name>
    <value>1</value>
    <description>Default block replication.
    The actual number of replications can be specified when the file is created.
    The default is used if replication is not specified in create time.
    </description>
  </property>
</configuration>

I’ll summarize these three settings here. A full explanation of the core-site.xml file can be found in the Hadoop wiki: http://hadoop.apache.org/common/docs/current/cluster_setup.html.

fs.default.name

This setting is telling Hadoop to use HDFS running on the local host for storage. This could also be a local file or something in Amazon S3.

mapred.tasktracker.tasks.maximum

We want to set this to a sane value because this is the maximum number of MapReduce tasks that Hadoop will fire up. If we set this value too high, Hadoop could theoretically start up a large number of tasks and overwhelm our CPU.

dfs.replication

Non-relational databases like Hadoop get their resiliency by replicating data throughout the database cluster. Since we’re running in pseudo-distributed mode (it’s a cluster with only one node) we want to set this to 1.

hadoop-env.sh

I changed a few variables in my hadoop-env.sh file based on settings I found in the Hadoop Wiki article Running Hadoop on OS X 10.5 64-bit). You can set these environment variables anywhere in your operating system, but since they’re Hadoop-specific, it’s probably best to set them in the hadoop-env.sh file. Based on my own experience, I did not change the HADOOP_CLASSPATH variable. Here’s what the first 15 lines of my hadoop-env.sh looks like:

# Set Hadoop-specific environment variables here.

# The only required environment variable is JAVA_HOME.  All others are
# optional.  When running a distributed configuration it is best to
# set JAVA_HOME in this file, so that it is correctly defined on
# remote nodes.

# The java implementation to use.  Required.
export JAVA_HOME=/System/Library/Frameworks/JavaVM.framework/Versions/1.6.0/Home/

# Extra Java CLASSPATH elements.  Optional.
# export HADOOP_CLASSPATH="<extra_entries>:$HADOOP_CLASSPATH"

# The maximum amount of heap to use, in MB. Default is 1000.
export HADOOP_HEAPSIZE=2000

I’ve also added some additional configuration to my .profile that make working with Hadoop and Hive a lot easier:

export CLASSPATH=$CLASSPATH:/Library/PostgreSQL/9.0/share/java/postgresql-9.0-801.jdbc4.jar

export JAVA_HOME=$(/usr/libexec/java_home)
export HADOOP_HOME=/opt/hadoop
export HADOOP_CONF_DIR=$HADOOP_HOME/conf
export PATH=$HADOOP_HOME/bin:$PATH
export HIVE_HOME=/opt/hive
export PATH=$HIVE_HOME/bin:$PATH
export PIG_HOME=/opt/pig
export PIGDIR=$PIG_HOME
export PATH=$PIG_HOME/bin:$PATH
export HIVE_AUX_JARS_PATH=/Library/PostgreSQL/9.0/share/java/

# CLASSPATH variables don't seem to like symbolic links on OS X
# setting up the CLASSPATH here makes it easy to change things around
# in the future as I upgrade between Hadoop/Hive/HBase/Pig versions
hadoop_dir=$HADOOP_HOME;
hadoop_version=0.20.2+737;
pig_dir=$PIG_HOME;
pig_version=0.7.0+16;

export CLASSPATH=$CLASSPATH:${hadoop_dir}/hadoop-${hadoop_version}-core.jar:${hadoop_dir}/hadoop-${hadoop_version}-tools.jar:${hadoop_dir}/hadoop-${hadoop_version}-ant.jar:${hadoop_dir}/lib/commons-logging-1.0.4.jar:${pig_dir}/pig-${pig_version}.jar:${pig_dir}/pig-${pig_version}-core.jar

One Last Step

Before we can start anything running on Hadoop we have to get our namenode ready. The namenode is part of Hadoop that manages information for the distributed filesystem. A namenode knows where all of the data is stored across our distributed filesystem. Without a running namenode, we can’t find the files we’re storing.

Before we get started, we have to format our filesystem. This process creates empty directories to be used for the namenode’s data. Datanodes aren’t involved because they can be added and removed from the cluster dynamically.

hadoop namenode -format

You’ll see some output from this command and, once it’s all said and done, you’ll have a formatted namenode that’s ready to go.

Starting Hadoop

Once you have everything set up and installed, you should be able to start Hadoop by running start-all.sh from the command prompt. You should see some messages that tell you a bunch of nodes are being started with output going to a specific directory.

Starting Hadoop on the Command Prompt

Verifying That It’s Running

You can verify that everything is configured and running hadoop dfs -ls / on the command line. You should see a basic directory listing that looks something like this image.

Verifying that Hadoop is Running

What About Hive?

Astute readers will notice that we’ve only configured Hadoop and haven’t done anything to get Hive up and running. This is, unfortunately, a separate task. Hadoop and Hive are separate tools, but they’re part of the same tool chain.

Configuring the Metastore

Hive, like Hadoop, uses XML configuration files. But, since Hive is a database, it also needs a place to store metadata about the database – it’s called the metastore. By default, Hive uses what’s called an embedded metastore. The problem with the default configuration is that it only allows for one user or process at a time to connect to a Hive database. This isn’t that good for production databases. It’s also no good if you’re like me and forget which virtual desktop you left that query window open on. We’re going to fix this.

It’s possible to store the metastore’s information in a persistent external database. Hive comes configured to use the Derby database, however any JDBC-compliant database can be used by setting a few options. This is precisely what we’re going to do. There’s a delicious irony in storing one database’s configuration settings in another database. Since I’m a huge fan of PostgreSQL, that’s what we’re going to use to get started.

Creating the Metastore User

I don’t want Hive talking to PostgreSQL as me, or any other random user account, so we’re going to set up a new user.

CREATE USER hadoop WITH PASSWORD 'hadoop';
Creating the Metastore Database

Now that we have our user, let’s create the database:

CREATE DATABASE hadoop WITH OWNER = hadoop;
Database Driver

Of course, we also need a database driver. Up in the changes to my .profile I set up a HIVE_AUX_JARS_PATH variable. This is a : delimited path where we can tell Hive to look for extra functionality to load. This folder specifically contains the PostgreSQL JDBC driver. You can download the appropriate PostgreSQL JDBC driver from the PostgreSQL JDBC driver site. Appropriate documentation is also there. It’s pretty much as simple as copying the JAR file into an appropriate directory.

hive-site.xml

Now that we’ve created our metastore user and database as well as installed a JDBC driver, the last thing to do is to make some changes in hive-site.xml. The hive-site.xml overrides the settings in hive-default.xml – just like the Hadoop configuration we did earlier. One thing to keep in mind is that you can also override Hadoop configuration in this file as well. We’re not going to, but it’s good to know that it’s possible.

<?xml version="1.0"?>
<?xml-stylesheet type="text/xsl" href="configuration.xsl"?>

<configuration>
    <property>
        <name>hive.root.logger</name>
        <value>INFO,console</value>
    </property>
    <property>
        <name>org.jpox.autoCreateSchema</name>
        <value>true</value>
    </property>
    <property>
        <name>hive.metastore.local</name>
        <value>true</value>
        <description>controls whether to connect to remote metastore server or open a new metastore server in Hive Client JVM</description>
    </property>
    <property>
      <name>javax.jdo.option.ConnectionURL</name>
      <value>jdbc:postgresql://localhost:5432/hadoop</value>
    </property>
    <property>
      <name>javax.jdo.option.ConnectionDriverName</name>
      <value>org.postgresql.Driver</value>
    </property>
    <property>
      <name>javax.jdo.option.ConnectionUserName</name>
      <value>hadoop</value>
    </property>
    <property>
      <name>javax.jdo.option.ConnectionPassword</name>
      <value>hadoop</value>
    </property>
</configuration>

The javax.dbo.option.* settings are where we set up our metastore database connection. Everything else should be, largely, self-explanatory.

Creating Tables

Let’s go ahead and create a table in Hive:

CREATE TABLE leagues (
  league_id STRING,
  league_name STRING
)
ROW FORMAT DELIMITED
  FIELDS TERMINATED BY '\t';

This creates a table named leagues with two columns: league_id and league_name. The last two lines might look a bit funny to people familiar with SQL. They are specific to the Hive Query Language (HiveQL). These lines say that each row in the underlying data file will be stored as tab-delimited text.

Getting Data Into Hive

We’ve configured Hadoop and Hive and we’ve created a table. How do we get data into the table? That’s a trickier matter.

We’re going to be using sample data from Retrosheet to populate our Hive database. You should get started downloading and keep reading, it takes a bit of time.

Loading Data With Pig

Pig is a way to execute data flows written in a language called Pig Latin. In this use case, I’ve written very primitive data clean-up scripts using Pig Latin (it would have been just as easy, if not easier, to write them in Ruby or Perl).

The first section of this Pig Latin script registers the PiggyBank contributed functionality and creates convenient aliases for the functions. We use DEFINE to set up the aliases so we don’t have to type the fully qualified function name every time we want to use it. It’s a lot easier to type replace(state, 'Ohio', 'OH') than it is to type org.apache.pig.piggybank.evaluation.string.REPLACE(state, 'Ohio', 'OH').

REGISTER /opt/pig/contrib/piggybank/java/piggybank.jar;
-- Create a function alias because nobody wants to type in the full function 
-- name over and over again.
DEFINE replace org.apache.pig.piggybank.evaluation.string.REPLACE(); 
DEFINE substring org.apache.pig.piggybank.evaluation.string.SUBSTRING(); 
DEFINE s_split org.apache.pig.piggybank.evaluation.string.Split(); 
DEFINE reverse org.apache.pig.piggybank.evaluation.string.Reverse();



-- Read the first CSV file and dump it as a tab-separated file
league_codes = LOAD 'league-codes.csv'
USING PigStorage(',')
AS (league_id:chararray, league_name);

STORE league_codes 
INTO 'league-codes' 
USING PigStorage('\t');



names = LOAD 'retrosheet-CurrentNames.csv'
USING PigStorage(',')
AS (franchise_id:chararray,
    historical_franchise_id:chararray,
    league_id:chararray,
    division:chararray,
    location_name:chararray,
    nickname:chararray,
    alt_nickname:chararray,
    first_game_date:chararray,
    last_game_date:chararray,
    city:chararray,
    state:chararray);

-- We don't need to explicitly specify the PigStorage mechanism.
-- Pig will use tab separated files by default.
STORE names
INTO 'team_names';



-- It's our first transformation:
-- 1) Load the data from the text file.
-- 2) Remove double-quote characters from the data.
-- 3) Write the data to the filesystem.
ballparks = LOAD 'retrosheet-ballparkcodes.txt'
USING PigStorage(',')
AS (park_id:chararray,
    park_name:chararray,
    park_nickname:chararray,
    city:chararray,
    state:chararray,
    start_date:chararray,
    end_date:chararray,
    league_id:chararray,
    notes:chararray);

formatted_parks = foreach ballparks GENERATE
  park_id,
  park_name,
  park_nickname,
  city,
  state,
  start_date,
  end_date,
  league_id,
  replace(notes, '"', '');

STORE formatted_parks
INTO 'ballparks';



personnel = LOAD 'retrosheet-umpire.coach.ids.txt'
USING PigStorage(',')
AS (last_name:chararray,
    first_name:chararray,
    person_id:chararray,
    debut_date:chararray
    );

-- the debut date is the only weirdly formatted date in here, so we'll reformat
-- it to be sortable in YYYYMMDD format
--
-- I had originally intended to do something with this, but I never
-- got around to writing it. As it stands, it's a good way to show 
-- how the string splitting function works (it returns an array)
-- and how that can be combined with flatten to turn it into a tuple
split_personnel = FOREACH personnel GENERATE
  last_name,
  first_name,
  person_id,
  flatten(s_split(debut_date, '/'))
AS (  
    month:chararray,
    day:chararray,
    year:chararray);

-- These subsequent x_, y_, z_ stages load our personnel 
-- relations into intermediate relations while we're doing
-- work on the data. This could have been done in a single
-- step, but it's much easier to read this way.
x_personnel = FOREACH split_personnel GENERATE
  last_name,
  first_name,
  person_id,
  reverse(CONCAT('000', month)) AS month, 
  reverse(CONCAT('000', day)) AS day, 
  year;

y_personnel = FOREACH x_personnel GENERATE
  last_name,
  first_name,
  person_id,
  reverse(substring($3, 0, 2)) AS month,
  reverse(substring($4, 0, 2)) AS day,
  year;

formatted_personnel = FOREACH y_personnel GENERATE
  last_name,
  first_name,
  person_id,
  CONCAT(CONCAT(year, month), day) AS debut;

STORE formatted_personnel
INTO 'personnel';

You might think that each of the STORE statements would produce a file named personnel or ballparks. Unfortunately, you’d be wrong. You see, Pig turns our Pig Latin script into a MapReduce job on the fly. The final output is written into the appropriately named folder in a part file. The part files have names like part-m-00000. This tells us that it’s part of a MapReduce job, specifically the map phase, and it’s the first part. If there were multiple map operations that were performed (if we had a full cluster), there would be multiple files with incrementing part numbers.

There is a second Pig Latin program that will load the standings, I’ve left that out of the article due to the sheer length of the code. However, it’s a relatively trivial script and should be easy enough to follow. The interesting thing to note is that the Pig Latin program to load the standings makes use of variable substitution. The driver script for this whole process, load_standings.sh, passes parameters to Pig on the command line and they are interpreted at run time.

For more information on Pig and Pig Latin, you should check out the Pig project page or the Pig Latin Cookbook.

Since we’ve formatted our data, how do we load it? It’s pretty simple to do with a Hive LOAD statement:

LOAD DATA LOCAL INPATH 'personnel/part-m-00000'
OVERWRITE INTO TABLE personnel ;

LOAD DATA LOCAL INPATH 'team_names/part-m-00000'
OVERWRITE INTO TABLE teams ;

LOAD DATA LOCAL INPATH 'ballparks/part-m-00000'
OVERWRITE INTO TABLE ballparks ;

LOAD DATA LOCAL INPATH 'league-codes/part-m-00000'
OVERWRITE INTO TABLE leagues ;

Because the data for the standings is separated out by year in the Retrosheet data set, we have to drive the insertion through a shell script:

echo "  loading Box Scores..."
for d in `ls -d stats/*`
  do
    year=${d:6:4}

    echo "    Loading box scores for $year"
    for f in `ls $d/part*`
    do
      `hive -e "LOAD DATA LOCAL INPATH '$f' INTO TABLE raw_standings PARTITION (year='$year'); "`;
    done;
  done;

This loops through the output folders for the standings one at a time and loads the data into Hive. The raw_standings uses partitioning to separate out the data by year. Similar to partitioning in a relational database, this lets us store data with the same partition key in the same physical file. This improves query performance when we’re querying on a limited range of data. In this case, we’re only partitioning by year, but it’s possible to partition on multiple columns for greater granularity.

To improve long term performance further, we’ve gone ahead and created a second table, rc_standings. The rc_standings table is partitioned by year, but it’s also using a ROW FORMAT setting to create an RCFile – a columnar files. The data in this table will be stored in a column-oriented layout. This makes it possible to skip reading from columns that aren’t used in our query. With a table as wide as the rc_standings table, that’s going to be beneficial for our query performance.

In order to get the data into the RCFile table, we have to copy it using a second driver in the shell script:

for d in `ls -d stats/*`
  do
    year=${d:6:4}
    # This only overwrites the $year partition in the standings table
    # We also make use of the REGEX Column Specification to avoid pulling back the
    #  partitioning column(s) in the query. 
    #  See http://wiki.apache.org/hadoop/Hive/LanguageManual/Select#REGEX_Column_Specification
    #  for more about the REGEX Column Specification.
    hive -S -e "INSERT OVERWRITE TABLE rc_standings PARTITION (year=$year) SELECT \`(year)?+.+\` FROM raw_standings WHERE year = $year;"
  done;

Loading the standings took an incredible amount of time on my test system, despite using an SSD and having 8GB of memory available. I’m not sure why, and I would love to know what I did wrong, so feel free to sound off in the comments so we can make this process faster for people to get started.

Querying Hive

Once we have the data loaded, querying becomes possible. To connect to hive, open up a command prompt and type hive. You should see blank hive prompt waiting for your commands.

The Hive Prompt

Let’s take a look at the contents of a single table. Type select * from leagues; and hit enter. You should see something like the following:

A Simple Query

Let’s try something trickier and use a JOIN.

select l.league_name, t.franchise_id, t.first_game_date, t.last_game_date, t.nickname
from leagues l
join teams t on l.league_id = t.league_id;

Query Results - With a Join

So far, so good. These have both been very small queries. What if we want to look at some aggregated data. What then? Turns out that it’s just as easy to do as any other query.

select l.league_name, s.year, avg(s.home_score)
from rc_standings s
join leagues l on s.home_league_id = l.league_id
group by l.league_name, s.year;

Now We're Using GROUP BY

Once again, Hive is responding exactly as we would expect. That’s because HiveQL was designed to be as close to SQL as possible. The Hive engine is transforming our HiveQL into MapReduce jobs in the back end. Depending on the type of queries that we’re running, Our queries may execute on as few as one node in the cluster or as many as all nodes in the cluster.

While HiveSQL looks like SQL, some additions have been made that make sense for Hive – SORT BY guarantees that the order of results within a single reducer while ORDER BY guarantees the order of results in the total output. There is also a DISTRIBUTE BY command determines how our data will be distributed to the reducers that are handling the query.

Resources

The pig latin, shell script, and HiveSQL files used in this script can be found here, on Amazon S3.

The source data can be found here. Please note that you’ll need 7-zip to get to the source data. Which, by the way, was obtained free of charge and is copyrighted by Retrosheet. For more information, please contact www.retrosheet.org.

Querying HBase with Toad for Cloud Databases

We recently released a new version of Toad for Cloud Databases as an Eclipse plug-in. While this functionality has been in Toad for Cloud since the last release, this video shows Toad for Cloud running in Eclipse and demonstrates some basic querying against HBase using Toad for Cloud’s ability to translate between ANSI compliant SQL code and native database calls.

[media id=5 width=680 height=560]

If the video above doesn’t work for you, everything is available full size at NoSQLPedia – Querying HBase.

Facebook Messaging – HBase Comes of Age

Update: I want to thank Ben Black, Todd Lipcon, and Kelley Reynolds for pointing out the inaccuracies in the original post. I’ve gone through the references they provided and made corrections.

Facebook, the data giant of the internet, recently unveiled a new messaging system. To be fair, I would normally ignore feature roll outs and marketing flimflam, but Facebook’s announcement is worthy of attention not only because of the underlying database but how the move was done.

Historically Speaking

Long ago, the engineers at Facebook developed a product named Cassandra to meet their needs for inbox search. This was a single purpose application, and a lot of Facebook’s data continued to live in MySQL clusters. Cassandra solves the needs of the original Facebook Inbox search very well.

Building a New Product

Whenever we solve a new problem, it’s a good idea to investigate all of the options available before settling on a solution. By their own admission, the developers at Facebook evaluated at several platforms (including Cassandra, MySQL, and Apache HBase) before deciding to use Apache HBase. What makes this decision interesting is not just the reasons that Apache HBase was chosen, but also the reasons that MySQL and Cassandra were not chosen.

Why Not Use MySQL?

It’s not MySQL can’t scale. MySQL can clearly scale – many high traffic, high data volume sites and applications use MySQL. The problem comes when you are scaling a relational database to a massive size – Facebook estimate that they will be storing well over 135 billion messages every month (15 billion person-to-person messages plus 120 billion chat messages). Even if these messages were limited in size to a single SMS (160 characters), that’s 21,600,000,000,000 bytes of data per month. Or, as we say where I come from, a buttload of data.

When you start storing that amount of data, strange things start to happen. Updates to indexes take a long time, data statistics get updated very rarely. With a use case like tracking email and messaging, prolific users of the messaging platform could cause massive data skew which would cause queries for other users to perform poorly.

Data sharding is a difficult problem to solve with an RDBMS at any scale, but when you move to the massive scales that Facebook is targeting, data sharding moves from a difficult problem to an incredibly difficult problem. Sharding algorithms needs to be carefully chosen to ensure that data is spread evenly across all of the database servers in production and the algorithm needs to be flexible enough to ensure that new servers can be easily added into the database without disrupting the existing set up.

As Todd Lipcon points out in the comments, one of the issues with scaling an RDBMS is the write pattern of B+-tree indexes. Writing data to a B+-tree index with an arbitrarily increasing integer key is fairly trivial – it’s effectively an append to the index. Things get trickier when the index is used to support various searches. Keying by email address or subject line will cause rows to be inserted in the middle of the index leading to all kinds of undesirable side effects like page splits and index fragmentation. Performing maintenance on large indexes is problematic in terms of time to complete, CPU time, and I/O issues.

Why Not Use Cassandra?

If MySQL can’t handle this kind of load, why not use Cassandra? Cassandra was purpose built to handle Facebook’s Inbox Search feature, after all.

Cassandra, unfortunately, has it’s own problems dealing with scale. One of the difficulties scaling Cassandra is a poorly designed partitioning scheme. It is safe to assume that the Facebook engineering team has enough experience with Cassandra to know how to effectively partition their data. If partitioning data isn’t a likely problem, what is?

Is replication the problem?

When a record is written to Cassandra using the data is immediately, and asynchronously, written to every other server that is receiving a replica of the data. Cassandra allows you to tune the replication settings varying from no write confirmation through full write confirmation. Different consistency levels are appropriate for different workloads, but they all carry different risks. For example, using a consistency level of ConsistencyLevel.ALL would require every replica to respond before the write returns to the client as successful. Needless to say, this isn’t a recommended best practice. Benjamin Black (blog | twitter) informs me that the recommended best practice is to use ConsistencyLevel.ONE or ConsistencyLevel.QUORUM with a replication factor of 3.

Of course, thinking about Cassandra’s implementation could be a non-issue because the Facebook implementation of Cassandra is proprietary and may have very few similarities to the current open source Apache Cassandra codebase.

Cassandra’s replication can be configured to meet the needs of a variety of workloads and deployment patterns and we can rule that out as the reason to not choose Cassandra. In this post from Facebook Engineering, the only concrete statement is “We found Cassandra’s eventual consistency model to be a difficult pattern to reconcile for our new Messages infrastructure.” Cassandra is designed to be constantly available and eventually consistent – reads will always succeed but they may not always return the same data – the inconsistencies are resolved through a read repair process. HDFS (the data store underlying HBase) is designed to be strongly consistent, even if that consistency causes availability problems.

Choosing HBase

Cassandra is a known quantity at Facebook. Cassandra was, after all, developed at Facebook. The operations teams have a great deal of experience with supporting and scaling Cassandra. Why make the switch?

Replication

HBase’s replication model (actually, it’s the HDFS replication model) differs from Cassandra’s replication through replication pipelining. Replication pipelining makes it possible to have guarantees of data consistency – we know that every node will have the same data once a write has completed. We can guarantee write order (which is important for knowing when a message arrives in your inbox).

Because of HDFS’s data replication strategy, it’s possible to gain some automatic fault tolerance. The load will be pipelined through multiple servers – if any single server fails, the query can be served by another HDFS node with the data. Likewise, data replication makes it easy to handle node failures. If a node fails, queries for that specific region of data can be routed to one of the remaining replica servers. A new server can be added and the data from the failed server can easily be replicated to the new server.

In the spirit of full disclosure, this replication is not a feature specific to HBase/HDFS – Cassandra is able to do this as well. Todd Lipcon’s comment includes some information on the differences between HDFS and Cassandra replication. It’s important to remember that HBase/HDFS and Cassandra use two different replication methodologies. Cassandra is an highly available, eventually consistent system. HBase/HDFS is strongly consistent system. Each is appropriate for a certain types of tasks.

Logging

One of the criticisms of many NoSQL databases is that they do not log data before it is written. This is, clearly, not the case. Both HBase and Cassandra use a write ahead logger to make sure that data can be safely written to disk in the log before it is persisted to disk. This allows either data store, much like a relational database, to recover from crashes without having to write to disk every time a new row is inserted.

Existing Expertise

The technology behind HBase – Hadoop and HDFS – is very well understood and has been used previously at Facebook. During the development of their data warehouse, Facebook opted to use Hive. Hive is a set of tools for data analytics built on top of Hadoop and HDFS. While Hive performs very well with analytic queries on largely static data, it does not perform well with rapidly changing data. This makes Hive poorly suited for the new Social Messaging feature at Facebook. Since Hive makes use of Hadoop and HDFS, these shared technologies are well understood by Facebook’s operations teams. As a result, the same technology that allows Facebook to scale their data will be the technology that allows Facebook to scale their Social Messaging feature. The operations team already understands many of the problems they will encounter.

Other Technologies

There are, of course, other technologies involved, but the most interesting part to me is the data store. Facebook’s decision to use HBase is a huge milestone. HBase has come of age and is being used by a prominent customer. Facebook developers are contributing their improvements back to the open source community – they’ve publicly said that they are running the open source version of Hadoop.

This site is protected with Urban Giraffe's plugin 'HTML Purified' and Edward Z. Yang's Powered by HTML Purifier. 531 items have been purified.