Wednesday, December 16, 2015

Apache Spark with MongoDB (using pymongo-spark)

Here is a follow up on previous post about using Apache Spark to work on MongoDB data.  Please refer to the old post for details on the setup.

This post is about using the "unstable" pymongo-spark library to create MongoDB backed RDD.

First, get the mongo-hadoop source tree from github:

git clone https://github.com/mongodb/mongo-hadoop.git


We need to build a jar and copy a Python script from the source.

For building the jar:

./gradlew jar

When done, the only jar we need is (version number depends on the source you checkout)

spark/build/libs/mongo-hadoop-spark-1.5.0-SNAPSHOT.jar


Copy it to the  lib directory under Spark.  Also need to add it to the classpath when using pySpark or spark-submit.

And the Python file we need from the source tree is located at:

mongo-hadoop/spark/src/main/python/pymongo_spark.py


Save it under the same folder as the application script and remember to use "--py-files" parameter to deploy it.

Also make sure you have pymongo installed.  e.g. for Debian machines

sudo apt-get install python-pymongo


Finally, the application script. The pymongo-spark needs to be activated before loading / writing RDDs.  Also, the structure of the loaded RDD is slightly different when compared with RDD using mongo-hadoop.

from operator import add
from pyspark import SparkContext, SparkConf
import pymongo_spark

if __name__ == "__main__":

    # initiate pymongo-spark
    pymongo_spark.activate()

    conf = SparkConf() \
        .setAppName("SparkMongoDB2") \
        .set('spark.executor.extraClassPath', '/home/spark/spark-1.4.1-bin-hadoop2.6/lib/mongo-hadoop-spark-1.5.0-SNAPSHOT.jar')
    sc = SparkContext(conf=conf)

    # we know the number of business is much less than reviews
    # create a dictionary of stars given to each business
    businessStar = sc.mongoRDD('mongodb://192.168.1.69:27017/test.yelp_business') \
        .map(lambda b: (b['business_id'], b['stars'])).collectAsMap()

    # create and process review RDD
    poorReview = sc.mongoRDD('mongodb://192.168.1.69:27017/test.yelp_review') \
        .filter(lambda r: r['stars'] < businessStar[r['business_id']]) \
        .map(lambda r: (r['business_id'], 1)) \
        .reduceByKey(add)

    # save output back to MongoDB
    poorReview.saveToMongoDB('mongodb://192.168.1.69:27017/test.poor_count')

    sc.stop()


Here is an example of submitting the application


bin/spark-submit  --master spark://192.168.1.10:7077 --conf "spark.eventLog.enabled=true" --py-files "/home/spark/test/spark_mongodb/pymongo_spark.py" --driver-class-path="/home/spark/spark-1.4.1-bin-hadoop2.6/lib/mongo-hadoop-spark-1.5.0-SNAPSHOT.jar" --executor-memory=256m ~/test/spark_mongodb/yelp_poor2.py







18 comments:

Shyam Kumar said...

Hello Clarence

I am trying to import the mongo-connector into spark-1.6.1. I dont see a lib file present in the SPARK_HOME. Any idea where i will need to put the jars to get the import org.mongodb working.

Thanks
Shyam

Clarence said...

Hi Shyam. I haven't used mongo-connector. But if you are using spark-submit, you can use the "--packages" option to use maven dependency. Or, use "--jars" to specify the location of your lib.

rex said...

Hi Clarence,

I have done all the setting that is said in your blog. But I could not solve the following issue:
ImportError: No module named pymongo_spark.


What could be the issue for such case??

Clarence said...

@rex have you built the pymongo_spark code? Also, when you submit your Spark application, do you use the "--py-files" parameter to distribute the pymongo_spark files?

Rex said...
This comment has been removed by the author.
Rex said...

I have build the pymongo_spark using the command ./gradlew jar
Yes I have used the same command you have mention in the post:
bin/spark-submit --master spark://:7077 --conf "spark.eventLog.enabled=true" --py-files "my_loc/pymongo_spark.py" --driver-class-path="my_loc/lib/mongo-hadoop-spark-1.5.0-SNAPSHOT.jar" --executor-memory=256m my_loc/yelp_poor2.py

My configuration
I have two worker nodes and one master node.
Worker nodes communicate with the mongodb server but during this command:
poorReview.saveToMongoDB('mongodb://master_ip:27017/test.poor_count')

I got this error.
Lost task 2.0 in stage 8.0 (TID 450, worker_ip, executor 1): com.mongodb.MongoTimeoutException: Timed out while waiting for a server that matches AnyServerSelector{} after 2571 ms

so the result is not save to poor_count.

Clarence said...

hi rex. when you run the job, do you see the workers connected to mongodb and retrieving data?

Rex said...

Yes it can retrieve data from mongodb.

Anonymous said...

Log details: worker node1 ip: x.x.x.199 worker node2 ip: x.x.x.200

17/07/03 10:05:14 INFO TaskSetManager: Finished task 426.0 in stage 3.0 (TID 445) in 10069 ms on x.x.x.199 (executor 1) (427/427)
17/07/03 10:05:14 INFO TaskSchedulerImpl: Removed TaskSet 3.0, whose tasks have all completed, from pool
17/07/03 10:05:14 INFO DAGScheduler: ShuffleMapStage 3 (reduceByKey at /home/hduser/try/test.py:25) finished in 298.476 s
17/07/03 10:05:14 INFO DAGScheduler: looking for newly runnable stages
17/07/03 10:05:14 INFO DAGScheduler: running: Set()
17/07/03 10:05:14 INFO DAGScheduler: waiting: Set(ResultStage 4)
17/07/03 10:05:14 INFO DAGScheduler: failed: Set()
17/07/03 10:05:14 INFO DAGScheduler: Submitting ResultStage 4 (PythonRDD[11] at RDD at PythonRDD.scala:48), which has no missing parents
17/07/03 10:05:14 INFO MemoryStore: Block broadcast_9 stored as values in memory (estimated size 6.3 KB, free 2.5 GB)
17/07/03 10:05:14 INFO MemoryStore: Block broadcast_9_piece0 stored as bytes in memory (estimated size 3.9 KB, free 2.5 GB)
17/07/03 10:05:14 INFO BlockManagerInfo: Added broadcast_9_piece0 in memory on x.x.x.198:35614 (size: 3.9 KB, free: 2.5 GB)
17/07/03 10:05:14 INFO SparkContext: Created broadcast 9 from broadcast at DAGScheduler.scala:996
17/07/03 10:05:14 INFO DAGScheduler: Submitting 1 missing tasks from ResultStage 4 (PythonRDD[11] at RDD at PythonRDD.scala:48)
17/07/03 10:05:14 INFO TaskSchedulerImpl: Adding task set 4.0 with 1 tasks
17/07/03 10:05:14 INFO TaskSetManager: Starting task 0.0 in stage 4.0 (TID 446, x.x.x.200, executor 0, partition 0, NODE_LOCAL, 5783 bytes)
17/07/03 10:05:14 INFO BlockManagerInfo: Added broadcast_9_piece0 in memory on x.x.x.200:43469 (size: 3.9 KB, free: 362.0 MB)
17/07/03 10:05:14 INFO MapOutputTrackerMasterEndpoint: Asked to send map output locations for shuffle 0 to x.x.x.200:40972
17/07/03 10:05:14 INFO MapOutputTrackerMaster: Size of output statuses for shuffle 0 is 46256 bytes
17/07/03 10:05:14 INFO TaskSetManager: Finished task 0.0 in stage 4.0 (TID 446) in 137 ms on x.x.x.200 (executor 0) (1/1)
17/07/03 10:05:14 INFO TaskSchedulerImpl: Removed TaskSet 4.0, whose tasks have all completed, from pool
17/07/03 10:05:14 INFO DAGScheduler: ResultStage 4 (runJob at PythonRDD.scala:441) finished in 0.138 s
17/07/03 10:05:14 INFO DAGScheduler: Job 3 finished: runJob at PythonRDD.scala:441, took 298.676021 s
17/07/03 10:05:14 INFO SparkContext: Starting job: take at SerDeUtil.scala:233
17/07/03 10:05:14 INFO DAGScheduler: Got job 4 (take at SerDeUtil.scala:233) with 1 output partitions
17/07/03 10:05:14 INFO DAGScheduler: Final stage: ResultStage 6 (take at SerDeUtil.scala:233)
17/07/03 10:05:14 INFO DAGScheduler: Parents of final stage: List(ShuffleMapStage 5)
17/07/03 10:05:14 INFO DAGScheduler: Missing parents: List()
17/07/03 10:05:14 INFO DAGScheduler: Submitting ResultStage 6 (MapPartitionsRDD[13] at mapPartitions at SerDeUtil.scala:148), which has no missing parents
17/07/03 10:05:14 INFO MemoryStore: Block broadcast_10 stored as values in memory (estimated size 6.4 KB, free 2.5 GB)
17/07/03 10:05:14 INFO MemoryStore: Block broadcast_10_piece0 stored as bytes in memory (estimated size 3.9 KB, free 2.5 GB)
17/07/03 10:05:14 INFO BlockManagerInfo: Added broadcast_10_piece0 in memory on x.x.x.198:35614 (size: 3.9 KB, free: 2.5 GB)
17/07/03 10:05:14 INFO SparkContext: Created broadcast 10 from broadcast at DAGScheduler.scala:996

Anonymous said...

17/07/03 10:05:14 INFO DAGScheduler: Submitting 1 missing tasks from ResultStage 6 (MapPartitionsRDD[13] at mapPartitions at SerDeUtil.scala:148)
17/07/03 10:05:14 INFO TaskSchedulerImpl: Adding task set 6.0 with 1 tasks
17/07/03 10:05:14 INFO TaskSetManager: Starting task 0.0 in stage 6.0 (TID 447, x.x.x.199, executor 1, partition 0, NODE_LOCAL, 5865 bytes)
17/07/03 10:05:14 INFO BlockManagerInfo: Added broadcast_10_piece0 in memory on x.x.x.199:43308 (size: 3.9 KB, free: 362.0 MB)
17/07/03 10:05:14 INFO MapOutputTrackerMasterEndpoint: Asked to send map output locations for shuffle 0 to x.x.x.199:58144
17/07/03 10:05:14 INFO TaskSetManager: Finished task 0.0 in stage 6.0 (TID 447) in 105 ms on x.x.x.199 (executor 1) (1/1)
17/07/03 10:05:14 INFO TaskSchedulerImpl: Removed TaskSet 6.0, whose tasks have all completed, from pool
17/07/03 10:05:14 INFO DAGScheduler: ResultStage 6 (take at SerDeUtil.scala:233) finished in 0.106 s
17/07/03 10:05:14 INFO DAGScheduler: Job 4 finished: take at SerDeUtil.scala:233, took 0.125786 s
17/07/03 10:05:14 INFO Converter: Loaded converter: com.mongodb.spark.pickle.NoopConverter
17/07/03 10:05:14 INFO Converter: Loaded converter: com.mongodb.spark.pickle.NoopConverter
17/07/03 10:05:14 INFO deprecation: mapred.output.dir is deprecated. Instead, use mapreduce.output.fileoutputformat.outputdir
17/07/03 10:05:14 INFO MongoOutputCommitter: Setting up job.
17/07/03 10:05:14 INFO SparkContext: Starting job: saveAsNewAPIHadoopFile at PythonRDD.scala:834
17/07/03 10:05:14 INFO DAGScheduler: Got job 5 (saveAsNewAPIHadoopFile at PythonRDD.scala:834) with 427 output partitions
17/07/03 10:05:14 INFO DAGScheduler: Final stage: ResultStage 8 (saveAsNewAPIHadoopFile at PythonRDD.scala:834)
17/07/03 10:05:14 INFO DAGScheduler: Parents of final stage: List(ShuffleMapStage 7)
17/07/03 10:05:14 INFO DAGScheduler: Missing parents: List()
17/07/03 10:05:14 INFO DAGScheduler: Submitting ResultStage 8 (MapPartitionsRDD[15] at map at PythonHadoopUtil.scala:181), which has no missing parents
17/07/03 10:05:14 INFO MemoryStore: Block broadcast_11 stored as values in memory (estimated size 67.0 KB, free 2.5 GB)
17/07/03 10:05:14 INFO MemoryStore: Block broadcast_11_piece0 stored as bytes in memory (estimated size 23.2 KB, free 2.5 GB)
17/07/03 10:05:14 INFO BlockManagerInfo: Added broadcast_11_piece0 in memory on x.x.x.198:35614 (size: 23.2 KB, free: 2.5 GB)
17/07/03 10:05:14 INFO SparkContext: Created broadcast 11 from broadcast at DAGScheduler.scala:996

Rex said...


17/07/03 10:05:14 INFO DAGScheduler: Submitting 427 missing tasks from ResultStage 8 (MapPartitionsRDD[15] at map at PythonHadoopUtil.scala:181)
17/07/03 10:05:14 INFO TaskSchedulerImpl: Adding task set 8.0 with 427 tasks
17/07/03 10:05:14 INFO TaskSetManager: Starting task 0.0 in stage 8.0 (TID 448, x.x.x.199, executor 1, partition 0, NODE_LOCAL, 5883 bytes)
17/07/03 10:05:14 INFO TaskSetManager: Starting task 1.0 in stage 8.0 (TID 449, x.x.x.200, executor 0, partition 1, NODE_LOCAL, 5883 bytes)
17/07/03 10:05:14 INFO TaskSetManager: Starting task 2.0 in stage 8.0 (TID 450, x.x.x.199, executor 1, partition 2, NODE_LOCAL, 5883 bytes)
17/07/03 10:05:25 INFO TaskSetManager: Starting task 18.0 in stage 8.0 (TID 466, x.x.x.200, executor 0, partition 18, NODE_LOCAL, 5883 bytes)
17/07/03 10:05:25 WARN TaskSetManager: Lost task 3.0 in stage 8.0 (TID 451, x.x.x.200, executor 0): com.mongodb.MongoTimeoutException: Timed out while waiting for a server that matches AnyServerSelector{} after 3046 ms
at com.mongodb.BaseCluster.getServer(BaseCluster.java:87)
at com.mongodb.DBTCPConnector.getServer(DBTCPConnector.java:654)
at com.mongodb.DBTCPConnector.access$300(DBTCPConnector.java:39)
at com.mongodb.DBTCPConnector$MyPort.getConnection(DBTCPConnector.java:503)
at com.mongodb.DBTCPConnector$MyPort.get(DBTCPConnector.java:451)
at com.mongodb.DBTCPConnector.getPrimaryPort(DBTCPConnector.java:409)
at com.mongodb.DBCollectionImpl.executeBulkWriteOperation(DBCollectionImpl.java:142)
at com.mongodb.DBCollection.executeBulkWriteOperation(DBCollection.java:1663)
at com.mongodb.DBCollection.executeBulkWriteOperation(DBCollection.java:1659)
at com.mongodb.BulkWriteOperation.execute(BulkWriteOperation.java:90)
at com.mongodb.hadoop.output.MongoOutputCommitter.commitTask(MongoOutputCommitter.java:167)
at com.mongodb.hadoop.output.MongoOutputCommitter.commitTask(MongoOutputCommitter.java:70)
at org.apache.spark.rdd.PairRDDFunctions$$anonfun$saveAsNewAPIHadoopDataset$1$$anonfun$12.apply(PairRDDFunctions.scala:1132)
at org.apache.spark.rdd.PairRDDFunctions$$anonfun$saveAsNewAPIHadoopDataset$1$$anonfun$12.apply(PairRDDFunctions.scala:1102)
at org.apache.spark.scheduler.ResultTask.runTask(ResultTask.scala:87)
at org.apache.spark.scheduler.Task.run(Task.scala:99)
at org.apache.spark.executor.Executor$TaskRunner.run(Executor.scala:322)
at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1142)
at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:617)
at java.lang.Thread.run(Thread.java:748)

Rex said...

Upto mapreduce part is working fine.

Clarence said...

what is your script for saving the result to mongodb? is it pointing to the correct server?

Rex said...
This comment has been removed by the author.
Rex said...

The same script of saving that is in this blog.

i.e. poorReview.saveToMongoDB('mongodb://mymongodbserver:27017/test.poor_count')

I can retrieve data from mymongodbserver , but I cannot save the mapreduce result

Rex said...

Now its working. It was due to multiple SparkSubmit.

Rex said...

Thanks a lot Clarence.

If I want to save the result in HDFS instead of saveToMongoDB, what should be the command for it?

Currently I am using pymongo_spark __version__ = '0.1'

Clarence said...

Hi Rex. It is just a pyspark RDD. Save it as you would normally do.