HadoopDB Quick Start Guide

The purpose of this document is to help you get HadoopDB (http://hadoopdb.sourceforge.net) running. The overview of the design is presented in Section 5 of our VLDB paper available at http://db.cs.yale.edu/hadoopdb/hadoopdb.html. The paper also contains the results of the experiments we ran to evaluate Hadoop (with HDFS), HadoopDB, and commercial parallel databases.

Figure: The Architecture of HadoopDB
Image hadoopDB-arch

The basic idea behind HadoopDB is to give Hadoop access to multiple single-node DBMS servers (eg. PostgreSQL or MySQL) deployed across the cluster. HadoopDB pushes as much as possible data processing into the database engine by issuing SQL queries (usually most of the Map/Combine phase logic is expressible in SQL). This in turn results in creating a system that resembles a shared-nothing parallel database. Applying techniques taken from the database world leads to a performance boost, especially in more complex data analysis. At the same time, the fact that HadoopDB relies on MapReduce framework ensures scores on scalability and fault/heterogeneity tolerance similar to Hadoop. Again, the details are described in our VLDB paper.

Last but not least, HadoopDB was built completely from open source components, including Hive, which gives a SQL interface to our system. Regular API for writing custom Hadoop jobs is also available. All code we release is free and open source.


1 Installation

There are two options:
  1. You can install Hadoop/PostgreSQL/HadoopDB etc on your own cluster
  2. You can use our EC2 image, which comes with Hadoop/PostgreSQL installed (Skip to EC2)

1.1 Pre-requisites

Supported Platforms

GNU/Linux is supported as a development and production platform. HadoopDB has been demonstrated on clusters with 100 nodes and should scale as long as Hadoop scales.

Required Software

  1. JavaTM 1.6.x
  2. Hadoop 0.19.1. (We will refer to the root installation directory of Hadoop as HADOOP_HOME.
  3. Single node databases installed on each slave node in the cluster.
To get a Hadoop distribution, download the release from one of the Apache Download Mirrors. Please refer to http://hadoop.apache.org for more information.

We experimented with PostgreSQL and MySQL, although any JDBC-compliant database should work. For details on PostgreSQL or MySQL installation please go to their websites.

For example, PostgreSQL installation on Linux (Fedora 8) is very simple:

yum -y install postgresql postgresql-server

HadoopDB

HadoopDB's binary version (hadoopdb.jar) needs to placed under HADOOP_HOME/lib (on each node). The JDBC jar driver is also required and needs to be placed in the lib directory.

To copy the jar file to all nodes, you could use the following the following python script with HADOOP_HOME/lib/hadoopdb.jar or HADOOP_HOME/lib/jdbc_driver.jar as an input argument:

#!/usr/bin/python
# Usage ./propogate file_path

import sys, os

file = sys.argv[1]
hostfile = "nodes.txt"
nodes = open(hostfile,'r').readlines()
for i in nodes:
   node = i.strip()
   os.system("scp -i key %s %s:%s" %(file, node, file))

print "File Propogation Completed"

For Hadoop to recognize these changes, a restart may be required. Issue the following commands:

HADOOP_HOME/bin/stop-all.sh
HADOOP_HOME/bin/start-all.sh


1.2 Amazon EC2

HadoopDB was tested extensively on Amazon EC2 (http://aws.amazon.com/ec2/) during the experiments for the VLDB paper. We created our own image and ran EC2 clusters using the scripts included in Hadoop distribution (see more on that at http://wiki.apache.org/hadoop/AmazonEC2).

Our image contains PostgreSQL 8.2.5 installed and can be found under S3 bucket yale-vldb with AMI of ami-0111f768.

To list public images under our bucket, please execute:

ec2-describe-images -a | grep yale-vldb

Assuming you are under HADOOP_HOME/src/contrib/ec2/ directory, usual steps are:

# starts a new 10-node cluster
bin/hadoop-ec2 launch-cluster MY_CLUSTER 10
# copies necessary files (e.g. the latest hadoopdb.jar and SMS Planner)
bin/hadoop-ec2 push MY_CLUSTER file
# logins to the master node to run your jobs
bin/hadoop-ec2 login MY_CLUSTER
# terminates the cluster
bin/hadoop-ec2 terminate-cluster MY_CLUSTER

To get a list of EC2 internal IP addresses of nodes in the cluster, issue the following:

ec2-describe-instances | java -cp hadoopdb.jar edu.yale.cs.hadoopdb.util.ec2.InstanceProperties -private -group MY_CLUSTER > nodes.txt

To get the public IP addresses:

ec2-describe-instances | java -cp hadoopdb.jar edu.yale.cs.hadoopdb.util.ec2.InstanceProperties -public -group MY_CLUSTER > nodes-public.txt

1.3 Configuration


1.3.1 HadoopDB

The Hadoop configuration file HADOOP_HOME/conf/hadoop-site.xml should include the following properties:
<!-- mandatory -->
<property>
  <name>hadoopdb.config.file</name>
  <value>HadoopDB.xml</value>
  <description>The name of the HadoopDB cluster configuration file</description>
</property>

<!-- optional, default 1000 -->
<!-- MySQL needs -2147483648 (Integer.MIN_VALUE) for large results -->
<property>
  <name>hadoopdb.fetch.size</name>
  <value>1000</value>
  <description>The number of records fetched from JDBC ResultSet at once</description>
</property>

<!-- optional, default false -->
<property>
  <name>hadoopdb.config.replication</name>
  <value>false</value>
  <description>Tells HadoopDB Catalog whether replication is enabled.
  Replica locations need to be specified in the catalog.
  False causes replica information to be ignored.</description>
</property>

To propogate the configuration file to all nodes, use the script above with HADOOP_HOME/conf/hadoop-site.xml.

1.3.2 DBMS (PostgreSQL)

Edit pg_hba.conf to be as follows:

local   all         all                               trust
host    all         all         127.0.0.1/0          password
host    all         all         ::1/128               trust

Edit postgresql.conf to be as follows:

listen_addresses = '*'                # what IP address(es) to listen on;
port = 5432                           # (change requires restart)
shared_buffers = 512MB
work_mem = 1024MB

Then, restart Postgres using pg_ctl restart.

If PostgreSQL refuses to start with such large memory settings, you may need to run:

sysctl -w kernel.shmmax=554663936
This setting should also be added to /etc/sysctl.conf.

1.4 SMS Planner setup

SQL to MapReduce to SQL (SMS) Planner consists of a slightly modified Hive build and SMS specific classes. Download and unzip the SMS_dist.zip package. Follow the Running Hive instructions at http://wiki.apache.org/hadoop/Hive/GettingStarted. Note HIVE_HOME is the unzipped dist directory.

Should you need to upgrade an existing Hive/SMS deployment to a new HadoopDB version, it's usually sufficient to place the new hadoopdb.jar under HIVE_HOME/lib. Hadoop installation should contain the same version of hadoopdb.jar under HADOOP_HOME/lib. Make sure you propogate the HADOOP_HOME/lib/hadoopdb.jar using the propogation script above to all nodes in the cluster

Before executing any HadoopDB jobs, you need to prepare and load your data as described in the next section.

2 Data

Typically, the data you want to analyze have some structure. Even if you process text data from HDFS, some kind of structure is assumed since a Map function expects key-value pairs (in most cases each record contains some number of fields). Given that you need to structure your data anyway, you may as well do it sooner using classic data modeling and DB schema design (which is taught in courses on database systems and explained in many books on this subject, e.g. http://www.db-book.com).

Although it is perfectly OK to load your data into a HadoopDB database cluster without much preparation (bulk-loading SQL command COPY is recommended), a careful analysis of data and expected queries results in a significant performance boost at the expense of a longer loading. Two important things to consider are data partitioning and DB tuning (e.g. creating indices). Again, details of those techniques are covered in textbooks. For some queries, hash-partitioning and clustered indices improve HadoopDB's performance 10 times; see Section 6 of the VLDB paper.

When data need no hash-partitioning, it's sufficient to split them into chunks (blocks). We recommend chunks of 1GB, because typically they can be efficiently processed by a database server. Chunk size setting, however, may depend on your hardware configuration. Partitioning data into small chunks (separate databases) as opposed to keeping just one database per node (as is usually done in parallel database installations) gives HadoopDB scores in fault/heterogeneity tolerance similar to those achieved by Hadoop with HDFS.

2.1 Data Loader

In order to push more query logic into databases (e.g. joins), hash-partitioning of data needs to be performed. This may be done in two phases. First, data need to be loaded into HDFS. Then, a custom-made Hadoop job, GlobalHasher, repartitions data into a specified number of partitions (e.g. number of nodes in a cluster).

For example, the following command splits data into 10 partitions, by hashing on the first field of each line, where each field is separated by | character.

hadoop jar hadoopdb.jar edu.yale.cs.hadoopdb.dataloader.GlobalHasher input_path output_path 10 \| 0

Each partition should be downloaded into a node's local file system and partitioned by LocalHasher into the desired number of files.

For instance, the following commands download a partition from HDFS and split a file into 5 chunks.

hadoop fs -get /output_path/part-00000 local_file_name
java -cp hadoopdb.jar edu.yale.cs.hadoopdb.dataloader.LocalHasher local_file_name 5 \| 0

Each chunked file is then bulk-loaded into a separate database within a node and indexed appropriately.

Let's assume that the databases and desired tables are already created. Then the following SQL command loads data into a table, each line is considered a new record, with fields delimited by | character.

COPY your_table FROM 'your_data_text_file' WITH DELIMITER '|';

To run the same commands on each node in the cluster in parallel we used Python scripts that create threads that connect to each machine and execute shell or SQL commands.

For example, the following script reads the list of nodes from nodes.txt and then creates a directory /my_data on each of them.

#!/usr/bin/python
import sys, os, thread, commands

completed = {}

def executeThread(node, *args):
   #Make sure key is accessible and is the correct key name.
   os.system("ssh -i key -o 'StrictHostKeyChecking=no' %s \'mkdir /my_data \'" %(node))
   #You could replace the mkdir command with another command line or add more command lines,
   # as long as you prefix the command with the ssh connection.
   completed[node] = "true"

hostfile = "nodes.txt"
internalips = open(hostfile,'r').readlines()

for i in internalips:
   os.system('sleep 5')
   node_n = i.strip()
   thread.start_new_thread(executeThread, (node_n))

while (len(completed.keys()) < len(internalips)):
   os.system('sleep 10')

print "Execution Completed"

2.2 HadoopDB Catalog

The HadoopDB Catalog needs to reflect the location of all chunks. In the current implementation the catalog is an XML file stored in HDFS.

The SimpleCatalogGenerator tool assumes uniformly distributed chunks across all nodes, with each node having the exact same number of chunks. It associates each relation's chunks with a partition id and allows the existence of chunked and complete relations per node.

Run the following command to generate the HadoopDB.xml catalog:

java -cp hadoopdb.jar edu.yale.cs.hadoopdb.catalog.SimpleCatalogGenerator path_to_Catalog.properties

The Catalog.properties file contains parameters necessary for Catalog generation:

#Properties for Catalog Generation
##################################
nodes_file=machines.txt
relations_unchunked=grep, EntireRankings
relations_chunked=Rankings, UserVisits
catalog_file=HadoopDB.xml
##
#DB Connection Parameters
##
port=5432
username=user
password=password
driver=org.postgresql.Driver
url_prefix=jdbc\:postgresql\://
##
#Chunking properties
##
chunks_per_node=20
unchunked_db_prefix=udb
chunked_db_prefix=cdb
##
#Replication Properties
##
dump_script_prefix=/root/dump_
replication_script_prefix=/root/load_replica_
dump_file_u_prefix=/mnt/dump_udb
dump_file_c_prefix=/mnt/dump_cdb
##
#Cluster Connection
##
ssh_key=id_rsa-gsg-keypair

Modify the above file to suit your cluster setup, chunking properties and database connection properties and relations.

After generating the catalog, upload it to HDFS:

hadoop dfs -put HadoopDB.xml HadoopDB.xml

If you choose a different catalog name, you will need to change the HADOOP_HOME/conf/hadoop-site.xml to point to the right file.

2.3 Replication

Replication requires creating a dump of each chunk database and restoring it on another node in the cluster. A simple tool creates the necessary scripts for dumping and replica loading as well as Catalog creation. This tool only generates the scripts; it does not execute them. Scripts need to be pushed to the appropriate nodes and executed in parallel.

java -cp hadoopdb.jar edu.yale.cs.hadoopdb.catalog.SimpleRandomReplicationFactorTwo path_to_Catalog.properties [-dump false/true]

The ``-dump false'' will not generate dump scripts. The default setting is true. The tool generates a new HadoopDB.xml catalog where chunks are randomly replicated across all nodes. Only factor two replication is supported by the tool.

After replication, upload the catalog file into HDFS, removing the old catalog first.

hadoop dfs -rmr HadoopDB.xml
hadoop dfs -put HadoopDB.xml HadoopDB.xml

Then, update the HADOOP_HOME/conf/hadoop-site.xml to allow replication for HadoopDB (see Configuration:HadoopDB).

2.4 SMS MetaStore

The Hive core of SMS uses an internal catalog: the MetaStore. Each relation needs to created in the MetaStore to allow SMS to modify query plans involving database relations.

Start the Hive command line interface:

cd HIVE_HOME
bin/hive

For each relation, issue the following command:

CREATE EXTERNAL TABLE relation_name (field_name_1 field_type_1, ... field_name_n field_type_n)
ROW FORMAT DELIMITED
FIELDS TERMINATED BY '|'
STORED AS INPUTFORMAT 'edu.yale.cs.hadoopdb.sms.connector.SMSInputFormat' OUTPUTFORMAT
'org.apache.hadoop.hive.ql.io.HiveIgnoreKeyTextOutputFormat'
LOCATION 'dummy_path_in_HDFS/relation_name';

Note the location path needs to have the relation name as the final directory in the path.

3 HadoopDB jobs

Writing HadoopDB jobs is very similar to writing regular Hadoop jobs. HadoopDB's API contains an abstract class DBJobBase that should be extended to define job-specific settings and provide the implementation of Map and Reduce functions. In addition, a class that implements DBWritable interface needs to be written. Instances of this class read in records returned from a database and serve as a value for a Map function. HadoopDB Connector levereges Hadoop's ability to accept a custom InputFormat, which allows reading from an arbitrary data source. HadoopDB Catalog provides Hadoop Scheduler with all the information to get Map tasks assigned to the local databases in order to avoid unnecessary network traffic.

For example, assuming the table definition below:

CREATE TABLE table_name (
field_a varchar(20),
field_b int,
field_c date);

We want to compute the sum of field_b grouped by field_a but only for records in which field_c is greater than Jan 1, 2009. The computation needs to be performed across entire dataset spread across the cluster.

The SQL that can pushed into each chunk database is as follows:

SELECT field_a, SUM(field_b) AS sum_b
FROM table_name
WHERE field_c > '2009-01-01'
GROUP BY field_a;

The results are read into the objects of the class given below.

class AggTableRecord implements DBWritable {
	private String field_a;
	private long sum_b;

	public String getFieldA() {
		return field_a;
	}

	public long getSumB() {
		return sum_b;
	}

	@Override
	public void readFields(ResultSet resultSet) throws SQLException {
		field_a = resultSet.getString("field_a");
		sum_b = resultSet.getLong("sum_b");
	}

	@Override
	public void write(PreparedStatement statement) throws SQLException {
		throw new UnsupportedOperationException("No write() impl.");
	}
}

The Map function simply outputs records returned from the chunk database. Please note that pre-aggregation (Combine) is done at the SQL level.

class Map extends MapReduceBase implements
		Mapper<LongWritable, AggTableRecord, Text, LongWritable> {

	protected Text outputKey = new Text();
	protected LongWritable outputValue = new LongWritable();

	public void map(LongWritable key, AggTableRecord value,
			OutputCollector<Text, LongWritable> output, Reporter reporter)
			throws IOException {

		outputKey.set(value.getFieldA());
		outputValue.set(value.getSumB());
		output.collect(outputKey, outputValue);
	}
}

The Reduce function performs the final aggregation after repartitioning on the key (field_a).

class Reduce extends MapReduceBase implements
		Reducer<Text, LongWritable, Text, LongWritable> {

	protected LongWritable outputValue = new LongWritable();

	public void reduce(Text key, Iterator<LongWritable> values,
			OutputCollector<Text, LongWritable> output, Reporter reporter)
			throws IOException {

		long sum = 0;
		while (values.hasNext()) {
			sum += values.next().get();
		}

		outputValue.set(sum);
		output.collect(key, outputValue);
	}
}

More examples are given in the edu.yale.cs.hadoopdb.benchmark package.

Results of HadoopDB jobs are stored in HDFS. If there is a need to put them into databases, bulk-loading should be employed rather than individual INSERT statements.

4 SMS jobs

Executing queries via SMS is straighforward once relations are registered in the MetaStore. In order to get the same results as for the example given above, execute this query:
INSERT OVERWRITE DIRECTORY 'output-directory'
SELECT field_a, SUM(field_b) AS sum_b
FROM table_name
WHERE field_c > '2009-01-01'
GROUP BY field_a;
The parallelization of the query (converting it into HadoopDB job) is done inside the SMS. The MapReduce plan generated by Hive is traversed by SMS Planner in order to replace as many Hive's Operators as possible with a SQL query that can be executed independently against each chunk database. Hive execution engine takes care of running the job that uses parametrized SMS Connector classes.

Please note that SMS Planner is still a work in progress. Benchmark queries described in the VLDB paper (Grep, Selection, and both Aggregations) have been tested and work correctly. Other queries may not be completely supported in SMS or revert to Hive query processing with SMS only reading data off the databases.

5 Performance settings

Shown below are Hadoop settings we use to improve performance when dealing with large data (please also consult http://hadoop.apache.org/core/docs/current/cluster_setup.html).

<property>
  <name>mapred.child.java.opts</name>
  <value>-Xmx1024m -server</value>
</property>

<property>
  <name>mapred.job.reuse.jvm.num.tasks</name>
  <value>-1</value>
</property>

<property>
  <name>mapred.compress.map.output</name>
  <value>true</value>
</property>

<property>
  <name>io.sort.factor</name>
  <value>100</value>
</property>

<property>
  <name>io.sort.mb</name>
  <value>200</value>
</property>

<property>
  <name>io.file.buffer.size</name>
  <value>131072</value>
</property>

<property>
  <name>mapred.reduce.parallel.copies</name>
  <value>50</value>
</property>

<property>
  <name>tasktracker.http.threads</name>
  <value>50</value>
</property>

<property>
  <name>mapred.tasktracker.map.tasks.maximum</name>
  <value>1</value>
</property>

Note the number of map task per tasktracker should be picked based on the available memory. If each database has 1GB of memory and relations accessed fit within that space, then increasing the number of tasktrackers will improve performance. Partitioning data across multiple disks will also improve performance. Setting the number of tasktrackers to one is suitable for a single disk and limited main memory.

6 HadoopDB source code

Source could be checked out using:
svn co https://hadoopdb.svn.sourceforge.net/svnroot/hadoopdb hadoopdb
A simple ant build file is included. It expects Hadoop and Hive (patched) jars under /lib.

6.1 The Eclipse Environment

First, disable automatic builds in Eclipse.

Checkout the latest Hive as explained here. Then setup Eclipse as described here. You now have a Hive project under Eclipse.

In the same workspace, checkout out HadoopDB source code through Eclipse's SVN plugin:

  1. Navigate to SVN Repository Exploring Perspective
  2. Create the following SVN location: https://hadoopdb.svn.sourceforge.net/svnroot/hadoopdb
  3. Checkout using New Project Wizard and follow the GUI instructions.

Right click the Hive project, Navigate to Team:Apply Patch. Find the hive-sms.patch file located in WORKSPACE_PATH/HadoopDB_project/Patches/hive-sms.patch. This should modify the following classes in the Hive project:

ExecDriver.java
HiveInputFormat.java
SemanticAnalyzer.java
and add the following class:
Pathable.java

Modify the Hive build path to link to the HadoopDB project and HadoopDB's build path to include both the Hive project and jar files located in HADOOP_HOME.

Refresh both projects and build in Eclipse.

Finally, to create an SMS distribution:

  1. Export the HadoopDB package into hadoopdb.jar file
  2. Place the hadoopdb.jar file under HIVE_PROJECT_ROOT/build/ql
Then, under HIVE_PROJECT_ROOT run the following ant command:
ant -Dhadoop.version="<your-hadoop-version>" package



HadoopDB Team - Yale University 2009 Last update: 09-07-20