架构设计

这个是实时监测的,所以采用了SparkStreaming
在这里插入图片描述

python爬取

爬取逻辑和大数据实战-全网热词相类似,只处理微博的即可。

flume转发

flume沉槽到spark有push和poll两种方式,我这里使用的是较为简单的push

a1.sources = r1
a1.sinks = k1
a1.channels = c1

#source
a1.sources.r1.type = syslogudp
a1.sources.r1.bind = node102
a1.sources.r1.port = 7777

#channel
a1.channels.c1.type = memory
a1.channels.c1.capacity = 10000
a1.channels.c1.transactionCapacity = 1000


#sink
a1.sinks.k1.type = avro
a1.sinks.k1.hostname = 192.168.128.1
a1.sinks.k1.port = 8888

#相互关联 source--channel, sink--channel
a1.sources.r1.channels = c1
a1.sinks.k1.channel = c1

spark计算

import org.apache.spark.SparkConf
import org.apache.spark.sql.{SaveMode, SparkSession}
import org.apache.spark.streaming.flume.FlumeUtils
import org.apache.spark.streaming.{Seconds, StreamingContext}

case class WordDelta(word:String, delta:Int)
object HotWordsPredict {
  def main(args: Array[String]): Unit = {
    //必须有一个接收器在常驻,另一个线程执行
    val sparkConf = new SparkConf().setMaster("local[*]").setAppName("Hello Streaming")
    //    val sparkContext = new SparkContext(sparkConf)
    val streamingContext = new StreamingContext(sparkConf,Seconds(300))

    //5s处理为一个批次
    val flumeDStream = FlumeUtils.createStream(streamingContext, "192.168.128.1", 8888)
    val sparkSession = SparkSession.builder().config(sparkConf).getOrCreate()
    import sparkSession.implicits._

    flumeDStream.map(line => new String(line.event.getBody.array()).trim)
      .map(line=>
        {
          val strs = line.split("\t")
          (strs(2), (strs(3), strs(4)))
        })
      .groupByKey()
      .map{case (k, v)=>
        (k, v.maxBy(_._2)._1.toInt - v.minBy(_._2)._1.toInt)}
      .foreachRDD(rdd=>
      if(!rdd.isEmpty()){
        rdd.map{case (k,v)=>WordDelta(k,v)}
          .toDS().write
          .format("jdbc")
          .option("url", "jdbc:mysql://node103:3306/db_hotwords")
          .option("dbtable", "tb_word_delta")
          .option("user", "root")
          .option("password", "root")
          .mode(SaveMode.Overwrite)
          .save()
      })
    streamingContext.start()
    streamingContext.awaitTermination()
  }
}

$ ./bin/spark-submit --class org.apache.spark.examples.SparkPi \
    --master yarn \
    --deploy-mode cluster \
    --driver-memory 4g \
    --executor-memory 2g \
    --executor-cores 1 \
    lib/spark-examples*.jar \
    10

mysql存储

CREATE TABLE `tb_word_delta` (  
  `word` text,  
  `delta` int(11) DEFAULT NULL  
) ENGINE=InnoDB DEFAULT CHARSET=utf8;  

flask展示

import pymysql
from flask import Flask
from jinja2 import Markup, Environment, FileSystemLoader
from pyecharts.globals import CurrentConfig, SymbolType

# 关于 CurrentConfig,可参考 [基本使用-全局变量]
CurrentConfig.GLOBAL_ENV = Environment(loader=FileSystemLoader("./templates"))

from pyecharts import options as opts
from pyecharts.charts import Bar, WordCloud

app = Flask(__name__, static_folder="templates")

@app.route("/")
def index():
    return """<!DOCTYPE html>
<html>
<head>
<meta charset="utf-8">
<title>jiajunbernoulli的展示</title>
</head>
<body>
   <div id="page1">
        <iframe  width="100%" height="400" src="wordcloud"  frameborder="no" border="0" marginwidth="0" marginheight="0" scrolling="no"></iframe>
   </div>
   <div id="page2">
        <iframe align="center" width="100%" height="400" src="worddelta"  frameborder="no" border="0" marginwidth="0" marginheight="0" scrolling="no"></iframe>
   </div>
</body>
</html>
    """


@app.route("/worddelta")
def worddelta():
    # 打开数据库连接
    db = pymysql.connect("localhost", "root", "root", "db_hotwords", charset='utf8')
    cursor = db.cursor()
    cursor.execute("SELECT word,delta FROM tb_word_delta ORDER BY delta LIMIT 5")
    low5 = cursor.fetchall()
    cursor.execute("SELECT word,delta FROM tb_word_delta ORDER BY delta DESC LIMIT 5")
    up5 = cursor.fetchall()
    db.close()
    d = dict(low5 + up5)
    lst = sorted(d.items(), key=lambda x: abs(x[1]), reverse=True)
    keys = []
    values = []
    for item in lst:
        keys.append(item[0])
        if item[1] >= 0:
            values.append(item[1])
        else:
            values.append({"value": item[1], "label": "labelRight"})
    return """
    <!DOCTYPE html>
<html>

	<head>
		<meta charset="utf-8">
		<title>ECharts</title>
		<!-- 引入 echarts.js -->
		<script src="https://cdn.staticfile.org/echarts/4.3.0/echarts.min.js"></script>
	</head>

	<body>
		<!-- 为ECharts准备一个具备大小(宽高)的Dom -->
		<div id="main" style="width: 600px;height:400px;"></div>
		<script type="text/javascript">
			// 基于准备好的dom,初始化echarts实例
			var myChart = echarts.init(document.getElementById('main'));
			var option;

			var labelRight = {
				normal: {
					position: 'right'
				}
			};
			option = {
				title: {
					text: '微博热搜增长top5-下跌top5',
					subtext: '数据来源:微博热搜指数',
					sublink: 'http://e.weibo.com/1341556070/AjwF2AgQm'
				},
				tooltip: {
					trigger: 'axis',
					axisPointer: { // 坐标轴指示器,坐标轴触发有效
						type: 'shadow' // 默认为直线,可选为:'line' | 'shadow'
					}
				},
				grid: {
					top: 80,
					bottom: 30
				},
				xAxis: {
					type: 'value',
					position: 'top',
					splitLine: {
						lineStyle: {
							type: 'dashed'
						}
					},
				},
				yAxis: {
					type: 'category',
					axisLine: {
						show: false
					},
					axisLabel: {
						show: false
					},
					axisTick: {
						show: false
					},
					splitLine: {
						show: false
					},
					data: """+str(keys)+"""
				},
				series: [{
					name: '变化量',
					type: 'bar',
					stack: '总量',
					label: {
						normal: {
							show: true,
							formatter: '{b}'
						}
					},
					data: """+str(values)+"""
				}]
			};

			myChart.setOption(option);
		</script>
	</body>

</html>
"""

if __name__ == "__main__":
    app.run(host="0.0.0.0")

在这里插入图片描述

Logo

永洪科技,致力于打造全球领先的数据技术厂商,具备从数据应用方案咨询、BI、AIGC智能分析、数字孪生、数据资产、数据治理、数据实施的端到端大数据价值服务能力。

更多推荐