Wednesday, December 16, 2015

Apache Spark with MongoDB (using pymongo-spark)

Here is a follow up on previous post about using Apache Spark to work on MongoDB data.  Please refer to the old post for details on the setup.

This post is about using the "unstable" pymongo-spark library to create MongoDB backed RDD.

First, get the mongo-hadoop source tree from github:

git clone https://github.com/mongodb/mongo-hadoop.git


We need to build a jar and copy a Python script from the source.

For building the jar:

./gradlew jar

When done, the only jar we need is (version number depends on the source you checkout)

spark/build/libs/mongo-hadoop-spark-1.5.0-SNAPSHOT.jar


Copy it to the  lib directory under Spark.  Also need to add it to the classpath when using pySpark or spark-submit.

And the Python file we need from the source tree is located at:

mongo-hadoop/spark/src/main/python/pymongo_spark.py


Save it under the same folder as the application script and remember to use "--py-files" parameter to deploy it.

Also make sure you have pymongo installed.  e.g. for Debian machines

sudo apt-get install python-pymongo


Finally, the application script. The pymongo-spark needs to be activated before loading / writing RDDs.  Also, the structure of the loaded RDD is slightly different when compared with RDD using mongo-hadoop.

from operator import add
from pyspark import SparkContext, SparkConf
import pymongo_spark

if __name__ == "__main__":

    # initiate pymongo-spark
    pymongo_spark.activate()

    conf = SparkConf() \
        .setAppName("SparkMongoDB2") \
        .set('spark.executor.extraClassPath', '/home/spark/spark-1.4.1-bin-hadoop2.6/lib/mongo-hadoop-spark-1.5.0-SNAPSHOT.jar')
    sc = SparkContext(conf=conf)

    # we know the number of business is much less than reviews
    # create a dictionary of stars given to each business
    businessStar = sc.mongoRDD('mongodb://192.168.1.69:27017/test.yelp_business') \
        .map(lambda b: (b['business_id'], b['stars'])).collectAsMap()

    # create and process review RDD
    poorReview = sc.mongoRDD('mongodb://192.168.1.69:27017/test.yelp_review') \
        .filter(lambda r: r['stars'] < businessStar[r['business_id']]) \
        .map(lambda r: (r['business_id'], 1)) \
        .reduceByKey(add)

    # save output back to MongoDB
    poorReview.saveToMongoDB('mongodb://192.168.1.69:27017/test.poor_count')

    sc.stop()


Here is an example of submitting the application


bin/spark-submit  --master spark://192.168.1.10:7077 --conf "spark.eventLog.enabled=true" --py-files "/home/spark/test/spark_mongodb/pymongo_spark.py" --driver-class-path="/home/spark/spark-1.4.1-bin-hadoop2.6/lib/mongo-hadoop-spark-1.5.0-SNAPSHOT.jar" --executor-memory=256m ~/test/spark_mongodb/yelp_poor2.py







19 comments:

  1. Hello Clarence

    I am trying to import the mongo-connector into spark-1.6.1. I dont see a lib file present in the SPARK_HOME. Any idea where i will need to put the jars to get the import org.mongodb working.

    Thanks
    Shyam

    ReplyDelete
  2. Hi Shyam. I haven't used mongo-connector. But if you are using spark-submit, you can use the "--packages" option to use maven dependency. Or, use "--jars" to specify the location of your lib.

    ReplyDelete
  3. Hi Clarence,

    I have done all the setting that is said in your blog. But I could not solve the following issue:
    ImportError: No module named pymongo_spark.


    What could be the issue for such case??

    ReplyDelete
  4. @rex have you built the pymongo_spark code? Also, when you submit your Spark application, do you use the "--py-files" parameter to distribute the pymongo_spark files?

    ReplyDelete
  5. This comment has been removed by the author.

    ReplyDelete
  6. I have build the pymongo_spark using the command ./gradlew jar
    Yes I have used the same command you have mention in the post:
    bin/spark-submit --master spark://:7077 --conf "spark.eventLog.enabled=true" --py-files "my_loc/pymongo_spark.py" --driver-class-path="my_loc/lib/mongo-hadoop-spark-1.5.0-SNAPSHOT.jar" --executor-memory=256m my_loc/yelp_poor2.py

    My configuration
    I have two worker nodes and one master node.
    Worker nodes communicate with the mongodb server but during this command:
    poorReview.saveToMongoDB('mongodb://master_ip:27017/test.poor_count')

    I got this error.
    Lost task 2.0 in stage 8.0 (TID 450, worker_ip, executor 1): com.mongodb.MongoTimeoutException: Timed out while waiting for a server that matches AnyServerSelector{} after 2571 ms

    so the result is not save to poor_count.

    ReplyDelete
  7. hi rex. when you run the job, do you see the workers connected to mongodb and retrieving data?

    ReplyDelete
  8. Yes it can retrieve data from mongodb.

    ReplyDelete
  9. Log details: worker node1 ip: x.x.x.199 worker node2 ip: x.x.x.200

    17/07/03 10:05:14 INFO TaskSetManager: Finished task 426.0 in stage 3.0 (TID 445) in 10069 ms on x.x.x.199 (executor 1) (427/427)
    17/07/03 10:05:14 INFO TaskSchedulerImpl: Removed TaskSet 3.0, whose tasks have all completed, from pool
    17/07/03 10:05:14 INFO DAGScheduler: ShuffleMapStage 3 (reduceByKey at /home/hduser/try/test.py:25) finished in 298.476 s
    17/07/03 10:05:14 INFO DAGScheduler: looking for newly runnable stages
    17/07/03 10:05:14 INFO DAGScheduler: running: Set()
    17/07/03 10:05:14 INFO DAGScheduler: waiting: Set(ResultStage 4)
    17/07/03 10:05:14 INFO DAGScheduler: failed: Set()
    17/07/03 10:05:14 INFO DAGScheduler: Submitting ResultStage 4 (PythonRDD[11] at RDD at PythonRDD.scala:48), which has no missing parents
    17/07/03 10:05:14 INFO MemoryStore: Block broadcast_9 stored as values in memory (estimated size 6.3 KB, free 2.5 GB)
    17/07/03 10:05:14 INFO MemoryStore: Block broadcast_9_piece0 stored as bytes in memory (estimated size 3.9 KB, free 2.5 GB)
    17/07/03 10:05:14 INFO BlockManagerInfo: Added broadcast_9_piece0 in memory on x.x.x.198:35614 (size: 3.9 KB, free: 2.5 GB)
    17/07/03 10:05:14 INFO SparkContext: Created broadcast 9 from broadcast at DAGScheduler.scala:996
    17/07/03 10:05:14 INFO DAGScheduler: Submitting 1 missing tasks from ResultStage 4 (PythonRDD[11] at RDD at PythonRDD.scala:48)
    17/07/03 10:05:14 INFO TaskSchedulerImpl: Adding task set 4.0 with 1 tasks
    17/07/03 10:05:14 INFO TaskSetManager: Starting task 0.0 in stage 4.0 (TID 446, x.x.x.200, executor 0, partition 0, NODE_LOCAL, 5783 bytes)
    17/07/03 10:05:14 INFO BlockManagerInfo: Added broadcast_9_piece0 in memory on x.x.x.200:43469 (size: 3.9 KB, free: 362.0 MB)
    17/07/03 10:05:14 INFO MapOutputTrackerMasterEndpoint: Asked to send map output locations for shuffle 0 to x.x.x.200:40972
    17/07/03 10:05:14 INFO MapOutputTrackerMaster: Size of output statuses for shuffle 0 is 46256 bytes
    17/07/03 10:05:14 INFO TaskSetManager: Finished task 0.0 in stage 4.0 (TID 446) in 137 ms on x.x.x.200 (executor 0) (1/1)
    17/07/03 10:05:14 INFO TaskSchedulerImpl: Removed TaskSet 4.0, whose tasks have all completed, from pool
    17/07/03 10:05:14 INFO DAGScheduler: ResultStage 4 (runJob at PythonRDD.scala:441) finished in 0.138 s
    17/07/03 10:05:14 INFO DAGScheduler: Job 3 finished: runJob at PythonRDD.scala:441, took 298.676021 s
    17/07/03 10:05:14 INFO SparkContext: Starting job: take at SerDeUtil.scala:233
    17/07/03 10:05:14 INFO DAGScheduler: Got job 4 (take at SerDeUtil.scala:233) with 1 output partitions
    17/07/03 10:05:14 INFO DAGScheduler: Final stage: ResultStage 6 (take at SerDeUtil.scala:233)
    17/07/03 10:05:14 INFO DAGScheduler: Parents of final stage: List(ShuffleMapStage 5)
    17/07/03 10:05:14 INFO DAGScheduler: Missing parents: List()
    17/07/03 10:05:14 INFO DAGScheduler: Submitting ResultStage 6 (MapPartitionsRDD[13] at mapPartitions at SerDeUtil.scala:148), which has no missing parents
    17/07/03 10:05:14 INFO MemoryStore: Block broadcast_10 stored as values in memory (estimated size 6.4 KB, free 2.5 GB)
    17/07/03 10:05:14 INFO MemoryStore: Block broadcast_10_piece0 stored as bytes in memory (estimated size 3.9 KB, free 2.5 GB)
    17/07/03 10:05:14 INFO BlockManagerInfo: Added broadcast_10_piece0 in memory on x.x.x.198:35614 (size: 3.9 KB, free: 2.5 GB)
    17/07/03 10:05:14 INFO SparkContext: Created broadcast 10 from broadcast at DAGScheduler.scala:996

    ReplyDelete
  10. 17/07/03 10:05:14 INFO DAGScheduler: Submitting 1 missing tasks from ResultStage 6 (MapPartitionsRDD[13] at mapPartitions at SerDeUtil.scala:148)
    17/07/03 10:05:14 INFO TaskSchedulerImpl: Adding task set 6.0 with 1 tasks
    17/07/03 10:05:14 INFO TaskSetManager: Starting task 0.0 in stage 6.0 (TID 447, x.x.x.199, executor 1, partition 0, NODE_LOCAL, 5865 bytes)
    17/07/03 10:05:14 INFO BlockManagerInfo: Added broadcast_10_piece0 in memory on x.x.x.199:43308 (size: 3.9 KB, free: 362.0 MB)
    17/07/03 10:05:14 INFO MapOutputTrackerMasterEndpoint: Asked to send map output locations for shuffle 0 to x.x.x.199:58144
    17/07/03 10:05:14 INFO TaskSetManager: Finished task 0.0 in stage 6.0 (TID 447) in 105 ms on x.x.x.199 (executor 1) (1/1)
    17/07/03 10:05:14 INFO TaskSchedulerImpl: Removed TaskSet 6.0, whose tasks have all completed, from pool
    17/07/03 10:05:14 INFO DAGScheduler: ResultStage 6 (take at SerDeUtil.scala:233) finished in 0.106 s
    17/07/03 10:05:14 INFO DAGScheduler: Job 4 finished: take at SerDeUtil.scala:233, took 0.125786 s
    17/07/03 10:05:14 INFO Converter: Loaded converter: com.mongodb.spark.pickle.NoopConverter
    17/07/03 10:05:14 INFO Converter: Loaded converter: com.mongodb.spark.pickle.NoopConverter
    17/07/03 10:05:14 INFO deprecation: mapred.output.dir is deprecated. Instead, use mapreduce.output.fileoutputformat.outputdir
    17/07/03 10:05:14 INFO MongoOutputCommitter: Setting up job.
    17/07/03 10:05:14 INFO SparkContext: Starting job: saveAsNewAPIHadoopFile at PythonRDD.scala:834
    17/07/03 10:05:14 INFO DAGScheduler: Got job 5 (saveAsNewAPIHadoopFile at PythonRDD.scala:834) with 427 output partitions
    17/07/03 10:05:14 INFO DAGScheduler: Final stage: ResultStage 8 (saveAsNewAPIHadoopFile at PythonRDD.scala:834)
    17/07/03 10:05:14 INFO DAGScheduler: Parents of final stage: List(ShuffleMapStage 7)
    17/07/03 10:05:14 INFO DAGScheduler: Missing parents: List()
    17/07/03 10:05:14 INFO DAGScheduler: Submitting ResultStage 8 (MapPartitionsRDD[15] at map at PythonHadoopUtil.scala:181), which has no missing parents
    17/07/03 10:05:14 INFO MemoryStore: Block broadcast_11 stored as values in memory (estimated size 67.0 KB, free 2.5 GB)
    17/07/03 10:05:14 INFO MemoryStore: Block broadcast_11_piece0 stored as bytes in memory (estimated size 23.2 KB, free 2.5 GB)
    17/07/03 10:05:14 INFO BlockManagerInfo: Added broadcast_11_piece0 in memory on x.x.x.198:35614 (size: 23.2 KB, free: 2.5 GB)
    17/07/03 10:05:14 INFO SparkContext: Created broadcast 11 from broadcast at DAGScheduler.scala:996

    ReplyDelete

  11. 17/07/03 10:05:14 INFO DAGScheduler: Submitting 427 missing tasks from ResultStage 8 (MapPartitionsRDD[15] at map at PythonHadoopUtil.scala:181)
    17/07/03 10:05:14 INFO TaskSchedulerImpl: Adding task set 8.0 with 427 tasks
    17/07/03 10:05:14 INFO TaskSetManager: Starting task 0.0 in stage 8.0 (TID 448, x.x.x.199, executor 1, partition 0, NODE_LOCAL, 5883 bytes)
    17/07/03 10:05:14 INFO TaskSetManager: Starting task 1.0 in stage 8.0 (TID 449, x.x.x.200, executor 0, partition 1, NODE_LOCAL, 5883 bytes)
    17/07/03 10:05:14 INFO TaskSetManager: Starting task 2.0 in stage 8.0 (TID 450, x.x.x.199, executor 1, partition 2, NODE_LOCAL, 5883 bytes)
    17/07/03 10:05:25 INFO TaskSetManager: Starting task 18.0 in stage 8.0 (TID 466, x.x.x.200, executor 0, partition 18, NODE_LOCAL, 5883 bytes)
    17/07/03 10:05:25 WARN TaskSetManager: Lost task 3.0 in stage 8.0 (TID 451, x.x.x.200, executor 0): com.mongodb.MongoTimeoutException: Timed out while waiting for a server that matches AnyServerSelector{} after 3046 ms
    at com.mongodb.BaseCluster.getServer(BaseCluster.java:87)
    at com.mongodb.DBTCPConnector.getServer(DBTCPConnector.java:654)
    at com.mongodb.DBTCPConnector.access$300(DBTCPConnector.java:39)
    at com.mongodb.DBTCPConnector$MyPort.getConnection(DBTCPConnector.java:503)
    at com.mongodb.DBTCPConnector$MyPort.get(DBTCPConnector.java:451)
    at com.mongodb.DBTCPConnector.getPrimaryPort(DBTCPConnector.java:409)
    at com.mongodb.DBCollectionImpl.executeBulkWriteOperation(DBCollectionImpl.java:142)
    at com.mongodb.DBCollection.executeBulkWriteOperation(DBCollection.java:1663)
    at com.mongodb.DBCollection.executeBulkWriteOperation(DBCollection.java:1659)
    at com.mongodb.BulkWriteOperation.execute(BulkWriteOperation.java:90)
    at com.mongodb.hadoop.output.MongoOutputCommitter.commitTask(MongoOutputCommitter.java:167)
    at com.mongodb.hadoop.output.MongoOutputCommitter.commitTask(MongoOutputCommitter.java:70)
    at org.apache.spark.rdd.PairRDDFunctions$$anonfun$saveAsNewAPIHadoopDataset$1$$anonfun$12.apply(PairRDDFunctions.scala:1132)
    at org.apache.spark.rdd.PairRDDFunctions$$anonfun$saveAsNewAPIHadoopDataset$1$$anonfun$12.apply(PairRDDFunctions.scala:1102)
    at org.apache.spark.scheduler.ResultTask.runTask(ResultTask.scala:87)
    at org.apache.spark.scheduler.Task.run(Task.scala:99)
    at org.apache.spark.executor.Executor$TaskRunner.run(Executor.scala:322)
    at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1142)
    at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:617)
    at java.lang.Thread.run(Thread.java:748)

    ReplyDelete
  12. Upto mapreduce part is working fine.

    ReplyDelete
  13. what is your script for saving the result to mongodb? is it pointing to the correct server?

    ReplyDelete
  14. This comment has been removed by the author.

    ReplyDelete
  15. The same script of saving that is in this blog.

    i.e. poorReview.saveToMongoDB('mongodb://mymongodbserver:27017/test.poor_count')

    I can retrieve data from mymongodbserver , but I cannot save the mapreduce result

    ReplyDelete
  16. Now its working. It was due to multiple SparkSubmit.

    ReplyDelete
  17. Thanks a lot Clarence.

    If I want to save the result in HDFS instead of saveToMongoDB, what should be the command for it?

    Currently I am using pymongo_spark __version__ = '0.1'

    ReplyDelete
  18. Hi Rex. It is just a pyspark RDD. Save it as you would normally do.

    ReplyDelete
  19. Hi Clarence,
    I have followed your instructions and my program is running fine but it's not saving any data to MongoDB. It's not throwing any errors even.

    ReplyDelete