Secondary Indexes – How Would You Do It (In Riak)?

Every database has secondary indexes, right? Not quite. It turns out that some databases don’t support them. Secondary indexes are important because they make it possible to perform more, quick, queries on a given chunk of data. What if we want to add secondary indexes to a database, how would we go about doing it?

Looking at queries, there are a few basic ways that we actually query data:

  • Equality predicates
  • Inequality predicates
  • Multi-predicate queries

We’re going to be using Riak as our example database

Equality Predicates

The easiest type of query is an equality predicate. With an equality predicate, we’re just matching any given search term. Looking at an example, what if we want to find every user born on July 4, 1976?

function(value, keyData, arg){
  var data = Riak.mapValuesJson(value)[0];

  if (data.Birthdate == '1976-07-04') {
    return [ data ];

With an RDBMS, we’d just create an index on the table to support our queries. But with a key-value store, we don’t have that ability. How do we do it?

To create an index in Riak, we’d create another bucket, users_by_birthdate. The keys for this bucket would be the value of the birthdate. We could use links (this will fall apart pretty rapidly above about 1000 links) or store a list of keys in the users bucket. To satisfy our MapReduce query we can use the users_by_birthdate bucket to get the IDs of our users for retrieval rather than scanning the entire bucket. Depending on the amount of data we have, being able to use equality predicates could reduce the number of reads we have to perform.

Multi-Predicate Equality Searches

I’m going to skip inequality predicates for a second and talk about multi-predicate equality searches. This is easy to accomplish. If we’re searching for everyone with the last name of ‘Smith’ who was born on July 4th, 1976 we might use the following MapReduce function:

function(value, keyData, arg){
  var data = Riak.mapValuesJson(value)[0];
  if (data.Birthdate == '1976-07-04' && data.LastName = 'Smith') {
    return [ data ];

With a relational database, we could create a multi-column index. That isn’t possible here – with Riak we only have a bucket name (the index name), a key (the key), a value.

Two Equality Buckets

One option would be to use two equality buckets and then perform a bitwise comparison of the results, only keeping user keys that exist in both buckets. In this case, we would have two buckets users_by_birthdate and users_by_lastname. The problem is that we might have large data skew on one of these key-value combinations. So, if there are only 1,000 users born on July 4th, 1976 and 1,000,000 Smiths in our database, this could cause some inefficiencies in our queries when we go to merge the two result sets. Worse, there could be 1,000,000 users that match our search conditions in each bucket, but only 10 in common.

Using a bitwise combination of two buckets is, from a development standpoint, the easiest way to implement multi-predicate comparison. However, this approach creates problems with any significant volume of data.

MapReduce Over An Index Bucket

What if instead of using two or more buckets and performing a bitwise comparison of the keys that we find, instead we create a single index bucket. Inside that bucket, we’ll still use one value as the key for each key-value pair. This would look like the users_by_birthdate bucket – every key-value pair has the birthdate as the key.

Each key contains a list of values. Unlike our first example we’re going to store more than a simple list of keys. Instead we’re going to store a list of lists of tuples – a multi-dimensional data structure. If we’re using the birthdate, last name example, it might look like this:

1976-07-04    [
                [176, {last_name: Smith}],
                [195, {last_name: Harrison}]

What does this buy us? Well, for starters we don’t have to perform bitwise comparisons between two or more potentially massive lists of keys. Instead we can examine the MapReduce query that’s coming in and re-write it on the fly to give us a MapReduce job over our new index. The upside of this index is that it can perform double duty; we can search for users by birthdate or by birthdate and last name.

One of the things that we need to consider when we’re designing our key-value database is the queries that we’re going to be running, not the way that the data should be structured. When we’re structuring our data, it may make more sense to use a complex data structure, like this index bucket, instead of many single column indexes. Updates to the data will require fewer updates with this model. Only one write is needed when a user signs up, not two; one for the compound index as opposed to two for the simple indexes – one for users_by_birthdate and one for users_by_lastname.

Once we start updating data using an index bucket can reduce index maintenance. When we update a record we’ll need to update any index buckets. This could lead to a large number of data writes, tombstone writes, and compactions over time. Using a single index bucket with more complex structure in the index removes the need for a large number of writes. We only have to write to the original key-value pair as well as writing to the index bucket.

Inequality Predicates

Inequality predicates are much trickier than equality predicates. At first glance it seems like it would be easy to perform a MapReduce over the keys and determine which key-value pairs meet the search conditions. And we would be right: that is an easy solution when there are a small number of keys. What happens when there are a large number of keys? We need to read more data to get to the results we need. Digging further, what happens when very little of our data actually matches? The benefits of using this approach become very slim; we need to perform a large number of reads to retrieve very little data.

There’s a trick to implementing inequality predicates: in some cases it will be faster to retrieve all keys that match our inequality predicate – birthdate between 1976-07-01 and 1976-07-31. In other cases, a scan is going to be faster – users with a last name starting with ‘A’.

How do we figure out which approach is going to be faster? Let’s take a look at a concrete example using users and birthdates for our example. We already know how to find every user with a specific birthdate using an index; that’s a simple seek. How do we find all users whose birthday falls between a range of dates? That’s what we’ve been asking all along, right?

What if we maintain some simple information about our users_by_birthdate index? Let’s say we track the minimum value, the maximum value, and the number of keys in the bucket. Just from this information we can guess roughly how many keys will be in any given range. This is pretty easy with dates; we can assume that there’s a an even distribution of birthdates in any range of dates.

Things get trickier with names. How many distinct last names are there between Aaronson and Bartleby? What about between Xara and Yoder? Knowing the min and max last names and the number of keys won’t help us figure out the distribution; the data isn’t uniformly distributed. We need a way to make educated guesses about the data in a range. One way to do this is to create a histogram of our data. Histograms are data samplings that let us make better guesses about the number of specific values we can expect to find in a given range.

Relational databases accomplish this by maintaining separate statistics tables. We can accomplish something similar in Riak by creating a new bucket of statistics where the stats key is the high value of a range of keys. The value of the key-value pair would be data about the range of values – number of rows, number of keys, and the average number of distinct values per key. We want to collect statistics so we can easily determine when we need to seek a small number of keys and when we need to perform a MapReduce over the bucket.

So far I’ve proposed two range search approaches that work well with different use cases. Ideally we could create an indexing mechanism in Riak that lets us state an assumption about the distribution of our data in Riak. If we know there’s a constant distribution of keys, we can skip any kind of heuristics and just assume that for July, we’ll be reading 31 keys. Likewise, if we want to pull back every user whose last name is between Scalia and Smith, then we’ll need to MapReduce over the index bucket to get the data that we need.

Creating Indexes – Simple Indexes

Simple, single column indexes are easy to create. The syntax to create an index with SQL is pretty straightforward:

CREATE [ UNIQUE ] INDEX [ CONCURRENTLY ] [ name ] ON table [ USING method ]
    ( { column | ( expression ) } [ opclass ] [ ASC | DESC ] [ NULLS { FIRST | LAST } ] [, ...] )
    [ WITH ( storage_parameter = value [, ... ] ) ]
    [ TABLESPACE tablespace ]
    [ WHERE predicate ]

With Riak, we won’t need that much information about the index. Working from the simplest use case possible, we only need

  • index name
  • bucket
  • value name to index
  • data storage mechanism

We need to uniquely name our index, otherwise we can’t create a new bucket for it. Likewise, we have to know which bucket we’re indexing. It also makes sense that we want to know which value we’ll be indexing.

We need to define the data storage mechanism for indexes so we are able to index the data. It’s for the same reason that we have to tell Riak Search how the data is stored; unless we know what we’re indexing, it’s impossible to index it.

Looking at riak_search, it should be possible to re-use the basic principles behind riak_search_kv_extractor and riak_search_kv_hook to define how indexes are created and how data is retrieved. When we create a full-text index with Riak search, we supply the datatype (JSON, XML, plain-text, or something else). This gives us an incredible amount of flexibility without having to define a schema for the documents. We’re only indexing the fields we want. This can be incredibly useful if we’re storing larger objects but we know we’ll only perform look ups on a few fields – indexing time and space is reduced drastically.

Re-using the full-text index model for secondary indexes makes it easy to maintain consistent indexing behavior throughout Riak – all of our indexes (full-text or otherwise) can be defined using the same commands.

Creating Indexes – Complex Indexes

Complex indexes shouldn’t be much more difficult to implement than simple indexes. We only need one additional piece of information to collect our index: additional fields to index. Ideally the additional fields would be a text representation of how we access the fields in a program – “parent.child.property”. Supporting XML could make this trickier, but it should be easy to use XQuery/XPath to determine which XML elements, attributes, and values should be indexed. The intelligence of knowing how to perform the query will come later; indexes don’t need to know how to query themselves.

Staying up to Date

Right now, this index will only be accurate once: when we create it. Once again, we can borrow from the Riak Search implementation and create hooks to make sure the index is kept up to date as data is written. Keeping our indexes in sync with the data will cause some overhead on the cluster, but it’s worth it to know that we can accurately retrieve data. Like any other database, we need to take care not to create too many indexes: every write to the bucket means we end up writing to each index.

Ideally the index maintenance hooks would be the last hooks to run in a sequence of pre-commit hooks. If there are a number of pre-commit hooks to validate data we need to make sure that the data is valid before it is written to an index. If our primary worry is data validity, it is possible to implement the index maintenance code as a post-commit hook in Erlang (rather than as a pre-commit hook in either Erlang or JavaScript).

The Trouble with Data Locality

One potential problem we can run into is data locality. Depending on how the data is distributed in the key space, we could end up writing key-value and index data on the same physical node. The problem is that we could double, or more, the physical I/O that we’re performing on those nodes. If an index is not on the same node as the data we might incur tremendous network overhead querying remote indexes before querying data on the local node. On a busy network, this increased bandwidth could have an adverse effect on operations.

Driving Toward Our Goal

We’re clearly heading toward something. Let’s take stock of what we’ve talked about so far:

There are two types of search predicate to support

  • Equality predicates – easy to index
  • Inequality predicates – trickier to index

There several ways to use indexes

  • A single index
  • Multiple indexes with a bitwise comparison
  • Complex indexes

We can read from indexes through

  • Key seeks
  • Key scans (MapReduce)
  • Key/value scans (MapReduce)

We can figure out what type of read to use by using data statistics

  • Uniformly distributed keys are easy
  • Randomly distributed keys are harder
  • A small number of seeks can be more efficient than a single MapReduce

Astute readers will have probably guess what we need: a query engine.

This Ain’t No Car, It’s a Key-Value Store!

Why do we need a query engine? Key-value stores are simple; data is stored in key-value pairs and retrieved by a simple key lookup. A query engine of some type just adds complexity, right? Well, without a query engine, we can’t use secondary indexes without explicitly writing queries to use them.

In a hypothetical Riak query engine, we could process a MapReduce query like this:

Parse map phases and extract search conditions

Examine existing indexes

  1. Do indexes exist?
  2. Examine index type – evaluate statistics if needed
  3. Evaluate data access path needed to retrieve data – this is where we determine if we can use one index, multiple indexes with a bitwise compare, or use a multi-column index
  4. If no indexes exist, does a full-text index exist?

Re-write MapReduce functions on-the-fly

  1. Early Map phases are added to query use indexes
  2. Early Map phase output is used for key selection in MapReduce functions

While Riak doesn’t need anything as complex as an RDBMS’s query engine, this is still a gross simplification of how a query engine would function – additional functionality is needed to perform costing estimates of seeks vs MapReduce scans, statistics will need analyzed, and the query engine will need to be aware of vast portions of Riak in order to make good guesses so it can re-write MapReduce queries on the fly.

Features, Features, Features

I left some topics out of this discussion intentionally. They don’t directly apply to implementing indexes, but they would be very helpful to have down the road.

  • Checking updated values to avoid needless tombstones. When a client updates a key with the same value it had before, there’s no reason to overwrite the stored value: nothing changed.
  • Saving query plans. We are re-writing MapReduce queries on the fly, why not save them off internally and opt to store them for re-use when the cluster comes online?
  • Missing indexes. Frequently used queries should be optimized, either with secondary indexes for faster querying or as standalone entities in Riak. THe easiest way to monitor for this is to store metadata about the query history.
  • Index use data. How often are our indexes being used? Do they meet our needs or should we replace them with different indexes?
  • Index only queries. If a MapReduce can be satisfied by the data in the index, don’t query the main bucket.
  • Running statistics. Riak is a long running process. Over time we will be able to collect a lot of data about Riak. Why not collect data about how our Riak is being used so we can tune our code and hardware to get better performance?

Summing it all Up

Indexing is a tricky thing to think about and an even trickier thing to get right. When I first started on this as a thought experiment, I thought that it was going to be easy. Assuming that indexes are simple led to a lot of dead ends, frustration, and restarts. This came about from talking to Mark Phillips (blog | twitter) and Dan Reverri (twitter) last week at the Basho offices about Riak, HTML, and the sound of farm machinery pulling itself apart.

Sound off in the comments.

TL;DR version – indexes are hard, let’s go shopping.

Querying Hive with Toad for Cloud Databases

I recently talked about using the Toad for Cloud Databases Eclipse plug-in to query an HBase database. After I finished up the video, I did some work loading a sample dataset from Retrosheet into my local Hive instance.

This 7 minute tutorial shows you brand new functionality in the Toad for Cloud Databases Eclipse plug-in and how you can use it to perform data warehousing queries against Hive.

New Uses for NoSQL

We all know that you can use NoSQL databases to store data. And that’s cool, right? After all, NoSQL databases can be massively distributed, are redundant, and really, really fast. But some of the things that make NoSQL database really interesting aren’t just the redundancy, performance, or their ability to use all of those old servers in the closet. Under the covers, NoSQL databases are supported by complex code that makes these features possible – things like distributed file systems.

What’s a Brackup?

Brackup is a backup tool. There are a lot of backup tools on the market, what makes this one special?

First, it’s free.

Second, it’s open source; which means it’s always going to be free.

Third, it can chunk your files – files will be crammed into chunks for faster access and distributed across your backup servers. Did you know that opening a filehandle is one of the single most expensive things you can ever do in programming?

Fourth, it supports different backends.

It Can Backup to Riak

I’ve mentioned Riak a few times around here. Quick summary: Riak is a distributed key-value database.


So, this means that when you take a backup, Brackup is going to split your data into different chunks. These chunks are going to be sent to the backup location. In this case, the backup location is going to be your Riak cluster. As Brackup goes along and does its work, it sends the chunks off to Riak.

Unlike sending your data to an FTP server or Amazon S3, it’s going to get magically replicated in the background by Riak. If you lose a backup server, it’s not a big deal because Riak will have replicated that data across multiple servers in the cluster. Backing up your backups just got a lot easier.

Why Is the NoSQL Part Important?

NoSQL can be used for different things. It’s not a just a potential replacement for an RDBMS (and the beginning of another nerd holy war). Depending on the data store and your purpose, you can use a NoSQL database for a lot of different things – most notably as a distributed file system. This saves time and money since you don’t have to buy a special purpose product, you can use what’s already there.

Comparing MongoDB and SQL Server Replication

MongoDB has replication built in. So does SQL Server, Oracle, DB2, PostgreSQL, and MySQL. What’s the difference? What makes each MongoDB a unique and special snowflake?

I recently read a three part series on MongoDB repication (Replication Internals, Getting to Know Your Oplog, Bending the Oplog to Your Will) in an effort to better understand MongoDB’s replication compared to SQL Server’s replication.

Logging Sidebar

Before we get started, it’s important to distinguish between the oplog and MongoDB’s regular log. By default, MongoDB pipes its log to STDOUT… unless you supply the --logpath command line flag. Logging to STDOUT is fine for development, but you’ll want to make sure you log to a file for production use. The MongoDB log file is not like SQL Server’s log. It isn’t used for recovery playback. It’s an activity log. Sort of like the logs for your web server.

What’s The Same?

Both MongoDB and SQL Server store replicated data in a central repository. SQL Server stores transactions to be replicated in the distribution database. MongoDB stores replicated writes in the oplog collection. The most immediate difference between the two mechanisms is that SQL Server uses the transaction as the demarcation point while MongoDB uses the individual command as the demarcation point.

All of our transactions (MongoDB has transactions… they’re just only applied to a single command) are logged. That log is used to ship commands over to a subscriber. Both SQL Server and MongoDB support having multiple subscribers to a single database. In MongoDB, this is referred to as a replica set – every member of the set will receive all of commands from the master. MongoDB adds some additional features: any member of a replica set may be promoted to the master server if the original master server dies. This can be configured to happen automatically.

The Ouroboros

The Ouroboros is a mythical creature than devours its own tail. Like the Ouroboros, the MongoDB oplog devours its own tail. In ideal circumstances, this isn’t a problem. The oplog will happily write away. The replica servers will happily read away and, in general, keep up with the writing to the oplog.

The oplog file is a fixed size so, like the write ahead log in most RDBMSes, it will begin to eat itself again. This is fine… most of the time.

Unfortunately, if the replicas fall far enough behind, the oplog will overwrite the transactions that the replicas are reading. Yes, you read that correctly – your database will overwrite undistributed transactions. DBAs will most likely recoil in horror. Why is this bad? Well, under extreme circumstances you may have no integrity.

Let’s repeat that, just in case you missed it the first time:

There is no guarantee of replica integrity.

Now, before you put on your angry pants and look at SQL Server Books Online to prove me wrong, this is also entirely possible with transactional replication in SQL Server. It’s a little bit different, but the principle still applies. When you set up transactional replication in SQL Server, you also need to set up a retention period. If your replication is down for longer than X hours, SQL Server is going to tell you to cram it up your backside and rebuild your replication from scratch.

Falling Behind

Falling behind is easy to do when a server is under heavy load. But, since MongoDB avoids writing to disk to increase performance, that’s not a problem, right?

Theoretically yes. In reality that’s not always the case.

When servers are under a heavy load, a lot of weird things can happen. Heavy network traffic can result in TCP/IP offloading – the network card can offload work to the CPU. When you’re using commodity hardware with commodity storage, you might be using software RAID instead of hardware RAID to simulate one giant drive for data. Software RAID can be computationally expensive, especially if you encounter a situation where you start swapping to disk. Before you know it, you have a perfect storm of one off factors that have brought your shiny new server to its knees.

In the process, your oplog is happily writing away. The replica is falling further behind because you’re reading from your replica and writing to the master (that’s what we’re supposed to do, after all). Soon enough, your replicas are out of sync and you’ve lost data.

Falling Off a Cliff

Unfortunately, in this scenario, you might have problems recovering because the full resync also uses a circular oplog to determine where to start up replication again. The only way you could resolve this nightmare storm would be to shut down your forward facing application, kill incoming requests, and bring the database back online slowly and carefully.

Stopping I/O from incoming writes will make it easy for the replicas to catch up to the master and perform any shard reallocation that you need to split the load up more effectively.

Climbing Gear, Please

I’ve bitched a lot in this article about MongoDB’s replication. As a former DBA, it’s a scary model. But I’ve bitched a lot in the past about SQL Server’s transactional replication – logs can grow out of control if a subscriber falls behind or dies – but it happens with good reason. The SQL Sever dev team made the assumption that a replica should be consistent with the master. In order to keep a replica consistent, all of the undistributed commands need to be kept somewhere (in a log file) until all of the subscribers/replicas can be brought up to speed. This does result in a massive hit to your disk usage, but it also keeps your replicated databases in sync with the master.

Just like with MongoDB, there are times when a SQL Server subscriber may fall so far behind that you need to rebuild the replication. This is never an easy choice, no matter which platform you’re using, and it’s a decision that should not be taken lightly. MongoDB makes this choice a bit easier because MongoDB might very well eat its own oplog. Once that happens, you have no choice but to rebuild replication.

Replication is hard to administer and hard to get right. Be careful and proceed with caution, no matter what your platform.

At Least There is a Ladder

You can climb out of this hole and, realistically, it’s not that bad of a hole. In specific circumstances you may end up in a situation where you will have to take the front end application offline in order to resync your replicas. It’s not the best option, but at least there is a solution.

Every feature has a trade off. Relational databases trade integrity for performance (in this case) whereas MongoDB trades immediate performance for potential maintenance and recovery problems.

Further Reading


SQL Server

Open Sourcing Sawzall – What Does It Mean?

For Data Analytics or automotive modification, you will find no finer tool.

While perusing twitter, I saw that Google has open sourced Sawzall, one of their internal tools for data processing. WTF does this mean?

Sawzall, WTF?

Apart from a tool that I once used to cut the muffler off of my car (true story), what is Sawzall?

Sawzall is a procedural language for analyzing excessively large data sets. When I say “excessively large data sets”, think Google Voice logs, utility meter readings, or the network traffic logs for the Chicago Public Library. You could also think of anything where you’re going to be crunching a lot of data over the course of many hours on your monster Dell R910 SQL Server.

There’s a lengthy paper about how Sawzall works, but I’ll summarize it really quickly. If you really want to read up on all the internal Sawzall goodness, you can check it out on Google code – Interpreting the Data: Parallel Analysis with Sawzall.

Spell It Out for Me

At its most basic, Sawzall is a MapReduce engine, although the Google documentation goes to great pains to not use the word MapReduce, so maybe it’s not actually MapReduce. It smells oddly like MapReduce to me.

I’ll go into more depth on the ideas behind MapReduce in the future, but here’s the basics of MapReduce as far as Sawzall is concerned:

  1. Data is split into partitions.
  2. Each partition is filtered. (This is the Map.)
  3. The results of the filtering operation are used by an aggregation phase. (This is the Reduce.)
  4. The results of the aggregation are saved to a file.

It’s pretty simple. That simplicity makes it possible to massively parallelize the analysis of data. If you’re in the RDBMS world, think Vertica, SQL Server Parallel Data Warehouse, or Oracle Exadata. If you are already entrenched and in love with NoSQL, you already know all about MapReduce and probably think I’m an idiot for dumbing it down so much.

The upside to Sawzall’s approach is that rather than write a Map program and a Reduce program and a job driver and maybe some kind of intermediate aggregator, you just write a single program in the Sawzall language and compile it.

… And Then?

I don’t think anyone is sure, yet. One of the problems with internal tools is that they’re part of a larger stack. Sawzall is part of Google’s internal infrastructure. It may emit compiled code, but how do we go about making use of those compiled programs in our own applications? Your answer is better than mine, most likely.

Sawzall uses something called Protocol Buffers – PB is a cross language way to efficiently move objects and data around between programs. It looks like Twitter is already using Protocol Buffers for some of their data storage needs, so it might only be a matter of time before they adopt Sawzall – or before some blogger opines that they might adopt Sawzall ;) .

So far nobody has a working implementation of Sawzall running on top of any MapReduce implementations – Hadoop, for instance. At a cursory glance, it seems like Sawzall could be used in Hadoop Streaming jobs. In fact, Section 10 of the Sawzall paper seems to point out that Sawzall is a record by record analytical language – your aggregator needs to be smart enough to handled the filtered records.

Why Do I Need Another Language?

This is a damn good question. I don’t program as much as I used to, but I can reasonably write code in C#, Ruby, JavaScript, numerous SQL dialects, and Java. I can read and understand at least twice as many languages. What’s the point of another language?

One advantage of a special purpose language is that you don’t have to worry shoehorning domain specific functionality into existing language constructs. You’re free to write the language the way it needs to be written. You can achieve a wonderful brevity by baking features into the language. Custom languages let developers focus on the problems at hand and ignore implementation details.

What Now?

You could download the code from the Google Code repository, compile it, and start playing around with it. It should be pretty easy to get up and running on Linux systems. OS X developers should look at these instructions from a helpful Hacker News reader. Windows developers should install Linux on a VM or buy a Mac.

Outside of downloading and installing Sawzall yourself to see what the fuss is about, the key is to keep watching the sky and see what happens.

Upcoming Talks – Next Week

Next week I’ll be in the San Francisco Bay area. More specifically, I’ll be giving three lightning talks at three separate Cloud Camps. It’s the same talk each time, I’ll be giving a general intro to NoSQL and cloud databases.

Silicon Valley Cloud Camp in Santa Clara, CA.
Cloud Camp Santa Clara also in Santa Clara, CA.
Cloud Camp SF @ QCon in San Francisco, CA.

All of these events start at 6:30PM, the Lightning Talks start at 6:45PM, and I have no idea when I’m going to take the stage, but it promises to be good. If you’re in the area and would like to hang out, hit me up in the comments and we can arrange a time to talk.

Foursquare and MongoDB: What If

People have chimed in and talked about the Foursquare outage. The nice part about these discussions is that they’re focusing on the technical problems with the current set up and Foursquare. They’re picking it apart and looking at what is right, what went wrong, and what needs to be done differently in MongoDB to prevent problems like this in the future.

Let’s play a “what if” game. What if Foursquare wasn’t using MongoDB? What if they were using something else?


Riak is a massively scaleable key/value data store. It’s based on Amazon’s Dynamo. If you don’t want to read the paper, that just means that it uses some magic to make sure that data is evenly spread throughout the database cluster and that adding a new node makes it very easy to rebalance the cluster.

What would have happened at Foursquare if they had been using Riak?

Riak still suffers from the same performance characteristics around disk access as MongoDB – once you have to page to disk, operations become slow and throughput dries up. This is a common problem in any software that needs to access disks – disks are slow, RAM is fast.

Riak, however, has an interesting distinction. It allocates keys inside the cluster using something called a consistent hash ring – this is just a convenient way to rapidly allocate ranges of keys to different nodes within a cluster. The ring itself isn’t interesting. What’s exciting is that the ring is divided into partitions (64 by default). Every node in the system claims an equal share of those 64 partitions. In addition, each partition is replicated so there are always three copies of a give key/value pair at any time. Because there are multiple copies of the data, it’s unlikely that any single node will fail or become unbalanced. In theory, if Foursquare had used Riak it is very unlikely that we’ll run into a problem were a single node becomes full.

How would this consistent hash ring magical design choice have helped? Adding a new node causes the cluster to redistribute data in the background. The new node will claim and equal amount of space in the cluster and the data will be redistributed from the other nodes in the background. The same thing happens when a node fails, by the way. Riak also only stores keys in memory, not all of the data. So it’s possible to reference an astronomical amount before running out of RAM.

There’s no need to worry about replica sets, re-sharding, or promoting a new node to master. Once a node joins a riak cluster, it takes over its share of the load. As long as you have the network throughput on your local network (which you probably do), then this operation can be fairly quick and painless.


Cassandra, like Riak, is based on Amazon’s Dynamo. It’s a massively distributed data store. I suspect that if Foursquare had used Cassandra, they would have run into similar problems.

Cassandra makes use of range partitioning to distribute data within the cluster. The Foursquare database was keyed off of the user name which, in turn, saw abnormal growth because some groups of users were more active than others. Names also tend to clump around certain letters, especially when you’re limited to a Latin character set. I know a lot more Daves that I know Zachariahs, and I have a lot of friends whose names start with the letter L. This distribution causes data within the cluster to be overly allocated to one node. This would, ultimately, lead to the same problems that happened at Foursquare with their MongoDB installation.

That being said, it’s possible to use a random partitioner for the data in Cassandra. The random partitioner makes it very easy to add additional nodes and distribute data across them. The random partitioner comes with a price. It makes it impossible to do quick range slice queries in Cassandra – you can no longer say “I want to see all of the data for January 3rd, 2010 through January 8th, 2010”. Instead, you would need to build up custom indexes to support your querying and build batch processes to load the indexes. The tradeoffs between the random partitioner and the order preserving partitioner are covered very well in Dominic Williams’s article Cassandra: RandomPartitioner vs OrderPreservingPartitioner.

Careful use of the random partitioner and supporting batch operations could have prevented the outage that Foursquare saw, but this would have lead to different design challenges, some of which may have been difficult to overcome without resorting to a great deal of custom code.


HBase is a distributed column-oriented database built on top of HDFS. It is based on Google’s Bigtable database as described in “Bigtable: A Distributed Storage System for Structured Data”. As an implementation of Bigtable, HBase has a number of advantages over MongoDB – write ahead logging, rich ad hoc querying, and redundancy

HBase is not going to suffer from the same node redistribution problems that caused the Foursquare outage. When it comes time to add a new node to the cluster data will be migrated in much larger chunks, one data file at a time. This makes it much easier to add a new data node and redistribute data across the network.

Just like a relational database, HBase is designed so that all data doesn’t need to reside in memory for good performance. The internal data structures are built in a way that makes it very easy to find data and return it to the client. Keys are held in memory and two levels of caching make sure that frequently used data will be in memory when a client requests it.

HBase also has the advantage of using a write ahead log. In the event of a drive failure, it is possible to recover from a backup of your HBase database and play back the log to make sure data is correct and consistent.

If all of this sounds like HBase is designed to be a replacement for an RDBMS, you would be close. HBase is a massively distributed database, just like Bigtable. As a result, data needs to be logged because there is a chance that a hardware node will fail and will need to be recovered. Because HBase is a column-oriented database, we need to be careful not to treat it exactly like a relational database, but

A Relational Database

To be honest, Foursquare’s data load would be trivial in any relational database. SQL Server, Oracle, MySQL, and PostgreSQL can all handle orders of magnitude more data than the 132GB of data that Foursquare was storing at the time of the outage. This begs the question “How we could handle the constant write load?” Foursquare is a write-intensive application.

Typically, in the relational database world, when you need to scale read and write loads we add more disks. There is a finite amount of space in a server chassis and these local disks don’t provide the redundancy necessary for data security and performance; software RAID is also CPU intensive and slow. A better solution is to purchase a dedicated storage device, either a SAN, NAS, or DAS. All of these devices offer read/write caching and can be configured with in a variety of RAID levels for performance and redundancy.

RDBMSes are known quantities – they are easy to scale to certain points. Judging by the amount of data that Foursquare reports to have, they aren’t likely to reach the point where an RDBMS can no longer scale for a very long time. The downside to this approach is that an RDBMS is much costlier per TB of storage (up to ten times more expensive) than using MongoDB, but if your business is your data, then it’s important to keep the data safe.


It’s difficult to say if a different database solution would have prevented the Foursquare outage. But it is a good opportunity to highlight how different data storage systems would respond in the same situation.

Hadoop World Follow Up

I should have written this right when I got back from Hadoop World, instead of a week or so later, but things don’t always happen the way you plan. Before I left to go to Hadoop World (and points in between), I put up a blog post asking for questions about Hadoop. You guys responded with some good questions and I think I owe you answers.

What Is Hadoop?

Hadoop isn’t a simple database; it’s a bunch of different technologies built on top of the Hadoop common utilities, MapReduce, and HDFS (Hadoop Distributed File System). Each of these products serves a simple purpose – HDFS handles storage, MapReduce is a parallel job distribution system, HBase is a distributed database with support for structured tables. You can find out more on the Apache Hadoop page. I’m bailing on this question because 1) it wasn’t asked and 2) I don’t think it’s fair or appropriate for me to regurgitate these answers.

How Do You Install It?

Installing Hadoop is not quite as easy as installing Cassandra. There are two flavors to choose from:

Cloudera flavors

Homemade flavors

  • If you want to run Hadoop natively on your local machine, you can go through the single node setup from the Apache Hadoop documentation.
  • Hadoop The Definitive Guide also has info on how to get started.
  • Windows users, keep in mind that Hadoop is not supported on Windows in production. If you want to try it, you’re completely on your own and in uncharted waters.

What Login Security Model(s) Does It Have?

Good question! NoSQL databases are not renowned for their data security.

Back in the beginning, at the dawn of Hadoopery, it was assumed that everyone accessing the system would be a trusted individual operating inside a secured environment. Clearly this happy ideal won’t fly in a modern, litigation fueled world. As a result, Hadoop has support for a Kerberos authentication (now meaning all versions of Hadoop newer than version 0.22 for the Apache Hadoop distribution, Cloudera’s CDH3 distribution, or the 0.20.S Yahoo! Distribution of Hadoop). The Kerberos piece handles the proper authentication, but it is still up to Hadoop and HDFS(it’s a distributed file system, remember?) to make sure that an authenticated user is authorized to mess around with a file.

In short: Kerberos guarantees that I’m Jeremiah Peschka and Hadoop+HDFS guarantee that I can muck with data.

N.B. As of 2010-10-19 (when I’m writing this), Hadoop 0.22 is not available for general use. If you want to run Hadoop 0.22, you’ll have to build from the source control repository yourself. Good luck and godspeed.

When Does It Make Sense To Use Hadoop Instead of SQL Server, Oracle, or DB2?

Or any RDBMS for that matter. Personally, I think an example makes this easier to understand.

Let’s say that we have 1 terabyte of data (the average Hadoop cluster is reported as being 114.5TB in size) and we need to process this data nightly – create reports, analyze trends, detect anomalous data, etc. If we are using an RDBMS, we’d have to batch this data up (to avoid transaction log problems). We would also need to deal with the limitations of parallelism in your OS/RDBMS combination, as well as I/O subsystem limitations (we can only read so much data at one time). SQL dialects are remarkably bad languages for loop and flow control.

If we were using Hadoop for our batch operations, we’d write a MapReduce program (think of it like a query, for now). This MapReduce program would be distributed across all of the nodes in your cluster and then run in parallel using local storage and resources. So instead of hitting a single SAN across 8 or 16 parallel execution threads, we might have 20 commodity servers all processing 1/20 of the data simultaneously. Each server is going to process 50GB. The results will be combined once the job is done and then we’re free to do whatever we want with it – if we’re using SQL Server to store the data in tables for report, we would probably bcp the data into a table.

Another benefit of Hadoop is that the MapReduce functionality is implemented in Java, C, or another imperative programming language. This makes it easy to solve computationally intensive operations in MapReduce programs. There are a large number of programming problems that cannot be easily solved in a SQL dialect; SQL is designed for retrieving data and performing relatively simple transformations on it, not for complex programming tasks.

Hadoop (Hadoop common + HDFS + MapReduce) is great for batch processing. If you need to consume massive amounts of data, it’s the right tool for the job. Hadoop is being used in production for point of sale analysis, fraud detection, machine learning, risk analysis, and fault/failure prediction.

The other consideration is cost: Hadoop is deployed on commodity servers using local disks and no software licensing fees (Linux is free, Hadoop is free). The same thing with an RDBMS is going to involve buying an expensive SAN, a high end server with a lot of RAM, and paying licensing fees to Microsoft, Oracle, or IBM as well as any other vendors involved. In the end, it can cost 10 times less to store 1TB of data in Hadoop than in an RDBMS.

HBase provides the random realtime read/write that you might be used to from the OLTP world. The difference, however, is that this is still NoSQL data access. Tables are large and irregular (much like Google’s Big Table) and there are no complex transactions.

Hive is a data warehouse toolset that sits on top of Hadoop. It supports many of the features that you’re used to seeing in SQL, including a SQL-ish query language that should be easily learned but also provides support for using MapReduce functionality in ad hoc queries.

In short, Hadoop and SQL Server solve different sets of problems. If you need transactional support, complex rule validation and data integrity, use an RDBMS. If you need to process data in parallel, perform batch and ad hoc analysis, or perform computationally expensive transformations then you should look into Hadoop.

What Tools Are Provided to Manage Hadoop?

Unfortunately, there are not a lot of management tools on the market for Hadoop – the only tools I found were supplied by Cloudera. APIs are available to develop your own management tools. From the sound of discussions that I overheard, I think a lot of developers are going down this route – they’re developing monitoring tools that meet their own, internal, needs rather than build general purpose tools that they could sell to third parties. As the core product improves, I’m sure that more vendors will be stepping up to the plate to provide additional tools and support for Hadoop.

Right now, there are a few products on the market that support Hadoop:

  • Quest’s Toad for Cloud makes it easy to query data stored using HBase and Hive.
  • Quest’s OraOop is an Oracle database driver for Sqoop – you can think of Sqoop as Hadoop’s equivalent of SQL Server’s bcp program
  • Karmasphere Studio is a tool for writing, debugging, and watching MapReduce jobs.

What Is The State of Write Consistency?

As I mentioned earlier, there is no support for ACID transactions. On the most basic level, Hadoop processes data in bulk from files stored in HDFS. When writes fail, they’re retried until the minimum guaranteed number of replicas is written. This is a built-in part of HDFS, you get this consistency for free just by using Hadoop.

As far as eventual consistency is concerned, Hadoop uses an interesting method to ensure that data is quickly and effectively written to disk. Basically, HDFS finds a place on disk to write the data. Once that write has completed, the data is forwarded to another node in the cluster. If that node fails to write the data, a new node is picked and the data is written until the minimum number of replicas have had data written to them. There are additional routines in place that will attempt to spread the data throughout the data center.

If this seems like an overly simple explanation of how HDFS and Hadoop write data, you’re right. The specifics of write consistency can be found in the HDFS Architecture Guide (which is nowhere near as scary as it sounds).

I’m Going to Hadoop World

Sounds like I’m bragging, right? There is a free book involved. Or maybe you don’t care because it’s all NoSQLs and stuff and you’re a SQL Server DBA. And that’s where we differ.

When I first heard about NoSQL databases, I had the same reaction that a lot of people are having right now: disbelief and mockery. I remember making fun of MySQL when I first ran into it. It was such an odd database: it didn’t have foreign keys, joins didn’t work, it sometimes ate all of your data, and writes put locks on tables. It was no comparison for SQL Server, Oracle, or PostgreSQL. When I hit publish, this blog post is going to get dumped into a MySQL database on my server. I’ll probably hit up slashdot or read some blogs. That’s going to hit MySQL as well. These days, I try to keep a passing familiarity with MySQL because you never know when you’re going to need to use such a cheap, powerful, tool.

I see a filthy commie on the other side of this intertubes

This is what I think about NoSQL databases; especially Hadoop.

So, I’m going to Hadoop World and I’m going to learn as much as I can about Hadoop and the technology that surrounds it. The fun part is that I’m going to take some notes. And I’m going to share those notes.

That’s all well and good. As Senator Joe McCarthy used to say, “sharing is caring and caring is for commies.”

So, instead of just going and learning about what is interesting to me, I want to know what kind of questions DBAs have about this new-fangled Hadoop thing. I want to find the answers to the questions that people have about Hadoop, its place in the enterprise, and how it may or may not change the DBA’s job.

So, sound off in the comments. If you’ve got a question, type in the box and hit send. I’ll give you props when I post my write up of Hadoop World and I’ll do my best to find the answer.

I owe this entire idea to Andy Warren (blog | twitter). He mentioned that he was curious about Hadoop and that sparked this blog post. Here are his questions:

  • How do you install it?
  • What login security model(s) does it have
  • When does it make sense to use Hadoop vs SQL Server?
  • What tools are provided to manage it?

Cassandra Tutorial – Installing Cassandra

I was feeling a little bit crazy the other day and I installed Cassandra on my home computer. Twice. Just because. It wasn’t really a “just because” moment. I installed Cassandra twice so I could get the hang of it in case anyone asked me how they could go ahead and get started. The process was relatively painless, but there are some prerequisites that you need to have installed before you can get started. I thought that it would be good to create this short Cassandra installation tutorial to help you get started.

This video walks through everything you need to do get Cassandra installed and running on your computer. Apart from installing the OS. If you don’t have an OS, I don’t know how you’re reading this, probably witchcraft.

[media id=3 width=800 height=600]

This site is protected with Urban Giraffe's plugin 'HTML Purified' and Edward Z. Yang's Powered by HTML Purifier. 531 items have been purified.