This is a follow up to the last two posts I made about querying HBase and Hive. The set up for this was a bit trickier than I would have liked, so I’m documenting my entire process for three reasons.
- To remind myself for the next time I have to do this.
- To help someone else get started.
- In the hope that someone will know a better way and help me improve this.
Installing Hadoop and Hive
This was the easiest part of the installation process. I used the Cloudera distribution of Hadoop for everything in this stack. There are a few ways to go about getting the Cloudera Distribution of Hadoop (CDH for short) on your machine. The Hadoop Quick Start is a good place to get started.
I’m pretty sure we’ve covered this before, but you need to have Java 1.6 installed in order to use any part of the Hadoop ecosystem.
If you’re using Windows, I recommend using the Hadoop VMWare image that Cloudera make available. If you want to run Hadoop natively on Windows, you need to set up Cygwin and creating a UNIX-like environment on your local machine. Setting. up Cygwin can be a bit tricky the first time you do it. Luckily, you’re not alone and documentation does exist: Setting up a Single-Node Hadoop “Cluster” on Windows XP.
Linux users have it the easiest out of any platform, in my opinion. Most of the documentation assumes that you’re running some UNIX-based operating system. If you’re willing to use the CDH version of Hadoop, you can tell your package manager to talk to the Cloudera repository. Cloudera have thoughtfully provided documentation for you in the installation guide.
You issue a few commands and you’ll have a working Hadoop set up on your machine in no time.
This is the route that I took with OS X. You could also do the same on Linux if you want to use the stock distribution of Hadoop. The Hadoop Quick Start guide covers this in some detail, but here’s what I did:
I copied these three files into my
/opt folder and uncompressed them. As you can see in the screenshot, I renamed the hadoop, hive, and pig folders to use a
cdh_ prefix. This makes it easier for me to identify where the installation came from, just in case I need to upgrade or raise a stink with someone.
Aliasing Hadoop for Easy Upgrades
Nothing is ever quite as easy as it seems at first, right? Now that we have our software installed, we need to supply some configuration information. Hadoop has three modes.
- Standalone In standalone mode – nothing runs as a service or daemon. This requires no configuration on our part, but it does make things trickier with Hive because only one Hive database connection can exist at a time in standalone mode.
- Pseudo-distributed This is what we’re aiming for. In pseudo-distributed mode everything is running on one machine as a service process. What we’re actually doing (sort of) is configuring our Hadoop installation to a be a one node cluster. It’s not reliable or failure proof (or a good idea), but it does let us develop on our workstations and laptops instead of buying a lot of servers for the house.
- Fully distributed This is a real live cluster running on many servers. Books are being written about this.
Hadoop is, by default, set up to run in standalone mode. Having a standalone Hadoop/HDFS installation is nice, but it doesn’t do what we need in the long-term.
There are Linux packages for CDH that will set up your server to run in pseudo-distributed mode. If you used one of those, you can skip ahead. Personally, I’d stick around just so you can read up on what you’re missing and so you make me feel better (I’ll know if you skip ahead).
core-site.xml file is used for machine specific configuration details. The default configuration is set up in
core-default.xml. Settings in
core-default.xml are overridden by
core-site.xml, so there’s no reason to go messing around in there. If you want to check out the contents of the file, it’s located in
Once you’ve opened, or created your own
core-site.xml, you want to put appropriate information in there. Mine looks like this:
<?xml-stylesheet type="text/xsl" href="configuration.xsl"?>
<!-- Put site-specific property overrides in this file. -->
<description>The name of the default file system. A URI whose
scheme and authority determine the FileSystem implementation. The
uri's scheme determines the config property (fs.SCHEME.impl) naming
the FileSystem implementation class. The uri's authority is used to
determine the host, port, etc. for a filesystem.</description>
<description>The maximum number of tasks that will be run simultaneously by a
a task tracker
<description>Default block replication.
The actual number of replications can be specified when the file is created.
The default is used if replication is not specified in create time.
I’ll summarize these three settings here. A full explanation of the core-site.xml file can be found in the Hadoop wiki: http://hadoop.apache.org/common/docs/current/cluster_setup.html.
This setting is telling Hadoop to use HDFS running on the local host for storage. This could also be a local file or something in Amazon S3.
We want to set this to a sane value because this is the maximum number of MapReduce tasks that Hadoop will fire up. If we set this value too high, Hadoop could theoretically start up a large number of tasks and overwhelm our CPU.
Non-relational databases like Hadoop get their resiliency by replicating data throughout the database cluster. Since we’re running in pseudo-distributed mode (it’s a cluster with only one node) we want to set this to 1.
I changed a few variables in my
hadoop-env.sh file based on settings I found in the Hadoop Wiki article Running Hadoop on OS X 10.5 64-bit). You can set these environment variables anywhere in your operating system, but since they’re Hadoop-specific, it’s probably best to set them in the
hadoop-env.sh file. Based on my own experience, I did not change the
HADOOP_CLASSPATH variable. Here’s what the first 15 lines of my
hadoop-env.sh looks like:
# Set Hadoop-specific environment variables here.
# The only required environment variable is JAVA_HOME. All others are
# optional. When running a distributed configuration it is best to
# set JAVA_HOME in this file, so that it is correctly defined on
# remote nodes.
# The java implementation to use. Required.
# Extra Java CLASSPATH elements. Optional.
# export HADOOP_CLASSPATH="<extra_entries>:$HADOOP_CLASSPATH"
# The maximum amount of heap to use, in MB. Default is 1000.
I’ve also added some additional configuration to my
.profile that make working with Hadoop and Hive a lot easier:
# CLASSPATH variables don't seem to like symbolic links on OS X
# setting up the CLASSPATH here makes it easy to change things around
# in the future as I upgrade between Hadoop/Hive/HBase/Pig versions
One Last Step
Before we can start anything running on Hadoop we have to get our namenode ready. The namenode is part of Hadoop that manages information for the distributed filesystem. A namenode knows where all of the data is stored across our distributed filesystem. Without a running namenode, we can’t find the files we’re storing.
Before we get started, we have to format our filesystem. This process creates empty directories to be used for the namenode’s data. Datanodes aren’t involved because they can be added and removed from the cluster dynamically.
hadoop namenode -format
You’ll see some output from this command and, once it’s all said and done, you’ll have a formatted namenode that’s ready to go.
Once you have everything set up and installed, you should be able to start Hadoop by running
start-all.sh from the command prompt. You should see some messages that tell you a bunch of nodes are being started with output going to a specific directory.
Starting Hadoop on the Command Prompt
Verifying That It’s Running
You can verify that everything is configured and running
hadoop dfs -ls / on the command line. You should see a basic directory listing that looks something like this image.
Verifying that Hadoop is Running
What About Hive?
Astute readers will notice that we’ve only configured Hadoop and haven’t done anything to get Hive up and running. This is, unfortunately, a separate task. Hadoop and Hive are separate tools, but they’re part of the same tool chain.
Hive, like Hadoop, uses XML configuration files. But, since Hive is a database, it also needs a place to store metadata about the database – it’s called the metastore. By default, Hive uses what’s called an embedded metastore. The problem with the default configuration is that it only allows for one user or process at a time to connect to a Hive database. This isn’t that good for production databases. It’s also no good if you’re like me and forget which virtual desktop you left that query window open on. We’re going to fix this.
It’s possible to store the metastore’s information in a persistent external database. Hive comes configured to use the Derby database, however any JDBC-compliant database can be used by setting a few options. This is precisely what we’re going to do. There’s a delicious irony in storing one database’s configuration settings in another database. Since I’m a huge fan of PostgreSQL, that’s what we’re going to use to get started.
I don’t want Hive talking to PostgreSQL as me, or any other random user account, so we’re going to set up a new user.
CREATE USER hadoop WITH PASSWORD 'hadoop';
Now that we have our user, let’s create the database:
CREATE DATABASE hadoop WITH OWNER = hadoop;
Of course, we also need a database driver. Up in the changes to my
.profile I set up a
HIVE_AUX_JARS_PATH variable. This is a
: delimited path where we can tell Hive to look for extra functionality to load. This folder specifically contains the PostgreSQL JDBC driver. You can download the appropriate PostgreSQL JDBC driver from the PostgreSQL JDBC driver site. Appropriate documentation is also there. It’s pretty much as simple as copying the JAR file into an appropriate directory.
Now that we’ve created our metastore user and database as well as installed a JDBC driver, the last thing to do is to make some changes in
hive-site.xml overrides the settings in
hive-default.xml – just like the Hadoop configuration we did earlier. One thing to keep in mind is that you can also override Hadoop configuration in this file as well. We’re not going to, but it’s good to know that it’s possible.
<?xml-stylesheet type="text/xsl" href="configuration.xsl"?>
<description>controls whether to connect to remote metastore server or open a new metastore server in Hive Client JVM</description>
javax.dbo.option.* settings are where we set up our metastore database connection. Everything else should be, largely, self-explanatory.
Let’s go ahead and create a table in Hive:
CREATE TABLE leagues (
ROW FORMAT DELIMITED
FIELDS TERMINATED BY '\t';
This creates a table named
leagues with two columns:
league_name. The last two lines might look a bit funny to people familiar with SQL. They are specific to the Hive Query Language (HiveQL). These lines say that each row in the underlying data file will be stored as tab-delimited text.
Getting Data Into Hive
We’ve configured Hadoop and Hive and we’ve created a table. How do we get data into the table? That’s a trickier matter.
We’re going to be using sample data from Retrosheet to populate our Hive database. You should get started downloading and keep reading, it takes a bit of time.
Loading Data With Pig
Pig is a way to execute data flows written in a language called Pig Latin. In this use case, I’ve written very primitive data clean-up scripts using Pig Latin (it would have been just as easy, if not easier, to write them in Ruby or Perl).
The first section of this Pig Latin script registers the PiggyBank contributed functionality and creates convenient aliases for the functions. We use
DEFINE to set up the aliases so we don’t have to type the fully qualified function name every time we want to use it. It’s a lot easier to type
replace(state, 'Ohio', 'OH') than it is to type
org.apache.pig.piggybank.evaluation.string.REPLACE(state, 'Ohio', 'OH').
-- Create a function alias because nobody wants to type in the full function
-- name over and over again.
DEFINE replace org.apache.pig.piggybank.evaluation.string.REPLACE();
DEFINE substring org.apache.pig.piggybank.evaluation.string.SUBSTRING();
DEFINE s_split org.apache.pig.piggybank.evaluation.string.Split();
DEFINE reverse org.apache.pig.piggybank.evaluation.string.Reverse();
-- Read the first CSV file and dump it as a tab-separated file
league_codes = LOAD 'league-codes.csv'
AS (league_id:chararray, league_name);
names = LOAD 'retrosheet-CurrentNames.csv'
-- We don't need to explicitly specify the PigStorage mechanism.
-- Pig will use tab separated files by default.
-- It's our first transformation:
-- 1) Load the data from the text file.
-- 2) Remove double-quote characters from the data.
-- 3) Write the data to the filesystem.
ballparks = LOAD 'retrosheet-ballparkcodes.txt'
formatted_parks = foreach ballparks GENERATE
replace(notes, '"', '');
personnel = LOAD 'retrosheet-umpire.coach.ids.txt'
-- the debut date is the only weirdly formatted date in here, so we'll reformat
-- it to be sortable in YYYYMMDD format
-- I had originally intended to do something with this, but I never
-- got around to writing it. As it stands, it's a good way to show
-- how the string splitting function works (it returns an array)
-- and how that can be combined with flatten to turn it into a tuple
split_personnel = FOREACH personnel GENERATE
-- These subsequent x_, y_, z_ stages load our personnel
-- relations into intermediate relations while we're doing
-- work on the data. This could have been done in a single
-- step, but it's much easier to read this way.
x_personnel = FOREACH split_personnel GENERATE
reverse(CONCAT('000', month)) AS month,
reverse(CONCAT('000', day)) AS day,
y_personnel = FOREACH x_personnel GENERATE
reverse(substring($3, 0, 2)) AS month,
reverse(substring($4, 0, 2)) AS day,
formatted_personnel = FOREACH y_personnel GENERATE
CONCAT(CONCAT(year, month), day) AS debut;
You might think that each of the
STORE statements would produce a file named
ballparks. Unfortunately, you’d be wrong. You see, Pig turns our Pig Latin script into a MapReduce job on the fly. The final output is written into the appropriately named folder in a part file. The part files have names like
part-m-00000. This tells us that it’s part of a MapReduce job, specifically the map phase, and it’s the first part. If there were multiple map operations that were performed (if we had a full cluster), there would be multiple files with incrementing part numbers.
There is a second Pig Latin program that will load the standings, I’ve left that out of the article due to the sheer length of the code. However, it’s a relatively trivial script and should be easy enough to follow. The interesting thing to note is that the Pig Latin program to load the standings makes use of variable substitution. The driver script for this whole process,
load_standings.sh, passes parameters to Pig on the command line and they are interpreted at run time.
For more information on Pig and Pig Latin, you should check out the Pig project page or the Pig Latin Cookbook.
Since we’ve formatted our data, how do we load it? It’s pretty simple to do with a Hive
LOAD DATA LOCAL INPATH 'personnel/part-m-00000'
OVERWRITE INTO TABLE personnel ;
LOAD DATA LOCAL INPATH 'team_names/part-m-00000'
OVERWRITE INTO TABLE teams ;
LOAD DATA LOCAL INPATH 'ballparks/part-m-00000'
OVERWRITE INTO TABLE ballparks ;
LOAD DATA LOCAL INPATH 'league-codes/part-m-00000'
OVERWRITE INTO TABLE leagues ;
Because the data for the standings is separated out by year in the Retrosheet data set, we have to drive the insertion through a shell script:
echo " loading Box Scores..."
for d in `ls -d stats/*`
echo " Loading box scores for $year"
for f in `ls $d/part*`
`hive -e "LOAD DATA LOCAL INPATH '$f' INTO TABLE raw_standings PARTITION (year='$year'); "`;
This loops through the output folders for the standings one at a time and loads the data into Hive. The
raw_standings uses partitioning to separate out the data by year. Similar to partitioning in a relational database, this lets us store data with the same partition key in the same physical file. This improves query performance when we’re querying on a limited range of data. In this case, we’re only partitioning by year, but it’s possible to partition on multiple columns for greater granularity.
To improve long term performance further, we’ve gone ahead and created a second table,
rc_standings table is partitioned by year, but it’s also using a
ROW FORMAT setting to create an RCFile – a columnar files. The data in this table will be stored in a column-oriented layout. This makes it possible to skip reading from columns that aren’t used in our query. With a table as wide as the
rc_standings table, that’s going to be beneficial for our query performance.
In order to get the data into the RCFile table, we have to copy it using a second driver in the shell script:
for d in `ls -d stats/*`
# This only overwrites the $year partition in the standings table
# We also make use of the REGEX Column Specification to avoid pulling back the
# partitioning column(s) in the query.
# See http://wiki.apache.org/hadoop/Hive/LanguageManual/Select#REGEX_Column_Specification
# for more about the REGEX Column Specification.
hive -S -e "INSERT OVERWRITE TABLE rc_standings PARTITION (year=$year) SELECT \`(year)?+.+\` FROM raw_standings WHERE year = $year;"
Loading the standings took an incredible amount of time on my test system, despite using an SSD and having 8GB of memory available. I’m not sure why, and I would love to know what I did wrong, so feel free to sound off in the comments so we can make this process faster for people to get started.
Once we have the data loaded, querying becomes possible. To connect to hive, open up a command prompt and type
hive. You should see blank hive prompt waiting for your commands.
The Hive Prompt
Let’s take a look at the contents of a single table. Type
select * from leagues; and hit enter. You should see something like the following:
A Simple Query
Let’s try something trickier and use a
select l.league_name, t.franchise_id, t.first_game_date, t.last_game_date, t.nickname
from leagues l
join teams t on l.league_id = t.league_id;
Query Results - With a Join
So far, so good. These have both been very small queries. What if we want to look at some aggregated data. What then? Turns out that it’s just as easy to do as any other query.
select l.league_name, s.year, avg(s.home_score)
from rc_standings s
join leagues l on s.home_league_id = l.league_id
group by l.league_name, s.year;
Now We're Using GROUP BY
Once again, Hive is responding exactly as we would expect. That’s because HiveQL was designed to be as close to SQL as possible. The Hive engine is transforming our HiveQL into MapReduce jobs in the back end. Depending on the type of queries that we’re running, Our queries may execute on as few as one node in the cluster or as many as all nodes in the cluster.
While HiveSQL looks like SQL, some additions have been made that make sense for Hive –
SORT BY guarantees that the order of results within a single reducer while
ORDER BY guarantees the order of results in the total output. There is also a
DISTRIBUTE BY command determines how our data will be distributed to the reducers that are handling the query.
The pig latin, shell script, and HiveSQL files used in this script can be found here, on Amazon S3.
The source data can be found here. Please note that you’ll need 7-zip to get to the source data. Which, by the way, was obtained free of charge and is copyrighted by Retrosheet. For more information, please contact www.retrosheet.org.