Tag riak

Introduction to Riak … TONIGHT!

I’ll be speaking at the Columbus Ruby Brigade and giving an introduction to Riak tonight at 6:30PM!

There will be pizza and soda and Ruby and me. You can even stick around afterwards while we all go next door for drinks (you can buy my Diet Cokes all night if you really like the presentation).

Riak: An Overview

This presentation will lead you through an overview of Riak: a flexible, decentralized key-value store. Riak was designed to provide a friendly HTTP/JSON interface and provide a database that’s well suited for reliable web applications.

Add it to your calendar!

Introduction to Riak – Next Monday

I’ll be speaking at the Columbus Ruby Brigade and giving an introduction to Riak next Monday, February 21, at 6:30PM.

Riak: An Overview

This presentation will lead you through an overview of Riak: a flexible, decentralized key-value store. Riak was designed to provide a friendly HTTP/JSON interface and provide a database that’s well suited for reliable web applications.

Add it to your calendar!

Introduction to Riak at Columbus Ruby Brigade

I’ll be speaking at the Columbus Ruby Brigade and giving an introduction to Riak on February 21 at 6:30PM.

Riak: An Overview

This presentation will lead you through an overview of Riak: a flexible, decentralized key-value store. Riak was designed to provide a friendly HTTP/JSON interface and provide a database that’s well suited for reliable web applications.

Add it to your calendar!

Querying Riak – Key Filters and MapReduce

A while back we talked about getting faster writes with Riak. Since then, I’ve been quiet on the Riak front. Let’s take a look at how we can get data out of Riak, especially since I went to great pains to throw all of that data into Riak as fast as my little laptop could manage.

Key filtering is a new feature in Riak that makes it much easier to restrict queries to a subset of the data. Prior to Riak 0.13, it was necessary to write MapReduce jobs that would scan through all of the keys in a bucket. The problem is that the MapReduce jobs end up loading both the key and the value into memory. If we have a lot of data, this can cause a huge performance hit. Instead of loading all of the data key filtering lets us look at the keys themselves. We’re pre-processing the data before we get to our actually query. This is good because 1) software should do as little as possible and 2) Riak doesn’t have secondary indexing to make querying faster.

Here’s how it works: Riak holds all keys in memory, but the data remains on disk. The key filtering code scans the keys in memory on the nodes in our cluster. If any keys match our criteria, Riak will pass them along to any map phases that are waiting down the pipe. I’ve written the sample code in Ruby but this functionality is available through any client.

The Code

We’re using data loaded with load_animal_data.rb. The test script itself can be found in mr_filter.rb. Once again, we’re using the taxoboxes data set.

The Results

             user     system      total        real
mr       0.060000   0.030000   0.090000 ( 20.580278)
filter   0.000000   0.000000   0.000000 (  0.797387)

MapReduce

First, the MapReduce query:

{"inputs":"animals",
 "query":[{"map":{"language":"javascript",
                  "keep":false,
                  "source":"function(o) { if (o.key.indexOf('spider') != -1) return [1]; else return []; }"}},
          {"reduce":{"language":"javascript",
                     "keep":true,
                     "name":"Riak.reduceSum"}}]}

We’re going to iterate over every key value pair in the animals bucket and look for a key that contains the word ‘spider’. Once we find that key, we’re going to return a single element array containing the number 1. Once the map phase is done, we use the built-in function Riak.reduceSum to give us a sum of the values from the previous map phase. We’re generating a count of the records that match our data – how many spiders do we really have?

Key Filtering

The key filtering query doesn’t look that much different:

{"inputs":{"bucket":"animals",
           "key_filters":[["matches","spider"]]},
 "query":[{"map":{"language":"javascript",
                  "keep":false,
                  "source":"function(o) { return [1]; }"}},
          {"reduce":{"language":"javascript",
                     "keep":true,
                     "name":"Riak.reduceSum"}}]}

It’s not that much different – the map query has been greatly simplified to just return [1] on success and the search criteria has been moved into the inputs portion of the query. The big difference is in the performance: the key filter query is 26 times faster.

This is a simple example, but a 26x improvement is nothing to scoff at. What it really means is that the rest of our MapReduce needs to work on a smaller subset of the data which, ultimately, makes things faster for us.

A Different Way to Model Data

Now that we have our querying basics out of the way, let’s look at this problem from a different perspective; let’s say we’re tracking stock performance over time. In a relational database we might have a number of tables, notably a table to track stocks and a table to track daily_trade_volume. Theoretically, we could do the same thing in Riak with some success, but it would incur a lot of overhead. Instead we can use a natural key to locate our data. Depending on how we want to store the data, this could look something like YYYY-MM-DD-ticker_symbol. I’ve created a script to load data from stock exchange data. For my tests, I only loaded the data for stocks that began with Q. There’s an a lot of data in this data set, so I kept things to a minimum in order to make this quick.

Since our data also contains the stock exchange identifier, we could even go one step further and include the exchange in our key. That would be helpful if we were querying based on the exchange.

If you take a look at [mr_stocks.rb][8] you’ll see that we’re setting up a query to filter stocks by the symbol QTM and then aggregate the total trade volume by month. The map phase creates a single cell array with the stock volume traded in the month and returns it. We use the Riak.mapValuesJson function to map the raw data coming in from Riak to a proper JavaScript object. We then get the month that we’re looking at by parsing the key. This is easy enough to do because we have a well-defined key format.

function(o, keyData, arg) {
  var data = Riak.mapValuesJson(o)[0];
  var month = o.key.split('-').slice(0,2).join('-');
  var obj = {};
  obj[month] = data.stock_volume;
  return [ obj ];
}

If we were to look at this output we would see a lot of rows of unaggregated data. While that is interesting, we want to look at trending for stock trades for QTM over all time. To do this we create a reduce function that will sum up the output of the map function. This is some pretty self explanatory JavaScript:

function(values, arg) {
  return [ values.reduce(function(acc, item) {
               for (var month in item) {
                 if (acc[month]) { acc[month] += parseInt(item[month]); }
                 else { acc[month] = parseInt(item[month]); }
               }

               return acc;
             })
  ] 
}

Okay, so that might not actually be as self-explanatory as anyone would like. The JavaScript reduce method is a newer one. It will accumulate a single result (the acc variable) for all elements in the array. You could use this to get a sum, an average, or whatever you want.

One other thing to note is that we use parseInt. We probably don’t have to use it, but it’s a good idea. Why? Riak is not aware of our data structures. We just store arrays of bytes in Riak – it could be a picture, it could be text, it could be a gzipped file – Riak doesn’t care. JavaScript only knows that it’s a string. So, when we want to do mathematical operations on our data, it’s probably wise to use parseInt and parseFloat.

Where to Now?

Right now you probably have a lot of data loaded. You have a couple of options. There are two scripts on github to remove the stock data and the animal data from your Riak cluster. That’s a pretty boring option. What can you learn from deleting your data and shutting down your Riak cluster? Not a whole lot.

You should open up mr_stocks.rb and take a look at how it works. It should be pretty easy to modify the map and reduce functions to output total trade volume for the month, average volume per day, and average price per day. Give it a shot and see what you come up with.

If you have questions or run into problems, you can hit up the comments, the Riak Developer Mailing List, or hit up the #riak IRC room on irc.freenode.net if you need immediate, real time help with your problem.

Data Durability

A friend of mine half-jokingly says that the only reason to put data into a database is to get it back out again. In order to get data out, we need to ensure some kind of durability.

Relational databases offer single server durability through write-ahead logging and checkpoint mechanisms. These are tried and true methods of writing data to a replay log on disk as well as caching writes in memory. Whenever a checkpoint occurs, dirty data is flushed to disk. The benefit of a write ahead log is that we can always recover from a crash (so long as we have the log files, of course).

How does single server durability work with non-relational databases? Most of them don’t have write-ahead logging.

MongoDB currently has limited single server durability. While some people consider this a weakness, it has some strengths – writes complete very quickly since there is no write-ahead log that needs to immediately sync to disk. MongoDB also has the ability to create replica sets for increased durability. There is one obvious upside to replica sets – the data is in multiple places. Another advantage of replica sets is that it’s possible to use getLastError({w:...}) to request acknowledgement from multiple replica servers before a write is reported as complete to a client. Just keep in mind that getLastError is not used by default – application code will have to call the method to force the sync.

Setting a w-value for writes is something that was mentioned in Getting Faster Writes with Riak. Although, in that article we were decreasing durability to increase write performance. In Amazon Dynamo inspired systems writes are not considered complete until multiple clients have responded. The advantage is that durable replication is enforced at the database and clients have to elect to use less security for the data. Refer to the Cassandra documentation on Writes and Consistency or the Riak Replication documentation for more information on how Dynamo inspired replication works. Datastores using HDFS for storage can take advantage of HDFS’s built-in data replication.

Even HBase, a column-oriented database, uses HDFS to handle data replication. The trick is that rows may be chopped up based on columns and split into regions. Those regions are then distributed around the cluster on what are called region servers. HBase is designed for real-time read/write random-access. If we’re trying to get real-time reads and writes, we can’t expect HBase to immediately sync files to disk – there’s a commit log (RDBMS people will know this as a write-ahead log). Essentially, when a write comes in from a client, the write is first written to the commit log (which is stored using HDFS), then it’s written in memory and when the in-memory structure fills up, that structure is flushed to the filesystem. Here’s something cunning: since the commit log is being written to HDFS, it’s available in multiple places in the cluster at the same time. If one of the region servers goes down it’s easy enough to recover from – that region server’s commit log is split apart and distributed to other region servers which then take up the load of the failed region server.

There are plenty of HBase details that have been grossly oversimplified or blatantly ignored here for the sake of brevity. Additional details can be found in HBase Architecture 101 – Storage as well as this Advanced HBase presentation. As HBase is inspired by Google’s big table, additional information can be found in Chang et al. Bigtable: A distributed storage system for structured data and The Google File System.

Interestingly enough, there is a proposed feature for PostgreSQL 9.1 to add synchronous replication to PostgreSQL. Current replication in PostgreSQL is more like asynchronous database mirroring in SQL Server, or the default replica set write scenario with MongoDB. Synchronous replication makes it possible to ensure that data is being written to every node in the RDBMS cluster. Robert Haas discusses some of the pros and cons of replication in PostgreSQL in his post What Kind of Replication Do You Need?.

Microsoft’s Azure environment also has redundancy built in. Much like Hadoop, the redundancy and durability is baked into Azure at the filesystem. Building the redundancy at such a low level makes it easy for every component of the Azure environment to use it to achieve higher availability and durability. The Windows Azure Storage team have put together an excellent overview. Needless to say, Microsoft have implemented a very robust storage architecture for the Azure platform – binary data is split into chunks and spread across multiple servers. Each of those chunks is replicated so that there are three copies of the data at any given time. Future features will allow for data to be seamlessly geographically replicated.

Even SQL Azure, Microsoft’s cloud based relational database, takes advantage of this replication. In SQL Azure when a row is written in the database, the write occurs on three servers simultaneously. Clients don’t even see an operation as having committed until the filesystem has responded from all three locations. Automatic replication is designed into the framework. This prevents the loss of a single server, rack, or rack container from taking down a large number of customers. And, just like in other distributed systems, when a single node goes down, the load and data are moved to other nodes. For a local database, this kind of durability is typically only obtained using a combination of SAN technology, database replication, and database mirroring.

There is a lot of solid technology backing the Azure platform, but I suspect that part of Microsoft’s ultimate goal is to hide the complexity of configuring data durability from the user. It’s foreseeable that future upgrades will make it possible to dial up or down durability for storage.

While relational databases are finding more ways to spread load out and retain consistency, there are changes in store for MongoDB to improve single server durability. MongoDB has been highly criticized for its lack of single server durability. Until recently, the default response has been that you should take frequent backups and write to multiple replicas. This is still a good idea, but it’s promising to see that the MongoDB development team are addressing single server durability concerns.

Why is single server durability important for any database? Aside from guaranteeing that data is correct in the instance of a crash, it also makes it easier to increase adoption of a database at the department level. A durable single database server makes it easy to build an application on your desktop, deploy it to the server under your desk, and move it into the corporate data center as the application gains importance.

Logging and replication are critical technologies for databases. They guarantee data is durable and available. There are also just as many options as there are databases on the market. It’s important to understand the requirements of your application before choosing mechanisms to ensure durability and consistency across multiple servers.

References

Goals for 2011 – Early Update

It’s a bit early to be updating my goals for 2011, but I’m really excited about this one. Over the course of last week, I wrote an article about loading data into Riak. I had a brief conversation with Mark Phillips (blog | twitter) about adding some of the code to the Riak function contrib.

This is where a sane person would say “Yeah, sure Mark, do whatever you want with my code.” Instead I said something like “I’d be happy to share. How about I make a generic tool?” About 40 minutes later I had a working chunk of code. 30 minutes after that I had refactored the code into a driver and a library. I wrote up some documentation and sent everything off to be included in the main Riak function contrib repository. A couple of days and a documentation correction later and you can now see my first code contribution to the open source world on the internet: Importing YAML.

While I’m really excited about this, and it’s very important to me, there’s more to take away from this than just “Yay, I did something!” We’re all able to give something back to our community. In this case I took code I had written to perform benchmarks and extracted a useful piece of demonstration code from it. Share your knowledge with the world around you – it’s how we get smarter.

Getting Faster Writes with Riak

While preparing for an upcoming presentation about Riak for the Columbus Ruby Brigade, I wrote a simple data loader. When I initially ran the load, it took about 4 minutes to load the data on the worst run. When you’re waiting to test your data load and write a presentation, 4 minutes is an eternity. Needless to say, I got frustrated pretty quickly with the speed of my data loader, so I hit up the Riak channel on IRC and started digging into the Ruby driver’s source code.

The Results

              user     system      total        real
defaults 63.660000   3.270000  66.930000 (166.475535)
dw => 1  50.940000   2.720000  53.660000 (128.470094)
dw => 0  52.350000   2.740000  55.090000 (120.151827)
n => 2   52.850000   2.790000  55.640000 (132.023310)

The Defaults

Our default load uses no customizations. Riak is going to write data to three nodes in the cluster (n = 3). Since we’re using the default configuration, we can safely assume that Riak will use quorum for write confirmation (w = n/2 + 1). Finally, we can also assume that the durable write value is to use a quorum, since that’s the default for riak-client.

Because we’re writing to n (3) nodes and we’re waiting for w (2) nodes to respond, writes were slower than I’d like. Thankfully, Riak makes it easy to tune how it will respond to writes.

Changing the N Value

The first change that we can do is change the N value (replication factor). The N value should have a huge improvement for my test machine – Riak is only on one of my hard drives. Even solid state drives can only write to one place at a time. When we create the bucket we can change the bucket’s properties and set the N value. note It’s important that you set bucket properties when you ‘create’ the bucket. Buckets are created when keys are added to them and they are deleted when the last key is deleted.

b1 = client.bucket('animals_dw1',
                   :keys => false)
b1.props = { :n_val => 1, :dw => 1 }

In this chunk of code we set the N value to 1 and set the durable writes to 1. This means that only 1 replica will have to commit the record to durable storage in order for the write to be considered a success.

On the bright side, this approach is considerably faster. Here’s the bummer: by setting the N value to 1, we’ve removed any hope of durability from our cluster – the data will never be replicated. Any server failure will result in data loss. For our testing purposes, it’s okay because we’re trying to see how fast we can make things, not how safe we can make them.

How much faster? Our run with all defaults enabled took 166 seconds. Only writing to 1 replica shaved 38 seconds off of our write time. The other thing that I changed was setting returnbody to false. By default, the Ruby Riak client will return the object that was saved. Turning this setting off should make things faster – less bytes are flying around the network.

Forget About Durability

What happens when we turn down durability? That’s the dw => 0 result in the table at the beginning of the article. We get an 8 second performance boost over our last load.

What did we change? We set both the dw and w parameters to 0. This means that our client has told Riak that we’re not going to wait for a response from any replicas before decided that a write has succeeded. This is a fire and forget write – we’re passing data as quickly as possible to the client and to hell with the consequences.

So, by eliminating any redundancy, ignoring the current record from the database, and refusing to acknowledge any reads from the server, we’re able to get a 46.3 second performance improvement over our default values. This is impressive, but it’s roughly akin to throwing our data at a bucket, not into the bucket.

What if I Care About My Data?

What if you care about your data? After all, we got our performance improvement from setting the number of replicas to 1 and turning off write acknowledgement. The fourth, and final run, that I performed took a look at what would happen if we kept the number of replicas at a quorum (an N value of 2) and ignored write responses. If we’re just streaming data into a database, we may not care if a record gets missed here and there. It turns out that this is only slightly slower than running with scissors. It takes 132 seconds to write the data; only 4 seconds slower than with durable writes set to 1 and still nearly 34.5 seconds faster than using the defaults.

The most recent version of this sample code can be found on github at https://github.com/peschkaj/riak_intro. Sample data was located through Infochimps.com. The data load uses the Taxobox data set.

Secondary Indexes – How Would You Do It (In Riak)?

Every database has secondary indexes, right? Not quite. It turns out that some databases don’t support them. Secondary indexes are important because they make it possible to perform more, quick, queries on a given chunk of data. What if we want to add secondary indexes to a database, how would we go about doing it?

Looking at queries, there are a few basic ways that we actually query data:

  • Equality predicates
  • Inequality predicates
  • Multi-predicate queries

We’re going to be using Riak as our example database

Equality Predicates

The easiest type of query is an equality predicate. With an equality predicate, we’re just matching any given search term. Looking at an example, what if we want to find every user born on July 4, 1976?

function(value, keyData, arg){
  var data = Riak.mapValuesJson(value)[0];

  if (data.Birthdate == '1976-07-04') {
    return [ data ];
  }
}

With an RDBMS, we’d just create an index on the table to support our queries. But with a key-value store, we don’t have that ability. How do we do it?

To create an index in Riak, we’d create another bucket, users_by_birthdate. The keys for this bucket would be the value of the birthdate. We could use links (this will fall apart pretty rapidly above about 1000 links) or store a list of keys in the users bucket. To satisfy our MapReduce query we can use the users_by_birthdate bucket to get the IDs of our users for retrieval rather than scanning the entire bucket. Depending on the amount of data we have, being able to use equality predicates could reduce the number of reads we have to perform.

Multi-Predicate Equality Searches

I’m going to skip inequality predicates for a second and talk about multi-predicate equality searches. This is easy to accomplish. If we’re searching for everyone with the last name of ‘Smith’ who was born on July 4th, 1976 we might use the following MapReduce function:

function(value, keyData, arg){
  var data = Riak.mapValuesJson(value)[0];
  if (data.Birthdate == '1976-07-04' && data.LastName = 'Smith') {
    return [ data ];
  }
}

With a relational database, we could create a multi-column index. That isn’t possible here – with Riak we only have a bucket name (the index name), a key (the key), a value.

Two Equality Buckets

One option would be to use two equality buckets and then perform a bitwise comparison of the results, only keeping user keys that exist in both buckets. In this case, we would have two buckets users_by_birthdate and users_by_lastname. The problem is that we might have large data skew on one of these key-value combinations. So, if there are only 1,000 users born on July 4th, 1976 and 1,000,000 Smiths in our database, this could cause some inefficiencies in our queries when we go to merge the two result sets. Worse, there could be 1,000,000 users that match our search conditions in each bucket, but only 10 in common.

Using a bitwise combination of two buckets is, from a development standpoint, the easiest way to implement multi-predicate comparison. However, this approach creates problems with any significant volume of data.

MapReduce Over An Index Bucket

What if instead of using two or more buckets and performing a bitwise comparison of the keys that we find, instead we create a single index bucket. Inside that bucket, we’ll still use one value as the key for each key-value pair. This would look like the users_by_birthdate bucket – every key-value pair has the birthdate as the key.

Each key contains a list of values. Unlike our first example we’re going to store more than a simple list of keys. Instead we’re going to store a list of lists of tuples – a multi-dimensional data structure. If we’re using the birthdate, last name example, it might look like this:

1976-07-04    [
                [176, {last_name: Smith}],
                [195, {last_name: Harrison}]
              ]

What does this buy us? Well, for starters we don’t have to perform bitwise comparisons between two or more potentially massive lists of keys. Instead we can examine the MapReduce query that’s coming in and re-write it on the fly to give us a MapReduce job over our new index. The upside of this index is that it can perform double duty; we can search for users by birthdate or by birthdate and last name.

One of the things that we need to consider when we’re designing our key-value database is the queries that we’re going to be running, not the way that the data should be structured. When we’re structuring our data, it may make more sense to use a complex data structure, like this index bucket, instead of many single column indexes. Updates to the data will require fewer updates with this model. Only one write is needed when a user signs up, not two; one for the compound index as opposed to two for the simple indexes – one for users_by_birthdate and one for users_by_lastname.

Once we start updating data using an index bucket can reduce index maintenance. When we update a record we’ll need to update any index buckets. This could lead to a large number of data writes, tombstone writes, and compactions over time. Using a single index bucket with more complex structure in the index removes the need for a large number of writes. We only have to write to the original key-value pair as well as writing to the index bucket.

Inequality Predicates

Inequality predicates are much trickier than equality predicates. At first glance it seems like it would be easy to perform a MapReduce over the keys and determine which key-value pairs meet the search conditions. And we would be right: that is an easy solution when there are a small number of keys. What happens when there are a large number of keys? We need to read more data to get to the results we need. Digging further, what happens when very little of our data actually matches? The benefits of using this approach become very slim; we need to perform a large number of reads to retrieve very little data.

There’s a trick to implementing inequality predicates: in some cases it will be faster to retrieve all keys that match our inequality predicate – birthdate between 1976-07-01 and 1976-07-31. In other cases, a scan is going to be faster – users with a last name starting with ‘A’.

How do we figure out which approach is going to be faster? Let’s take a look at a concrete example using users and birthdates for our example. We already know how to find every user with a specific birthdate using an index; that’s a simple seek. How do we find all users whose birthday falls between a range of dates? That’s what we’ve been asking all along, right?

What if we maintain some simple information about our users_by_birthdate index? Let’s say we track the minimum value, the maximum value, and the number of keys in the bucket. Just from this information we can guess roughly how many keys will be in any given range. This is pretty easy with dates; we can assume that there’s a an even distribution of birthdates in any range of dates.

Things get trickier with names. How many distinct last names are there between Aaronson and Bartleby? What about between Xara and Yoder? Knowing the min and max last names and the number of keys won’t help us figure out the distribution; the data isn’t uniformly distributed. We need a way to make educated guesses about the data in a range. One way to do this is to create a histogram of our data. Histograms are data samplings that let us make better guesses about the number of specific values we can expect to find in a given range.

Relational databases accomplish this by maintaining separate statistics tables. We can accomplish something similar in Riak by creating a new bucket of statistics where the stats key is the high value of a range of keys. The value of the key-value pair would be data about the range of values – number of rows, number of keys, and the average number of distinct values per key. We want to collect statistics so we can easily determine when we need to seek a small number of keys and when we need to perform a MapReduce over the bucket.

So far I’ve proposed two range search approaches that work well with different use cases. Ideally we could create an indexing mechanism in Riak that lets us state an assumption about the distribution of our data in Riak. If we know there’s a constant distribution of keys, we can skip any kind of heuristics and just assume that for July, we’ll be reading 31 keys. Likewise, if we want to pull back every user whose last name is between Scalia and Smith, then we’ll need to MapReduce over the index bucket to get the data that we need.

Creating Indexes – Simple Indexes

Simple, single column indexes are easy to create. The syntax to create an index with SQL is pretty straightforward:

CREATE [ UNIQUE ] INDEX [ CONCURRENTLY ] [ name ] ON table [ USING method ]
    ( { column | ( expression ) } [ opclass ] [ ASC | DESC ] [ NULLS { FIRST | LAST } ] [, ...] )
    [ WITH ( storage_parameter = value [, ... ] ) ]
    [ TABLESPACE tablespace ]
    [ WHERE predicate ]

With Riak, we won’t need that much information about the index. Working from the simplest use case possible, we only need

  • index name
  • bucket
  • value name to index
  • data storage mechanism

We need to uniquely name our index, otherwise we can’t create a new bucket for it. Likewise, we have to know which bucket we’re indexing. It also makes sense that we want to know which value we’ll be indexing.

We need to define the data storage mechanism for indexes so we are able to index the data. It’s for the same reason that we have to tell Riak Search how the data is stored; unless we know what we’re indexing, it’s impossible to index it.

Looking at riak_search, it should be possible to re-use the basic principles behind riak_search_kv_extractor and riak_search_kv_hook to define how indexes are created and how data is retrieved. When we create a full-text index with Riak search, we supply the datatype (JSON, XML, plain-text, or something else). This gives us an incredible amount of flexibility without having to define a schema for the documents. We’re only indexing the fields we want. This can be incredibly useful if we’re storing larger objects but we know we’ll only perform look ups on a few fields – indexing time and space is reduced drastically.

Re-using the full-text index model for secondary indexes makes it easy to maintain consistent indexing behavior throughout Riak – all of our indexes (full-text or otherwise) can be defined using the same commands.

Creating Indexes – Complex Indexes

Complex indexes shouldn’t be much more difficult to implement than simple indexes. We only need one additional piece of information to collect our index: additional fields to index. Ideally the additional fields would be a text representation of how we access the fields in a program – “parent.child.property”. Supporting XML could make this trickier, but it should be easy to use XQuery/XPath to determine which XML elements, attributes, and values should be indexed. The intelligence of knowing how to perform the query will come later; indexes don’t need to know how to query themselves.

Staying up to Date

Right now, this index will only be accurate once: when we create it. Once again, we can borrow from the Riak Search implementation and create hooks to make sure the index is kept up to date as data is written. Keeping our indexes in sync with the data will cause some overhead on the cluster, but it’s worth it to know that we can accurately retrieve data. Like any other database, we need to take care not to create too many indexes: every write to the bucket means we end up writing to each index.

Ideally the index maintenance hooks would be the last hooks to run in a sequence of pre-commit hooks. If there are a number of pre-commit hooks to validate data we need to make sure that the data is valid before it is written to an index. If our primary worry is data validity, it is possible to implement the index maintenance code as a post-commit hook in Erlang (rather than as a pre-commit hook in either Erlang or JavaScript).

The Trouble with Data Locality

One potential problem we can run into is data locality. Depending on how the data is distributed in the key space, we could end up writing key-value and index data on the same physical node. The problem is that we could double, or more, the physical I/O that we’re performing on those nodes. If an index is not on the same node as the data we might incur tremendous network overhead querying remote indexes before querying data on the local node. On a busy network, this increased bandwidth could have an adverse effect on operations.

Driving Toward Our Goal

We’re clearly heading toward something. Let’s take stock of what we’ve talked about so far:

There are two types of search predicate to support

  • Equality predicates – easy to index
  • Inequality predicates – trickier to index

There several ways to use indexes

  • A single index
  • Multiple indexes with a bitwise comparison
  • Complex indexes

We can read from indexes through

  • Key seeks
  • Key scans (MapReduce)
  • Key/value scans (MapReduce)

We can figure out what type of read to use by using data statistics

  • Uniformly distributed keys are easy
  • Randomly distributed keys are harder
  • A small number of seeks can be more efficient than a single MapReduce

Astute readers will have probably guess what we need: a query engine.

This Ain’t No Car, It’s a Key-Value Store!

Why do we need a query engine? Key-value stores are simple; data is stored in key-value pairs and retrieved by a simple key lookup. A query engine of some type just adds complexity, right? Well, without a query engine, we can’t use secondary indexes without explicitly writing queries to use them.

In a hypothetical Riak query engine, we could process a MapReduce query like this:

Parse map phases and extract search conditions

Examine existing indexes

  1. Do indexes exist?
  2. Examine index type – evaluate statistics if needed
  3. Evaluate data access path needed to retrieve data – this is where we determine if we can use one index, multiple indexes with a bitwise compare, or use a multi-column index
  4. If no indexes exist, does a full-text index exist?

Re-write MapReduce functions on-the-fly

  1. Early Map phases are added to query use indexes
  2. Early Map phase output is used for key selection in MapReduce functions

While Riak doesn’t need anything as complex as an RDBMS’s query engine, this is still a gross simplification of how a query engine would function – additional functionality is needed to perform costing estimates of seeks vs MapReduce scans, statistics will need analyzed, and the query engine will need to be aware of vast portions of Riak in order to make good guesses so it can re-write MapReduce queries on the fly.

Features, Features, Features

I left some topics out of this discussion intentionally. They don’t directly apply to implementing indexes, but they would be very helpful to have down the road.

  • Checking updated values to avoid needless tombstones. When a client updates a key with the same value it had before, there’s no reason to overwrite the stored value: nothing changed.
  • Saving query plans. We are re-writing MapReduce queries on the fly, why not save them off internally and opt to store them for re-use when the cluster comes online?
  • Missing indexes. Frequently used queries should be optimized, either with secondary indexes for faster querying or as standalone entities in Riak. THe easiest way to monitor for this is to store metadata about the query history.
  • Index use data. How often are our indexes being used? Do they meet our needs or should we replace them with different indexes?
  • Index only queries. If a MapReduce can be satisfied by the data in the index, don’t query the main bucket.
  • Running statistics. Riak is a long running process. Over time we will be able to collect a lot of data about Riak. Why not collect data about how our Riak is being used so we can tune our code and hardware to get better performance?

Summing it all Up

Indexing is a tricky thing to think about and an even trickier thing to get right. When I first started on this as a thought experiment, I thought that it was going to be easy. Assuming that indexes are simple led to a lot of dead ends, frustration, and restarts. This came about from talking to Mark Phillips (blog | twitter) and Dan Reverri (twitter) last week at the Basho offices about Riak, HTML, and the sound of farm machinery pulling itself apart.

Sound off in the comments.

TL;DR version – indexes are hard, let’s go shopping.

New Uses for NoSQL

We all know that you can use NoSQL databases to store data. And that’s cool, right? After all, NoSQL databases can be massively distributed, are redundant, and really, really fast. But some of the things that make NoSQL database really interesting aren’t just the redundancy, performance, or their ability to use all of those old servers in the closet. Under the covers, NoSQL databases are supported by complex code that makes these features possible – things like distributed file systems.

What’s a Brackup?

Brackup is a backup tool. There are a lot of backup tools on the market, what makes this one special?

First, it’s free.

Second, it’s open source; which means it’s always going to be free.

Third, it can chunk your files – files will be crammed into chunks for faster access and distributed across your backup servers. Did you know that opening a filehandle is one of the single most expensive things you can ever do in programming?

Fourth, it supports different backends.

It Can Backup to Riak

I’ve mentioned Riak a few times around here. Quick summary: Riak is a distributed key-value database.

So?

So, this means that when you take a backup, Brackup is going to split your data into different chunks. These chunks are going to be sent to the backup location. In this case, the backup location is going to be your Riak cluster. As Brackup goes along and does its work, it sends the chunks off to Riak.

Unlike sending your data to an FTP server or Amazon S3, it’s going to get magically replicated in the background by Riak. If you lose a backup server, it’s not a big deal because Riak will have replicated that data across multiple servers in the cluster. Backing up your backups just got a lot easier.

Why Is the NoSQL Part Important?

NoSQL can be used for different things. It’s not a just a potential replacement for an RDBMS (and the beginning of another nerd holy war). Depending on the data store and your purpose, you can use a NoSQL database for a lot of different things – most notably as a distributed file system. This saves time and money since you don’t have to buy a special purpose product, you can use what’s already there.

Foursquare and MongoDB: What If

People have chimed in and talked about the Foursquare outage. The nice part about these discussions is that they’re focusing on the technical problems with the current set up and Foursquare. They’re picking it apart and looking at what is right, what went wrong, and what needs to be done differently in MongoDB to prevent problems like this in the future.

Let’s play a “what if” game. What if Foursquare wasn’t using MongoDB? What if they were using something else?

Riak

Riak is a massively scaleable key/value data store. It’s based on Amazon’s Dynamo. If you don’t want to read the paper, that just means that it uses some magic to make sure that data is evenly spread throughout the database cluster and that adding a new node makes it very easy to rebalance the cluster.

What would have happened at Foursquare if they had been using Riak?

Riak still suffers from the same performance characteristics around disk access as MongoDB – once you have to page to disk, operations become slow and throughput dries up. This is a common problem in any software that needs to access disks – disks are slow, RAM is fast.

Riak, however, has an interesting distinction. It allocates keys inside the cluster using something called a consistent hash ring – this is just a convenient way to rapidly allocate ranges of keys to different nodes within a cluster. The ring itself isn’t interesting. What’s exciting is that the ring is divided into partitions (64 by default). Every node in the system claims an equal share of those 64 partitions. In addition, each partition is replicated so there are always three copies of a give key/value pair at any time. Because there are multiple copies of the data, it’s unlikely that any single node will fail or become unbalanced. In theory, if Foursquare had used Riak it is very unlikely that we’ll run into a problem were a single node becomes full.

How would this consistent hash ring magical design choice have helped? Adding a new node causes the cluster to redistribute data in the background. The new node will claim and equal amount of space in the cluster and the data will be redistributed from the other nodes in the background. The same thing happens when a node fails, by the way. Riak also only stores keys in memory, not all of the data. So it’s possible to reference an astronomical amount before running out of RAM.

There’s no need to worry about replica sets, re-sharding, or promoting a new node to master. Once a node joins a riak cluster, it takes over its share of the load. As long as you have the network throughput on your local network (which you probably do), then this operation can be fairly quick and painless.

Cassandra

Cassandra, like Riak, is based on Amazon’s Dynamo. It’s a massively distributed data store. I suspect that if Foursquare had used Cassandra, they would have run into similar problems.

Cassandra makes use of range partitioning to distribute data within the cluster. The Foursquare database was keyed off of the user name which, in turn, saw abnormal growth because some groups of users were more active than others. Names also tend to clump around certain letters, especially when you’re limited to a Latin character set. I know a lot more Daves that I know Zachariahs, and I have a lot of friends whose names start with the letter L. This distribution causes data within the cluster to be overly allocated to one node. This would, ultimately, lead to the same problems that happened at Foursquare with their MongoDB installation.

That being said, it’s possible to use a random partitioner for the data in Cassandra. The random partitioner makes it very easy to add additional nodes and distribute data across them. The random partitioner comes with a price. It makes it impossible to do quick range slice queries in Cassandra – you can no longer say “I want to see all of the data for January 3rd, 2010 through January 8th, 2010”. Instead, you would need to build up custom indexes to support your querying and build batch processes to load the indexes. The tradeoffs between the random partitioner and the order preserving partitioner are covered very well in Dominic Williams’s article Cassandra: RandomPartitioner vs OrderPreservingPartitioner.

Careful use of the random partitioner and supporting batch operations could have prevented the outage that Foursquare saw, but this would have lead to different design challenges, some of which may have been difficult to overcome without resorting to a great deal of custom code.

HBase/HDFS/Hadoop

HBase is a distributed column-oriented database built on top of HDFS. It is based on Google’s Bigtable database as described in “Bigtable: A Distributed Storage System for Structured Data”. As an implementation of Bigtable, HBase has a number of advantages over MongoDB – write ahead logging, rich ad hoc querying, and redundancy

HBase is not going to suffer from the same node redistribution problems that caused the Foursquare outage. When it comes time to add a new node to the cluster data will be migrated in much larger chunks, one data file at a time. This makes it much easier to add a new data node and redistribute data across the network.

Just like a relational database, HBase is designed so that all data doesn’t need to reside in memory for good performance. The internal data structures are built in a way that makes it very easy to find data and return it to the client. Keys are held in memory and two levels of caching make sure that frequently used data will be in memory when a client requests it.

HBase also has the advantage of using a write ahead log. In the event of a drive failure, it is possible to recover from a backup of your HBase database and play back the log to make sure data is correct and consistent.

If all of this sounds like HBase is designed to be a replacement for an RDBMS, you would be close. HBase is a massively distributed database, just like Bigtable. As a result, data needs to be logged because there is a chance that a hardware node will fail and will need to be recovered. Because HBase is a column-oriented database, we need to be careful not to treat it exactly like a relational database, but

A Relational Database

To be honest, Foursquare’s data load would be trivial in any relational database. SQL Server, Oracle, MySQL, and PostgreSQL can all handle orders of magnitude more data than the 132GB of data that Foursquare was storing at the time of the outage. This begs the question “How we could handle the constant write load?” Foursquare is a write-intensive application.

Typically, in the relational database world, when you need to scale read and write loads we add more disks. There is a finite amount of space in a server chassis and these local disks don’t provide the redundancy necessary for data security and performance; software RAID is also CPU intensive and slow. A better solution is to purchase a dedicated storage device, either a SAN, NAS, or DAS. All of these devices offer read/write caching and can be configured with in a variety of RAID levels for performance and redundancy.

RDBMSes are known quantities – they are easy to scale to certain points. Judging by the amount of data that Foursquare reports to have, they aren’t likely to reach the point where an RDBMS can no longer scale for a very long time. The downside to this approach is that an RDBMS is much costlier per TB of storage (up to ten times more expensive) than using MongoDB, but if your business is your data, then it’s important to keep the data safe.

Conclusion

It’s difficult to say if a different database solution would have prevented the Foursquare outage. But it is a good opportunity to highlight how different data storage systems would respond in the same situation.

This site is protected with Urban Giraffe's plugin 'HTML Purified' and Edward Z. Yang's Powered by HTML Purifier. 531 items have been purified.