tag:blogger.com,1999:blog-3963615495292613087.post8820602923118577992..comments2023-09-23T04:06:33.472-07:00Comments on Clarence's Wicked Mind: Apache Spark with MongoDB (using pymongo-spark)Unknownnoreply@blogger.comBlogger19125tag:blogger.com,1999:blog-3963615495292613087.post-29116603801740346662018-05-02T02:45:17.002-07:002018-05-02T02:45:17.002-07:00Hi Clarence,
I have followed your instructions and...Hi Clarence,<br />I have followed your instructions and my program is running fine but it's not saving any data to MongoDB. It's not throwing any errors even.<br />Rahulhttps://www.blogger.com/profile/06424076605591510709noreply@blogger.comtag:blogger.com,1999:blog-3963615495292613087.post-51912439516671822992017-07-09T00:15:44.458-07:002017-07-09T00:15:44.458-07:00Hi Rex. It is just a pyspark RDD. Save it as you ...Hi Rex. It is just a pyspark RDD. Save it as you would normally do.Clarencehttps://www.blogger.com/profile/05733956879669528278noreply@blogger.comtag:blogger.com,1999:blog-3963615495292613087.post-80653842690084580102017-07-07T23:31:48.600-07:002017-07-07T23:31:48.600-07:00Thanks a lot Clarence.
If I want to save the resu...Thanks a lot Clarence.<br /><br />If I want to save the result in HDFS instead of saveToMongoDB, what should be the command for it? <br /><br />Currently I am using pymongo_spark __version__ = '0.1'<br />Rexhttps://www.blogger.com/profile/09777556359281623286noreply@blogger.comtag:blogger.com,1999:blog-3963615495292613087.post-74274270154779417932017-07-07T23:26:36.657-07:002017-07-07T23:26:36.657-07:00Now its working. It was due to multiple SparkSubmi...Now its working. It was due to multiple SparkSubmit.<br />Rexhttps://www.blogger.com/profile/09777556359281623286noreply@blogger.comtag:blogger.com,1999:blog-3963615495292613087.post-10013554267423890122017-07-07T10:32:08.449-07:002017-07-07T10:32:08.449-07:00The same script of saving that is in this blog.
i...The same script of saving that is in this blog.<br /><br />i.e. poorReview.saveToMongoDB('mongodb://mymongodbserver:27017/test.poor_count')<br /><br />I can retrieve data from mymongodbserver , but I cannot save the mapreduce resultRexhttps://www.blogger.com/profile/09777556359281623286noreply@blogger.comtag:blogger.com,1999:blog-3963615495292613087.post-20068696933285226782017-07-07T10:31:12.281-07:002017-07-07T10:31:12.281-07:00This comment has been removed by the author.Rexhttps://www.blogger.com/profile/09777556359281623286noreply@blogger.comtag:blogger.com,1999:blog-3963615495292613087.post-36701678922613381722017-07-03T09:00:06.836-07:002017-07-03T09:00:06.836-07:00what is your script for saving the result to mongo...what is your script for saving the result to mongodb? is it pointing to the correct server?Clarencehttps://www.blogger.com/profile/05733956879669528278noreply@blogger.comtag:blogger.com,1999:blog-3963615495292613087.post-60595077877721997992017-07-02T22:03:33.883-07:002017-07-02T22:03:33.883-07:00Upto mapreduce part is working fine.
Upto mapreduce part is working fine. <br />Rexhttps://www.blogger.com/profile/09777556359281623286noreply@blogger.comtag:blogger.com,1999:blog-3963615495292613087.post-38735380538273657742017-07-02T22:01:05.140-07:002017-07-02T22:01:05.140-07:00
17/07/03 10:05:14 INFO DAGScheduler: Submitting 4...<br />17/07/03 10:05:14 INFO DAGScheduler: Submitting 427 missing tasks from ResultStage 8 (MapPartitionsRDD[15] at map at PythonHadoopUtil.scala:181)<br />17/07/03 10:05:14 INFO TaskSchedulerImpl: Adding task set 8.0 with 427 tasks<br />17/07/03 10:05:14 INFO TaskSetManager: Starting task 0.0 in stage 8.0 (TID 448, x.x.x.199, executor 1, partition 0, NODE_LOCAL, 5883 bytes)<br />17/07/03 10:05:14 INFO TaskSetManager: Starting task 1.0 in stage 8.0 (TID 449, x.x.x.200, executor 0, partition 1, NODE_LOCAL, 5883 bytes)<br />17/07/03 10:05:14 INFO TaskSetManager: Starting task 2.0 in stage 8.0 (TID 450, x.x.x.199, executor 1, partition 2, NODE_LOCAL, 5883 bytes)<br />17/07/03 10:05:25 INFO TaskSetManager: Starting task 18.0 in stage 8.0 (TID 466, x.x.x.200, executor 0, partition 18, NODE_LOCAL, 5883 bytes)<br />17/07/03 10:05:25 WARN TaskSetManager: Lost task 3.0 in stage 8.0 (TID 451, x.x.x.200, executor 0): com.mongodb.MongoTimeoutException: Timed out while waiting for a server that matches AnyServerSelector{} after 3046 ms<br /> at com.mongodb.BaseCluster.getServer(BaseCluster.java:87)<br /> at com.mongodb.DBTCPConnector.getServer(DBTCPConnector.java:654)<br /> at com.mongodb.DBTCPConnector.access$300(DBTCPConnector.java:39)<br /> at com.mongodb.DBTCPConnector$MyPort.getConnection(DBTCPConnector.java:503)<br /> at com.mongodb.DBTCPConnector$MyPort.get(DBTCPConnector.java:451)<br /> at com.mongodb.DBTCPConnector.getPrimaryPort(DBTCPConnector.java:409)<br /> at com.mongodb.DBCollectionImpl.executeBulkWriteOperation(DBCollectionImpl.java:142)<br /> at com.mongodb.DBCollection.executeBulkWriteOperation(DBCollection.java:1663)<br /> at com.mongodb.DBCollection.executeBulkWriteOperation(DBCollection.java:1659)<br /> at com.mongodb.BulkWriteOperation.execute(BulkWriteOperation.java:90)<br /> at com.mongodb.hadoop.output.MongoOutputCommitter.commitTask(MongoOutputCommitter.java:167)<br /> at com.mongodb.hadoop.output.MongoOutputCommitter.commitTask(MongoOutputCommitter.java:70)<br /> at org.apache.spark.rdd.PairRDDFunctions$$anonfun$saveAsNewAPIHadoopDataset$1$$anonfun$12.apply(PairRDDFunctions.scala:1132)<br /> at org.apache.spark.rdd.PairRDDFunctions$$anonfun$saveAsNewAPIHadoopDataset$1$$anonfun$12.apply(PairRDDFunctions.scala:1102)<br /> at org.apache.spark.scheduler.ResultTask.runTask(ResultTask.scala:87)<br /> at org.apache.spark.scheduler.Task.run(Task.scala:99)<br /> at org.apache.spark.executor.Executor$TaskRunner.run(Executor.scala:322)<br /> at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1142)<br /> at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:617)<br /> at java.lang.Thread.run(Thread.java:748)Rexhttps://www.blogger.com/profile/09777556359281623286noreply@blogger.comtag:blogger.com,1999:blog-3963615495292613087.post-3386496643900491532017-07-02T22:00:28.264-07:002017-07-02T22:00:28.264-07:0017/07/03 10:05:14 INFO DAGScheduler: Submitting 1 ...17/07/03 10:05:14 INFO DAGScheduler: Submitting 1 missing tasks from ResultStage 6 (MapPartitionsRDD[13] at mapPartitions at SerDeUtil.scala:148)<br />17/07/03 10:05:14 INFO TaskSchedulerImpl: Adding task set 6.0 with 1 tasks<br />17/07/03 10:05:14 INFO TaskSetManager: Starting task 0.0 in stage 6.0 (TID 447, x.x.x.199, executor 1, partition 0, NODE_LOCAL, 5865 bytes)<br />17/07/03 10:05:14 INFO BlockManagerInfo: Added broadcast_10_piece0 in memory on x.x.x.199:43308 (size: 3.9 KB, free: 362.0 MB)<br />17/07/03 10:05:14 INFO MapOutputTrackerMasterEndpoint: Asked to send map output locations for shuffle 0 to x.x.x.199:58144<br />17/07/03 10:05:14 INFO TaskSetManager: Finished task 0.0 in stage 6.0 (TID 447) in 105 ms on x.x.x.199 (executor 1) (1/1)<br />17/07/03 10:05:14 INFO TaskSchedulerImpl: Removed TaskSet 6.0, whose tasks have all completed, from pool <br />17/07/03 10:05:14 INFO DAGScheduler: ResultStage 6 (take at SerDeUtil.scala:233) finished in 0.106 s<br />17/07/03 10:05:14 INFO DAGScheduler: Job 4 finished: take at SerDeUtil.scala:233, took 0.125786 s<br />17/07/03 10:05:14 INFO Converter: Loaded converter: com.mongodb.spark.pickle.NoopConverter<br />17/07/03 10:05:14 INFO Converter: Loaded converter: com.mongodb.spark.pickle.NoopConverter<br />17/07/03 10:05:14 INFO deprecation: mapred.output.dir is deprecated. Instead, use mapreduce.output.fileoutputformat.outputdir<br />17/07/03 10:05:14 INFO MongoOutputCommitter: Setting up job.<br />17/07/03 10:05:14 INFO SparkContext: Starting job: saveAsNewAPIHadoopFile at PythonRDD.scala:834<br />17/07/03 10:05:14 INFO DAGScheduler: Got job 5 (saveAsNewAPIHadoopFile at PythonRDD.scala:834) with 427 output partitions<br />17/07/03 10:05:14 INFO DAGScheduler: Final stage: ResultStage 8 (saveAsNewAPIHadoopFile at PythonRDD.scala:834)<br />17/07/03 10:05:14 INFO DAGScheduler: Parents of final stage: List(ShuffleMapStage 7)<br />17/07/03 10:05:14 INFO DAGScheduler: Missing parents: List()<br />17/07/03 10:05:14 INFO DAGScheduler: Submitting ResultStage 8 (MapPartitionsRDD[15] at map at PythonHadoopUtil.scala:181), which has no missing parents<br />17/07/03 10:05:14 INFO MemoryStore: Block broadcast_11 stored as values in memory (estimated size 67.0 KB, free 2.5 GB)<br />17/07/03 10:05:14 INFO MemoryStore: Block broadcast_11_piece0 stored as bytes in memory (estimated size 23.2 KB, free 2.5 GB)<br />17/07/03 10:05:14 INFO BlockManagerInfo: Added broadcast_11_piece0 in memory on x.x.x.198:35614 (size: 23.2 KB, free: 2.5 GB)<br />17/07/03 10:05:14 INFO SparkContext: Created broadcast 11 from broadcast at DAGScheduler.scala:996Anonymousnoreply@blogger.comtag:blogger.com,1999:blog-3963615495292613087.post-39761125293603599722017-07-02T21:58:04.943-07:002017-07-02T21:58:04.943-07:00Log details: worker node1 ip: x.x.x.199 worker nod...Log details: worker node1 ip: x.x.x.199 worker node2 ip: x.x.x.200 <br /><br />17/07/03 10:05:14 INFO TaskSetManager: Finished task 426.0 in stage 3.0 (TID 445) in 10069 ms on x.x.x.199 (executor 1) (427/427)<br />17/07/03 10:05:14 INFO TaskSchedulerImpl: Removed TaskSet 3.0, whose tasks have all completed, from pool <br />17/07/03 10:05:14 INFO DAGScheduler: ShuffleMapStage 3 (reduceByKey at /home/hduser/try/test.py:25) finished in 298.476 s<br />17/07/03 10:05:14 INFO DAGScheduler: looking for newly runnable stages<br />17/07/03 10:05:14 INFO DAGScheduler: running: Set()<br />17/07/03 10:05:14 INFO DAGScheduler: waiting: Set(ResultStage 4)<br />17/07/03 10:05:14 INFO DAGScheduler: failed: Set()<br />17/07/03 10:05:14 INFO DAGScheduler: Submitting ResultStage 4 (PythonRDD[11] at RDD at PythonRDD.scala:48), which has no missing parents<br />17/07/03 10:05:14 INFO MemoryStore: Block broadcast_9 stored as values in memory (estimated size 6.3 KB, free 2.5 GB)<br />17/07/03 10:05:14 INFO MemoryStore: Block broadcast_9_piece0 stored as bytes in memory (estimated size 3.9 KB, free 2.5 GB)<br />17/07/03 10:05:14 INFO BlockManagerInfo: Added broadcast_9_piece0 in memory on x.x.x.198:35614 (size: 3.9 KB, free: 2.5 GB)<br />17/07/03 10:05:14 INFO SparkContext: Created broadcast 9 from broadcast at DAGScheduler.scala:996<br />17/07/03 10:05:14 INFO DAGScheduler: Submitting 1 missing tasks from ResultStage 4 (PythonRDD[11] at RDD at PythonRDD.scala:48)<br />17/07/03 10:05:14 INFO TaskSchedulerImpl: Adding task set 4.0 with 1 tasks<br />17/07/03 10:05:14 INFO TaskSetManager: Starting task 0.0 in stage 4.0 (TID 446, x.x.x.200, executor 0, partition 0, NODE_LOCAL, 5783 bytes)<br />17/07/03 10:05:14 INFO BlockManagerInfo: Added broadcast_9_piece0 in memory on x.x.x.200:43469 (size: 3.9 KB, free: 362.0 MB)<br />17/07/03 10:05:14 INFO MapOutputTrackerMasterEndpoint: Asked to send map output locations for shuffle 0 to x.x.x.200:40972<br />17/07/03 10:05:14 INFO MapOutputTrackerMaster: Size of output statuses for shuffle 0 is 46256 bytes<br />17/07/03 10:05:14 INFO TaskSetManager: Finished task 0.0 in stage 4.0 (TID 446) in 137 ms on x.x.x.200 (executor 0) (1/1)<br />17/07/03 10:05:14 INFO TaskSchedulerImpl: Removed TaskSet 4.0, whose tasks have all completed, from pool <br />17/07/03 10:05:14 INFO DAGScheduler: ResultStage 4 (runJob at PythonRDD.scala:441) finished in 0.138 s<br />17/07/03 10:05:14 INFO DAGScheduler: Job 3 finished: runJob at PythonRDD.scala:441, took 298.676021 s<br />17/07/03 10:05:14 INFO SparkContext: Starting job: take at SerDeUtil.scala:233<br />17/07/03 10:05:14 INFO DAGScheduler: Got job 4 (take at SerDeUtil.scala:233) with 1 output partitions<br />17/07/03 10:05:14 INFO DAGScheduler: Final stage: ResultStage 6 (take at SerDeUtil.scala:233)<br />17/07/03 10:05:14 INFO DAGScheduler: Parents of final stage: List(ShuffleMapStage 5)<br />17/07/03 10:05:14 INFO DAGScheduler: Missing parents: List()<br />17/07/03 10:05:14 INFO DAGScheduler: Submitting ResultStage 6 (MapPartitionsRDD[13] at mapPartitions at SerDeUtil.scala:148), which has no missing parents<br />17/07/03 10:05:14 INFO MemoryStore: Block broadcast_10 stored as values in memory (estimated size 6.4 KB, free 2.5 GB)<br />17/07/03 10:05:14 INFO MemoryStore: Block broadcast_10_piece0 stored as bytes in memory (estimated size 3.9 KB, free 2.5 GB)<br />17/07/03 10:05:14 INFO BlockManagerInfo: Added broadcast_10_piece0 in memory on x.x.x.198:35614 (size: 3.9 KB, free: 2.5 GB)<br />17/07/03 10:05:14 INFO SparkContext: Created broadcast 10 from broadcast at DAGScheduler.scala:996Anonymousnoreply@blogger.comtag:blogger.com,1999:blog-3963615495292613087.post-56229914430727164792017-07-02T21:02:22.529-07:002017-07-02T21:02:22.529-07:00Yes it can retrieve data from mongodb. Yes it can retrieve data from mongodb. Rexhttps://www.blogger.com/profile/09777556359281623286noreply@blogger.comtag:blogger.com,1999:blog-3963615495292613087.post-63553263794896861542017-07-02T15:59:47.314-07:002017-07-02T15:59:47.314-07:00hi rex. when you run the job, do you see the worke...hi rex. when you run the job, do you see the workers connected to mongodb and retrieving data?Clarencehttps://www.blogger.com/profile/05733956879669528278noreply@blogger.comtag:blogger.com,1999:blog-3963615495292613087.post-60936322558858227952017-07-02T10:14:54.923-07:002017-07-02T10:14:54.923-07:00I have build the pymongo_spark using the command ....I have build the pymongo_spark using the command ./gradlew jar<br />Yes I have used the same command you have mention in the post:<br />bin/spark-submit --master spark://:7077 --conf "spark.eventLog.enabled=true" --py-files "my_loc/pymongo_spark.py" --driver-class-path="my_loc/lib/mongo-hadoop-spark-1.5.0-SNAPSHOT.jar" --executor-memory=256m my_loc/yelp_poor2.py<br /><br />My configuration<br />I have two worker nodes and one master node.<br />Worker nodes communicate with the mongodb server but during this command:<br />poorReview.saveToMongoDB('mongodb://master_ip:27017/test.poor_count')<br /><br />I got this error.<br />Lost task 2.0 in stage 8.0 (TID 450, worker_ip, executor 1): com.mongodb.MongoTimeoutException: Timed out while waiting for a server that matches AnyServerSelector{} after 2571 ms<br /><br />so the result is not save to poor_count.Rexhttps://www.blogger.com/profile/09777556359281623286noreply@blogger.comtag:blogger.com,1999:blog-3963615495292613087.post-52288952746508774312017-07-02T10:12:03.950-07:002017-07-02T10:12:03.950-07:00This comment has been removed by the author.Rexhttps://www.blogger.com/profile/09777556359281623286noreply@blogger.comtag:blogger.com,1999:blog-3963615495292613087.post-31937482387558028002017-07-01T10:35:15.908-07:002017-07-01T10:35:15.908-07:00@rex have you built the pymongo_spark code? Also,...@rex have you built the pymongo_spark code? Also, when you submit your Spark application, do you use the "--py-files" parameter to distribute the pymongo_spark files?Clarencehttps://www.blogger.com/profile/05733956879669528278noreply@blogger.comtag:blogger.com,1999:blog-3963615495292613087.post-69173429071897286802017-06-30T23:21:16.292-07:002017-06-30T23:21:16.292-07:00Hi Clarence,
I have done all the setting that is ...Hi Clarence,<br /><br />I have done all the setting that is said in your blog. But I could not solve the following issue: <br />ImportError: No module named pymongo_spark.<br /><br /><br />What could be the issue for such case??Rexhttps://www.blogger.com/profile/09777556359281623286noreply@blogger.comtag:blogger.com,1999:blog-3963615495292613087.post-13853123041275988482016-07-30T18:03:52.763-07:002016-07-30T18:03:52.763-07:00Hi Shyam. I haven't used mongo-connector. Bu...Hi Shyam. I haven't used mongo-connector. But if you are using spark-submit, you can use the "--packages" option to use maven dependency. Or, use "--jars" to specify the location of your lib.Clarencehttps://www.blogger.com/profile/05733956879669528278noreply@blogger.comtag:blogger.com,1999:blog-3963615495292613087.post-77662889030137115202016-07-29T09:03:45.046-07:002016-07-29T09:03:45.046-07:00Hello Clarence
I am trying to import the mongo-co...Hello Clarence<br /><br />I am trying to import the mongo-connector into spark-1.6.1. I dont see a lib file present in the SPARK_HOME. Any idea where i will need to put the jars to get the import org.mongodb working.<br /><br />Thanks<br />ShyamShyam Kumarhttps://www.blogger.com/profile/18190899877758970369noreply@blogger.com