Category Riak

Introduction to Riak … TONIGHT!

I’ll be speaking at the Columbus Ruby Brigade and giving an introduction to Riak tonight at 6:30PM!

There will be pizza and soda and Ruby and me. You can even stick around afterwards while we all go next door for drinks (you can buy my Diet Cokes all night if you really like the presentation).

Riak: An Overview

This presentation will lead you through an overview of Riak: a flexible, decentralized key-value store. Riak was designed to provide a friendly HTTP/JSON interface and provide a database that’s well suited for reliable web applications.

Add it to your calendar!

Introduction to Riak – Next Monday

I’ll be speaking at the Columbus Ruby Brigade and giving an introduction to Riak next Monday, February 21, at 6:30PM.

Riak: An Overview

This presentation will lead you through an overview of Riak: a flexible, decentralized key-value store. Riak was designed to provide a friendly HTTP/JSON interface and provide a database that’s well suited for reliable web applications.

Add it to your calendar!

Introduction to Riak at Columbus Ruby Brigade

I’ll be speaking at the Columbus Ruby Brigade and giving an introduction to Riak on February 21 at 6:30PM.

Riak: An Overview

This presentation will lead you through an overview of Riak: a flexible, decentralized key-value store. Riak was designed to provide a friendly HTTP/JSON interface and provide a database that’s well suited for reliable web applications.

Add it to your calendar!

Querying Riak – Key Filters and MapReduce

A while back we talked about getting faster writes with Riak. Since then, I’ve been quiet on the Riak front. Let’s take a look at how we can get data out of Riak, especially since I went to great pains to throw all of that data into Riak as fast as my little laptop could manage.

Key filtering is a new feature in Riak that makes it much easier to restrict queries to a subset of the data. Prior to Riak 0.13, it was necessary to write MapReduce jobs that would scan through all of the keys in a bucket. The problem is that the MapReduce jobs end up loading both the key and the value into memory. If we have a lot of data, this can cause a huge performance hit. Instead of loading all of the data key filtering lets us look at the keys themselves. We’re pre-processing the data before we get to our actually query. This is good because 1) software should do as little as possible and 2) Riak doesn’t have secondary indexing to make querying faster.

Here’s how it works: Riak holds all keys in memory, but the data remains on disk. The key filtering code scans the keys in memory on the nodes in our cluster. If any keys match our criteria, Riak will pass them along to any map phases that are waiting down the pipe. I’ve written the sample code in Ruby but this functionality is available through any client.

The Code

We’re using data loaded with load_animal_data.rb. The test script itself can be found in mr_filter.rb. Once again, we’re using the taxoboxes data set.

The Results

             user     system      total        real
mr       0.060000   0.030000   0.090000 ( 20.580278)
filter   0.000000   0.000000   0.000000 (  0.797387)

MapReduce

First, the MapReduce query:

{"inputs":"animals",
 "query":[{"map":{"language":"javascript",
                  "keep":false,
                  "source":"function(o) { if (o.key.indexOf('spider') != -1) return [1]; else return []; }"}},
          {"reduce":{"language":"javascript",
                     "keep":true,
                     "name":"Riak.reduceSum"}}]}

We’re going to iterate over every key value pair in the animals bucket and look for a key that contains the word ‘spider’. Once we find that key, we’re going to return a single element array containing the number 1. Once the map phase is done, we use the built-in function Riak.reduceSum to give us a sum of the values from the previous map phase. We’re generating a count of the records that match our data – how many spiders do we really have?

Key Filtering

The key filtering query doesn’t look that much different:

{"inputs":{"bucket":"animals",
           "key_filters":[["matches","spider"]]},
 "query":[{"map":{"language":"javascript",
                  "keep":false,
                  "source":"function(o) { return [1]; }"}},
          {"reduce":{"language":"javascript",
                     "keep":true,
                     "name":"Riak.reduceSum"}}]}

It’s not that much different – the map query has been greatly simplified to just return [1] on success and the search criteria has been moved into the inputs portion of the query. The big difference is in the performance: the key filter query is 26 times faster.

This is a simple example, but a 26x improvement is nothing to scoff at. What it really means is that the rest of our MapReduce needs to work on a smaller subset of the data which, ultimately, makes things faster for us.

A Different Way to Model Data

Now that we have our querying basics out of the way, let’s look at this problem from a different perspective; let’s say we’re tracking stock performance over time. In a relational database we might have a number of tables, notably a table to track stocks and a table to track daily_trade_volume. Theoretically, we could do the same thing in Riak with some success, but it would incur a lot of overhead. Instead we can use a natural key to locate our data. Depending on how we want to store the data, this could look something like YYYY-MM-DD-ticker_symbol. I’ve created a script to load data from stock exchange data. For my tests, I only loaded the data for stocks that began with Q. There’s an a lot of data in this data set, so I kept things to a minimum in order to make this quick.

Since our data also contains the stock exchange identifier, we could even go one step further and include the exchange in our key. That would be helpful if we were querying based on the exchange.

If you take a look at [mr_stocks.rb][8] you’ll see that we’re setting up a query to filter stocks by the symbol QTM and then aggregate the total trade volume by month. The map phase creates a single cell array with the stock volume traded in the month and returns it. We use the Riak.mapValuesJson function to map the raw data coming in from Riak to a proper JavaScript object. We then get the month that we’re looking at by parsing the key. This is easy enough to do because we have a well-defined key format.

function(o, keyData, arg) {
  var data = Riak.mapValuesJson(o)[0];
  var month = o.key.split('-').slice(0,2).join('-');
  var obj = {};
  obj[month] = data.stock_volume;
  return [ obj ];
}

If we were to look at this output we would see a lot of rows of unaggregated data. While that is interesting, we want to look at trending for stock trades for QTM over all time. To do this we create a reduce function that will sum up the output of the map function. This is some pretty self explanatory JavaScript:

function(values, arg) {
  return [ values.reduce(function(acc, item) {
               for (var month in item) {
                 if (acc[month]) { acc[month] += parseInt(item[month]); }
                 else { acc[month] = parseInt(item[month]); }
               }

               return acc;
             })
  ] 
}

Okay, so that might not actually be as self-explanatory as anyone would like. The JavaScript reduce method is a newer one. It will accumulate a single result (the acc variable) for all elements in the array. You could use this to get a sum, an average, or whatever you want.

One other thing to note is that we use parseInt. We probably don’t have to use it, but it’s a good idea. Why? Riak is not aware of our data structures. We just store arrays of bytes in Riak – it could be a picture, it could be text, it could be a gzipped file – Riak doesn’t care. JavaScript only knows that it’s a string. So, when we want to do mathematical operations on our data, it’s probably wise to use parseInt and parseFloat.

Where to Now?

Right now you probably have a lot of data loaded. You have a couple of options. There are two scripts on github to remove the stock data and the animal data from your Riak cluster. That’s a pretty boring option. What can you learn from deleting your data and shutting down your Riak cluster? Not a whole lot.

You should open up mr_stocks.rb and take a look at how it works. It should be pretty easy to modify the map and reduce functions to output total trade volume for the month, average volume per day, and average price per day. Give it a shot and see what you come up with.

If you have questions or run into problems, you can hit up the comments, the Riak Developer Mailing List, or hit up the #riak IRC room on irc.freenode.net if you need immediate, real time help with your problem.

Goals for 2011 – Early Update

It’s a bit early to be updating my goals for 2011, but I’m really excited about this one. Over the course of last week, I wrote an article about loading data into Riak. I had a brief conversation with Mark Phillips (blog | twitter) about adding some of the code to the Riak function contrib.

This is where a sane person would say “Yeah, sure Mark, do whatever you want with my code.” Instead I said something like “I’d be happy to share. How about I make a generic tool?” About 40 minutes later I had a working chunk of code. 30 minutes after that I had refactored the code into a driver and a library. I wrote up some documentation and sent everything off to be included in the main Riak function contrib repository. A couple of days and a documentation correction later and you can now see my first code contribution to the open source world on the internet: Importing YAML.

While I’m really excited about this, and it’s very important to me, there’s more to take away from this than just “Yay, I did something!” We’re all able to give something back to our community. In this case I took code I had written to perform benchmarks and extracted a useful piece of demonstration code from it. Share your knowledge with the world around you – it’s how we get smarter.

Getting Faster Writes with Riak

While preparing for an upcoming presentation about Riak for the Columbus Ruby Brigade, I wrote a simple data loader. When I initially ran the load, it took about 4 minutes to load the data on the worst run. When you’re waiting to test your data load and write a presentation, 4 minutes is an eternity. Needless to say, I got frustrated pretty quickly with the speed of my data loader, so I hit up the Riak channel on IRC and started digging into the Ruby driver’s source code.

The Results

              user     system      total        real
defaults 63.660000   3.270000  66.930000 (166.475535)
dw => 1  50.940000   2.720000  53.660000 (128.470094)
dw => 0  52.350000   2.740000  55.090000 (120.151827)
n => 2   52.850000   2.790000  55.640000 (132.023310)

The Defaults

Our default load uses no customizations. Riak is going to write data to three nodes in the cluster (n = 3). Since we’re using the default configuration, we can safely assume that Riak will use quorum for write confirmation (w = n/2 + 1). Finally, we can also assume that the durable write value is to use a quorum, since that’s the default for riak-client.

Because we’re writing to n (3) nodes and we’re waiting for w (2) nodes to respond, writes were slower than I’d like. Thankfully, Riak makes it easy to tune how it will respond to writes.

Changing the N Value

The first change that we can do is change the N value (replication factor). The N value should have a huge improvement for my test machine – Riak is only on one of my hard drives. Even solid state drives can only write to one place at a time. When we create the bucket we can change the bucket’s properties and set the N value. note It’s important that you set bucket properties when you ‘create’ the bucket. Buckets are created when keys are added to them and they are deleted when the last key is deleted.

b1 = client.bucket('animals_dw1',
                   :keys => false)
b1.props = { :n_val => 1, :dw => 1 }

In this chunk of code we set the N value to 1 and set the durable writes to 1. This means that only 1 replica will have to commit the record to durable storage in order for the write to be considered a success.

On the bright side, this approach is considerably faster. Here’s the bummer: by setting the N value to 1, we’ve removed any hope of durability from our cluster – the data will never be replicated. Any server failure will result in data loss. For our testing purposes, it’s okay because we’re trying to see how fast we can make things, not how safe we can make them.

How much faster? Our run with all defaults enabled took 166 seconds. Only writing to 1 replica shaved 38 seconds off of our write time. The other thing that I changed was setting returnbody to false. By default, the Ruby Riak client will return the object that was saved. Turning this setting off should make things faster – less bytes are flying around the network.

Forget About Durability

What happens when we turn down durability? That’s the dw => 0 result in the table at the beginning of the article. We get an 8 second performance boost over our last load.

What did we change? We set both the dw and w parameters to 0. This means that our client has told Riak that we’re not going to wait for a response from any replicas before decided that a write has succeeded. This is a fire and forget write – we’re passing data as quickly as possible to the client and to hell with the consequences.

So, by eliminating any redundancy, ignoring the current record from the database, and refusing to acknowledge any reads from the server, we’re able to get a 46.3 second performance improvement over our default values. This is impressive, but it’s roughly akin to throwing our data at a bucket, not into the bucket.

What if I Care About My Data?

What if you care about your data? After all, we got our performance improvement from setting the number of replicas to 1 and turning off write acknowledgement. The fourth, and final run, that I performed took a look at what would happen if we kept the number of replicas at a quorum (an N value of 2) and ignored write responses. If we’re just streaming data into a database, we may not care if a record gets missed here and there. It turns out that this is only slightly slower than running with scissors. It takes 132 seconds to write the data; only 4 seconds slower than with durable writes set to 1 and still nearly 34.5 seconds faster than using the defaults.

The most recent version of this sample code can be found on github at https://github.com/peschkaj/riak_intro. Sample data was located through Infochimps.com. The data load uses the Taxobox data set.

Secondary Indexes – How Would You Do It (In Riak)?

Every database has secondary indexes, right? Not quite. It turns out that some databases don’t support them. Secondary indexes are important because they make it possible to perform more, quick, queries on a given chunk of data. What if we want to add secondary indexes to a database, how would we go about doing it?

Looking at queries, there are a few basic ways that we actually query data:

  • Equality predicates
  • Inequality predicates
  • Multi-predicate queries

We’re going to be using Riak as our example database

Equality Predicates

The easiest type of query is an equality predicate. With an equality predicate, we’re just matching any given search term. Looking at an example, what if we want to find every user born on July 4, 1976?

function(value, keyData, arg){
  var data = Riak.mapValuesJson(value)[0];

  if (data.Birthdate == '1976-07-04') {
    return [ data ];
  }
}

With an RDBMS, we’d just create an index on the table to support our queries. But with a key-value store, we don’t have that ability. How do we do it?

To create an index in Riak, we’d create another bucket, users_by_birthdate. The keys for this bucket would be the value of the birthdate. We could use links (this will fall apart pretty rapidly above about 1000 links) or store a list of keys in the users bucket. To satisfy our MapReduce query we can use the users_by_birthdate bucket to get the IDs of our users for retrieval rather than scanning the entire bucket. Depending on the amount of data we have, being able to use equality predicates could reduce the number of reads we have to perform.

Multi-Predicate Equality Searches

I’m going to skip inequality predicates for a second and talk about multi-predicate equality searches. This is easy to accomplish. If we’re searching for everyone with the last name of ‘Smith’ who was born on July 4th, 1976 we might use the following MapReduce function:

function(value, keyData, arg){
  var data = Riak.mapValuesJson(value)[0];
  if (data.Birthdate == '1976-07-04' && data.LastName = 'Smith') {
    return [ data ];
  }
}

With a relational database, we could create a multi-column index. That isn’t possible here – with Riak we only have a bucket name (the index name), a key (the key), a value.

Two Equality Buckets

One option would be to use two equality buckets and then perform a bitwise comparison of the results, only keeping user keys that exist in both buckets. In this case, we would have two buckets users_by_birthdate and users_by_lastname. The problem is that we might have large data skew on one of these key-value combinations. So, if there are only 1,000 users born on July 4th, 1976 and 1,000,000 Smiths in our database, this could cause some inefficiencies in our queries when we go to merge the two result sets. Worse, there could be 1,000,000 users that match our search conditions in each bucket, but only 10 in common.

Using a bitwise combination of two buckets is, from a development standpoint, the easiest way to implement multi-predicate comparison. However, this approach creates problems with any significant volume of data.

MapReduce Over An Index Bucket

What if instead of using two or more buckets and performing a bitwise comparison of the keys that we find, instead we create a single index bucket. Inside that bucket, we’ll still use one value as the key for each key-value pair. This would look like the users_by_birthdate bucket – every key-value pair has the birthdate as the key.

Each key contains a list of values. Unlike our first example we’re going to store more than a simple list of keys. Instead we’re going to store a list of lists of tuples – a multi-dimensional data structure. If we’re using the birthdate, last name example, it might look like this:

1976-07-04    [
                [176, {last_name: Smith}],
                [195, {last_name: Harrison}]
              ]

What does this buy us? Well, for starters we don’t have to perform bitwise comparisons between two or more potentially massive lists of keys. Instead we can examine the MapReduce query that’s coming in and re-write it on the fly to give us a MapReduce job over our new index. The upside of this index is that it can perform double duty; we can search for users by birthdate or by birthdate and last name.

One of the things that we need to consider when we’re designing our key-value database is the queries that we’re going to be running, not the way that the data should be structured. When we’re structuring our data, it may make more sense to use a complex data structure, like this index bucket, instead of many single column indexes. Updates to the data will require fewer updates with this model. Only one write is needed when a user signs up, not two; one for the compound index as opposed to two for the simple indexes – one for users_by_birthdate and one for users_by_lastname.

Once we start updating data using an index bucket can reduce index maintenance. When we update a record we’ll need to update any index buckets. This could lead to a large number of data writes, tombstone writes, and compactions over time. Using a single index bucket with more complex structure in the index removes the need for a large number of writes. We only have to write to the original key-value pair as well as writing to the index bucket.

Inequality Predicates

Inequality predicates are much trickier than equality predicates. At first glance it seems like it would be easy to perform a MapReduce over the keys and determine which key-value pairs meet the search conditions. And we would be right: that is an easy solution when there are a small number of keys. What happens when there are a large number of keys? We need to read more data to get to the results we need. Digging further, what happens when very little of our data actually matches? The benefits of using this approach become very slim; we need to perform a large number of reads to retrieve very little data.

There’s a trick to implementing inequality predicates: in some cases it will be faster to retrieve all keys that match our inequality predicate – birthdate between 1976-07-01 and 1976-07-31. In other cases, a scan is going to be faster – users with a last name starting with ‘A’.

How do we figure out which approach is going to be faster? Let’s take a look at a concrete example using users and birthdates for our example. We already know how to find every user with a specific birthdate using an index; that’s a simple seek. How do we find all users whose birthday falls between a range of dates? That’s what we’ve been asking all along, right?

What if we maintain some simple information about our users_by_birthdate index? Let’s say we track the minimum value, the maximum value, and the number of keys in the bucket. Just from this information we can guess roughly how many keys will be in any given range. This is pretty easy with dates; we can assume that there’s a an even distribution of birthdates in any range of dates.

Things get trickier with names. How many distinct last names are there between Aaronson and Bartleby? What about between Xara and Yoder? Knowing the min and max last names and the number of keys won’t help us figure out the distribution; the data isn’t uniformly distributed. We need a way to make educated guesses about the data in a range. One way to do this is to create a histogram of our data. Histograms are data samplings that let us make better guesses about the number of specific values we can expect to find in a given range.

Relational databases accomplish this by maintaining separate statistics tables. We can accomplish something similar in Riak by creating a new bucket of statistics where the stats key is the high value of a range of keys. The value of the key-value pair would be data about the range of values – number of rows, number of keys, and the average number of distinct values per key. We want to collect statistics so we can easily determine when we need to seek a small number of keys and when we need to perform a MapReduce over the bucket.

So far I’ve proposed two range search approaches that work well with different use cases. Ideally we could create an indexing mechanism in Riak that lets us state an assumption about the distribution of our data in Riak. If we know there’s a constant distribution of keys, we can skip any kind of heuristics and just assume that for July, we’ll be reading 31 keys. Likewise, if we want to pull back every user whose last name is between Scalia and Smith, then we’ll need to MapReduce over the index bucket to get the data that we need.

Creating Indexes – Simple Indexes

Simple, single column indexes are easy to create. The syntax to create an index with SQL is pretty straightforward:

CREATE [ UNIQUE ] INDEX [ CONCURRENTLY ] [ name ] ON table [ USING method ]
    ( { column | ( expression ) } [ opclass ] [ ASC | DESC ] [ NULLS { FIRST | LAST } ] [, ...] )
    [ WITH ( storage_parameter = value [, ... ] ) ]
    [ TABLESPACE tablespace ]
    [ WHERE predicate ]

With Riak, we won’t need that much information about the index. Working from the simplest use case possible, we only need

  • index name
  • bucket
  • value name to index
  • data storage mechanism

We need to uniquely name our index, otherwise we can’t create a new bucket for it. Likewise, we have to know which bucket we’re indexing. It also makes sense that we want to know which value we’ll be indexing.

We need to define the data storage mechanism for indexes so we are able to index the data. It’s for the same reason that we have to tell Riak Search how the data is stored; unless we know what we’re indexing, it’s impossible to index it.

Looking at riak_search, it should be possible to re-use the basic principles behind riak_search_kv_extractor and riak_search_kv_hook to define how indexes are created and how data is retrieved. When we create a full-text index with Riak search, we supply the datatype (JSON, XML, plain-text, or something else). This gives us an incredible amount of flexibility without having to define a schema for the documents. We’re only indexing the fields we want. This can be incredibly useful if we’re storing larger objects but we know we’ll only perform look ups on a few fields – indexing time and space is reduced drastically.

Re-using the full-text index model for secondary indexes makes it easy to maintain consistent indexing behavior throughout Riak – all of our indexes (full-text or otherwise) can be defined using the same commands.

Creating Indexes – Complex Indexes

Complex indexes shouldn’t be much more difficult to implement than simple indexes. We only need one additional piece of information to collect our index: additional fields to index. Ideally the additional fields would be a text representation of how we access the fields in a program – “parent.child.property”. Supporting XML could make this trickier, but it should be easy to use XQuery/XPath to determine which XML elements, attributes, and values should be indexed. The intelligence of knowing how to perform the query will come later; indexes don’t need to know how to query themselves.

Staying up to Date

Right now, this index will only be accurate once: when we create it. Once again, we can borrow from the Riak Search implementation and create hooks to make sure the index is kept up to date as data is written. Keeping our indexes in sync with the data will cause some overhead on the cluster, but it’s worth it to know that we can accurately retrieve data. Like any other database, we need to take care not to create too many indexes: every write to the bucket means we end up writing to each index.

Ideally the index maintenance hooks would be the last hooks to run in a sequence of pre-commit hooks. If there are a number of pre-commit hooks to validate data we need to make sure that the data is valid before it is written to an index. If our primary worry is data validity, it is possible to implement the index maintenance code as a post-commit hook in Erlang (rather than as a pre-commit hook in either Erlang or JavaScript).

The Trouble with Data Locality

One potential problem we can run into is data locality. Depending on how the data is distributed in the key space, we could end up writing key-value and index data on the same physical node. The problem is that we could double, or more, the physical I/O that we’re performing on those nodes. If an index is not on the same node as the data we might incur tremendous network overhead querying remote indexes before querying data on the local node. On a busy network, this increased bandwidth could have an adverse effect on operations.

Driving Toward Our Goal

We’re clearly heading toward something. Let’s take stock of what we’ve talked about so far:

There are two types of search predicate to support

  • Equality predicates – easy to index
  • Inequality predicates – trickier to index

There several ways to use indexes

  • A single index
  • Multiple indexes with a bitwise comparison
  • Complex indexes

We can read from indexes through

  • Key seeks
  • Key scans (MapReduce)
  • Key/value scans (MapReduce)

We can figure out what type of read to use by using data statistics

  • Uniformly distributed keys are easy
  • Randomly distributed keys are harder
  • A small number of seeks can be more efficient than a single MapReduce

Astute readers will have probably guess what we need: a query engine.

This Ain’t No Car, It’s a Key-Value Store!

Why do we need a query engine? Key-value stores are simple; data is stored in key-value pairs and retrieved by a simple key lookup. A query engine of some type just adds complexity, right? Well, without a query engine, we can’t use secondary indexes without explicitly writing queries to use them.

In a hypothetical Riak query engine, we could process a MapReduce query like this:

Parse map phases and extract search conditions

Examine existing indexes

  1. Do indexes exist?
  2. Examine index type – evaluate statistics if needed
  3. Evaluate data access path needed to retrieve data – this is where we determine if we can use one index, multiple indexes with a bitwise compare, or use a multi-column index
  4. If no indexes exist, does a full-text index exist?

Re-write MapReduce functions on-the-fly

  1. Early Map phases are added to query use indexes
  2. Early Map phase output is used for key selection in MapReduce functions

While Riak doesn’t need anything as complex as an RDBMS’s query engine, this is still a gross simplification of how a query engine would function – additional functionality is needed to perform costing estimates of seeks vs MapReduce scans, statistics will need analyzed, and the query engine will need to be aware of vast portions of Riak in order to make good guesses so it can re-write MapReduce queries on the fly.

Features, Features, Features

I left some topics out of this discussion intentionally. They don’t directly apply to implementing indexes, but they would be very helpful to have down the road.

  • Checking updated values to avoid needless tombstones. When a client updates a key with the same value it had before, there’s no reason to overwrite the stored value: nothing changed.
  • Saving query plans. We are re-writing MapReduce queries on the fly, why not save them off internally and opt to store them for re-use when the cluster comes online?
  • Missing indexes. Frequently used queries should be optimized, either with secondary indexes for faster querying or as standalone entities in Riak. THe easiest way to monitor for this is to store metadata about the query history.
  • Index use data. How often are our indexes being used? Do they meet our needs or should we replace them with different indexes?
  • Index only queries. If a MapReduce can be satisfied by the data in the index, don’t query the main bucket.
  • Running statistics. Riak is a long running process. Over time we will be able to collect a lot of data about Riak. Why not collect data about how our Riak is being used so we can tune our code and hardware to get better performance?

Summing it all Up

Indexing is a tricky thing to think about and an even trickier thing to get right. When I first started on this as a thought experiment, I thought that it was going to be easy. Assuming that indexes are simple led to a lot of dead ends, frustration, and restarts. This came about from talking to Mark Phillips (blog | twitter) and Dan Reverri (twitter) last week at the Basho offices about Riak, HTML, and the sound of farm machinery pulling itself apart.

Sound off in the comments.

TL;DR version – indexes are hard, let’s go shopping.

This site is protected with Urban Giraffe's plugin 'HTML Purified' and Edward Z. Yang's Powered by HTML Purifier. 531 items have been purified.