大数据系列之Spark和MongoDB集成
在Spark生态系统中,HDFS作为存储可以使用MongoDB来替代,构建成Spark+MongoDB生态系统。MongoDB作为文档存储型数据库,支持HDFS没有的索引概念,响应时间为毫秒级别,同时可以利用强大的aggregate函数做数据的筛选和预处理。
在Spark生态系统中,HDFS作为存储可以使用MongoDB来替代,构建成Spark+MongoDB生态系统。MongoDB作为文档存储型数据库,支持HDFS没有的索引概念,响应时间为毫秒级别,同时可以利用强大的aggregate函数做数据的筛选和预处理。
1、Spark和MongoDB集成
MongoDB是一种文档型数据库,作为一个适用于敏捷开发的数据库,MongoDB的数据模式可以随着应用程序的发展而灵活地更新。但是MongoDB适合一次查询的需求,对于统计、分析(尤其是在需要跨表、跨库的情况下)并不是太方便,我们可以用spark来处理MongoDB数据。Spark和MongoDB部署的一个典型架构如下:
Spark任务一般由Spark的driver节点发起,经过Spark Master进行资源调度分发。比如这里我们有4个Spark worker节点,这些节点上的几个executor 计算进程就会同时开始工作。一般一个core就对应一个executor。每个executor会独立的去MongoDB取来原始数据,直接套用Spark提供的分析算法或者使用自定义流程来处理数据,计算完后把相应结果写回到MongoDB。在这里,所有和MongoDB的交互都是通过一个叫做Mongo-Spark的连接器来完成的,Mongo-Spark连接器有官方版本和第三方开发的版本,本次主要使用官方的版本。
1.1 MongoDB-Spark-Connector选择
在获取数据时,官方连接器的性能似乎比第三方连接器的好一点,官方连接器有一个条件下推的原则。我们知道spark的算子分为两种:Transformation和Action,只有遇到Action算子才会触发作业的提交。比如在后续的一些Transformation算子中对数据有一定的数据过滤条件,官方连接器会把过滤条件下推到MongoDB去执行,这样可以保证从MongoDB取出来、经过网络传输到Spark计算节点的数据确实都是用得着的。在写数据到mongodb时,第三方连接器的功能比官方连接器要优,支持在原有表的基础上做更新。
- 官方connector的github地址:https://github.com/mongodb/mongo-spark
- 第三方connector的github地址:https://github.com/Stratio/Spark-MongoDB
1.2 MongoDB-Spark-Connector配置
MongoDB-Spark-Connector的配置可以通过使用SparkConf使用–conf或者$SPARK_HOME/conf/spark-default.conf文件进行指定。
1.2.1 Input Configuration
如果这些input configuration通过SparkConf设置,需加上spark.mongodb.input前缀
参数名 | 描述 |
---|---|
uri | 连接MongoDB的uri地址IP或hostname, mongodb://host:port/ |
database | 从MongoDB中读取数据的database名称 |
collection | 从MongoDB中读取数据的collection名称 |
示例如下:
a)uri为:
spark.mongodb.input.uri=mongodb://127.0.0.1/databaseName.collectionName?readPreference=primaryPreferred
b)对应的configuration file如下:
spark.mongodb.input.uri=mongodb://127.0.0.1/
spark.mongodb.input.database=databaseName
spark.mongodb.input.collection=collectionName
spark.mongodb.input.readPreference.name=primaryPreferred
如果同时设置了uri和单独配置文件,uri设置会将单独设置覆盖掉。
1.2.2 Output Configuration
如果这些output configuration通过SparkConf设置,需加上spark.mongodb.output前缀
参数名 | 描述 |
---|---|
uri | 连接MongoDB的uri地址IP或hostname, mongodb://host:port/ |
database | 数据写入MongoDB的database名称 |
collection | 数据写入MongoDB的collection名称 |
a) uri为:
spark.mongodb.output.uri=mongodb://127.0.0.1/test.myCollection
b)对应的configuration file如下
spark.mongodb.output.uri=mongodb://127.0.0.1/
spark.mongodb.output.database=test
spark.mongodb.output.collection=myCollection
如果同时设置了uri和单独配置文件,uri设置会将单独设置覆盖掉。
1.2.3 Cache Configuration
参数名 | 描述 | 默认值 |
---|---|---|
spark.mongodb.keep_alive_ms | The length of time to keep a MongoClient available for sharing. | 5000 |
1.3 MongoDB连接器使用
1.3.1 spark-submit提交可执行程序
首先要安装spark(如果不需要把数据保存到hdfs、不需要使用yarn,可以不安装hadoop),在spark目录下的bin目录下会有一个spark-submit可执行文件。
例如把代码保存在test.py中,如果使用官方连接器,运行以下命令,在联网环境下如果没有找到连接会从官网自动下载并保存在路径/root/.ivy2/jars中。
注:在内网环境下将cache、jar目录拷贝到~/.ivy2目录下即可
[root@tango-spark01 spark-2.3.0]# spark-submit --packages org.mongodb.spark:mongo-spark-connector_2.11:2.2.2 test-rdd/test-conn.py
test-conn.py程序如下:
from pyspark import SparkConf, SparkContext
from pyspark.sql import SQLContext
from pyspark.sql.types import *
sc = SparkContext()
ctx = SQLContext(sc)
test_collection = ctx.read.format("com.mongodb.spark.sql").options(uri="mongodb://192.168.112.102:27017", database="syslogdb", collection="syslog_tango_01").load()
test_collection.printSchema()
print(test_collection.first())
print("helloworld!!")
1.3.2 spark shell方式操作
如果通过pyspark连接MongoDB,运行:
./bin/pyspark
--conf "spark.mongodb.input.uri=mongodb://192.168.112.102/testdb.myCollection?readPreference=primaryPreferred"
--conf "spark.mongodb.output.uri=mongodb://192.168.112.102/testdb.myCollection"
--packages org.mongodb.spark:mongo-spark-connector_2.11:2.2.2
1)写入MongoDB
people = spark.createDataFrame([("Bilbo Baggins", 50), ("Gandalf", 1000), ("Thorin", 195), ("Balin", 178), ("Kili", 77), ("Dwalin", 169), ("Oin", 167), ("Gloin", 158), ("Fili", 82), ("Bombur", None)], ["name", "age"])
people.write.format("com.mongodb.spark.sql.DefaultSource").mode("append").save()
people.show()
[外链图片转存失败,源站可能有防盗链机制,建议将图片保存下来直接上传(img-fJTNLllT-1619583783215)(https://note.youdao.com/favicon.ico)]
People DataFrame写入到MongoDB输出参数的database和collections中,如下:
2)读MongoDB
df = spark.read.format("com.mongodb.spark.sql.DefaultSource").load()
df.printSchema()
输出如下:
输出如下:
>>> df.printSchema()
root
|-- _id: struct (nullable = true)
| |-- oid: string (nullable = true)
|-- age: long (nullable = true)
|-- name: string (nullable = true)
1.4 PySpark-MongoDB数据加载
1.4.1 环境准备
1)Spark集群环境准备
参考“大数据系统之Spark集群环境部署”
2)MongoDB集群环境准备
参考“数据库系列之MongoDB集群环境搭建”
1.4.2 PySpark操作MongoDB
1)启动MongoDB集群环境
[root@tango-centos01 mongodb-linux-x86_64-rhel70-3.6.3]./bin/mongod -f ./config/master.conf
[root@tango-centos02 mongodb-linux-x86_64-rhel70-3.6.3]# ./bin/mongod -f ./config/slave.conf
[root@tango-centos03 mongodb-linux-x86_64-rhel70-3.6.3]# ./bin/mongod -f ./config/slave.conf
2)指定connector package执行代码
[root@tango-spark01 spark-2.3.0]# spark-submit --packages org.mongodb.spark:mongo-spark-connector_2.11:2.2.2 /usr/local/spark/ipynotebook/01-mongodb-01.py
3)建立MongoDB的连接
test_collection = ctx.read.format("com.mongodb.spark.sql").\
options(uri=urlAddr, database=dbName, collection=collID).load()
- uri:MongoDB的URL地址,“mongodb://192.168.112.102:27017”
- database:DB名称
- collection:collection名称
4)访问数据Filter过滤
从mongodb中读取数据时指定数据分区字段,分区大小提高读取效率, 当需要过滤部分数据集的情况下使用Dataset/SQL的方式filter,Mongo Connector会创建aggregation pipeline在mongodb端进行过滤,然后再传回给spark进行优化处理。
a)使用Spark SQL中指定谓词filter数据
sqlContext.sql("select * from df_tb1 where timeStart < date_sub(current_date(),1)").show()
b)使用Data Frame对数据进行filter
originDf = df.filter(df["@timestamp"] < currentTimestamp & df["@timestamp"] >= currentTimestamp -1440*60*1000) .select("_id", "host", "message").toDF("id", "hostname", "message")
5)使用SQL时间函数处理时间戳字段
time1 = time()
swimmersJSON.registerTempTable("df_tb2")
df2=sqlContext.sql("select age,name,timeStart,unix_timestamp(timeStart) time_stamp from df_tb2")
df2.show()
df2.registerTempTable("df_tb3")
sqlContext.sql("select * from df_tb3 where time_stamp < "+str(time1)+"-24*3600").show()
6)完整代码如下:
# -*- coding: UTF-8 -*-
import sys
from time import time
from pyspark import SparkConf, SparkContext
from pyspark.sql import SQLContext
from pyspark.sql.types import *
import numpy as np
import math
#settings mongodb configuration infomation
urlAddr = "mongodb://192.168.112.102:27017"
dbName = "syslogdb"
collID ="syslog_tango_01"
#Configure Spark AppName and Master
def CreateSparkContext():
sparkConf = SparkConf().setAppName("PyMongo") \
.set("spark.ui.showConsoleProgress", "false") \
.setMaster("local[*]")
sc = SparkContext(conf = sparkConf)
print ("master="+sc.master)
return (sc)
# Connect to mongodb
# URL address and database and collection name
def ConnectMongo(ctx):
#ctx = SQLContext(sc)
test_collection = ctx.read.format("com.mongodb.spark.sql").\
options(uri=urlAddr, database=dbName, collection=collID).load()
return test_collection
# Connect to MongoDB with filter schema
def ConnectMongoFilter(ctx):
# ctx = SQLContext(sc)
fields_list = "@timestamp,@version,_id,host,message"
fields = [StructField(field_name, StringType(), True) for field_name in fields_list.split(',')]
schema = StructType(fields)
test_collection = ctx.read.schema(schema).format("com.mongodb.spark.sql"). \
options(uri=urlAddr, database=dbName, collection=collID).load()
return test_collection
if __name__ == "__main__":
print("PySpark-MongoDB")
sc=CreateSparkContext()
ctx = SQLContext(sc)
retCollid = ConnectMongo(ctx)
# print schema and content
retCollid.printSchema()
retCollid.first()
#use DataFrames
currentTimestamp=time()
print(currentTimestamp)
df = retCollid
print("########test information:########")
print(type(df))
print(df["@timestamp"])
df.select("@timestamp").show(5)
originDf = df.filter( \
df["@timestamp"] < currentTimestamp & df["@timestamp"] >= currentTimestamp - 1440*60*1000) \
.select("_id", "host", "message").toDF("id", "hostname", "message")
#originDf = df.filter(df["@timestamp"]>1533028658.78).select("_id", "host", "message").toDF("id", "hostname", "message")
originDf.show()
#use Filter Schema
filterColl = ConnectMongoFilter(ctx)
filterColl.show(5)
print("######################################")
#use Spark SQL
df.registerTempTable("tempSYSLOG")
sql = "select * from tempSYSLOG where host like 'tango-01%'"
result = ctx.sql(sql)
result.show(5)
参考资料
转载请注明原文地址:https://blog.csdn.net/solihawk/article/details/116230946
文章会同步在公众号“牧羊人的方向”更新,感兴趣的可以关注公众号,谢谢!
更多推荐
所有评论(0)