MapReduce 数据分析报告(小案例分享)
智慧数据人的小作业,仅作参考
-
任务介绍
本 MapReduce 程序旨在对豆瓣 Top250 电影数据进行分析,主要实现以下功能:
- 统计导演出现次数:计算每个导演在数据集中出现的频率,以便了解哪些导演的作品在榜单中较为常见。
- 统计国家出现次数:分析各个国家电影在豆瓣 Top250 榜单中的分布情况,确定哪些国家的电影更受关注。
- 统计年份出现次数:展示不同年份电影在榜单中的数量分布,反映电影产业在不同时期的发展态势。
通过对这些数据的分析,可以深入挖掘豆瓣 Top250 电影数据中的信息,为电影行业研究、文化传播等提供有价值的参考。
- 数据文件介绍
- 数据文件格式
数据文件采用 CSV(逗号分隔值)格式,文件编码为 UTF - 8,文件内容为中文和数字。每一行代表一部电影的相关信息,包含电影名称、导演、电影类型、国家、上映年份、评分、评论人数等字段,字段之间使用逗号分隔。
- 数据文件内容概述
该数据文件是使用python在豆瓣网进行爬取获得的豆瓣top250数据,数据文件包含了豆瓣 Top250 电影的丰富信息,涵盖了多种类型、不同国家和不同年代的电影作品。这些电影在评分和评论人数方面具有较高的代表性,能够反映大众对电影的评价和喜好。
- 数据文件截图(示例)
以下是数据文件的部分截图(使用excel编辑器打开数据文件的示例):
-
MapReduce 程序基本结构介绍及解释
-
程序整体结构
本程序主要由以下几个部分组成:
- Mapper 类:负责将输入数据进行初步处理,提取关键信息并转换为键值对形式输出。
- Reducer 类:对 Mapper 输出的具有相同键的值进行合并和处理,计算最终的统计结果。
- main 方法:用于配置和启动 MapReduce 作业,设置输入输出路径、Mapper 和 Reducer 类等参数。
-
Mapper 类(DirectorAndCountryMapper)
- 功能:
- 从输入的 CSV 格式的电影数据行中,根据逗号分隔符提取导演、国家和年份信息。
- 将导演信息格式化为director:导演姓名,国家信息格式化为country:国家名称,年份信息格式化为year:年份数字作为键,值固定为 1,表示出现一次,然后输出键值对。
- 代码示例及解释:
public static class DirectorAndCountryMapper extends Mapper<Object, Text, Text, IntWritable> {
private final static IntWritable one = new IntWritable(1);
private Text field = new Text();
public void map(Object key, Text value, Context context) throws IOException, InterruptedException {
String[] fields = value.toString().split(",");
if (fields.length >= 2) {
// 统计导演出现次数
field.set("director:" + fields[1].trim());
context.write(field, one);
}
if (fields.length >= 4) {
// 统计国家出现次数
field.set("country:" + fields[3].trim());
context.write(field, one);
}
if (fields.length >= 5) {
// 统计年份出现次数
field.set("year:" + fields[4].trim());
context.write(field, one);
}
}
}
在map方法中,首先将输入的文本行(代表一部电影的信息)按逗号分割成字符串数组fields。然后通过条件判断分别提取导演、国家和年份信息,并将其与相应的前缀(如director:、country:、year:)组合成新的键,值为 1,通过context.write方法输出,以便后续 Reducer 处理。
-
Reducer 类(DirectorAndCountryReducer)
- 功能:
- 接收 Mapper 输出的键值对,其中键为格式化后的导演、国家或年份信息,值为 1 的迭代器。
- 对相同键的值进行求和,得到每个导演、国家或年份的出现总次数。
- 将最终结果以相同的键和计算得到的出现次数作为值输出。
- 代码示例及解释:
public static class DirectorAndCountryReducer extends Reducer<Text, IntWritable, Text, IntWritable> {
private IntWritable result = new IntWritable();
public void reduce(Text key, Iterable<IntWritable> values, Context context)
throws IOException, InterruptedException {
int sum = 0;
for (IntWritable val : values) {
sum += val.get();
}
result.set(sum);
context.write(key, result);
}
}
在reduce方法中,对于每个唯一的键(如特定导演、国家或年份),遍历其对应的值(都是 1),将这些值累加得到总和sum。然后将总和设置到result变量中,并通过context.write方法输出最终的键值对,其中键为原始的格式化信息,值为出现次数总和。
-
main 方法
- 功能:
- 配置 Hadoop 作业,包括创建Configuration对象和Job对象。
- 设置作业名称、指定程序入口类(setJarByClass)、设置 Mapper 和 Reducer 类(setMapperClass和setReducerClass)。
- 设置输出键值类型(setOutputKeyClass和setOutputValueClass),并指定输入输出路径(addInputPath和setOutputPath)。
- 提交作业并等待其完成,根据作业完成状态返回相应的退出码。
- 代码示例及解释:
public static void main(String[] args) throws Exception {
Configuration conf = new Configuration();
Job = Job.getInstance(conf, "douban analysis");
job.setJarByClass(DoubanAnalysis.class);
job.setMapperClass(DirectorAndCountryMapper.class);
job.setReducerClass(DirectorAndCountryReducer.class);
job.setOutputKeyClass(Text.class);
job.setOutputValueClass(IntWritable.class);
FileInputFormat.addInputPath(job, new Path(args[0]));
FileOutputFormat.setOutputPath(job, new Path(args[1]));
boolean success = job.waitForCompletion(true);
if (success) {
// 后续读取结果并输出的逻辑
//...
System.exit(0);
} else {
System.exit(1);
}
}
在main方法中,首先创建Configuration对象用于获取和设置 Hadoop 配置信息,然后创建Job对象来代表一个 MapReduce 作业。通过一系列set方法配置作业的各种参数,如类路径、Mapper 和 Reducer 类、输出键值类型等。使用addInputPath和setOutputPath方法指定输入和输出路径,这些路径通常是 HDFS 中的目录。最后通过job.waitForCompletion方法提交作业并等待其完成,根据作业执行成功与否返回相应的退出码(0 表示成功,1 表示失败)。
-
程序完整代码
import java.io.BufferedReader;
import java.io.IOException;
import java.io.InputStreamReader;
import java.util.Collections;
import java.util.HashMap;
import java.util.LinkedHashMap;
import java.util.Map;
import java.util.stream.Collectors;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.FSDataInputStream;
import org.apache.hadoop.fs.FileSystem;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.io.IntWritable;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.mapreduce.Job;
import org.apache.hadoop.mapreduce.Mapper;
import org.apache.hadoop.mapreduce.Reducer;
import org.apache.hadoop.mapreduce.lib.input.FileInputFormat;
import org.apache.hadoop.mapreduce.lib.output.FileOutputFormat;
public class DoubanAnalysis {
public static class DirectorAndCountryMapper extends Mapper<Object, Text, Text, IntWritable> {
private final static IntWritable one = new IntWritable(1);
private Text field = new Text();
public void map(Object key, Text value, Context context) throws IOException, InterruptedException {
String[] fields = value.toString().split(",");
if (fields.length >= 2) {
// 统计导演出现次数
field.set("director:" + fields[1].trim());
context.write(field, one);
}
if (fields.length >= 4) {
// 统计国家出现次数
field.set("country:" + fields[3].trim());
context.write(field, one);
}
if (fields.length >= 5) {
// 统计年份出现次数
field.set("year:" + fields[4].trim());
context.write(field, one);
}
}
}
public static class DirectorAndCountryReducer extends Reducer<Text, IntWritable, Text, IntWritable> {
private IntWritable result = new IntWritable();
public void reduce(Text key, Iterable<IntWritable> values, Context context)
throws IOException, InterruptedException {
int sum = 0;
for (IntWritable val : values) {
sum += val.get();
}
result.set(sum);
context.write(key, result);
}
}
public static void main(String[] args) throws Exception {
Configuration conf = new Configuration();
Job job = Job.getInstance(conf, "douban analysis");
job.setJarByClass(DoubanAnalysis.class);
job.setMapperClass(DirectorAndCountryMapper.class);
job.setReducerClass(DirectorAndCountryReducer.class);
job.setOutputKeyClass(Text.class);
job.setOutputValueClass(IntWritable.class);
FileInputFormat.addInputPath(job, new Path(args[0]));
FileOutputFormat.setOutputPath(job, new Path(args[1]));
boolean success = job.waitForCompletion(true);
if (success) {
// 读取导演统计结果并输出前五名
Path directorOutputPath = new Path(args[1] + "/part-r-00000");
FileSystem fs = FileSystem.get(conf);
FSDataInputStream is = fs.open(directorOutputPath);
BufferedReader br = new BufferedReader(new InputStreamReader(is));
Map<String, Integer> directorCountMap = new HashMap<>();
String line;
while ((line = br.readLine()) != null) {
String[] parts = line.split("\t");
if (parts.length == 2 && parts[0].startsWith("director:")) {
directorCountMap.put(parts[0].substring(9), Integer.parseInt(parts[1]));
}
}
br.close();
Map<String, Integer> sortedDirectorCountMap = directorCountMap.entrySet().stream()
.sorted(Collections.reverseOrder(Map.Entry.comparingByValue()))
.limit(5)
.collect(Collectors.toMap(
Map.Entry::getKey,
Map.Entry::getValue,
(oldValue, newValue) -> oldValue, LinkedHashMap::new
));
System.out.println("导演出现次数前五行:");
sortedDirectorCountMap.forEach((director, count) -> System.out.println(director + ": " + count));
// 读取国家统计结果并输出前五名
Path countryOutputPath = new Path(args[1] + "/part-r-00000");
is = fs.open(countryOutputPath);
br = new BufferedReader(new InputStreamReader(is));
Map<String, Integer> countryCountMap = new HashMap<>();
while ((line = br.readLine()) != null) {
String[] parts = line.split("\t");
if (parts.length == 2 && parts[0].startsWith("country:")) {
countryCountMap.put(parts[0].substring(8), Integer.parseInt(parts[1]));
}
}
br.close();
Map<String, Integer> sortedCountryCountMap = countryCountMap.entrySet().stream()
.sorted(Collections.reverseOrder(Map.Entry.comparingByValue()))
.limit(5)
.collect(Collectors.toMap(
Map.Entry::getKey,
Map.Entry::getValue,
(oldValue, newValue) -> oldValue, LinkedHashMap::new
));
System.out.println("国家出现次数前五行:");
sortedCountryCountMap.forEach((country, count) -> System.out.println(country + ": " + count));
// 读取年份统计结果并输出前五名
Path yearOutputPath = new Path(args[1] + "/part-r-00000");
is = fs.open(yearOutputPath);
br = new BufferedReader(new InputStreamReader(is));
Map<String, Integer> yearCountMap = new HashMap<>();
while ((line = br.readLine()) != null) {
String[] parts = line.split("\t");
if (parts.length == 2 && parts[0].startsWith("year:")) {
yearCountMap.put(parts[0].substring(5), Integer.parseInt(parts[1]));
}
}
br.close();
Map<String, Integer> sortedYearCountMap = yearCountMap.entrySet().stream()
.sorted(Collections.reverseOrder(Map.Entry.comparingByValue()))
.limit(5)
.collect(Collectors.toMap(
Map.Entry::getKey,
Map.Entry::getValue,
(oldValue, newValue) -> oldValue, LinkedHashMap::new
));
System.out.println("年份出现次数前五行:");
sortedYearCountMap.forEach((year, count) -> System.out.println(year + ": " + count));
System.exit(0);
} else {
System.exit(1);
}
}
}
-
运行结果
可以看到mapreduce的运行逻辑
-
分析结果及分析
-
分析结果
- 导演出现次数前五名:
- 克里斯托弗・诺兰(Christopher Nolan): 5 次
- 史蒂文・斯皮尔伯格(Steven Spielberg): 5 次
- 宫崎骏(Hayao Miyazaki): 5 次
- 李安(Ang Lee): 5 次
- 詹姆斯・卡梅隆(James Cameron): 2 次
- 国家出现次数前五名:
- USA(美国): 112 次
- Japan(日本): 20 次
- China (Mainland)/Hong Kong (China)(中国大陆 / 中国香港): 14 次
- Italy(意大利): 7 次
- France(法国): 7 次
- 年份出现次数前五名:
- 2009: 7 次
- 1994: 6 次
- 2001: 4 次
- 1997: 4 次
- 1993: 3 次
-
结果分析
- 导演方面:
- 克里斯托弗・诺兰的多部作品在豆瓣 Top250 中名列前茅,这表明他的电影风格和创作才华受到了广泛认可。他擅长执导科幻、悬疑等类型电影,如《星际穿越》《盗梦空间》等,其作品在剧情构思、视觉效果等方面具有独特魅力。
- 史蒂文・斯皮尔伯格作为好莱坞著名导演,其作品涵盖多种题材,且在电影叙事和情感表达上具有深厚功力,5 次入选体现了他在电影界的重要地位。
- 宫崎骏的动画作品以深刻的主题、精美的画面和动人的音乐深受观众喜爱,4 次入选说明日本动画电影在全球范围内具有较高影响力。
- 李安的电影作品在文化融合和情感细腻度上表现出色,他能够驾驭多种类型电影,东西方文化元素在他的作品中得到了很好的呈现。
- 国家方面:
- 美国电影在榜单中占据主导地位,超过半数的电影来自美国。这反映了美国电影产业在全球的强大影响力,其在电影制作技术、商业运作和文化输出方面具有显著优势。
- 日本电影以其独特的艺术风格和文化内涵,如动画电影、剧情片等,在国际上也拥有广泛的受众。
- 中国(包括中国大陆和中国香港)电影有 14 部入选,显示了中国电影在国际上的一定影响力,尤其是一些经典作品在剧情、表演等方面得到了认可。
- 意大利和法国电影也有一定数量入选,这两个国家在电影艺术领域具有深厚的历史底蕴和独特的艺术风格,如意大利的新现实主义电影、法国的新浪潮电影等,其电影作品在艺术品质上往往具有较高水准。
- 年份方面:
- 2009 年有较多电影入选,这可能与当时电影行业的发展趋势、技术进步以及电影市场的需求变化有关。例如,这一年可能有更多类型丰富、制作精良的电影问世,满足了观众多样化的观影需求。
- 1994 年和 1997 年等年份也有较多优秀电影出现,这些电影经过时间的沉淀,依然在观众心中保持较高的评价,可能是因为它们在剧情、表演、主题等方面具有经典性和时代意义。
-
总结与展望
通过本次 MapReduce 数据分析,我们对豆瓣 Top250 电影数据有了更深入的了解。从导演、国家和年份等维度揭示了电影行业的一些特点和趋势。未来,可以进一步扩展分析维度,如电影类型的细分分析、演员出现次数统计等,以获取更全面的电影数据洞察,为电影研究、产业发展等提供更有价值的参考。
-
版本环境说明
-
各软件包版本
环境:window11家庭中文版 23H2
VMware16.1.2
镜像:[CentOS-7-x86_64-DVD-2009.iso](https://mirrors.tuna.tsinghua.edu.cn/centos-vault/7.9.2009/isos/x86_64/CentOS-7-x86_64-DVD-2009.iso)
jdk:jdk-8u202-linux-x64.tar.gz
hadoop:hadoop-3.3.5.tar.gz
-
关于如何运行整个程序首先将java程序打包好成jar
首先将依赖的库打包创建,在idea工具栏里的文件里项目结构,选择工件(artifact),新建一个工件,命名为java类名,然后点击确定保存即可。
选择来自具有依赖的模块 下一步去到工具栏里的构建,选择构建工件,再在继续弹出的窗口选择构建他就会开始打包为jar包了,完成构建后会在项目目录下生成一个out文件夹,打包好的jar包就在最深的目录里。
- 将数据文件和jar包通过xshell里xftp链接到虚拟机将文件传入
这里我先放到一个文件夹里
根据作业要求在hadoop根目录下创建一个如下路径的文件夹input作为输入文件夹,注意不用创建ouput文件夹,运行程序会自动生成。
hadoop fs -mkdir -p /user/root/input #创建输入文件夹
hadoop fs -put /opt/douban.csv /user/root/input #将虚拟机目录下的数据文件放入hadoop的输入文件夹
hadoop jar /opt/douban_analysis.jar input output #运行jar包
#在启动 Hadoop 的命令行终端中,使用hadoop jar命令运行 MapReduce 程序,命令格式如下:
hadoop jar <jar包路径> <输入路径> <输出路径>
#程序运行完成后,可以使用以下命令查看输出结果:
hadoop fs -ls <输出路径>
hadoop fs -cat <输出路径>/part-r-00000
更多推荐
所有评论(0)