当MongoDB遇见Spark

传统Spark生态系统

MongoDB遇见Spark - 图1

那么Mongodb作为一个database, 可以担任什么样的角色呢? 就是数据存储这部分, 也就是图中的黑色圈圈HDFS的部分, 如下图

用MongoDB替换HDFS后的Spark生态系统

MongoDB遇见Spark - 图2

为什么要用MongoDB替换HDFS

  • 存储方式上, HDFS以文件为单位,每个文件64MB~128MB不等, 而MongoDB作为文档数据库则表现得更加细颗粒化
  • MongoDB支持HDFS所没有的索引的概念, 所以在读取上更加快
  • MongoDB支持的增删改功能比HDFS更加易于修改写入后

  • HDFS的响应级别为分钟, 而MongoDB通常是毫秒级别

  • 如果现有数据库已经是MongoDB的话, 那就不用再转存一份到HDFS上了

  • 可以利用MongoDB强大的Aggregate做数据的筛选或预处理

MongoDB Spark Connector介绍

  • 支持读取和写入,即可以将计算后的结果写入MongoDB
  • 将查询拆分为n个子任务, 如Connector会将一次match,拆分为多个子任务交给spark来处理, 减少数据的全量读取

MongoDB Spark 示例代码

计算用类型Type=1的message字符数并按userid进行分组
开发Maven dependency配置

这里用的是mongo-spark-connector_2.11 的2.0.0版本和spark的spark-core_2.11的2.0.2版本

  1. <dependency>
  2. <groupId>org.mongodb.spark</groupId>
  3. <artifactId>mongo-spark-connector_2.11</artifactId>
  4. <version>2.0.0</version>
  5. </dependency>
  6. <dependency>
  7. <groupId>org.apache.spark</groupId>
  8. <artifactId>spark-core_2.11</artifactId>
  9. <version>2.0.2</version>
  10. </dependency>
示例代码
  1. import com.mongodb.spark._
  2. import org.apache.spark.{SparkConf, SparkContext}
  3. import org.bson._
  4. val conf = new SparkConf()
  5. .setMaster("local")
  6. .setAppName("Mingdao-Score")
  7. //同时还支持mongo驱动的readPreference配置, 可以只从secondary读取数据
  8. .set("spark.mongodb.input.uri", "mongodb://xxx.xxx.xxx.xxx:27017,xxx.xxx.xxx:27017,xxx.xxx.xxx:27017/inputDB.collectionName")
  9. .set("spark.mongodb.output.uri", "mongodb://xxx.xxx.xxx.xxx:27017,xxx.xxx.xxx:27017,xxx.xxx.xxx:27017/outputDB.collectionName")
  10. val sc = new SparkContext(conf)
  11. // 创建rdd
  12. val originRDD = MongoSpark.load(sc)
  13. // 构造查询
  14. val dateQuery = new BsonDocument()
  15. .append("$gte", new BsonDateTime(start.getTime))
  16. .append("$lt", new BsonDateTime(end.getTime))
  17. val matchQuery = new Document("$match", BsonDocument.parse("{\"type\":\"1\"}"))
  18. // 构造Projection
  19. val projection1 = new BsonDocument("$project", BsonDocument.parse("{\"userid\":\"$userid\",\"message\":\"$message\"}")
  20. val aggregatedRDD = originRDD.withPipeline(Seq(matchQuery, projection1))
  21. //比如计算用户的消息字符数
  22. val rdd1 = aggregatedRDD.keyBy(x=>{
  23. Map(
  24. "userid" -> x.get("userid")
  25. )
  26. })
  27. val rdd2 = rdd1.groupByKey.map(t=>{
  28. (t._1, t._2.map(x => {
  29. x.getString("message").length
  30. }).sum)
  31. })
  32. rdd2.collect().foreach(x=>{
  33. println(x)
  34. })
  35. //保持统计结果至MongoDB outputurl 所指定的数据库
  36. MongoSpark.save(rdd2)

总结

MongoDB Connector 的文档只有基础的示例代码, 具体详情需要看GitHub中的example和部分源码

参考链接

  1. mongo-spark/examples/src/test/python/introduction.py
  1. # -*- coding: UTF-8 -*-
  2. #
  3. # Copyright 2016 MongoDB, Inc.
  4. #
  5. # Licensed to the Apache Software Foundation (ASF) under one or more
  6. # contributor license agreements. See the NOTICE file distributed with
  7. # this work for additional information regarding copyright ownership.
  8. # The ASF licenses this file to You under the Apache License, Version 2.0
  9. # (the "License"); you may not use this file except in compliance with
  10. # the License. You may obtain a copy of the License at
  11. #
  12. # http://www.apache.org/licenses/LICENSE-2.0
  13. #
  14. # Unless required by applicable law or agreed to in writing, software
  15. # distributed under the License is distributed on an "AS IS" BASIS,
  16. # WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
  17. # See the License for the specific language governing permissions and
  18. # limitations under the License.
  19. # To run this example use:
  20. # ./bin/spark-submit --master "local[4]" \
  21. # --conf "spark.mongodb.input.uri=mongodb://127.0.0.1/test.coll?readPreference=primaryPreferred" \
  22. # --conf "spark.mongodb.output.uri=mongodb://127.0.0.1/test.coll" \
  23. # --packages org.mongodb.spark:mongo-spark-connector_2.11:2.0.0 \
  24. # introduction.py
  25. from pyspark.sql import SparkSession
  26. if __name__ == "__main__":
  27. spark = SparkSession.builder.appName("Python Spark SQL basic example").getOrCreate()
  28. logger = spark._jvm.org.apache.log4j
  29. logger.LogManager.getRootLogger().setLevel(logger.Level.FATAL)
  30. # Save some data
  31. characters = spark.createDataFrame([("Bilbo Baggins", 50), ("Gandalf", 1000), ("Thorin", 195), ("Balin", 178), ("Kili", 77), ("Dwalin", 169), ("Oin", 167), ("Gloin", 158), ("Fili", 82), ("Bombur", None)], ["name", "age"])
  32. characters.write.format("com.mongodb.spark.sql").mode("overwrite").save()
  33. # print the schema
  34. print("Schema:")
  35. characters.printSchema()
  36. # read from MongoDB collection
  37. df = spark.read.format("com.mongodb.spark.sql").load()
  38. # SQL
  39. df.registerTempTable("temp")
  40. centenarians = spark.sql("SELECT name, age FROM temp WHERE age >= 100")
  41. print("Centenarians:")
  42. centenarians.show()