pero on anything

Integrating MySQL and Hadoop – or – A different approach on using CSV files in MySQL

We use both MySQL and Hadoop a lot. If you utilize each system to its strengths then this is a powerful combination. One problem we are constantly facing is to make data extracted from our Hadoop cluster available in MySQL.

The problem

Look at this simple example: Let’s say we have a table customer:

CREATE TABLE customer {
 
    id UNSIGNED INT NOT NULL,
    firstname VARCHAR(100) NOT NULL,
    lastname VARCHAR(100) NOT NULL,
    city VARCHAR(100) NOT NULL,
 
    PRIMARY KEY(id)
}

In addition to that we store orders customers made in Hadoop. An order includes: customerId, date, itemId, price. Note that these structures serve as a very simplified example.

Let’s say we want to find the first 50 customers, that placed at least one order sorted by firstname ascending. If both tables were in MySQL we could use a single SQL statement like:

SELECT DISTINCT c.id, c.firstname FROM customer c JOIN ORDER o ON c.id = o.customerId ORDER BY c.firstname ASC LIMIT 50

Having the orders in Hadoop we have basically two options:

  1. We write a Map-Reduce job that reads all customers from MySQL and joins them with the orders stored in Hadoop’s HDFS. The output is sorted by firstname ascending. From the result we use only the first 50 entries.
  2. We write a Map-Reduce job to extract all distinct customerIds, write them to a table in MySQL and use a SELECT with a JOIN.

In most cases option 2 will be the better choice if we have a non-trivial number of rows in our customer table. And that’s for three reasons:

  1. MySQL is not optimized for streaming rows. As our Map-Reduce job would always have to read the whole table, we would stream a lot.
  2. You cannot easily write something like a LIMIT clause in Map-Reduce. Even if you could, you’d likely have to read through all customer entries anyway. So the amount of data processed by the Map-Reduce job is significant higher if you use aproach 1.
  3. If you just started to move to Hadoop, most of the data structures like categories, product information etc. are still kept in MySQL and most of the business logic relies on SQL. In most application you would not move all your data to Hadoop anyway. So storing Hadoop’s result in MySQL simply integrates better with your existing application.

So, storing Map-Reduce results in MySQL seems to be the better option most of the time. But you still have to write all customerIds extracted by the Map-Reduce job into a table. And that is a performance killer. Even if you use HEAP tables it puts a lot of pressure on MySQL. Other options like CSV storage engine are not feasible since they do not provide any keys. And joining without a key is never a good idea.

Introducing: MySQL UDF csv_find() and csv_get()

One of the big advantages of Map-Reduce is that it produces output sorted by whatever we want. So we could output sorted CSV files. And we could perform binary search on these sorted CSV files. Great!

I have written two MySQL User Defined Functions (UDF) that provide find and get functionality on sorted CSV files.

How to use it?

Taking our example from above we transfer the resulting CSV file from HDFS to the local filesystem of our MySQL server and write a query like this:

SELECT * FROM customer WHERE csv_find('/tmp/myHadoopResult.csv', customer.id) = 1 ORDER BY firstname ASC LIMIT 50

And this is a lot faster than inserting the Map-Reduce result into a table. It might even be faster than our original SELECT statement where we assumed both tables customer and order are in MySQL since we are not using a JOIN at all. More on performance later on.

How does it work?

On initialization of csv_find the CSV file will be loaded into memory using mmap. And since the first column of the CSV file is sorted in ascending order we can simply use binary search on each call to csv_find.

If you need to access other columns of a CSV use csv_get(<file expression>, <key expression>, <column expression>). Example:

SELECT customer.lastname, csv_get('purchases.csv', customer.id, 2) AS price FROM customer

assuming that column 2 contains the price of a product purchased.

Prerequisites

The following assumptions are made and must be met by your CSV files:

  • Column delimiter is ‘\t’ and row delimiter is ‘\n’. You can change this at compile time.
  • The first column must be sorted in UTF-8 binary ascending order. “binary” means that it has to be sorted by byte value and not by a specific collation. For example ‘ä’ (0xc3 0xb6) comes after ‘z’ (0x7a). In bash you would sort a file like this:
    LC_ALL=C sort < input.csv > ordered.csv

    Remember that sorting comes for free in Map-Reduce.

  • No escaping is done. If you need it, you could do the following: First, escape everything in your CSV, say by replacing ‘\n’ with ‘\\n’ and then use csv_find or csv_get like this:
    csv_find(<file expression>, REPLACE(<key expression>, '\n', '\\n'))
  • Some MySQL APIs (at least JDBC) treat results of an UDF as binary data. You have to explicitly cast the return value of csv_get like this:
    SELECT CAST(csv_get(<file expression>, <key expression>, <column expression>) AS CHAR)

For more information take look into the source code documentation.

Usage patterns other than integrating with Hadoop

We use csv_find and csv_get not only to integrate with Hadoop but to integrate multiple MySQL servers. To make data from one MySQL server available in another you could export it like this:

SELECT * FROM customer WHERE  <some condition> ORDER BY BINARY id ASC INTO OUTFILE '/tmp/customer.csv'

Then copy the file over to the other MySQL server (or use NFS). Of course you could use FEDERATED storage engine. We decided not to because it has/had some glitches.

Another useful application is to replace complicated JOINs or SUBSELECTs. MySQL is good at performing some JOINs but really poor at a lot others, especially SUBSELECTs.

A brief performance evaluation

First we create a test CSV file:

#> for a in $(seq 1000000 2000000); do echo $a >> /tmp/random.csv; done

Then we load it into a table:

mysql> CREATE TABLE rand (id VARCHAR(255) NOT NULL, PRIMARY KEY(id));
Query OK, 0 ROWS affected (0.00 sec)
 
mysql> LOAD DATA INFILE '/tmp/random.csv' INTO TABLE rand;
Query OK, 1000001 ROWS affected (5.60 sec)

To test performance of JOIN vs. csv_find we create a second table containing the same rows:

mysql> CREATE TABLE rand2 (id VARCHAR(255) NOT NULL, PRIMARY KEY(id));
Query OK, 0 ROWS affected (0.01 sec)
 
mysql> LOAD DATA INFILE '/tmp/random.csv' INTO TABLE rand2;
Query OK, 1000001 ROWS affected (5.75 sec)

We see that importing 1 million rows already took 5.75 seconds.

Now lets compare the actual JOIN and csv_find:

mysql> SELECT COUNT(*) FROM rand JOIN rand2 ON rand.id = rand2.id;
1 ROW IN SET (2.37 sec)
 
mysql> SELECT COUNT(*) FROM rand WHERE csv_find('/tmp/random.csv', id) = 1;
1 ROW IN SET (1.83 sec)

We see 1.83 seconds for csv_find vs. 2.37 seconds for a JOIN.

Taking the time spent in LOAD DATA into account we even have 1.83 seconds vs. 8.12 seconds meaning csv_find is 4 times faster.

Since most Map-Reduce jobs do not use LOAD DATA but a ton of INSERT statements the real performance might be even worse. Not to mention the load massive INSERTs put on the MySQL server.

rand2 is an InnoDB table. Let’s retry with a memory table:

mysql> SET max_heap_table_size = 64 * 1024 * 1024;
Query OK, 0 ROWS affected (0.00 sec)
 
mysql> CREATE TABLE rand3 (id VARCHAR(10) NOT NULL, PRIMARY KEY(id)) ENGINE=HEAP;
Query OK, 0 ROWS affected (0.00 sec)
 
mysql> LOAD DATA INFILE '/tmp/random.csv' INTO TABLE rand3;
Query OK, 1000001 ROWS affected (1.94 sec)
Records: 1000001  Deleted: 0  Skipped: 0  Warnings: 0
 
mysql> SELECT COUNT(*) FROM rand JOIN rand3 ON rand.id = rand3.id;
1 ROW IN SET (1.80 sec)

As you can see, execution time of both queries is nearly equal, but we still need 1.94 seconds to load the data into table. Thus csv_find is still twice as fast compared to a JOIN on a HEAP table.

But, did you notice this statement?

SET max_heap_table_size = 64 * 1024 * 1024

We had to raise the maximum heap table size since the contents of our 7.7 MB test file would not fit into the default 16 megabytes.

Actually the HEAP table uses about 50 MB of RAM compared to just exactly 7.7 MB for csv_find.

And because RAM is a limiting factor you cannot use HEAP all that often anyway. csv_find and csv_get allocate as much memory as the file size. You can limit the maximum allowed file size at compile time.

Where to download?

Download mysql_udf_csv_binary_search-0.1.tar.gz.

This package includes instructions on how to install (see README) as well as a comprehensive test suite containing both, unit and integration tests (make test).

A note on Windows: Since I don’t use Windows there are no build instructions for this OS. I tried to write portable code but since I cannot test it, I don’t know if it is working. It would be great if someone out there could contribute a Windows version.

Code has been tested on MySQL version 5.1.41 as well as 5.0.83.

Final words

This package provides fast and simple integration of sorted CSV files coming from any source.

Comments and improvements are welcome.

2 Responses to “Integrating MySQL and Hadoop – or – A different approach on using CSV files in MySQL”

Leave a Reply

Your email address will not be published. Required fields are marked *

*

You may use these HTML tags and attributes: <a href="" title=""> <abbr title=""> <acronym title=""> <b> <blockquote cite=""> <cite> <code> <del datetime=""> <em> <i> <q cite=""> <strike> <strong> <pre lang="" line="" escaped="" highlight="">