The purpose of this document is to help you get HadoopDB (http://hadoopdb.sourceforge.net) running. The overview of the design is presented in Section 5 of our VLDB paper available at http://db.cs.yale.edu/hadoopdb/hadoopdb.html. The paper also contains the results of the experiments we ran to evaluate Hadoop (with HDFS), HadoopDB, and commercial parallel databases.
The basic idea behind HadoopDB is to give Hadoop access to multiple single-node DBMS servers (eg. PostgreSQL or MySQL) deployed across the cluster. HadoopDB pushes as much as possible data processing into the database engine by issuing SQL queries (usually most of the Map/Combine phase logic is expressible in SQL). This in turn results in creating a system that resembles a shared-nothing parallel database. Applying techniques taken from the database world leads to a performance boost, especially in more complex data analysis. At the same time, the fact that HadoopDB relies on MapReduce framework ensures scores on scalability and fault/heterogeneity tolerance similar to Hadoop. Again, the details are described in our VLDB paper.
Last but not least, HadoopDB was built completely from open source components, including Hive, which gives a SQL interface to our system. Regular API for writing custom Hadoop jobs is also available. All code we release is free and open source.
Supported Platforms
GNU/Linux is supported as a development and production platform. HadoopDB has been demonstrated on clusters with 100 nodes and should scale as long as Hadoop scales.
Required Software
We experimented with PostgreSQL and MySQL, although any JDBC-compliant database should work. For details on PostgreSQL or MySQL installation please go to their websites.
For example, PostgreSQL installation on Linux (Fedora 8) is very simple:
yum -y install postgresql postgresql-server
HadoopDB
HadoopDB's binary version (hadoopdb.jar) needs to placed under HADOOP_HOME/lib (on each node). The JDBC jar driver is also required and needs to be placed in the lib directory.
To copy the jar file to all nodes, you could use the following the following python script with HADOOP_HOME/lib/hadoopdb.jar or HADOOP_HOME/lib/jdbc_driver.jar as an input argument:
#!/usr/bin/python # Usage ./propogate file_path import sys, os file = sys.argv[1] hostfile = "nodes.txt" nodes = open(hostfile,'r').readlines() for i in nodes: node = i.strip() os.system("scp -i key %s %s:%s" %(file, node, file)) print "File Propogation Completed"
For Hadoop to recognize these changes, a restart may be required. Issue the following commands:
HADOOP_HOME/bin/stop-all.sh HADOOP_HOME/bin/start-all.sh
Our image contains PostgreSQL 8.2.5 installed and can be found under S3 bucket yale-vldb with AMI of ami-0111f768.
To list public images under our bucket, please execute:
ec2-describe-images -a | grep yale-vldb
Assuming you are under HADOOP_HOME/src/contrib/ec2/ directory, usual steps are:
# starts a new 10-node cluster bin/hadoop-ec2 launch-cluster MY_CLUSTER 10 # copies necessary files (e.g. the latest hadoopdb.jar and SMS Planner) bin/hadoop-ec2 push MY_CLUSTER file # logins to the master node to run your jobs bin/hadoop-ec2 login MY_CLUSTER # terminates the cluster bin/hadoop-ec2 terminate-cluster MY_CLUSTER
To get a list of EC2 internal IP addresses of nodes in the cluster, issue the following:
ec2-describe-instances | java -cp hadoopdb.jar edu.yale.cs.hadoopdb.util.ec2.InstanceProperties -private -group MY_CLUSTER > nodes.txt
To get the public IP addresses:
ec2-describe-instances | java -cp hadoopdb.jar edu.yale.cs.hadoopdb.util.ec2.InstanceProperties -public -group MY_CLUSTER > nodes-public.txt
<!-- mandatory --> <property> <name>hadoopdb.config.file</name> <value>HadoopDB.xml</value> <description>The name of the HadoopDB cluster configuration file</description> </property> <!-- optional, default 1000 --> <!-- MySQL needs -2147483648 (Integer.MIN_VALUE) for large results --> <property> <name>hadoopdb.fetch.size</name> <value>1000</value> <description>The number of records fetched from JDBC ResultSet at once</description> </property> <!-- optional, default false --> <property> <name>hadoopdb.config.replication</name> <value>false</value> <description>Tells HadoopDB Catalog whether replication is enabled. Replica locations need to be specified in the catalog. False causes replica information to be ignored.</description> </property>
To propogate the configuration file to all nodes, use the script above with HADOOP_HOME/conf/hadoop-site.xml.
Edit pg_hba.conf to be as follows:
local all all trust host all all 127.0.0.1/0 password host all all ::1/128 trust
Edit postgresql.conf to be as follows:
listen_addresses = '*' # what IP address(es) to listen on; port = 5432 # (change requires restart) shared_buffers = 512MB work_mem = 1024MB
Then, restart Postgres using pg_ctl restart.
If PostgreSQL refuses to start with such large memory settings, you may need to run:
sysctl -w kernel.shmmax=554663936This setting should also be added to /etc/sysctl.conf.
Should you need to upgrade an existing Hive/SMS deployment to a new HadoopDB version, it's usually sufficient to place the new hadoopdb.jar under HIVE_HOME/lib. Hadoop installation should contain the same version of hadoopdb.jar under HADOOP_HOME/lib. Make sure you propogate the HADOOP_HOME/lib/hadoopdb.jar using the propogation script above to all nodes in the cluster
Before executing any HadoopDB jobs, you need to prepare and load your data as described in the next section.
Although it is perfectly OK to load your data into a HadoopDB database cluster without much preparation (bulk-loading SQL command COPY is recommended), a careful analysis of data and expected queries results in a significant performance boost at the expense of a longer loading. Two important things to consider are data partitioning and DB tuning (e.g. creating indices). Again, details of those techniques are covered in textbooks. For some queries, hash-partitioning and clustered indices improve HadoopDB's performance 10 times; see Section 6 of the VLDB paper.
When data need no hash-partitioning, it's sufficient to split them into chunks (blocks). We recommend chunks of 1GB, because typically they can be efficiently processed by a database server. Chunk size setting, however, may depend on your hardware configuration. Partitioning data into small chunks (separate databases) as opposed to keeping just one database per node (as is usually done in parallel database installations) gives HadoopDB scores in fault/heterogeneity tolerance similar to those achieved by Hadoop with HDFS.
For example, the following command splits data into 10 partitions, by hashing on the first field of each line, where each field is separated by | character.
hadoop jar hadoopdb.jar edu.yale.cs.hadoopdb.dataloader.GlobalHasher input_path output_path 10 \| 0
Each partition should be downloaded into a node's local file system and partitioned by LocalHasher into the desired number of files.
For instance, the following commands download a partition from HDFS and split a file into 5 chunks.
hadoop fs -get /output_path/part-00000 local_file_name java -cp hadoopdb.jar edu.yale.cs.hadoopdb.dataloader.LocalHasher local_file_name 5 \| 0
Each chunked file is then bulk-loaded into a separate database within a node and indexed appropriately.
Let's assume that the databases and desired tables are already created. Then the following SQL command loads data into a table, each line is considered a new record, with fields delimited by | character.
COPY your_table FROM 'your_data_text_file' WITH DELIMITER '|';
To run the same commands on each node in the cluster in parallel we used Python scripts that create threads that connect to each machine and execute shell or SQL commands.
For example, the following script reads the list of nodes from nodes.txt and then creates a directory /my_data on each of them.
#!/usr/bin/python import sys, os, thread, commands completed = {} def executeThread(node, *args): #Make sure key is accessible and is the correct key name. os.system("ssh -i key -o 'StrictHostKeyChecking=no' %s \'mkdir /my_data \'" %(node)) #You could replace the mkdir command with another command line or add more command lines, # as long as you prefix the command with the ssh connection. completed[node] = "true" hostfile = "nodes.txt" internalips = open(hostfile,'r').readlines() for i in internalips: os.system('sleep 5') node_n = i.strip() thread.start_new_thread(executeThread, (node_n)) while (len(completed.keys()) < len(internalips)): os.system('sleep 10') print "Execution Completed"
The SimpleCatalogGenerator tool assumes uniformly distributed chunks across all nodes, with each node having the exact same number of chunks. It associates each relation's chunks with a partition id and allows the existence of chunked and complete relations per node.
Run the following command to generate the HadoopDB.xml catalog:
java -cp hadoopdb.jar edu.yale.cs.hadoopdb.catalog.SimpleCatalogGenerator path_to_Catalog.properties
The Catalog.properties file contains parameters necessary for Catalog generation:
#Properties for Catalog Generation ################################## nodes_file=machines.txt relations_unchunked=grep, EntireRankings relations_chunked=Rankings, UserVisits catalog_file=HadoopDB.xml ## #DB Connection Parameters ## port=5432 username=user password=password driver=org.postgresql.Driver url_prefix=jdbc\:postgresql\:// ## #Chunking properties ## chunks_per_node=20 unchunked_db_prefix=udb chunked_db_prefix=cdb ## #Replication Properties ## dump_script_prefix=/root/dump_ replication_script_prefix=/root/load_replica_ dump_file_u_prefix=/mnt/dump_udb dump_file_c_prefix=/mnt/dump_cdb ## #Cluster Connection ## ssh_key=id_rsa-gsg-keypair
Modify the above file to suit your cluster setup, chunking properties and database connection properties and relations.
After generating the catalog, upload it to HDFS:
hadoop dfs -put HadoopDB.xml HadoopDB.xml
If you choose a different catalog name, you will need to change the HADOOP_HOME/conf/hadoop-site.xml to point to the right file.
java -cp hadoopdb.jar edu.yale.cs.hadoopdb.catalog.SimpleRandomReplicationFactorTwo path_to_Catalog.properties [-dump false/true]
The ``-dump false'' will not generate dump scripts. The default setting is true. The tool generates a new HadoopDB.xml catalog where chunks are randomly replicated across all nodes. Only factor two replication is supported by the tool.
After replication, upload the catalog file into HDFS, removing the old catalog first.
hadoop dfs -rmr HadoopDB.xml hadoop dfs -put HadoopDB.xml HadoopDB.xml
Then, update the HADOOP_HOME/conf/hadoop-site.xml to allow replication for HadoopDB (see Configuration:HadoopDB).
Start the Hive command line interface:
cd HIVE_HOME bin/hive
For each relation, issue the following command:
CREATE EXTERNAL TABLE relation_name (field_name_1 field_type_1, ... field_name_n field_type_n) ROW FORMAT DELIMITED FIELDS TERMINATED BY '|' STORED AS INPUTFORMAT 'edu.yale.cs.hadoopdb.sms.connector.SMSInputFormat' OUTPUTFORMAT 'org.apache.hadoop.hive.ql.io.HiveIgnoreKeyTextOutputFormat' LOCATION 'dummy_path_in_HDFS/relation_name';
Note the location path needs to have the relation name as the final directory in the path.
For example, assuming the table definition below:
CREATE TABLE table_name ( field_a varchar(20), field_b int, field_c date);
We want to compute the sum of field_b grouped by field_a but only for records in which field_c is greater than Jan 1, 2009. The computation needs to be performed across entire dataset spread across the cluster.
The SQL that can pushed into each chunk database is as follows:
SELECT field_a, SUM(field_b) AS sum_b FROM table_name WHERE field_c > '2009-01-01' GROUP BY field_a;
The results are read into the objects of the class given below.
class AggTableRecord implements DBWritable { private String field_a; private long sum_b; public String getFieldA() { return field_a; } public long getSumB() { return sum_b; } @Override public void readFields(ResultSet resultSet) throws SQLException { field_a = resultSet.getString("field_a"); sum_b = resultSet.getLong("sum_b"); } @Override public void write(PreparedStatement statement) throws SQLException { throw new UnsupportedOperationException("No write() impl."); } }
The Map function simply outputs records returned from the chunk database. Please note that pre-aggregation (Combine) is done at the SQL level.
class Map extends MapReduceBase implements Mapper<LongWritable, AggTableRecord, Text, LongWritable> { protected Text outputKey = new Text(); protected LongWritable outputValue = new LongWritable(); public void map(LongWritable key, AggTableRecord value, OutputCollector<Text, LongWritable> output, Reporter reporter) throws IOException { outputKey.set(value.getFieldA()); outputValue.set(value.getSumB()); output.collect(outputKey, outputValue); } }
The Reduce function performs the final aggregation after repartitioning on the key (field_a).
class Reduce extends MapReduceBase implements Reducer<Text, LongWritable, Text, LongWritable> { protected LongWritable outputValue = new LongWritable(); public void reduce(Text key, Iterator<LongWritable> values, OutputCollector<Text, LongWritable> output, Reporter reporter) throws IOException { long sum = 0; while (values.hasNext()) { sum += values.next().get(); } outputValue.set(sum); output.collect(key, outputValue); } }
More examples are given in the edu.yale.cs.hadoopdb.benchmark package.
Results of HadoopDB jobs are stored in HDFS. If there is a need to put them into databases, bulk-loading should be employed rather than individual INSERT statements.
INSERT OVERWRITE DIRECTORY 'output-directory' SELECT field_a, SUM(field_b) AS sum_b FROM table_name WHERE field_c > '2009-01-01' GROUP BY field_a;The parallelization of the query (converting it into HadoopDB job) is done inside the SMS. The MapReduce plan generated by Hive is traversed by SMS Planner in order to replace as many Hive's Operators as possible with a SQL query that can be executed independently against each chunk database. Hive execution engine takes care of running the job that uses parametrized SMS Connector classes.
Please note that SMS Planner is still a work in progress. Benchmark queries described in the VLDB paper (Grep, Selection, and both Aggregations) have been tested and work correctly. Other queries may not be completely supported in SMS or revert to Hive query processing with SMS only reading data off the databases.
<property> <name>mapred.child.java.opts</name> <value>-Xmx1024m -server</value> </property> <property> <name>mapred.job.reuse.jvm.num.tasks</name> <value>-1</value> </property> <property> <name>mapred.compress.map.output</name> <value>true</value> </property> <property> <name>io.sort.factor</name> <value>100</value> </property> <property> <name>io.sort.mb</name> <value>200</value> </property> <property> <name>io.file.buffer.size</name> <value>131072</value> </property> <property> <name>mapred.reduce.parallel.copies</name> <value>50</value> </property> <property> <name>tasktracker.http.threads</name> <value>50</value> </property> <property> <name>mapred.tasktracker.map.tasks.maximum</name> <value>1</value> </property>
Note the number of map task per tasktracker should be picked based on the available memory. If each database has 1GB of memory and relations accessed fit within that space, then increasing the number of tasktrackers will improve performance. Partitioning data across multiple disks will also improve performance. Setting the number of tasktrackers to one is suitable for a single disk and limited main memory.
svn co https://hadoopdb.svn.sourceforge.net/svnroot/hadoopdb hadoopdbA simple ant build file is included. It expects Hadoop and Hive (patched) jars under /lib.
Checkout the latest Hive as explained here. Then setup Eclipse as described here. You now have a Hive project under Eclipse.
In the same workspace, checkout out HadoopDB source code through Eclipse's SVN plugin:
Right click the Hive project, Navigate to Team:Apply Patch. Find the hive-sms.patch file located in WORKSPACE_PATH/HadoopDB_project/Patches/hive-sms.patch. This should modify the following classes in the Hive project:
ExecDriver.java HiveInputFormat.java SemanticAnalyzer.javaand add the following class:
Pathable.java
Modify the Hive build path to link to the HadoopDB project and HadoopDB's build path to include both the Hive project and jar files located in HADOOP_HOME.
Refresh both projects and build in Eclipse.
Finally, to create an SMS distribution:
ant -Dhadoop.version="<your-hadoop-version>" package