This post is about using the "unstable" pymongo-spark library to create MongoDB backed RDD.
First, get the mongo-hadoop source tree from github:
git clone https://github.com/mongodb/mongo-hadoop.git
We need to build a jar and copy a Python script from the source.
For building the jar:
When done, the only jar we need is (version number depends on the source you checkout)
Copy it to the lib directory under Spark. Also need to add it to the classpath when using pySpark or spark-submit.
And the Python file we need from the source tree is located at:
Save it under the same folder as the application script and remember to use "--py-files" parameter to deploy it.
Also make sure you have pymongo installed. e.g. for Debian machines
sudo apt-get install python-pymongo
Finally, the application script. The pymongo-spark needs to be activated before loading / writing RDDs. Also, the structure of the loaded RDD is slightly different when compared with RDD using mongo-hadoop.
from operator import add
from pyspark import SparkContext, SparkConf
if __name__ == "__main__":
# initiate pymongo-spark
conf = SparkConf() \
sc = SparkContext(conf=conf)
# we know the number of business is much less than reviews
# create a dictionary of stars given to each business
businessStar = sc.mongoRDD('mongodb://192.168.1.69:27017/test.yelp_business') \
.map(lambda b: (b['business_id'], b['stars'])).collectAsMap()
# create and process review RDD
poorReview = sc.mongoRDD('mongodb://192.168.1.69:27017/test.yelp_review') \
.filter(lambda r: r['stars'] < businessStar[r['business_id']]) \
.map(lambda r: (r['business_id'], 1)) \
# save output back to MongoDB
bin/spark-submit --master spark://192.168.1.10:7077 --conf "spark.eventLog.enabled=true" --py-files "/home/spark/test/spark_mongodb/pymongo_spark.py" --driver-class-path="/home/spark/spark-1.4.1-bin-hadoop2.6/lib/mongo-hadoop-spark-1.5.0-SNAPSHOT.jar" --executor-memory=256m ~/test/spark_mongodb/yelp_poor2.py