• 任务介绍

本 MapReduce 程序旨在对豆瓣 Top250 电影数据进行分析,主要实现以下功能:

  1. 统计导演出现次数:计算每个导演在数据集中出现的频率,以便了解哪些导演的作品在榜单中较为常见。
  2. 统计国家出现次数:分析各个国家电影在豆瓣 Top250 榜单中的分布情况,确定哪些国家的电影更受关注。
  3. 统计年份出现次数:展示不同年份电影在榜单中的数量分布,反映电影产业在不同时期的发展态势。

通过对这些数据的分析,可以深入挖掘豆瓣 Top250 电影数据中的信息,为电影行业研究、文化传播等提供有价值的参考。

  • 数据文件介绍
  • 数据文件格式

数据文件采用 CSV(逗号分隔值)格式,文件编码为 UTF - 8,文件内容为中文和数字。每一行代表一部电影的相关信息,包含电影名称、导演、电影类型、国家、上映年份、评分、评论人数等字段,字段之间使用逗号分隔。

  • 数据文件内容概述

该数据文件是使用python在豆瓣网进行爬取获得的豆瓣top250数据,数据文件包含了豆瓣 Top250 电影的丰富信息,涵盖了多种类型、不同国家和不同年代的电影作品。这些电影在评分和评论人数方面具有较高的代表性,能够反映大众对电影的评价和喜好。

  • 数据文件截图(示例)

以下是数据文件的部分截图(使用excel编辑器打开数据文件的示例):

  • MapReduce 程序基本结构介绍及解释

  • 程序整体结构

本程序主要由以下几个部分组成:

  1. Mapper :负责将输入数据进行初步处理,提取关键信息并转换为键值对形式输出。
  2. Reducer :对 Mapper 输出的具有相同键的值进行合并和处理,计算最终的统计结果。
  3. main 方法:用于配置和启动 MapReduce 作业,设置输入输出路径、Mapper 和 Reducer 类等参数。
  • Mapper 类(DirectorAndCountryMapper)

  1. 功能
    • 从输入的 CSV 格式的电影数据行中,根据逗号分隔符提取导演、国家和年份信息。
    • 将导演信息格式化为director:导演姓名,国家信息格式化为country:国家名称,年份信息格式化为year:年份数字作为键,值固定为 1,表示出现一次,然后输出键值对。
  2. 代码示例及解释
public static class DirectorAndCountryMapper extends Mapper<Object, Text, Text, IntWritable> {

    private final static IntWritable one = new IntWritable(1);

    private Text field = new Text();

    public void map(Object key, Text value, Context context) throws IOException, InterruptedException {

        String[] fields = value.toString().split(",");

        if (fields.length >= 2) {

            // 统计导演出现次数

            field.set("director:" + fields[1].trim());

            context.write(field, one);

        }

        if (fields.length >= 4) {

            // 统计国家出现次数

            field.set("country:" + fields[3].trim());

            context.write(field, one);

        }

        if (fields.length >= 5) {

            // 统计年份出现次数

            field.set("year:" + fields[4].trim());

            context.write(field, one);

        }

    }

}

在map方法中,首先将输入的文本行(代表一部电影的信息)按逗号分割成字符串数组fields。然后通过条件判断分别提取导演、国家和年份信息,并将其与相应的前缀(如director:、country:、year:)组合成新的键,值为 1,通过context.write方法输出,以便后续 Reducer 处理。

  • Reducer 类(DirectorAndCountryReducer)

  1. 功能
    • 接收 Mapper 输出的键值对,其中键为格式化后的导演、国家或年份信息,值为 1 的迭代器。
    • 对相同键的值进行求和,得到每个导演、国家或年份的出现总次数。
    • 将最终结果以相同的键和计算得到的出现次数作为值输出。
  2. 代码示例及解释
public static class DirectorAndCountryReducer extends Reducer<Text, IntWritable, Text, IntWritable> {

    private IntWritable result = new IntWritable();



    public void reduce(Text key, Iterable<IntWritable> values, Context context)

            throws IOException, InterruptedException {

        int sum = 0;

        for (IntWritable val : values) {

            sum += val.get();

        }

        result.set(sum);

        context.write(key, result);

    }

}

在reduce方法中,对于每个唯一的键(如特定导演、国家或年份),遍历其对应的值(都是 1),将这些值累加得到总和sum。然后将总和设置到result变量中,并通过context.write方法输出最终的键值对,其中键为原始的格式化信息,值为出现次数总和。

  • main 方法

  1. 功能
    • 配置 Hadoop 作业,包括创建Configuration对象和Job对象。
    • 设置作业名称、指定程序入口类(setJarByClass)、设置 Mapper 和 Reducer 类(setMapperClass和setReducerClass)。
    • 设置输出键值类型(setOutputKeyClass和setOutputValueClass),并指定输入输出路径(addInputPath和setOutputPath)。
    • 提交作业并等待其完成,根据作业完成状态返回相应的退出码。
  2. 代码示例及解释
public static void main(String[] args) throws Exception {

    Configuration conf = new Configuration();

    Job = Job.getInstance(conf, "douban analysis");

    job.setJarByClass(DoubanAnalysis.class);

    job.setMapperClass(DirectorAndCountryMapper.class);

    job.setReducerClass(DirectorAndCountryReducer.class);

    job.setOutputKeyClass(Text.class);

    job.setOutputValueClass(IntWritable.class);

    FileInputFormat.addInputPath(job, new Path(args[0]));

    FileOutputFormat.setOutputPath(job, new Path(args[1]));

    boolean success = job.waitForCompletion(true);

    if (success) {

        // 后续读取结果并输出的逻辑

        //...

        System.exit(0);

    } else {

        System.exit(1);

    }

}

在main方法中,首先创建Configuration对象用于获取和设置 Hadoop 配置信息,然后创建Job对象来代表一个 MapReduce 作业。通过一系列set方法配置作业的各种参数,如类路径、Mapper 和 Reducer 类、输出键值类型等。使用addInputPath和setOutputPath方法指定输入和输出路径,这些路径通常是 HDFS 中的目录。最后通过job.waitForCompletion方法提交作业并等待其完成,根据作业执行成功与否返回相应的退出码(0 表示成功,1 表示失败)。

  • 程序完整代码

import java.io.BufferedReader;
import java.io.IOException;
import java.io.InputStreamReader;
import java.util.Collections;
import java.util.HashMap;
import java.util.LinkedHashMap;
import java.util.Map;
import java.util.stream.Collectors;

import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.FSDataInputStream;
import org.apache.hadoop.fs.FileSystem;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.io.IntWritable;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.mapreduce.Job;
import org.apache.hadoop.mapreduce.Mapper;
import org.apache.hadoop.mapreduce.Reducer;
import org.apache.hadoop.mapreduce.lib.input.FileInputFormat;
import org.apache.hadoop.mapreduce.lib.output.FileOutputFormat;

public class DoubanAnalysis {

    public static class DirectorAndCountryMapper extends Mapper<Object, Text, Text, IntWritable> {
        private final static IntWritable one = new IntWritable(1);
        private Text field = new Text();

        public void map(Object key, Text value, Context context) throws IOException, InterruptedException {
            String[] fields = value.toString().split(",");
            if (fields.length >= 2) {
                // 统计导演出现次数
                field.set("director:" + fields[1].trim());
                context.write(field, one);
            }
            if (fields.length >= 4) {
                // 统计国家出现次数
                field.set("country:" + fields[3].trim());
                context.write(field, one);
            }
            if (fields.length >= 5) {
                // 统计年份出现次数
                field.set("year:" + fields[4].trim());
                context.write(field, one);
            }
        }
    }

    public static class DirectorAndCountryReducer extends Reducer<Text, IntWritable, Text, IntWritable> {
        private IntWritable result = new IntWritable();

        public void reduce(Text key, Iterable<IntWritable> values, Context context)
                throws IOException, InterruptedException {
            int sum = 0;
            for (IntWritable val : values) {
                sum += val.get();
            }
            result.set(sum);
            context.write(key, result);
        }
    }

    public static void main(String[] args) throws Exception {
        Configuration conf = new Configuration();
        Job job = Job.getInstance(conf, "douban analysis");
        job.setJarByClass(DoubanAnalysis.class);
        job.setMapperClass(DirectorAndCountryMapper.class);
        job.setReducerClass(DirectorAndCountryReducer.class);
        job.setOutputKeyClass(Text.class);
        job.setOutputValueClass(IntWritable.class);
        FileInputFormat.addInputPath(job, new Path(args[0]));
        FileOutputFormat.setOutputPath(job, new Path(args[1]));

        boolean success = job.waitForCompletion(true);

        if (success) {
            // 读取导演统计结果并输出前五名
            Path directorOutputPath = new Path(args[1] + "/part-r-00000");
            FileSystem fs = FileSystem.get(conf);
            FSDataInputStream is = fs.open(directorOutputPath);
            BufferedReader br = new BufferedReader(new InputStreamReader(is));

            Map<String, Integer> directorCountMap = new HashMap<>();
            String line;
            while ((line = br.readLine()) != null) {
                String[] parts = line.split("\t");
                if (parts.length == 2 && parts[0].startsWith("director:")) {
                    directorCountMap.put(parts[0].substring(9), Integer.parseInt(parts[1]));
                }
            }
            br.close();

            Map<String, Integer> sortedDirectorCountMap = directorCountMap.entrySet().stream()
                    .sorted(Collections.reverseOrder(Map.Entry.comparingByValue()))
                    .limit(5)
                    .collect(Collectors.toMap(
                            Map.Entry::getKey,
                            Map.Entry::getValue,
                            (oldValue, newValue) -> oldValue, LinkedHashMap::new
                    ));

            System.out.println("导演出现次数前五行:");
            sortedDirectorCountMap.forEach((director, count) -> System.out.println(director + ": " + count));

            // 读取国家统计结果并输出前五名
            Path countryOutputPath = new Path(args[1] + "/part-r-00000");
            is = fs.open(countryOutputPath);
            br = new BufferedReader(new InputStreamReader(is));

            Map<String, Integer> countryCountMap = new HashMap<>();
            while ((line = br.readLine()) != null) {
                String[] parts = line.split("\t");
                if (parts.length == 2 && parts[0].startsWith("country:")) {
                    countryCountMap.put(parts[0].substring(8), Integer.parseInt(parts[1]));
                }
            }
            br.close();

            Map<String, Integer> sortedCountryCountMap = countryCountMap.entrySet().stream()
                    .sorted(Collections.reverseOrder(Map.Entry.comparingByValue()))
                    .limit(5)
                    .collect(Collectors.toMap(
                            Map.Entry::getKey,
                            Map.Entry::getValue,
                            (oldValue, newValue) -> oldValue, LinkedHashMap::new
                    ));

            System.out.println("国家出现次数前五行:");
            sortedCountryCountMap.forEach((country, count) -> System.out.println(country + ": " + count));

            // 读取年份统计结果并输出前五名
            Path yearOutputPath = new Path(args[1] + "/part-r-00000");
            is = fs.open(yearOutputPath);
            br = new BufferedReader(new InputStreamReader(is));

            Map<String, Integer> yearCountMap = new HashMap<>();
            while ((line = br.readLine()) != null) {
                String[] parts = line.split("\t");
                if (parts.length == 2 && parts[0].startsWith("year:")) {
                    yearCountMap.put(parts[0].substring(5), Integer.parseInt(parts[1]));
                }
            }
            br.close();

            Map<String, Integer> sortedYearCountMap = yearCountMap.entrySet().stream()
                    .sorted(Collections.reverseOrder(Map.Entry.comparingByValue()))
                    .limit(5)
                    .collect(Collectors.toMap(
                            Map.Entry::getKey,
                            Map.Entry::getValue,
                            (oldValue, newValue) -> oldValue, LinkedHashMap::new
                    ));

            System.out.println("年份出现次数前五行:");
            sortedYearCountMap.forEach((year, count) -> System.out.println(year + ": " + count));

            System.exit(0);
        } else {
            System.exit(1);
        }
    }
}
  • 运行结果

可以看到mapreduce的运行逻辑

  • 分析结果及分析

  • 分析结果

  1. 导演出现次数前五名
    • 克里斯托弗・诺兰(Christopher Nolan): 5 次
    • 史蒂文・斯皮尔伯格(Steven Spielberg): 5 次
    • 宫崎骏(Hayao Miyazaki): 5 次
    • 李安(Ang Lee): 5 次
    • 詹姆斯・卡梅隆(James Cameron): 2 次
  2. 国家出现次数前五名
    • USA(美国): 112 次
    • Japan(日本): 20 次
    • China (Mainland)/Hong Kong (China)(中国大陆 / 中国香港): 14 次
    • Italy(意大利): 7 次
    • France(法国): 7 次
  3. 年份出现次数前五名
    • 2009: 7 次
    • 1994: 6 次
    • 2001: 4 次
    • 1997: 4 次
    • 1993: 3 次
  • 结果分析

  1. 导演方面
    • 克里斯托弗・诺兰的多部作品在豆瓣 Top250 中名列前茅,这表明他的电影风格和创作才华受到了广泛认可。他擅长执导科幻、悬疑等类型电影,如《星际穿越》《盗梦空间》等,其作品在剧情构思、视觉效果等方面具有独特魅力。
    • 史蒂文・斯皮尔伯格作为好莱坞著名导演,其作品涵盖多种题材,且在电影叙事和情感表达上具有深厚功力,5 次入选体现了他在电影界的重要地位。
    • 宫崎骏的动画作品以深刻的主题、精美的画面和动人的音乐深受观众喜爱,4 次入选说明日本动画电影在全球范围内具有较高影响力。
    • 李安的电影作品在文化融合和情感细腻度上表现出色,他能够驾驭多种类型电影,东西方文化元素在他的作品中得到了很好的呈现。
  2. 国家方面
    • 美国电影在榜单中占据主导地位,超过半数的电影来自美国。这反映了美国电影产业在全球的强大影响力,其在电影制作技术、商业运作和文化输出方面具有显著优势。
    • 日本电影以其独特的艺术风格和文化内涵,如动画电影、剧情片等,在国际上也拥有广泛的受众。
    • 中国(包括中国大陆和中国香港)电影有 14 部入选,显示了中国电影在国际上的一定影响力,尤其是一些经典作品在剧情、表演等方面得到了认可。
    • 意大利和法国电影也有一定数量入选,这两个国家在电影艺术领域具有深厚的历史底蕴和独特的艺术风格,如意大利的新现实主义电影、法国的新浪潮电影等,其电影作品在艺术品质上往往具有较高水准。
  3. 年份方面
    • 2009 年有较多电影入选,这可能与当时电影行业的发展趋势、技术进步以及电影市场的需求变化有关。例如,这一年可能有更多类型丰富、制作精良的电影问世,满足了观众多样化的观影需求。
    • 1994 年和 1997 年等年份也有较多优秀电影出现,这些电影经过时间的沉淀,依然在观众心中保持较高的评价,可能是因为它们在剧情、表演、主题等方面具有经典性和时代意义。
  • 总结与展望

通过本次 MapReduce 数据分析,我们对豆瓣 Top250 电影数据有了更深入的了解。从导演、国家和年份等维度揭示了电影行业的一些特点和趋势。未来,可以进一步扩展分析维度,如电影类型的细分分析、演员出现次数统计等,以获取更全面的电影数据洞察,为电影研究、产业发展等提供更有价值的参考。

  • 版本环境说明

  • 各软件包版本

环境:window11家庭中文版 23H2

VMware16.1.2

镜像:[CentOS-7-x86_64-DVD-2009.iso](https://mirrors.tuna.tsinghua.edu.cn/centos-vault/7.9.2009/isos/x86_64/CentOS-7-x86_64-DVD-2009.iso)

jdk:jdk-8u202-linux-x64.tar.gz

hadoop:hadoop-3.3.5.tar.gz

  1. 关于如何运行整个程序首先将java程序打包好成jar

首先将依赖的库打包创建,在idea工具栏里的文件里项目结构,选择工件(artifact),新建一个工件,命名为java类名,然后点击确定保存即可。

选择来自具有依赖的模块 下一步去到工具栏里的构建,选择构建工件,再在继续弹出的窗口选择构建他就会开始打包为jar包了,完成构建后会在项目目录下生成一个out文件夹,打包好的jar包就在最深的目录里。

  1. 将数据文件和jar包通过xshell里xftp链接到虚拟机将文件传入


这里我先放到一个文件夹里

根据作业要求在hadoop根目录下创建一个如下路径的文件夹input作为输入文件夹,注意不用创建ouput文件夹,运行程序会自动生成。

hadoop fs -mkdir -p /user/root/input #创建输入文件夹

hadoop fs -put /opt/douban.csv /user/root/input #将虚拟机目录下的数据文件放入hadoop的输入文件夹

hadoop jar /opt/douban_analysis.jar input output #运行jar包

#在启动 Hadoop 的命令行终端中,使用hadoop jar命令运行 MapReduce 程序,命令格式如下:

hadoop jar <jar包路径> <输入路径> <输出路径>

#程序运行完成后,可以使用以下命令查看输出结果:

hadoop fs -ls <输出路径>

hadoop fs -cat <输出路径>/part-r-00000

Logo

永洪科技,连续七届荣获BI第一名的数据技术厂商,提供数据/智能分析、数据资产及治理、实施等能力。

更多推荐