This post is about using the "unstable" pymongo-spark library to create MongoDB backed RDD.
First, get the mongo-hadoop source tree from github:
git clone https://github.com/mongodb/mongo-hadoop.git
We need to build a jar and copy a Python script from the source.
For building the jar:
./gradlew jar
When done, the only jar we need is (version number depends on the source you checkout)
spark/build/libs/mongo-hadoop-spark-1.5.0-SNAPSHOT.jar
Copy it to the lib directory under Spark. Also need to add it to the classpath when using pySpark or spark-submit.
And the Python file we need from the source tree is located at:
mongo-hadoop/spark/src/main/python/pymongo_spark.py
Save it under the same folder as the application script and remember to use "--py-files" parameter to deploy it.
Also make sure you have pymongo installed. e.g. for Debian machines
sudo apt-get install python-pymongo
Finally, the application script. The pymongo-spark needs to be activated before loading / writing RDDs. Also, the structure of the loaded RDD is slightly different when compared with RDD using mongo-hadoop.
from operator import add
from pyspark import SparkContext, SparkConf
import pymongo_spark
if __name__ == "__main__":
# initiate pymongo-spark
pymongo_spark.activate()
conf = SparkConf() \
.setAppName("SparkMongoDB2") \
.set('spark.executor.extraClassPath', '/home/spark/spark-1.4.1-bin-hadoop2.6/lib/mongo-hadoop-spark-1.5.0-SNAPSHOT.jar')
sc = SparkContext(conf=conf)
# we know the number of business is much less than reviews
# create a dictionary of stars given to each business
businessStar = sc.mongoRDD('mongodb://192.168.1.69:27017/test.yelp_business') \
.map(lambda b: (b['business_id'], b['stars'])).collectAsMap()
# create and process review RDD
poorReview = sc.mongoRDD('mongodb://192.168.1.69:27017/test.yelp_review') \
.filter(lambda r: r['stars'] < businessStar[r['business_id']]) \
.map(lambda r: (r['business_id'], 1)) \
.reduceByKey(add)
# save output back to MongoDB
poorReview.saveToMongoDB('mongodb://192.168.1.69:27017/test.poor_count')
sc.stop()
bin/spark-submit --master spark://192.168.1.10:7077 --conf "spark.eventLog.enabled=true" --py-files "/home/spark/test/spark_mongodb/pymongo_spark.py" --driver-class-path="/home/spark/spark-1.4.1-bin-hadoop2.6/lib/mongo-hadoop-spark-1.5.0-SNAPSHOT.jar" --executor-memory=256m ~/test/spark_mongodb/yelp_poor2.py
Hello Clarence
ReplyDeleteI am trying to import the mongo-connector into spark-1.6.1. I dont see a lib file present in the SPARK_HOME. Any idea where i will need to put the jars to get the import org.mongodb working.
Thanks
Shyam
Hi Shyam. I haven't used mongo-connector. But if you are using spark-submit, you can use the "--packages" option to use maven dependency. Or, use "--jars" to specify the location of your lib.
ReplyDeleteHi Clarence,
ReplyDeleteI have done all the setting that is said in your blog. But I could not solve the following issue:
ImportError: No module named pymongo_spark.
What could be the issue for such case??
@rex have you built the pymongo_spark code? Also, when you submit your Spark application, do you use the "--py-files" parameter to distribute the pymongo_spark files?
ReplyDeleteThis comment has been removed by the author.
ReplyDeleteI have build the pymongo_spark using the command ./gradlew jar
ReplyDeleteYes I have used the same command you have mention in the post:
bin/spark-submit --master spark://:7077 --conf "spark.eventLog.enabled=true" --py-files "my_loc/pymongo_spark.py" --driver-class-path="my_loc/lib/mongo-hadoop-spark-1.5.0-SNAPSHOT.jar" --executor-memory=256m my_loc/yelp_poor2.py
My configuration
I have two worker nodes and one master node.
Worker nodes communicate with the mongodb server but during this command:
poorReview.saveToMongoDB('mongodb://master_ip:27017/test.poor_count')
I got this error.
Lost task 2.0 in stage 8.0 (TID 450, worker_ip, executor 1): com.mongodb.MongoTimeoutException: Timed out while waiting for a server that matches AnyServerSelector{} after 2571 ms
so the result is not save to poor_count.
hi rex. when you run the job, do you see the workers connected to mongodb and retrieving data?
ReplyDeleteYes it can retrieve data from mongodb.
ReplyDeleteLog details: worker node1 ip: x.x.x.199 worker node2 ip: x.x.x.200
ReplyDelete17/07/03 10:05:14 INFO TaskSetManager: Finished task 426.0 in stage 3.0 (TID 445) in 10069 ms on x.x.x.199 (executor 1) (427/427)
17/07/03 10:05:14 INFO TaskSchedulerImpl: Removed TaskSet 3.0, whose tasks have all completed, from pool
17/07/03 10:05:14 INFO DAGScheduler: ShuffleMapStage 3 (reduceByKey at /home/hduser/try/test.py:25) finished in 298.476 s
17/07/03 10:05:14 INFO DAGScheduler: looking for newly runnable stages
17/07/03 10:05:14 INFO DAGScheduler: running: Set()
17/07/03 10:05:14 INFO DAGScheduler: waiting: Set(ResultStage 4)
17/07/03 10:05:14 INFO DAGScheduler: failed: Set()
17/07/03 10:05:14 INFO DAGScheduler: Submitting ResultStage 4 (PythonRDD[11] at RDD at PythonRDD.scala:48), which has no missing parents
17/07/03 10:05:14 INFO MemoryStore: Block broadcast_9 stored as values in memory (estimated size 6.3 KB, free 2.5 GB)
17/07/03 10:05:14 INFO MemoryStore: Block broadcast_9_piece0 stored as bytes in memory (estimated size 3.9 KB, free 2.5 GB)
17/07/03 10:05:14 INFO BlockManagerInfo: Added broadcast_9_piece0 in memory on x.x.x.198:35614 (size: 3.9 KB, free: 2.5 GB)
17/07/03 10:05:14 INFO SparkContext: Created broadcast 9 from broadcast at DAGScheduler.scala:996
17/07/03 10:05:14 INFO DAGScheduler: Submitting 1 missing tasks from ResultStage 4 (PythonRDD[11] at RDD at PythonRDD.scala:48)
17/07/03 10:05:14 INFO TaskSchedulerImpl: Adding task set 4.0 with 1 tasks
17/07/03 10:05:14 INFO TaskSetManager: Starting task 0.0 in stage 4.0 (TID 446, x.x.x.200, executor 0, partition 0, NODE_LOCAL, 5783 bytes)
17/07/03 10:05:14 INFO BlockManagerInfo: Added broadcast_9_piece0 in memory on x.x.x.200:43469 (size: 3.9 KB, free: 362.0 MB)
17/07/03 10:05:14 INFO MapOutputTrackerMasterEndpoint: Asked to send map output locations for shuffle 0 to x.x.x.200:40972
17/07/03 10:05:14 INFO MapOutputTrackerMaster: Size of output statuses for shuffle 0 is 46256 bytes
17/07/03 10:05:14 INFO TaskSetManager: Finished task 0.0 in stage 4.0 (TID 446) in 137 ms on x.x.x.200 (executor 0) (1/1)
17/07/03 10:05:14 INFO TaskSchedulerImpl: Removed TaskSet 4.0, whose tasks have all completed, from pool
17/07/03 10:05:14 INFO DAGScheduler: ResultStage 4 (runJob at PythonRDD.scala:441) finished in 0.138 s
17/07/03 10:05:14 INFO DAGScheduler: Job 3 finished: runJob at PythonRDD.scala:441, took 298.676021 s
17/07/03 10:05:14 INFO SparkContext: Starting job: take at SerDeUtil.scala:233
17/07/03 10:05:14 INFO DAGScheduler: Got job 4 (take at SerDeUtil.scala:233) with 1 output partitions
17/07/03 10:05:14 INFO DAGScheduler: Final stage: ResultStage 6 (take at SerDeUtil.scala:233)
17/07/03 10:05:14 INFO DAGScheduler: Parents of final stage: List(ShuffleMapStage 5)
17/07/03 10:05:14 INFO DAGScheduler: Missing parents: List()
17/07/03 10:05:14 INFO DAGScheduler: Submitting ResultStage 6 (MapPartitionsRDD[13] at mapPartitions at SerDeUtil.scala:148), which has no missing parents
17/07/03 10:05:14 INFO MemoryStore: Block broadcast_10 stored as values in memory (estimated size 6.4 KB, free 2.5 GB)
17/07/03 10:05:14 INFO MemoryStore: Block broadcast_10_piece0 stored as bytes in memory (estimated size 3.9 KB, free 2.5 GB)
17/07/03 10:05:14 INFO BlockManagerInfo: Added broadcast_10_piece0 in memory on x.x.x.198:35614 (size: 3.9 KB, free: 2.5 GB)
17/07/03 10:05:14 INFO SparkContext: Created broadcast 10 from broadcast at DAGScheduler.scala:996
17/07/03 10:05:14 INFO DAGScheduler: Submitting 1 missing tasks from ResultStage 6 (MapPartitionsRDD[13] at mapPartitions at SerDeUtil.scala:148)
ReplyDelete17/07/03 10:05:14 INFO TaskSchedulerImpl: Adding task set 6.0 with 1 tasks
17/07/03 10:05:14 INFO TaskSetManager: Starting task 0.0 in stage 6.0 (TID 447, x.x.x.199, executor 1, partition 0, NODE_LOCAL, 5865 bytes)
17/07/03 10:05:14 INFO BlockManagerInfo: Added broadcast_10_piece0 in memory on x.x.x.199:43308 (size: 3.9 KB, free: 362.0 MB)
17/07/03 10:05:14 INFO MapOutputTrackerMasterEndpoint: Asked to send map output locations for shuffle 0 to x.x.x.199:58144
17/07/03 10:05:14 INFO TaskSetManager: Finished task 0.0 in stage 6.0 (TID 447) in 105 ms on x.x.x.199 (executor 1) (1/1)
17/07/03 10:05:14 INFO TaskSchedulerImpl: Removed TaskSet 6.0, whose tasks have all completed, from pool
17/07/03 10:05:14 INFO DAGScheduler: ResultStage 6 (take at SerDeUtil.scala:233) finished in 0.106 s
17/07/03 10:05:14 INFO DAGScheduler: Job 4 finished: take at SerDeUtil.scala:233, took 0.125786 s
17/07/03 10:05:14 INFO Converter: Loaded converter: com.mongodb.spark.pickle.NoopConverter
17/07/03 10:05:14 INFO Converter: Loaded converter: com.mongodb.spark.pickle.NoopConverter
17/07/03 10:05:14 INFO deprecation: mapred.output.dir is deprecated. Instead, use mapreduce.output.fileoutputformat.outputdir
17/07/03 10:05:14 INFO MongoOutputCommitter: Setting up job.
17/07/03 10:05:14 INFO SparkContext: Starting job: saveAsNewAPIHadoopFile at PythonRDD.scala:834
17/07/03 10:05:14 INFO DAGScheduler: Got job 5 (saveAsNewAPIHadoopFile at PythonRDD.scala:834) with 427 output partitions
17/07/03 10:05:14 INFO DAGScheduler: Final stage: ResultStage 8 (saveAsNewAPIHadoopFile at PythonRDD.scala:834)
17/07/03 10:05:14 INFO DAGScheduler: Parents of final stage: List(ShuffleMapStage 7)
17/07/03 10:05:14 INFO DAGScheduler: Missing parents: List()
17/07/03 10:05:14 INFO DAGScheduler: Submitting ResultStage 8 (MapPartitionsRDD[15] at map at PythonHadoopUtil.scala:181), which has no missing parents
17/07/03 10:05:14 INFO MemoryStore: Block broadcast_11 stored as values in memory (estimated size 67.0 KB, free 2.5 GB)
17/07/03 10:05:14 INFO MemoryStore: Block broadcast_11_piece0 stored as bytes in memory (estimated size 23.2 KB, free 2.5 GB)
17/07/03 10:05:14 INFO BlockManagerInfo: Added broadcast_11_piece0 in memory on x.x.x.198:35614 (size: 23.2 KB, free: 2.5 GB)
17/07/03 10:05:14 INFO SparkContext: Created broadcast 11 from broadcast at DAGScheduler.scala:996
ReplyDelete17/07/03 10:05:14 INFO DAGScheduler: Submitting 427 missing tasks from ResultStage 8 (MapPartitionsRDD[15] at map at PythonHadoopUtil.scala:181)
17/07/03 10:05:14 INFO TaskSchedulerImpl: Adding task set 8.0 with 427 tasks
17/07/03 10:05:14 INFO TaskSetManager: Starting task 0.0 in stage 8.0 (TID 448, x.x.x.199, executor 1, partition 0, NODE_LOCAL, 5883 bytes)
17/07/03 10:05:14 INFO TaskSetManager: Starting task 1.0 in stage 8.0 (TID 449, x.x.x.200, executor 0, partition 1, NODE_LOCAL, 5883 bytes)
17/07/03 10:05:14 INFO TaskSetManager: Starting task 2.0 in stage 8.0 (TID 450, x.x.x.199, executor 1, partition 2, NODE_LOCAL, 5883 bytes)
17/07/03 10:05:25 INFO TaskSetManager: Starting task 18.0 in stage 8.0 (TID 466, x.x.x.200, executor 0, partition 18, NODE_LOCAL, 5883 bytes)
17/07/03 10:05:25 WARN TaskSetManager: Lost task 3.0 in stage 8.0 (TID 451, x.x.x.200, executor 0): com.mongodb.MongoTimeoutException: Timed out while waiting for a server that matches AnyServerSelector{} after 3046 ms
at com.mongodb.BaseCluster.getServer(BaseCluster.java:87)
at com.mongodb.DBTCPConnector.getServer(DBTCPConnector.java:654)
at com.mongodb.DBTCPConnector.access$300(DBTCPConnector.java:39)
at com.mongodb.DBTCPConnector$MyPort.getConnection(DBTCPConnector.java:503)
at com.mongodb.DBTCPConnector$MyPort.get(DBTCPConnector.java:451)
at com.mongodb.DBTCPConnector.getPrimaryPort(DBTCPConnector.java:409)
at com.mongodb.DBCollectionImpl.executeBulkWriteOperation(DBCollectionImpl.java:142)
at com.mongodb.DBCollection.executeBulkWriteOperation(DBCollection.java:1663)
at com.mongodb.DBCollection.executeBulkWriteOperation(DBCollection.java:1659)
at com.mongodb.BulkWriteOperation.execute(BulkWriteOperation.java:90)
at com.mongodb.hadoop.output.MongoOutputCommitter.commitTask(MongoOutputCommitter.java:167)
at com.mongodb.hadoop.output.MongoOutputCommitter.commitTask(MongoOutputCommitter.java:70)
at org.apache.spark.rdd.PairRDDFunctions$$anonfun$saveAsNewAPIHadoopDataset$1$$anonfun$12.apply(PairRDDFunctions.scala:1132)
at org.apache.spark.rdd.PairRDDFunctions$$anonfun$saveAsNewAPIHadoopDataset$1$$anonfun$12.apply(PairRDDFunctions.scala:1102)
at org.apache.spark.scheduler.ResultTask.runTask(ResultTask.scala:87)
at org.apache.spark.scheduler.Task.run(Task.scala:99)
at org.apache.spark.executor.Executor$TaskRunner.run(Executor.scala:322)
at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1142)
at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:617)
at java.lang.Thread.run(Thread.java:748)
Upto mapreduce part is working fine.
ReplyDeletewhat is your script for saving the result to mongodb? is it pointing to the correct server?
ReplyDeleteThis comment has been removed by the author.
ReplyDeleteThe same script of saving that is in this blog.
ReplyDeletei.e. poorReview.saveToMongoDB('mongodb://mymongodbserver:27017/test.poor_count')
I can retrieve data from mymongodbserver , but I cannot save the mapreduce result
Now its working. It was due to multiple SparkSubmit.
ReplyDeleteThanks a lot Clarence.
ReplyDeleteIf I want to save the result in HDFS instead of saveToMongoDB, what should be the command for it?
Currently I am using pymongo_spark __version__ = '0.1'
Hi Rex. It is just a pyspark RDD. Save it as you would normally do.
ReplyDeleteHi Clarence,
ReplyDeleteI have followed your instructions and my program is running fine but it's not saving any data to MongoDB. It's not throwing any errors even.