大数据实战-微博热搜变化
python爬取flume转发flume沉槽到spark有push和poll两种方式,我这里使用的是较为简单的pusha1.sources = r1a1.sinks = k1a1.channels = c1#sourcea1.sources.r1.type = syslogudpa1.sources.r1.bind = node102a1.sources.r1.port = 7777#channe
·
架构设计
这个是实时监测的,所以采用了SparkStreaming
python爬取
爬取逻辑和大数据实战-全网热词相类似,只处理微博的即可。
flume转发
flume沉槽到spark有push和poll两种方式,我这里使用的是较为简单的push
a1.sources = r1
a1.sinks = k1
a1.channels = c1
#source
a1.sources.r1.type = syslogudp
a1.sources.r1.bind = node102
a1.sources.r1.port = 7777
#channel
a1.channels.c1.type = memory
a1.channels.c1.capacity = 10000
a1.channels.c1.transactionCapacity = 1000
#sink
a1.sinks.k1.type = avro
a1.sinks.k1.hostname = 192.168.128.1
a1.sinks.k1.port = 8888
#相互关联 source--channel, sink--channel
a1.sources.r1.channels = c1
a1.sinks.k1.channel = c1
spark计算
import org.apache.spark.SparkConf
import org.apache.spark.sql.{SaveMode, SparkSession}
import org.apache.spark.streaming.flume.FlumeUtils
import org.apache.spark.streaming.{Seconds, StreamingContext}
case class WordDelta(word:String, delta:Int)
object HotWordsPredict {
def main(args: Array[String]): Unit = {
//必须有一个接收器在常驻,另一个线程执行
val sparkConf = new SparkConf().setMaster("local[*]").setAppName("Hello Streaming")
// val sparkContext = new SparkContext(sparkConf)
val streamingContext = new StreamingContext(sparkConf,Seconds(300))
//5s处理为一个批次
val flumeDStream = FlumeUtils.createStream(streamingContext, "192.168.128.1", 8888)
val sparkSession = SparkSession.builder().config(sparkConf).getOrCreate()
import sparkSession.implicits._
flumeDStream.map(line => new String(line.event.getBody.array()).trim)
.map(line=>
{
val strs = line.split("\t")
(strs(2), (strs(3), strs(4)))
})
.groupByKey()
.map{case (k, v)=>
(k, v.maxBy(_._2)._1.toInt - v.minBy(_._2)._1.toInt)}
.foreachRDD(rdd=>
if(!rdd.isEmpty()){
rdd.map{case (k,v)=>WordDelta(k,v)}
.toDS().write
.format("jdbc")
.option("url", "jdbc:mysql://node103:3306/db_hotwords")
.option("dbtable", "tb_word_delta")
.option("user", "root")
.option("password", "root")
.mode(SaveMode.Overwrite)
.save()
})
streamingContext.start()
streamingContext.awaitTermination()
}
}
$ ./bin/spark-submit --class org.apache.spark.examples.SparkPi \
--master yarn \
--deploy-mode cluster \
--driver-memory 4g \
--executor-memory 2g \
--executor-cores 1 \
lib/spark-examples*.jar \
10
mysql存储
CREATE TABLE `tb_word_delta` (
`word` text,
`delta` int(11) DEFAULT NULL
) ENGINE=InnoDB DEFAULT CHARSET=utf8;
flask展示
import pymysql
from flask import Flask
from jinja2 import Markup, Environment, FileSystemLoader
from pyecharts.globals import CurrentConfig, SymbolType
# 关于 CurrentConfig,可参考 [基本使用-全局变量]
CurrentConfig.GLOBAL_ENV = Environment(loader=FileSystemLoader("./templates"))
from pyecharts import options as opts
from pyecharts.charts import Bar, WordCloud
app = Flask(__name__, static_folder="templates")
@app.route("/")
def index():
return """<!DOCTYPE html>
<html>
<head>
<meta charset="utf-8">
<title>jiajunbernoulli的展示</title>
</head>
<body>
<div id="page1">
<iframe width="100%" height="400" src="wordcloud" frameborder="no" border="0" marginwidth="0" marginheight="0" scrolling="no"></iframe>
</div>
<div id="page2">
<iframe align="center" width="100%" height="400" src="worddelta" frameborder="no" border="0" marginwidth="0" marginheight="0" scrolling="no"></iframe>
</div>
</body>
</html>
"""
@app.route("/worddelta")
def worddelta():
# 打开数据库连接
db = pymysql.connect("localhost", "root", "root", "db_hotwords", charset='utf8')
cursor = db.cursor()
cursor.execute("SELECT word,delta FROM tb_word_delta ORDER BY delta LIMIT 5")
low5 = cursor.fetchall()
cursor.execute("SELECT word,delta FROM tb_word_delta ORDER BY delta DESC LIMIT 5")
up5 = cursor.fetchall()
db.close()
d = dict(low5 + up5)
lst = sorted(d.items(), key=lambda x: abs(x[1]), reverse=True)
keys = []
values = []
for item in lst:
keys.append(item[0])
if item[1] >= 0:
values.append(item[1])
else:
values.append({"value": item[1], "label": "labelRight"})
return """
<!DOCTYPE html>
<html>
<head>
<meta charset="utf-8">
<title>ECharts</title>
<!-- 引入 echarts.js -->
<script src="https://cdn.staticfile.org/echarts/4.3.0/echarts.min.js"></script>
</head>
<body>
<!-- 为ECharts准备一个具备大小(宽高)的Dom -->
<div id="main" style="width: 600px;height:400px;"></div>
<script type="text/javascript">
// 基于准备好的dom,初始化echarts实例
var myChart = echarts.init(document.getElementById('main'));
var option;
var labelRight = {
normal: {
position: 'right'
}
};
option = {
title: {
text: '微博热搜增长top5-下跌top5',
subtext: '数据来源:微博热搜指数',
sublink: 'http://e.weibo.com/1341556070/AjwF2AgQm'
},
tooltip: {
trigger: 'axis',
axisPointer: { // 坐标轴指示器,坐标轴触发有效
type: 'shadow' // 默认为直线,可选为:'line' | 'shadow'
}
},
grid: {
top: 80,
bottom: 30
},
xAxis: {
type: 'value',
position: 'top',
splitLine: {
lineStyle: {
type: 'dashed'
}
},
},
yAxis: {
type: 'category',
axisLine: {
show: false
},
axisLabel: {
show: false
},
axisTick: {
show: false
},
splitLine: {
show: false
},
data: """+str(keys)+"""
},
series: [{
name: '变化量',
type: 'bar',
stack: '总量',
label: {
normal: {
show: true,
formatter: '{b}'
}
},
data: """+str(values)+"""
}]
};
myChart.setOption(option);
</script>
</body>
</html>
"""
if __name__ == "__main__":
app.run(host="0.0.0.0")
更多推荐
所有评论(0)