Support: 1-800-961-4454
Sales Chat
1-800-961-2888

MongoDB And Hadoop: Connect Your Data

Data processing platforms do not exist in siloes. That is why it is increasingly important for Rackspace to provide interoperability between datastores. This enables customers to choose the best technology for their data processing needs, without being restricted to a single tool.

For this post, we will look at two of our most popular data platforms: our ObjectRocket MongoDB service and our Hadoop-based Cloud Big Data Platform service.

The mongo-hadoop connector is an open source library for Hadoop that allows you to use MongoDB as an input source or output destination for MapReduce jobs and Hive and Pig queries. You can also export MongoDB backup files in .bson format to a local HDFS filesystem or our Cloud Files offering.

Our partner Hortonworks recently certified the mongo-hadoop connector for HDP 2.1. This tutorial will help you setup a Cloud Big Data Platform cluster with the mongo-hadoop connector and connect to an ObjectRocket MongoDB instance.

Note: You should be able to follow this example using any HDP 2.1 cluster and a MongoDB instance.

First, we need to get the mongo-hadoop jars. If you want to compile them from source, follow the below steps. If not, you can use the following precompiled jars for connector version 1.2.1 from http://6ad82e5f32a41fccb9dd-2834903aadc19353cae1c339ccf0854e.r14.cf2.rackcdn.com/mongo-connector.sh

You will need JDK 7 and Gradle to build from source.

git clone https://github.com/mongodb/mongo-hadoop.git
cd mongo-hadoop
./gradlew -Phadoop_version='2.4' jar

Gather the jars from the build directories and create a postinitscript that copies them to /usr/lib/hadoop/lib on the Hadoop cluster.

Then you will need to follow these simple steps below:

Create a MongoDB Instance
Follow the Getting Started Guide to create an ObjectRocket MongoDB instance and a database with a username and password. For this example we will use a database named “ocean.”

Create a HDP Cluster with mongo-hadoop connector

  • Create a postinitscript using the compiled jars or use the provided script.
  • Next create a Cloud Big Data HDP 2.1 cluster using the postinitscript to install the mongo-hadoop connector jars. If you have an existing cluster, just run the script on all the nodes.
  • If this is your first time trying out Cloud Big Data, follow the Getting Started Guide to get setup.
  • The below command creates a three-node HDP 2.1 small flavor cluster, which automatically installs the mongo-hadoop connector on all nodes after the cluster becomes active.
lava boot --type=HADOOP_HDP2_1 --flavor=hadoop1-7 --count=3 --postinitscript=http://6ad82e5f32a41fccb9dd-2834903aadc19353cae1c339ccf0854e.r14.cf2.rackcdn.com/mongo-connector.sh mongo-hadoop
  • That’s it. You now have all the required libraries installed and a working Hadoop cluster and a MongoDB instance.
  • To run Hive queries you will need to upgrade to the latest patches of Hive 0.13.
  • The below helper script updates Hive on the Gateway node. If you are using another HDP 2.1 cluster this will be the node on which Hive is running.
lava ssh mongo-hadoop
wget https://raw.githubusercontent.com/rnirmal/mongodb-hadoop-examples/master/cbd/hive_upgrade.sh
sh hive_upgrade.sh

Example

  • Let’s work through an example to make use of the installed mongo-hadoop connector. We will use Ocean to populate some NOAA data (air temperature, water temperature, etc.) into MongoDB and use Hive to calculate minimum, maximum and average values.
  • Load data from any node or from the MongoDB instance. Note: you need a database with a username and password; in this case let’s call the database “ocean.”
pip install pymongo
git clone https://github.com/kgorman/ocean.git
cd ocean
mongo --username=<DB Username> --password=<DB Password> <MongoDB Instance IP>:27017/ocean < stations.json
python fetcher.py --hostname=<MongoDB Instance IP> --port=27017 --db=ocean --username=<DB Username> --password=<DB Password>
  • After the installation steps, it creates a collection called “stations” in the “ocean” db to store station info.
  • The fetcher.py script then pulls data for the current time snapshot into a collection called “ocean_data”. Run the script at several intervals to get multiple values over a period of time for us to calculate the minimum, maximum and average values.
  • Next let’s map the collection as a Hive table, perform some transformations and output the results back into MongoDB. For this we will need an additional helper jar.
  • We will be using Klout’s Brickhouse for some custom Hive UDFs.
  • Log into the Gateway node of the Hadoop cluster, download the Brickhouse jar file and invoke the Hive prompt.
lava ssh mongo-hadoop
wget http://6ad82e5f32a41fccb9dd-2834903aadc19353cae1c339ccf0854e.r14.cf2.rackcdn.com/brickhouse-0.7.0.jar
hive
  • Set input/output sources from/to MongoDB and enable custom UDF functions.
hive> set MONGO_INPUT=mongodb://[user:password@]<MongoDB Instance IP>:27017/ocean.ocean_data;
hive> set MONGO_OUTPUT=mongodb://[user:password@]<MongoDB Instance IP>:27017/ocean.results;
hive> add JAR brickhouse-0.7.0.jar;
hive> create temporary function collect as 'brickhouse.udf.collect.CollectUDAF';
  • Create an external table in hive using the Ocean data stored in MongoDB.
hive> CREATE EXTERNAL TABLE ocean_data
(
city STRING,
products ARRAY<STRUCT<name:STRING, data:ARRAY<STRUCT<v:STRING>>>>
)
STORED BY 'com.mongodb.hadoop.hive.MongoStorageHandler'
WITH SERDEPROPERTIES('mongo.columns.mapping'='{"city":"name", "products.name":"products.name", "products.data.v": "products.data.v"}')
TBLPROPERTIES('mongo.uri'='${hiveconf:MONGO_INPUT}');
  • Create a temporary table to extrapolate the various product values.
hive> CREATE TABLE product_values(city STRING, product STRING, value FLOAT);
  • Insert values into the temporary table.
hive> INSERT OVERWRITE TABLE product_values 
SELECT city, prod_value.name as product, values.v as value from ocean_data LATERAL VIEW explode(products) t1 as prod_value LATERAL VIEW explode(prod_value.data) t2 as values;
  • Create the final output table where the processed results will be stored.
hive> CREATE TABLE results
(
city STRING,
products ARRAY<STRUCT<name:STRING, min:FLOAT, max:FLOAT, avg:DOUBLE>>
) 
STORED BY 'com.mongodb.hadoop.hive.MongoStorageHandler' 
TBLPROPERTIES('mongo.uri'='${hiveconf:MONGO_OUTPUT}');
  • Finally, compute the minimum, maximum and average and insert the computed values into the results table.
hive> INSERT INTO table results select city, collect(prod) as products from 
(
SELECT city, named_struct('name', product, 'min', min(value), 'max', max(value), 'avg', avg(value)) as prod from product_values group by city, product
) 
p group by city;
  • The last steps write results back into MongoDB. Now logon to the MongoDB instance and query the results collection for the computed values.
mongo --username=<DB username> --password=<DB password> <MongoDB Instance IP>:27017/ocean
> db.results.find().limit(5).pretty()

With this example you can begin to apply this exercise to any cross-platform operation you can dream of. You can leverage the real-time and performance characteristics of a MongoDB database with the powerful analytical insights that Hadoop is able to yield.

About the Author

This is a post written and contributed by Nirmal Ranganathan.

Nirmal is a Senior Software Engineer at Rackspace building Rackspace's Cloud Big Data Platform offering built on top of OpenStack. Nirmal was one of the founding members of Trove (OpenStack’s Database as a Service) and has served as a core committer of Cinder Block Storage Service and is a contributor to various OpenStack initiatives. In addition, he previously contributed to Apache Cassandra and Apache Thrift. He is always striving for ease of use in provisioning Big Data tools and applications, removing the complexity from the end user. In his view data scientists and users should focus on the value of their applications and not have to worry about infrastructure and service management.


More

Leave a New Comment

(Required)


Racker Powered
©2014 Rackspace, US Inc.