电商网站的大数据分析统计平台
无
安装虚拟机
配置安装Hadoop高可用集群、Zookeeper、Spark、HBase、MySQL、Kafka
数据集
链接:https://pan.baidu.com/s/12U7GMtxOHdUbsIUCNBcT_g
提取码:1234
在pom.xml文件中添加下列依赖
<dependencyManagement>
<dependencies>
<dependency>
<groupId>io.netty</groupId>
<artifactId>netty-all</artifactId>
<version>4.1.18.Final</version>
</dependency>
</dependencies>
</dependencyManagement>
<dependencies>
<!--Json依赖-->
<dependency>
<groupId>com.alibaba</groupId>
<artifactId>fastjson</artifactId>
<version>1.2.62</version>
</dependency>
<!--HBase依赖-->
<dependency>
<groupId>org.apache.hbase</groupId>
<artifactId>hbase-client</artifactId>
<version>1.2.1</version>
</dependency>
<dependency>
<groupId>org.apache.hbase</groupId>
<artifactId>hbase-common</artifactId>
<version>1.2.1</version>
</dependency>
<!--Spark依赖-->
<dependency>
<groupId>org.apache.spark</groupId>
<artifactId>spark-core_2.11</artifactId>
<version>2.3.2</version>
<exclusions>
<exclusion>
<groupId>io.netty</groupId>
<artifactId>netty</artifactId>
</exclusion>
</exclusions>
</dependency>
<dependency>
<groupId>org.apache.spark</groupId>
<artifactId>spark-sql_2.11</artifactId>
<version>2.3.2</version>
</dependency>
<!--Hadoop依赖-->
<dependency>
<groupId>org.apache.hadoop</groupId>
<artifactId>hadoop-common</artifactId>
<version>2.7.4</version>
</dependency>
<!--Spark Streaming依赖-->
<dependency>
<groupId>org.apache.spark</groupId>
<artifactId>spark-streaming_2.11</artifactId>
<version>2.3.2</version>
</dependency>
<!--Spark Streaming整合Kafka依赖-->
<dependency>
<groupId>org.apache.spark</groupId>
<artifactId>spark-streaming-kafka-0-10_2.11</artifactId>
<version>2.3.2</version>
</dependency>
</dependencies>
<build>
<plugins>
<plugin>
<groupId>org.apache.maven.plugins</groupId>
<artifactId>maven-compiler-plugin</artifactId>
<configuration>
<source>1.8</source>
<target>1.8</target>
</configuration>
</plugin>
<plugin>
<artifactId>maven-assembly-plugin</artifactId>
<configuration>
<appendAssemblyId>false</appendAssemblyId>
<descriptorRefs>
<descriptorRef>jar-with-dependencies</descriptorRef>
</descriptorRefs>
<archive>
<manifest>
<!-- 此处指定main方法入口的class -->
<mainClass>cn.itcast.top3.AreaProductTop3</mainClass>
</manifest>
</archive>
</configuration>
<executions>
<execution>
<id>make-assembly</id>
<phase>package</phase>
<goals>
<goal>assembly</goal>
</goals>
</execution>
</executions>
</plugin>
</plugins>
</build>
连接HBase
下面的代码放入同一个包HBase内
package cn.itcast.hbase;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.hbase.HBaseConfiguration;
import org.apache.hadoop.hbase.MasterNotRunningException;
import org.apache.hadoop.hbase.ZooKeeperConnectionException;
import org.apache.hadoop.hbase.client.Connection;
import org.apache.hadoop.hbase.client.ConnectionFactory;
import org.apache.hadoop.hbase.client.HBaseAdmin;
import java.io.IOException;
public class HbaseConnect {
public static Configuration conf;
public static Connection conn;
public static HBaseAdmin hbaseAdmin;
static {
conf = HBaseConfiguration.create();
conf.set("hbase.zookeeper.quorum", "hadoop1,hadoop2,hadoop3");
conf.set("hbase.zookeeper.property.clientPort", "2181");
try {
conn = ConnectionFactory.createConnection(conf);
} catch (IOException e) {
e.printStackTrace();
}
}
public static HBaseAdmin getHBaseAdmin() throws IOException{
try {
hbaseAdmin = (HBaseAdmin)(conn.getAdmin());
} catch (MasterNotRunningException e) {
e.printStackTrace();
} catch (ZooKeeperConnectionException e) {
e.printStackTrace();
}
return hbaseAdmin;
}
public static Connection getConnection(){
return conn;
}
public static synchronized void closeConnection(){
if(conn!=null){
try {
conn.close();
} catch (IOException e) {
e.printStackTrace();
}
}
}
}
package cn.itcast.hbase;
import org.apache.hadoop.hbase.HColumnDescriptor;
import org.apache.hadoop.hbase.HTableDescriptor;
import org.apache.hadoop.hbase.TableName;
import org.apache.hadoop.hbase.client.*;
import org.apache.hadoop.hbase.util.Bytes;
import java.io.IOException;
public class HbaseUtils {
public static void createTable(String tableName,
String... columFamilys)
throws IOException {
HBaseAdmin admin = HbaseConnect.getHBaseAdmin();
//判断表是否存在
if (admin.tableExists(tableName)){
//关闭表
admin.disableTable(tableName);
//删除表
admin.deleteTable(tableName);
}
HTableDescriptor hd
= new HTableDescriptor(TableName.valueOf(tableName));
for (String cf : columFamilys) {
hd.addFamily(new HColumnDescriptor(cf));
}
admin.createTable(hd);
admin.close();
}
public static void putsToHBase(String tableName,
String rowkey,
String cf,
String[] column,
String[] value)
throws Exception {
Table table = HbaseConnect
.getConnection()
.getTable(TableName.valueOf(tableName));
Put puts = new Put(rowkey.getBytes());
for (int i = 0;i<column.length;i++){
puts.addColumn(
Bytes.toBytes(cf),
Bytes.toBytes(column[i]),
Bytes.toBytes(value[i]));
}
table.put(puts);
table.close();
}
public static ResultScanner scan(String tableName)
throws IOException {
//获取指定HBase数据表的操作对象
Table table = HbaseConnect
.getConnection()
.getTable(TableName.valueOf(tableName));
Scan scan = new Scan();
return table.getScanner(scan);
}
public static void putsOneToHBase(String tableName,
String rowkey,
String cf,
String column,
String value)
throws IOException {
Table table = HbaseConnect
.getConnection()
.getTable(TableName.valueOf(tableName));
Put puts = new Put(rowkey.getBytes());
puts.addColumn(
Bytes.toBytes(cf),
Bytes.toBytes(column),
Bytes.toBytes(value));
table.put(puts);
table.close();
}
}
热门品类Top10分析
下面的代码放入同一个包Top10内
package cn.itcast.Top10;
import java.io.Serializable;
public class CategorySortKey implements Comparable<CategorySortKey>
,Serializable {
private int viewCount;
private int cartCount;
private int purchaseCount;
public CategorySortKey(
int viewcount,
int cartCount,
int purchaseCount) {
this.viewCount = viewcount;
this.cartCount = cartCount;
this.purchaseCount = purchaseCount;
}
public int getViewCount() {
return viewCount;
}
public void setViewCount(int viewCount) {
this.viewCount = viewCount;
}
public int getCartCount() {
return cartCount;
}
public void setCartCount(int cartCount) {
this.cartCount = cartCount;
}
public int getPurchaseCount() {
return purchaseCount;
}
public void setPurchaseCount(int purchaseCount) {
this.purchaseCount = purchaseCount;
}
@Override
public int compareTo(CategorySortKey other) {
if (viewCount - other.getViewCount() != 0) {
return (int) (viewCount - other.getViewCount());
} else if (cartCount - other.getCartCount() != 0) {
return (int) (cartCount - other.getCartCount());
} else if (purchaseCount - other.getPurchaseCount() != 0) {
return (int) (purchaseCount - other.getPurchaseCount());
}
return 0;
}
}
package cn.itcast.Top10;
import cn.itcast.hbase.HbaseConnect;
import cn.itcast.hbase.HbaseUtils;
import com.alibaba.fastjson.JSONObject;
import org.apache.spark.SparkConf;
import org.apache.spark.api.java.JavaPairRDD;
import org.apache.spark.api.java.JavaRDD;
import org.apache.spark.api.java.JavaSparkContext;
import org.apache.spark.api.java.Optional;
import org.apache.spark.api.java.function.Function;
import org.apache.spark.api.java.function.Function2;
import org.apache.spark.api.java.function.PairFunction;
import scala.Tuple2;
import java.util.List;
public class CategoryTop10 {
public static void main(String[] arg) {
SparkConf conf = new SparkConf();
conf.setAppName("CategoryTop10");
JavaSparkContext sc = new JavaSparkContext(conf);
JavaRDD<String> textFileRDD = sc.textFile("/spark_data/user_session.txt"); //该地址为虚拟机内文件地址
JavaPairRDD<Tuple2<String, String>, Integer> transformRDD = textFileRDD
.mapToPair(
new PairFunction<
String,
Tuple2<String, String>, Integer>() {
@Override
public Tuple2<Tuple2<String, String>, Integer> call(String s)
throws Exception {
JSONObject json = JSONObject.parseObject(s);
String category_id = json.getString("category_id");
String event_type = json.getString("event_type");
return new Tuple2<>(
new Tuple2<>(category_id, event_type),
new Integer(1));
}
});
JavaPairRDD<Tuple2<String, String>, Integer> aggregationRDD =
transformRDD.reduceByKey(
new Function2<Integer, Integer, Integer>() {
@Override
public Integer call(Integer integer1, Integer integer2)
throws Exception {
return integer1 + integer2;
}
});
JavaPairRDD<String,Integer> getViewCategoryRDD =aggregationRDD
.filter(new Function<Tuple2<Tuple2<String, String>, Integer>
, Boolean>() {
@Override
public Boolean call(Tuple2<Tuple2<String, String>
, Integer> tuple2) throws Exception {
String action = tuple2._1._2;
return action.equals("view");
}
}).mapToPair(
new PairFunction<Tuple2<Tuple2<String, String>
, Integer>, String, Integer>() {
@Override
public Tuple2<String, Integer>
call(Tuple2<Tuple2<String, String>, Integer> tuple2)
throws Exception {
return new Tuple2<>(tuple2._1._1,tuple2._2);
}
});
JavaPairRDD<String,Integer> getCartCategoryRDD = aggregationRDD
.filter(new Function<Tuple2<Tuple2<String, String>, Integer>
, Boolean>() {
@Override
public Boolean call(Tuple2<Tuple2<String, String>
, Integer> tuple2) throws Exception {
String action = tuple2._1._2;
return action.equals("cart");
}
}).mapToPair(new PairFunction<Tuple2<Tuple2<String, String>
, Integer>, String, Integer>() {
@Override
public Tuple2<String, Integer>
call(Tuple2<Tuple2<String, String>, Integer> tuple2)
throws Exception {
return new Tuple2<>(tuple2._1._1,tuple2._2);
}
});
JavaPairRDD<String,Integer> getPurchaseCategoryRDD = aggregationRDD
.filter(new Function<Tuple2<Tuple2<String, String>, Integer>
, Boolean>() {
@Override
public Boolean call(Tuple2<Tuple2<String, String>
, Integer> tuple2) throws Exception {
String action = tuple2._1._2;
return action.equals("purchase");
}
}).mapToPair(new PairFunction<Tuple2<Tuple2<String, String>
, Integer>, String, Integer>() {
@Override
public Tuple2<String, Integer>
call(Tuple2<Tuple2<String, String>, Integer> tuple2)
throws Exception {
return new Tuple2<>(tuple2._1._1,tuple2._2);
}
});
JavaPairRDD<String,Tuple2<Integer, Optional<Integer>>>
tmpJoinCategoryRDD =getViewCategoryRDD
.leftOuterJoin(getCartCategoryRDD);
JavaPairRDD<String,
Tuple2<Tuple2<Integer, Optional<Integer>>,
Optional<Integer>>> joinCategoryRDD =
tmpJoinCategoryRDD.leftOuterJoin(getPurchaseCategoryRDD);
JavaPairRDD<CategorySortKey,String> transCategoryRDD = joinCategoryRDD
.mapToPair(new PairFunction<Tuple2<String,
Tuple2<Tuple2<Integer, Optional<Integer>>,Optional<Integer>>>,
CategorySortKey,String>() {
@Override
public Tuple2<CategorySortKey,String> call(Tuple2<String,
Tuple2<Tuple2<Integer, Optional<Integer>>,
Optional<Integer>>> tuple2) throws Exception {
String category_id = tuple2._1;
int viewcount = tuple2._2._1._1;
int cartcount = 0;
int purchasecount = 0;
if (tuple2._2._1._2.isPresent()){
cartcount = tuple2._2._1._2.get().intValue();
}
if (tuple2._2._2.isPresent()){
purchasecount = tuple2._2._2.get().intValue();
}
CategorySortKey sortKey =
new CategorySortKey(viewcount, cartcount, purchasecount);
return new Tuple2<>(sortKey,category_id);
}
});
JavaPairRDD<CategorySortKey,String> sortedCategoryRDD =
transCategoryRDD.sortByKey(false);
List<Tuple2<CategorySortKey, String>> top10CategoryList =
sortedCategoryRDD.take(10);
//调用top10ToHbase方法
try {
top10ToHbase(top10CategoryList);
} catch (Exception e) {
e.printStackTrace();
}
//关闭HBase数据库连接
HbaseConnect.closeConnection();
sc.close();
}
public static void top10ToHbase(List<Tuple2<CategorySortKey, String>>
top10CategoryList) throws Exception
{
HbaseUtils.createTable("top10","top10_category");
String[] column =
{"category_id","viewcount","cartcount","purchasecount"};
String viewcount = "";
String cartcount = "";
String purchasecount = "";
String category_id = "";
int count = 0;
for (Tuple2<CategorySortKey, String> top10: top10CategoryList) {
count++;
viewcount = String.valueOf(top10._1.getViewCount());
cartcount = String.valueOf(top10._1.getCartCount());
purchasecount = String.valueOf(top10._1.getPurchaseCount());
category_id = top10._2;
String[] value =
{category_id,viewcount,cartcount,purchasecount};
HbaseUtils.putsToHBase("top10",
"rowkey_top"+count,
"top10_category",
column,
value);
}
}
}
封装jar包
在Maven窗口单击展开Lifecycle折叠框,双击Lifecycle折叠框中的“package”选项,IntelliJ IDEA会自动将程序封装成jar包,封装完成后,若出现“BUILD SUCCESS”内容,则证明成功封装热门品类Top10分析程序为jar包。
在项目SparkProject中的target目录下会生成SparkProject-1.0-SNAPSHOT.jar文件,为了便于后续与其它程序区分,这里可将默认文件名称修改为CategoryTop10.jar。
将jar包上传到集群
使用远程连接工具Xftp连接虚拟机Spark01,在存放jar文件的目录/export/SparkJar/(该目录需提前创建)下执行“rz”命令,上传热门品类Top10分析程序的jar包CategoryTop10.jar。
将数据集上传到本地文件系统
使用远程连接工具Xftp连接虚拟机Spark01,在存放数据文件的目录/export/data/SparkData/(该目录需提前创建)下执行“rz”命令,将数据集user_session.txt上传至本地文件系统。
在HDFS创建存放数据集的目录
将数据集上传到HDFS前,需要在HDFS的根目录创建目录spark_data,用于存放数据集user_session.txt
hdfs dfs -mkdir /spark_data
上传数据集到HDFS
将本地文件系统目录/export/data/SparkData/下的数据集user_session.txt上传到HDFS的spark_data目录下。
hdfs dfs -put /export/data/SparkData/user_session.txt /spark_data
提交热门品类Top10分析程序到YARN集群
通过Spark安装目录中bin目录下的shell脚本文件spark-submit提交热门品类Top10分析程序
到Hadoop集群的YARN运行。
spark-submit \
--master yarn \
--deploy-mode cluster \
--num-executors 3 \
--executor-memory 2G \
--class cn.itcast.top10.CategoryTop10 \
/export/SparkJar/CategoryTop10.jar /spark_data/user_session.txt
查看程序运行状态
程序运行时在控制台会生成“Application ID”(程序运行时的唯一ID),在浏览器输入“192.168.121.132:8088”(虚拟机地址替换为你的虚拟机地址),进入YARN的Web UI界面,通过对应“Application ID”查看程序的运行状态,当程序运行完成后State为FINISHED,并且FinalStatus为SUCCEES。
查看程序运行结果
在虚拟机Spark01执行“hbase shell”命令,进入HBase命令行工具。
在HBase命令行工具中执行“list”命令,查看HBase数据库中的所有数据表。
查看程序运行结果
在HBase命令行工具执行“scan ‘top10’”命令,查询数据表top10中的数据。
各区域热门商品Top3分析
package cn.itcast.Top3;
import cn.itcast.hbase.HbaseConnect;
import cn.itcast.hbase.HbaseUtils;
import com.alibaba.fastjson.JSONObject;
import com.google.common.collect.Lists;
import org.apache.spark.SparkConf;
import org.apache.spark.api.java.JavaPairRDD;
import org.apache.spark.api.java.JavaRDD;
import org.apache.spark.api.java.JavaSparkContext;
import org.apache.spark.api.java.function.Function;
import org.apache.spark.api.java.function.Function2;
import org.apache.spark.api.java.function.PairFunction;
import org.apache.spark.api.java.function.VoidFunction;
import scala.Tuple2;
import java.io.IOException;
import java.util.*;
public class AreaProductTop3 {
public static void main(String[] arg){
SparkConf conf = new SparkConf();
conf.setAppName("AreaProductTop3");
JavaSparkContext sc = new JavaSparkContext(conf);
JavaRDD<String> textFileRDD = sc.textFile("/spark_data/user_session.txt"); //该地址为虚拟机内文件地址
JavaPairRDD<Tuple2<String, String>, String> transProductRDD =
textFileRDD.mapToPair(new PairFunction<String,
Tuple2<String, String>,
String>() {
@Override
public Tuple2<Tuple2<String, String>, String> call(String s)
throws Exception {
JSONObject json = JSONObject.parseObject(s);
String address_name =
json.getString("address_name").replaceAll("\\u00A0+", "");//replaceAll("\\u00A0+","")去除特殊空字符
String product_id = json.getString("product_id");
String event_type = json.getString("event_type");
Tuple2<Tuple2<String, String>, String> tuple2 =
new Tuple2<>(
new Tuple2<>(address_name, product_id),
event_type);
return tuple2;
}
});
JavaPairRDD<Tuple2<String, String>, String> getViewRDD =
transProductRDD.filter(new Function<Tuple2<
Tuple2<String, String>, String>, Boolean>() {
@Override
public Boolean call(
Tuple2<Tuple2<String, String>, String> tuple2)
throws Exception {
String event_type = tuple2._2;
return event_type.equals("view");
}
});
JavaPairRDD<Tuple2<String,String>,Integer> productByAreaRDD =
getViewRDD.mapToPair(
new PairFunction<Tuple2<Tuple2<String, String>, String>,
Tuple2<String, String>,
Integer>() {
@Override
public Tuple2<Tuple2<String, String>, Integer> call(
Tuple2<Tuple2<String, String>, String> tuple2)
throws Exception {
return new Tuple2<>(tuple2._1,new Integer(1));
}
});
JavaPairRDD<Tuple2<String,String>,Integer> productCountByAreaRDD =
productByAreaRDD.reduceByKey(
new Function2<Integer, Integer, Integer>() {
@Override
public Integer call(Integer integer, Integer integer2)
throws Exception {
return integer+integer2;
}
});
JavaPairRDD<String,Tuple2<String,Integer>> transProductCountByAreaRDD =
productCountByAreaRDD.mapToPair(
new PairFunction<Tuple2<
Tuple2<String, String>, Integer>,
String,
Tuple2<String, Integer>>() {
@Override
public Tuple2<String, Tuple2<String, Integer>> call(
Tuple2<Tuple2<String, String>, Integer> tuple2)
throws Exception {
return new Tuple2<>(tuple2._1._1,
new Tuple2<>(tuple2._1._2,tuple2._2));
}
});
JavaPairRDD<String, Iterable<Tuple2<String, Integer>>>
productGroupByAreaRDD = transProductCountByAreaRDD.groupByKey();
JavaPairRDD<String, Iterable<Tuple2<String, Integer>>>
productSortByAreaRDD =productGroupByAreaRDD.mapToPair(
new PairFunction<
Tuple2<String, Iterable<Tuple2<String, Integer>>>,
String,
Iterable<Tuple2<String, Integer>>>() {
@Override
public Tuple2<String, Iterable<Tuple2<String, Integer>>>
call(Tuple2<String, Iterable<Tuple2<String, Integer>>> tuple2)
throws Exception {
List<Tuple2<String,Integer>> list = new ArrayList<>();
Iterator<Tuple2<String,Integer>> iter =
tuple2._2.iterator();
while (iter.hasNext()){
list.add(iter.next());
}
list.sort(new Comparator<Tuple2<String, Integer>>() {
@Override
public int compare(
Tuple2<String, Integer> o1,
Tuple2<String, Integer> o2) {
return o2._2 - o1._2;
}
});
return new Tuple2<>(tuple2._1,list);
}
});
JavaPairRDD<String, Iterable<Tuple2<String, Integer>>> top3AreaProductRDD
=productSortByAreaRDD.mapToPair(new PairFunction<
Tuple2<String, Iterable<Tuple2<String, Integer>>>,
String,
Iterable<Tuple2<String, Integer>>>() {
@Override
public Tuple2<String, Iterable<Tuple2<String, Integer>>>
call(Tuple2<String, Iterable<Tuple2<String, Integer>>> tuple2)
throws Exception {
List<Tuple2<String,Integer>> list = new ArrayList<>();
Iterator<Tuple2<String,Integer>> iter
= tuple2._2.iterator();
int i = 0;
while (iter.hasNext()){
list.add(iter.next());
i++;
if (i == 3){
break;
}
}
return new Tuple2<>(tuple2._1,list);
}
});
try {
top3ToHbase(top3AreaProductRDD);
} catch (IOException e) {
e.printStackTrace();
}
HbaseConnect.closeConnection();
sc.close();
}
public static void top3ToHbase(
JavaPairRDD<String, Iterable<Tuple2<String, Integer>>> rdd)
throws IOException {
HbaseUtils.createTable("top3","top3_area_product");
String[] column =
{"area","product_id","viewcount"};
rdd.foreach(
new VoidFunction<
Tuple2<String,
Iterable<Tuple2<String, Integer>>>>()
{
@Override
public void call(
Tuple2<String, Iterable<Tuple2<String, Integer>>>
tuple2) throws Exception {
String area = tuple2._1;
String product_id = "";
String viewcount = "";
Iterator<Tuple2<String,Integer>> iter =
tuple2._2.iterator();
List<Tuple2<String,Integer>> myList =
Lists.newArrayList(iter);
for (Tuple2<String,Integer> tuple : myList) {
product_id = tuple._1;
viewcount = String.valueOf(tuple._2);
String [] value = {area,product_id,viewcount};
try {
HbaseUtils.putsToHBase(
"top3",
area+product_id,
"top3_area_product",
column,
value);
} catch (Exception e) {
e.printStackTrace();
}
}
}
});
}
}
封装jar包
由于在封装热门品类Top10分析程序jar包时,将程序主类指向了“cn.itcast.top10.CategoryTop10”,因此这里需要将pom.xml文件中的程序主类修改为“cn.itcast.top3.AreaProductTop3”。根据封装热门品类Top10分析程序jar包的方式封装各区域热门商品Top3分析程序。将封装完成的jar包重命名为“AreaProductTop3”,通过远程连接工具Xftp将AreaProductTop3.jar上传到虚拟机Spark01的/export/SparkJar/目录下。
提交各区域热门商品Top3分析程序到YARN集群
通过Spark安装目录中bin目录下的shell脚本文件spark-submit提交各区域热门商品Top3分析程序到YARN集群运行。
spark-submit \
--master yarn \
--deploy-mode cluster \
--num-executors 3 \
--executor-memory 2G \
--class cn.itcast.top3.AreaProductTop3 \
/export/SparkJar/AreaProductTop3.jar /spark_data/user_session.txt
查看程序运行结果
在虚拟机Spark01执行“hbase shell”命令,进入HBase命令行工具。
在HBase命令行工具中执行“list”命令,查看HBase数据库中的所有数据表。
查看程序运行结果
在HBase命令行工具执行“scan ‘top3’”命令,查看数据表top3中所有数据。
网站转化率统计
package cn.itcast.conversion;
import cn.itcast.hbase.HbaseConnect;
import cn.itcast.hbase.HbaseUtils;
import org.apache.spark.api.java.JavaRDD;
import org.apache.spark.api.java.function.FlatMapFunction;
import org.apache.spark.api.java.function.ForeachFunction;
import org.apache.spark.sql.*;
import org.apache.spark.sql.types.DataTypes;
import org.apache.spark.sql.types.StructField;
import org.apache.spark.sql.types.StructType;
import java.io.IOException;
import java.util.ArrayList;
import java.util.Iterator;
import java.util.List;
public class PageConversion {
public static void main(String[] arg){
SparkSession spark = SparkSession
.builder()
.appName("PageConversion")
.getOrCreate();
Dataset<Row> userConversionDS = spark.read().json("/page_conversion/user_conversion.json"); //此为数据所在的HDFS位置
userConversionDS.createOrReplaceTempView("conversion_table");
Dataset<Row> pageIdPvDS = spark
.sql("select pageid,count(*) as pageid_count " +
"from conversion_table " +
"group by pageid");
Dataset<Row> useridGroupSortDS = spark
.sql("select userid,actionTime,pageid " +
"from conversion_table " +
"order by userid,actionTime");
useridGroupSortDS.createOrReplaceTempView("conversion_group_sort_table");
JavaRDD<Row> pageConversionRDD = spark.sql("select userid," +
"concat_ws(',',collect_list(pageid)) as column2s " +
"from conversion_group_sort_table " +
"group by userid").toJavaRDD();
JavaRDD<Row> rowRDD = pageConversionRDD
.flatMap(new FlatMapFunction<Row, Row>() {
@Override
public Iterator<Row> call(Row row) throws Exception {
List<Row> list = new ArrayList<>();
String[] page = row.get(1).toString().split(",");
String pageConversionStr = "";
for (int i = 0;i<page.length-1;i++){
if (!page[i].equals(page[i+1])){
pageConversionStr = page[i]+"_"+page[i+1];
list.add(RowFactory.create(pageConversionStr));
}
}
return list.iterator();
}
});
StructType schema = DataTypes
.createStructType(
new StructField[]{
DataTypes.createStructField(
"page_conversion",
DataTypes.StringType,
true)});
spark.createDataFrame(rowRDD, schema)
.registerTempTable("page_conversion_table");
spark.sql(
"select page_conversion," +
"count(*) as page_conversion_count " +
"from page_conversion_table " +
"group by page_conversion")
.createOrReplaceTempView("page_conversion_count_table");
Dataset<Row> pageConversionCountDS = spark
.sql("select page_conversion_count," +
"split(page_conversion,'_')[0] as start_page," +
"split(page_conversion,'_')[1] as last_page " +
"from page_conversion_count_table");
pageConversionCountDS
.join(
pageIdPvDS,
new Column("start_page").equalTo(new Column("pageid")),
"left")
.createOrReplaceTempView("page_conversion_join");
Dataset<Row> resultDS = spark
.sql("select " +
"concat(pageid,'_',last_page) as conversion," +
"round(" +
"CAST(page_conversion_count AS DOUBLE)/CAST(pageid_count AS DOUBLE)" +
",3) as rage " +
"from page_conversion_join");
try {
conversionToHBase(resultDS);
} catch (IOException e) {
e.printStackTrace();
}
HbaseConnect.closeConnection();
spark.close();
}
public static void conversionToHBase(Dataset<Row> dataset)
throws IOException {
HbaseUtils.createTable("conversion","page_conversion");
String[] column = {"convert_page","convert_rage"};
dataset.foreach(new ForeachFunction<Row>() {
@Override
public void call(Row row) throws Exception {
String conversion = row.get(0).toString();
String rage = row.get(1).toString();
String[] value ={conversion,rage};
HbaseUtils.putsToHBase("conversion",
conversion+rage,
"page_conversion",
column,
value);
}
});
}
}
封装jar包
由于在封装各区域热门商品Top3分析程序jar包时,将程序主类指向了“cn.itcast.top3.AreaProductTop3”,因此这里需要将pom.xml文件中的程序主类修改为“cn.itcast.conversion.PageConversion”。根据封装热门品类Top10分析程序jar包的方式封装页面单跳转化率统计程序。将封装完成的jar包重命名为“PageConversion”,通过远程连接工具SecureCRT将PageConversion.jar上传到虚拟机Spark01的/export/SparkJar/目录下。
将数据集上传到本地文件系统
使用远程连接工具Xftp连接虚拟机Spark01,在存放数据文件的目录/export/data/SparkData/(该目录需提前创建)下执行“rz”命令,将数据集user_conversion.json上传至本地文件系统。
在HDFS创建存放数据集的目录
将数据集上传到HDFS前,需要在HDFS的根目录创建目录page_conversion,用于存放数据集user_conversion.json。
hdfs dfs -mkdir /page_conversion
上传数据集到HDFS
将目录/export/data/SparkData/下的数据集user_conversion.json上传到HDFS的page_conversion目录下,具体命令如下。
hdfs dfs -put /export/data/SparkData/user_conversion.json /page_conversion
提交页面单跳转化率统计程序到YARN集群
通过Spark安装目录中bin目录下的shell脚本文件spark-submit提交页面单跳转化率统计程序到YARN集群运行
spark-submit \
--master yarn \
--deploy-mode cluster \
--num-executors 3 \
--executor-memory 2G \
--class cn.itcast.conversion.PageConversion \
/export/SparkJar/PageConversion.jar /page_conversion/user_conversion.json
查看程序运行结果
在虚拟机Spark01执行“hbase shell”命令,进入HBase命令行工具。
在HBase命令行工具中执行“list”命令,查看HBase数据库中的所有数据表。
在HBase命令行工具执行“scan ‘conversion’”命令,查看数据表conversion中所有数据。
广告点击流实时统计
打开HBase命令行工具
打开虚拟机启动大数据集群环境(此时可以不启动使用远程连接工具SecureCRT连接虚拟机Spark01,执行“hbase shell”命令进入HBase的命令行工具。
创建表blacklist
通过HBase的命令行工具创建表blacklist并指定列族为black_user,用于存储黑名单用户数据。
create 'blacklist','black_user'
插入黑名单用户
通过HBase的命令行工具在表blacklist的列族black_user下插入黑名单用户,指定uerid为33、44和55的用户为黑名单用户。
put 'blacklist', 'user33', 'black_user:userid', '33'
put 'blacklist', 'user44', 'black_user:userid', '44'
put 'blacklist', 'user55', 'black_user:userid', '55'
创建表adstream
通过HBase的命令行工具创建表adstream并指定列族为area_ads_count,用于存储用户广告点击流实时统计结果。
create 'adstream','area_ads_count'
实现Kafka生产者
在项目SparkProject的java目录新建Package包“cn.itcast.streaming”,用于存放广告点击流实时统计的Java文件,并在该包中创建Java类文件MockRealTime,用于实现Kafka生产者,生产用户广告点击流数据。
启动Kafka消费者
打开虚拟机启动大数据集群环境(包括Kafka),使用远程连接工具Xshell连接虚拟机Spark01,进入Kafka安装目录(/export/servers/kafka_2.11-2.0.0)启动Kafka消费者。
bin/kafka-console-consumer.sh \
--bootstrap-server spark01:9092,spark02:9092,spark03:9092 \
--topic ad
查看Kafka消费者
在虚拟机Spark01的Kafka消费者窗口查看数据是否被成功接收。
关闭Kafka消费者
在虚拟机Spark01的Kafka消费者窗口通过组合键“Ctrl+C”关闭当前消费者。
关闭Kafka生产者
在IntelliJ IDEA控制台中单击红色方框的按钮关闭Kafka生产者程序,关闭Kafka生产者程序。
下面的程序放在同一个包streaming中
package cn.itcast.streaming;
import org.apache.kafka.clients.producer.KafkaProducer;
import org.apache.kafka.clients.producer.ProducerRecord;
import java.util.*;
public class MockRealTime {
public static void main(String[] arg) throws InterruptedException {
String topic = "ad";
//创建Kafka生产者
KafkaProducer<String,String> producer = createKafkaProducer();
//每隔1秒向指定topic发送用户广告点击流数据
while (true){
producer.send(new ProducerRecord<>(topic,mockRealTimeData()));
Thread.sleep(1000);
}
}
public static KafkaProducer<String,String> createKafkaProducer(){
//创建Properties对象,用于配置Kafka
Properties props = new Properties();
//指定服务器地址
props.put("bootstrap.servers",
"hadoop1:9092,hadoop2:9092,hadoop3:9092");//注意9092前面的hadoop123修改为你的虚拟机名
//序列化key
props.put("key.serializer",
"org.apache.kafka.common.serialization.StringSerializer");
//序列化value
props.put("value.serializer",
"org.apache.kafka.common.serialization.StringSerializer");
//Leader接收到消息后,需保证保持同步的Follower也同步消息,默认参数为1
props.put("acks", "all");
//创建Kafka生产者对象KafkaProducer
KafkaProducer<String,String> kafkaProducer
= new KafkaProducer<String, String>(props);
return kafkaProducer;
}
public static String mockRealTimeData() throws InterruptedException {
Random random = new Random();
String[] cityArray = {
"beijing",
"tianjing",
"shanghai",
"chongqing",
"shenzhen",
"guangzhou",
"nanjing",
"chengdu",
"zhengzhou",
"hefei",
"wuhan"
};
long time = System.currentTimeMillis();
String city = cityArray[random.nextInt(cityArray.length)];
String userid = String.valueOf(random.nextInt(100));
String adid = String.valueOf(random.nextInt(10));
String userAd = time + "," + userid + "," + adid + "," + city;
Thread.sleep(10);
return userAd;
}
}
package cn.itcast.streaming;
import cn.itcast.hbase.HbaseConnect;
import cn.itcast.hbase.HbaseUtils;
import org.apache.hadoop.hbase.client.Result;
import org.apache.hadoop.hbase.client.ResultScanner;
import org.apache.kafka.clients.consumer.ConsumerRecord;
import org.apache.kafka.common.serialization.StringDeserializer;
import org.apache.spark.SparkConf;
import org.apache.spark.api.java.JavaPairRDD;
import org.apache.spark.api.java.Optional;
import org.apache.spark.api.java.function.Function;
import org.apache.spark.api.java.function.Function2;
import org.apache.spark.api.java.function.PairFunction;
import org.apache.spark.api.java.function.VoidFunction;
import org.apache.spark.streaming.Durations;
import org.apache.spark.streaming.api.java.JavaInputDStream;
import org.apache.spark.streaming.api.java.JavaPairDStream;
import org.apache.spark.streaming.api.java.JavaStreamingContext;
import org.apache.spark.streaming.kafka010.ConsumerStrategies;
import org.apache.spark.streaming.kafka010.KafkaUtils;
import org.apache.spark.streaming.kafka010.LocationStrategies;
import scala.Tuple2;
import java.io.IOException;
import java.util.*;
public class AdsRealTime {
public static void main(String[] arg) throws IOException,
InterruptedException {
System.setProperty("HADOOP_USER_NAME","root");
SparkConf conf = new SparkConf()
//配置Spark Streaming程序运以本地方式运行
.setMaster("local[2]")
.setAppName("stream_ad");
//配置Spark Streaming程序间隔5秒消费Kafka生产者生产的用户广告点击流数据
JavaStreamingContext jsc =
new JavaStreamingContext(conf, Durations.seconds(5));
jsc.checkpoint("hdfs://192.168.253.101:9000/checkpoint");//注意将9000前面的虚拟机地址修改为你的虚拟机地址
//指定Kafka消费者的topic
final Collection<String> topics = Arrays.asList("ad");
Map<String, Object> kafkaParams = new HashMap<>();
//Kafka服务监听端口
kafkaParams.put("bootstrap.servers",
"hadoop1:9092,hadoop2:9092,hadoop3:9092");//注意9092前面的hadoop123修改为你的虚拟机名
//指定kafka输出key的数据类型及编码格式(默认为字符串类型编码格式为uft-8)
kafkaParams.put("key.deserializer", StringDeserializer.class);
//指定kafka输出value的数据类型及编码格式(默认为字符串类型编码格式为uft-8)
kafkaParams.put("value.deserializer", StringDeserializer.class);
//消费者组名称,随便指定一个名称即可
kafkaParams.put("group.id", "adstream");
//表示如果有offset记录就从offset记录开始消费,
// 如果没有就从最新的数据开始消费,offset是用来记录消费到哪一条数据了
kafkaParams.put("auto.offset.reset", "latest");
//消费者定期自动提交offset到ZooKeeper
kafkaParams.put("enable.auto.commit", true);
JavaInputDStream<ConsumerRecord<String, String>> userAdStream =
KafkaUtils.createDirectStream(
jsc,
LocationStrategies.PreferConsistent(),
ConsumerStrategies.<String, String>Subscribe(topics, kafkaParams)
);
JavaPairDStream<String,Tuple2<String,String>> userClickAdsStream =
userAdStream.mapToPair(
(PairFunction<
ConsumerRecord<String, String>,
String,
Tuple2<String, String>
>)
record -> {
//将获取的每一行数据record通过分隔符“,”进行切分,生成数组value
String[] value = record.value().split(",");
String userid =value[1];
String adid =value[2];
String city =value[3];
return new Tuple2<>(userid,new Tuple2<>(adid,city));
});
JavaPairRDD<String,String> blackUserRDD =
jsc.sparkContext().parallelizePairs(getBlackUser());
JavaPairDStream<String,Tuple2<String,String>> checkUserClickAdsStream
= userClickAdsStream.transformToPair(
(Function<
JavaPairRDD<String, Tuple2<String, String>>,
JavaPairRDD<String, Tuple2<String, String>>>)
userClickAdsRDD -> {
JavaPairRDD<String, Tuple2<Tuple2<String, String>, Optional<String>>>
joinBlackAdsUserRDD = userClickAdsRDD.leftOuterJoin(blackUserRDD);
JavaPairRDD<String, Tuple2<Tuple2<String, String>, Optional<String>>>
filterBlackAdsUserRDD = joinBlackAdsUserRDD.filter(
(Function<Tuple2<String, Tuple2<Tuple2<String, String>,
Optional<String>>>,
Boolean>)
joinBlackAdsUserTuple2 ->
!joinBlackAdsUserTuple2._2._2.isPresent());
JavaPairRDD<String, Tuple2<String, String>> mapBlackAdsUserRDD
= filterBlackAdsUserRDD.mapToPair((PairFunction<Tuple2<String,
Tuple2<Tuple2<String, String>,
Optional<String>>>,
String,
Tuple2<String, String>>)
filterBlackAdsUserTuple2 ->
new Tuple2<>(
filterBlackAdsUserTuple2._1,
new Tuple2<>(
filterBlackAdsUserTuple2._2._1._1,
filterBlackAdsUserTuple2._2._1._2)));
return mapBlackAdsUserRDD;
});
JavaPairDStream<Tuple2<String,String>, Integer> areaAdsStream
= checkUserClickAdsStream.mapToPair(
(PairFunction<
Tuple2<String, Tuple2<String, String>>,
Tuple2<String, String>,
Integer>) checkUserClickAdsTuple2 -> {
String adid = checkUserClickAdsTuple2._2._1;
String city = checkUserClickAdsTuple2._2._2;
return new Tuple2<>(new Tuple2<>(city,adid),new Integer(1));
});
JavaPairDStream<Tuple2<String,String>, Integer> countAreaAdsStream
= areaAdsStream.updateStateByKey(
(Function2<
List<Integer>,
Optional<Integer>,
Optional<Integer>
>)
(valueList, oldState) -> {
Integer newState = 0;
if (oldState.isPresent()){
newState = oldState.get();
}
for (Integer value : valueList){
newState += value;
}
return Optional.of(newState);
});
JavaPairDStream<String,Integer> userStream = checkUserClickAdsStream
.mapToPair(
(PairFunction<Tuple2<String, Tuple2<String, String>>,
String, Integer>) checkUserClickAdsTuple2 ->
new Tuple2<>(
checkUserClickAdsTuple2._1,
new Integer(1)));
JavaPairDStream<String, Integer> countUserStream
= userStream.updateStateByKey(
(Function2<
List<Integer>,
Optional<Integer>,
Optional<Integer>
>)
(valueList, oldState) -> {
Integer newState = 0;
if (oldState.isPresent()){
newState = oldState.get();
}
for (Integer value : valueList){
newState += value;
}
return Optional.of(newState);
});
countUserStream.foreachRDD((VoidFunction<JavaPairRDD<String, Integer>>)
countUserRDD ->
countUserRDD.foreach((VoidFunction<Tuple2<String, Integer>>)
countUserTuple2 -> {
if (countUserTuple2._2>100){
HbaseUtils.putsOneToHBase(
"blacklist",
"user"+countUserTuple2._1,
"black_user",
"userid",
countUserTuple2._1);
}
}));
countAreaAdsStream.foreachRDD((
VoidFunction<
JavaPairRDD<
Tuple2<String, String>,
Integer>>
) countAreaAdsRDD ->
countAreaAdsRDD
.foreach(
(VoidFunction<
Tuple2<
Tuple2<String, String>
, Integer>>)
countAreaAdsTuple2 -> {
//获取广告ID
String adid = countAreaAdsTuple2._1._2;
//获取城市名称
String city = countAreaAdsTuple2._1._1;
//获取广告在城市中的点击次数
int count = countAreaAdsTuple2._2;
HbaseUtils.putsOneToHBase("adstream",
city+"_"+adid,
"area_ads_count",
"area",
city);
HbaseUtils.putsOneToHBase("adstream",
city+"_"+adid,
"area_ads_count",
"ad",
adid);
HbaseUtils.putsOneToHBase("adstream",
city+"_"+adid,
"area_ads_count",
"count",
String.valueOf(count));
}));
jsc.start();
jsc.awaitTermination();
HbaseConnect.closeConnection();
jsc.close();
}
public static ArrayList getBlackUser() throws IOException {
//获取黑名单用户数据
ResultScanner blcakResult = HbaseUtils.scan("blacklist");
Iterator<Result> blackIterator = blcakResult.iterator();
ArrayList<Tuple2<String,String>> blackList = new ArrayList<>();
//遍历迭代器blackIterator将黑名单用户添加到集合blackList
while (blackIterator.hasNext()){
String blackUserId = new String(blackIterator.next().value());
blackList.add(new Tuple2<>(blackUserId,"black"));
}
return blackList;
}
}
启动集群环境
启动Kafka生产者程序
启动用户广告点击流实时统计程序
Kafka生产者程序和用户广告点击流实时统计程序启动成功后,可在IDEA的控制台查看程序运行状态。
使用远程连接工具Xshell连接虚拟机Spark01,执行“hbase shell”命令,进入HBase命令行工具,在HBase命令行工具中执行“scan ‘adstream’”命令,查看HBase数据库中表adstream的统计结果。
更多推荐
所有评论(0)