Tuesday, December 15, 2015

Apache Spark with MongoDB

Updates (2015-12-17): There are two ways for Apache Spark to access MongoDB data: mongo-hadoop or pymongo-spark.  This post is about using mongo-hadoop.  There is another post on using pymongo-spark.

Here is an example of running analytic tasks on Apache Spark using data from MongoDB.  Note that this is a trivial problem and it is much more efficient to solve it within MongoDB (or using PostgreSQL FDW).  But it is a good exercise to try out the mongo-hadoop solution.

My Apache Spark cluster consists of one UDOO and one Raspberry Pi with UDOO as both master and slave.  The CPU and memory resources on these devices are limited.

UDOO: 192.168.1.10. 1GB memory and 4x 1GHz cores
RPi: 192.168.1.7. 512MB memory and 1x 700MHz core

MongoDB is installed on a x86 machine (192.168.1.69).  Some Yelp sample data is loaded in the test database.  In particular, we are interested in the "business" and "review" collections:

> db.yelp_business.findOne()
{
        "_id" : ObjectId("566bb192563714b25d604a94"),
        "business_id" : "UsFtqoBl7naz8AVUBZMjQQ",
        "full_address" : "202 McClure St\nDravosburg, PA 15034",
        "hours" : {

        },
        "open" : true,
        "categories" : [
                "Nightlife"
        ],
        "city" : "Dravosburg",
        "review_count" : 4,
        "name" : "Clancy's Pub",
        "neighborhoods" : [ ],
        "longitude" : -79.88693,
        "state" : "PA",
        "stars" : 3.5,
        "latitude" : 40.350519,
        "attributes" : {
                "Happy Hour" : true,
                "Accepts Credit Cards" : true,
                "Good For Groups" : true,
                "Outdoor Seating" : false,
                "Price Range" : 1
        },
        "type" : "business"
}
> db.yelp_business.count()
61184
> db.yelp_review.findOne()
{
        "_id" : ObjectId("566bb1eb563714b25d61ea02"),
        "votes" : {
                "funny" : 0,
                "useful" : 2,
                "cool" : 0
        },
        "user_id" : "H1kH6QZV7Le4zqTRNxoZow",
        "review_id" : "RF6UnRTtG7tWMcrO2GEoAg",
        "stars" : 2,
        "date" : "2010-03-22",
        "text" : "Unfortunately, the frustration of being Dr. Goldberg's patient is a repeat of the experience I've had with so many other doctors in NYC -- good doctor, terrible staff.  It seems that his staff simply never answers the phone.  It usually takes 2 hours of repeated calling to get an answer.  Who has time for that or wants to deal with it?  I have run into this problem with many other doctors and I just don't get it.  You have office workers, you have patients with medical needs, why isn't anyone answering the phone?  It's incomprehensible and not work the aggravation.  It's with regret that I feel that I have to give Dr. Goldberg 2 stars.",
        "type" : "review",
        "business_id" : "vcNAWiLM4dR7D2nwwJ7nCA"
}
> db.yelp_review.count()

1569264


We are going to find, for each business, the number of reviews with rating lower than its rating.

First, on the Spark machines, we need to download the MongoDB Java driver and the mongo-hadoop software:

wget http://central.maven.org/maven2/org/mongodb/mongo-java-driver/3.0.4/mongo-java-driver-3.0.4.jar
wget https://github.com/mongodb/mongo-hadoop/releases/download/r1.4.0/mongo-hadoop-core-1.4.0.jar


I saved them under the lib directory of Spark.

The logic is written in Python:

from operator import add
from pyspark import SparkContext, SparkConf

if __name__ == "__main__":

    conf = SparkConf() \
        .setAppName("SparkMongoDB") \
        .set('spark.executor.extraClassPath', '/home/spark/spark-1.4.1-bin-hadoop2.6/lib/mongo-hadoop-core-1.4.0.jar:/home/spark/spark-1.4.1-bin-hadoop2.6/lib/mongo-java-driver-3.0.4.jar')
    sc = SparkContext(conf=conf)

    keyClassName = 'org.apache.hadoop.io.Text'
    valueClassName = 'org.apache.hadoop.io.MapWritable'

    # two collections from MongoDB
    configReview = {'mongo.input.uri': 'mongodb://192.168.1.69:27017/test.yelp_review'}
    configBusiness = {'mongo.input.uri': 'mongodb://192.168.1.69:27017/test.yelp_business'}

    # we know the number of business is much less than reviews
    # create a dictionary of stars given to each business
    businessStar = sc.newAPIHadoopRDD(
            inputFormatClass='com.mongodb.hadoop.MongoInputFormat',
            keyClass=keyClassName,
            valueClass=valueClassName,
            conf=configBusiness) \
        .map(lambda b: (b[1]['business_id'], b[1]['stars'])).collectAsMap()

    # create and process review RDD
    poorReview = sc.newAPIHadoopRDD(
            inputFormatClass='com.mongodb.hadoop.MongoInputFormat',
            keyClass=keyClassName,
            valueClass=valueClassName,
            conf=configReview) \
        .filter(lambda r: r[1]['stars'] < businessStar[r[1]['business_id']]) \
        .map(lambda r: (r[1]['business_id'], 1)) \
        .reduceByKey(add)

    ''' This alternative is more elegant but require more memory
    businessStar = sc.newAPIHadoopRDD(
            inputFormatClass='com.mongodb.hadoop.MongoInputFormat',
            keyClass=keyClassName,
            valueClass=valueClassName,
            conf=configBusiness) \
        .map(lambda b: (b[1]['business_id'], b[1]['stars']))

    poorReview = sc.newAPIHadoopRDD(
            inputFormatClass='com.mongodb.hadoop.MongoInputFormat',
            keyClass=keyClassName,
            valueClass=valueClassName,
            conf=configReview) \
        .map(lambda r: (r[1]['business_id'], r[1]['stars'])) \
        .join(businessStar) \
        .filter(lambda (business_id, (rStar, bStar)): bStar > rStar) \
        .map(lambda (business_id, (rStar, bStar)): (business_id, 1)) \
        .reduceByKey(add)
    '''

    # save output back to MongoDB
    outConfig = {'mongo.output.uri': 'mongodb://192.168.1.69:27017/test.poor_count'}
    poorReview.saveAsNewAPIHadoopFile(
            path='file:///not-used',
            outputFormatClass='com.mongodb.hadoop.MongoOutputFormat',
            keyClass=keyClassName,
            valueClass=valueClassName,
            conf=outConfig)

    sc.stop()


Start the cluster ("sbin/start-all.sh") and submit the job, e.g.:

bin/spark-submit  --master spark://192.168.1.10:7077 --conf "spark.eventLog.enabled=true" --driver-class-path="/home/spark/spark-1.4.1-bin-hadoop2.6/lib/mongo-hadoop-core-1.4.0.jar:/home/spark/spark-1.4.1-bin-hadoop2.6/lib/mongo-java-driver-3.0.4.jar" --executor-memory=256m ~/test/spark_mongodb/yelp_poor.py

It took 20 minutes to complete.  Swapping was a big issue due to lack of memory on the ARM devices.





Next, will try pymongo-spark to see if skipping the Hadoop layer will improve the performance (however, as of version 0.1, pymongo-spark used mongo-hadoop as the underlying logic).


8 comments:

Unknown said...

Sir, I have followed all the steps . But got few errors . Sir, could you help me out . I am also using a spark cluster of 3 nodes . 1 master and 2 slaves . But i am using yarn-schedular . I can also show you the errors.

Clarence said...

@hemanta what was the error?

Unknown said...

It first shows warning..." The first process fail to initialize plz check u have enough resources.... " and the warning continues..... I have 3 nodes 1 master and 2 workers.... All the 3 nodes have 16 gb RAM,8 cores and each 1tb HD. It takes almost 3 hours... The process Still running and does not give me the output.... In my spark UI also it shows process is active... What is the problem Sir... I am new to distributed platform and I am using uor sample program only with your sample data... i.e yelp_business and yelp_review...but did not get the result back...

Unknown said...

And one more thing I am using YARN schedular...

Unknown said...
This comment has been removed by the author.
Unknown said...
This comment has been removed by the author.
Clarence said...

hi hemanta. my suggestion to you is to troubleshoot your spark and mongodb setup separately first. make sure your spark is working fine before adding mongodb to the mix.

Unknown said...

Thank you Sir. The problem has been resolved and now I am able to run your program in my cluster. There was some Spark and MongoDB Configuration issues which I set correctly and problem has been resolved. Sir, can you tell me how to import data stored in a mongo server located outside the cluster to HDFS. I have tried using my_rdd.saveAsNewAPIHadoopFile() , but it shows some error like

"py4j.protocol.Py4JJavaError: An error occurred while calling z:org.apache.spark.api.python.PythonRDD.saveAsNewAPIHadoopFile."

: org.apache.spark.SparkException: Job aborted due to stage failure: Exception while getting task result: java.lang.IndexOutOfBoundsException: Index: 113, Size: 31