实验五总共有三个小实验,其中第三个是其中逻辑最复杂的一个,我今天结合官方的实现代码来讲解其中的执行过程。

首先是以如下形式表示的表明亲子关系的输入文件,左列为孩子名字,右列为父母名字,中间以空格分隔。其中第一行是表头,第二行开始才是真实的数据,所以在处理数据时不考虑第一行。

child-parent.txt

child parent
Steven Lucy
Steven Jack
Jone Lucy
Jone Jack
Lucy Mary
Lucy Frank
Jack Alice
Jack Jesse
David Alice
David Jesse
Philip David
Philip Alma
Mark David
Mark Alma

经过MapReduce之前的InputFormat,InputSplit和RecordReader对输入文件所进行的预处理,该child-parent.txt文件已经被转换为如下的键值对形式:

<1,"child parent">
<2,"Steven Lucy">
<3,"Steven Jack">
<4,"Jone Lucy">
<5,"Jone Jack">
<6,"Lucy Mary">
<7,"Lucy Frank">
<8,"Jack Alice">
<9,"Jack Jesse">
<10,"David Alice">
<11,"David Jesse">
<12,"Philip David">
<13,"Philip Alma">
<14,"Mark David">
<15,"Mark Alma">

其中key是文档中某行的行号,value是该行的内容,这就是Map的输入。以下是实现程序Map流程的代码:

public static class Map extends Mapper<Object, Text, Text, Text>{
        public void map(Object key, Text value, Context context) throws IOException,InterruptedException{
            String child_name = new String();
            String parent_name = new String();
            String relation_type = new String();
            String line = value.toString();
            int i = 0;
            while(line.charAt(i) != ' '){
                i++;
            }
            String[] values = {line.substring(0,i),line.substring(i+1)};
            if(values[0].compareTo("child") != 0){
                child_name = values[0];
                parent_name = values[1];
                relation_type = "1";//左右表区分标志
                context.write(new Text(values[1]), new Text(relation_type+"+"+child_name+"+"+parent_name));
                //左表
                relation_type = "2";
                context.write(new Text(values[0]), new Text(relation_type+"+"+child_name+"+"+parent_name));
                //右表
            }
        }
    }

从它继承的Mapper类的泛型中可以看出它是以<Object,Text>的形式来接收以上输入的键值对的,其中Object是Java原生的最高级的对象类型,而Text则是Hadoop内置的文本类型,之所以这样定义是因为在Map函数的处理过程中并不需要用到key值,所以对于key值的具体类型并不敏感。而要求Map函数处理后的输出类型为<Text,Text>,它内部主要做了如下操作:

String child_name = new String();
String parent_name = new String();
String relation_type = new String();
String line = value.toString();//将输入的Text类型转为String类型
/*
下面5行代码实现了将每行数据中的子女名与父母名以空格分开,并分别存放于数组values中*/
int i = 0;
while(line.charAt(i) != ' '){
    i++;
}
String[] values = {line.substring(0,i),line.substring(i+1)};
/*
以下代码实现的功能是将数组中的子女名和父母名两项提取出来进行处理最后输出成
左右两个表的形式,即将输入时的一行<child_name,parent_name>,经过处理后输
出时会变成两行<parent_name,1+child_name+parent_name>和
<child_name,2+child_name+parent_name>,
其中的1和2是标志位,1标志当前的键在值所标示的关系中是父母的地位,2标志当前
的键在值所标示的关系中是孩子的地位。最外面的判断条件确保了表头不会被处理。
*/
if(values[0].compareTo("child") != 0){
child_name = values[0];
parent_name = values[1];
relation_type = "1";//左右表区分标志
context.write(new Text(values[1]), new Text(relation_type+"+"+child_name+"+"+parent_name));
                //左表
relation_type = "2";
context.write(new Text(values[0]), new Text(relation_type+"+"+child_name+"+"+parent_name));
                //右表
}

最后,Map函数的输出是这样的:

<"Lucy","1+Steven+Lucy">
<"Steven","2+Steven+Lucy">
<"Jack","1+Steven+Jack">
<"Steven","2+Steven+Jack">
<"Lucy","1+Jone+Lucy">
<"Jone","2+Jone+Lucy">
<"Jack","1+Jone+Jack">
<"Jone","2+Jone+Jack">
<"Mary","1+Lucy+Mary">
<"Lucy","2+Lucy+Mary">
<"Frank","1+Lucy+Frank">
<"Lucy","2+Lucy+Frank">
<"Alice","1+Jack+Alice">
<"Jack","2+Jack+Alice">
<"Jesse","1+Jack+Jesse">
<"Jack","2+Jack+Jesse">
<"Alice","1+David+Alice">
<"David","2+David+Alice">
<"Jesse","1+David+Jesse">
<"David","2+David+Jesse">
<"David","1+Philip+David">
<"Philip","2+Philip+David">
<"Alma","1+Philip+Alma">
<"Philip","2+Philip+Alma">
<"David","1+Mark+David">
<"Mark","2+Mark+David">
<"Alma","1+Mark+Alma">
<"Mark","2+Mark+Alma">

而在之后的shuffle过程中,Hadoop会根据主键进行自动的分区、排序和归并等操作,相同主键的键值对会被归并到一起,最后Reduce函数接收的输入其实是这样的:

<"Lucy",<"1+Steven+Lucy","1+Jone+Lucy","2+Lucy+Mary","2+Lucy+Frank">>
<"Steven",<"2+Steven+Lucy","2+Steven+Jack">>
<"Jack",<"1+Steven+Lucy","1+Jone+Lucy","2+Jack+Alice","2+Jack+Jesse">
<"Jone",<"2+Jone+Lucy","2+Jone+Jack">>
<"Mary",<"1+Lucy+Mary">>
<"Frank",<"1+Lucy+Frank">>
<"Allice",<"1+Jack+Alice","1+David+Alice">>
<"David",<"1+Philip+David","1+Mark+David","2+David+Alice","2+David+Jesse">>
<"Jesse",<"1+Jack+Jesse","1+David+Jesse">>
<"Philip",<"2+Philip+David","2+Philip+Alma">>
<"Alma",<"1+Philip+Alma","1+Mark+Alma">>
<"Mark",<"2+Mark+David","2+Mark+Alma">>

以下是实验的Reduce部分,可看出其要求的输入是<Text,Text>类型,输出同样是<Text,Text>类型,但实际上传入参数的值是以Text为泛型的可迭代对象

public static class Reduce extends Reducer<Text, Text, Text, Text>{
        public void reduce(Text key, Iterable<Text> values,Context context) throws IOException,InterruptedException{
            if(time == 0){   //输出表头
                context.write(new Text("grand_child"), new Text("grand_parent"));
                time++;
            }
            int grand_child_num = 0;
            String grand_child[] = new String[10];
            int grand_parent_num = 0;
            String grand_parent[]= new String[10];
            Iterator ite = values.iterator();
            while(ite.hasNext()){
                String record = ite.next().toString();
                int len = record.length();
                int i = 2;
                if(len == 0) continue;
                char relation_type = record.charAt(0);
                String child_name = new String();
                String parent_name = new String();
                //获取value-list中value的child
 
                while(record.charAt(i) != '+'){
                    child_name = child_name + record.charAt(i);
                    i++;
                }
                i=i+1;
                //获取value-list中value的parent
                while(i<len){
                    parent_name = parent_name+record.charAt(i);
                    i++;
                }
                //左表,取出child放入grand_child
                if(relation_type == '1'){
                    grand_child[grand_child_num] = child_name;
                    grand_child_num++;
                }
                else{//右表,取出parent放入grand_parent
                    grand_parent[grand_parent_num] = parent_name;
                    grand_parent_num++;
                }
            }
 
            if(grand_parent_num != 0 && grand_child_num != 0 ){
                for(int m = 0;m<grand_child_num;m++){
                    for(int n=0;n<grand_parent_num;n++){
                        context.write(new Text(grand_child[m]), new Text(grand_parent[n]));
                        //输出结果
                    }
                }
            }
        }
}

其内部代码主要做了如下功能:

if(time == 0){   //输出表头
    context.write(new Text("grand_child"), new Text("grand_parent"));
    time++;//已输出过的表头便不再输出
}
/*以下四句定义了孙辈名数组和祖辈名数组,并定义了各自的元素初始数量*/
int grand_child_num = 0;
String grand_child[] = new String[10];
int grand_parent_num = 0;
String grand_parent[]= new String[10];
/*
以下的代码遍历值列表,根据relation_type分别将不同的名字填入孙辈数组或祖辈数组中,以
<"Lucy",<"1+Steven+Lucy","1+Jone+Lucy","2+Lucy+Mary","2+Lucy+Frank">>
为例,最后的数组填充结果为grand_child = {Steven,Jone},grand_parent = {Mary,Frank},
即取出了当前主键所代表的人的所有子女和他/她的父母,以此来表示祖孙关系
*/
Iterator ite = values.iterator();
while(ite.hasNext()){
    String record = ite.next().toString();
    int len = record.length();
    int i = 2;
    if(len == 0) continue;
    char relation_type = record.charAt(0);
    String child_name = new String();
    String parent_name = new String();
    //获取value-list中value的child
    while(record.charAt(i) != '+'){
        child_name = child_name + record.charAt(i);
        i++;
    }
    i=i+1;
    //获取value-list中value的parent
    while(i<len){
        parent_name = parent_name+record.charAt(i);
        i++;
    }
    //左表,取出child放入grand_child
    if(relation_type == '1'){
        grand_child[grand_child_num] = child_name;
        grand_child_num++;
    }
    else{//右表,取出parent放入grand_parent
        grand_parent[grand_parent_num] = parent_name;
        grand_parent_num++;
    }
}
/*
这最后的代码通过将grand_child数组和grand_parent数组作笛卡尔乘积得出祖孙关
系的键值对,其外的判断条件限制了只有同时能找到子女和父母的人所代表的键值对
才会有输出,因为给定的原始输入文件不是所有人都能找到子女或父母,即只有形如
<"Lucy",<"1+Steven+Lucy","1+Jone+Lucy","2+Lucy+Mary","2+Lucy+Frank">>
这样值列表中同时有relation_type为1和2的项的才会执行这段代码,才会有输出,
而一个如上这样的键值对会对应4个Reduce输出,因为最后有三个输入的键值对满足
该条件,所以最后有12个新的键值对输出
*/
if(grand_parent_num != 0 && grand_child_num != 0 ){
    for(int m = 0;m<grand_child_num;m++){
        for(int n=0;n<grand_parent_num;n++){
            context.write(new Text(grand_child[m]), new 
Text(grand_parent[n]));

所以Reduce的输出结果是这样的:

<"grand_child","grand_parent">
<"Mark","Jesse">
<"Mark","Alice">
<"Philip","Jesse">
<"Philip","Alice">
<"Jone","Jesse">
<"Jone","Alice">
<"Steven","Jesse">
<"Steven","Alice">
<"Steven","Frank">
<"Steven","Mary">
<"Jone","Frank">
<"Jone","Mary">

最后经过OutputFormat的格式化处理,将键值对转换成字符流输出到文件,打印文件得出以下结果:
最后输出结果
以下是官方完整代码,来自于http://dblab.xmu.edu.cn/blog/2808-2/#more-2808

package com.simple_data_mining;
 
import java.io.IOException;
import java.util.*;
 
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.io.IntWritable;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.mapreduce.Job;
import org.apache.hadoop.mapreduce.Mapper;
import org.apache.hadoop.mapreduce.Reducer;
import org.apache.hadoop.mapreduce.lib.input.FileInputFormat;
import org.apache.hadoop.mapreduce.lib.output.FileOutputFormat;
import org.apache.hadoop.util.GenericOptionsParser;
 
public class simple_data_mining {
    public static int time = 0;
 
    /**
     * @param args
     * 输入一个child-parent的表格
     * 输出一个体现grandchild-grandparent关系的表格
     */
    //Map将输入文件按照空格分割成child和parent,然后正序输出一次作为右表,反序输出一次作为左表,需要注意的是在输出的value中必须加上左右表区别标志
    public static class Map extends Mapper<Object, Text, Text, Text>{
        public void map(Object key, Text value, Context context) throws IOException,InterruptedException{
            String child_name = new String();
            String parent_name = new String();
            String relation_type = new String();
            String line = value.toString();
            int i = 0;
            while(line.charAt(i) != ' '){
                i++;
            }
            String[] values = {line.substring(0,i),line.substring(i+1)};
            if(values[0].compareTo("child") != 0){
                child_name = values[0];
                parent_name = values[1];
                relation_type = "1";//左右表区分标志
                context.write(new Text(values[1]), new Text(relation_type+"+"+child_name+"+"+parent_name));
                //左表
                relation_type = "2";
                context.write(new Text(values[0]), new Text(relation_type+"+"+child_name+"+"+parent_name));
                //右表
            }
        }
    }
 
    public static class Reduce extends Reducer<Text, Text, Text, Text>{
        public void reduce(Text key, Iterable<Text> values,Context context) throws IOException,InterruptedException{
            if(time == 0){   //输出表头
                context.write(new Text("grand_child"), new Text("grand_parent"));
                time++;
            }
            int grand_child_num = 0;
            String grand_child[] = new String[10];
            int grand_parent_num = 0;
            String grand_parent[]= new String[10];
            Iterator ite = values.iterator();
            while(ite.hasNext()){
                String record = ite.next().toString();
                int len = record.length();
                int i = 2;
                if(len == 0) continue;
                char relation_type = record.charAt(0);
                String child_name = new String();
                String parent_name = new String();
                //获取value-list中value的child
 
                while(record.charAt(i) != '+'){
                    child_name = child_name + record.charAt(i);
                    i++;
                }
                i=i+1;
                //获取value-list中value的parent
                while(i<len){
                    parent_name = parent_name+record.charAt(i);
                    i++;
                }
                //左表,取出child放入grand_child
                if(relation_type == '1'){
                    grand_child[grand_child_num] = child_name;
                    grand_child_num++;
                }
                else{//右表,取出parent放入grand_parent
                    grand_parent[grand_parent_num] = parent_name;
                    grand_parent_num++;
                }
            }
 
            if(grand_parent_num != 0 && grand_child_num != 0 ){
                for(int m = 0;m<grand_child_num;m++){
                    for(int n=0;n<grand_parent_num;n++){
                        context.write(new Text(grand_child[m]), new Text(grand_parent[n]));
                        //输出结果
                    }
                }
            }
        }
    }
    public static void main(String[] args) throws Exception{
        // TODO Auto-generated method stub
        Configuration conf = new Configuration();
    conf.set("fs.default.name","hdfs://localhost:9000");
        String[] otherArgs = new String[]{"input","output"}; /* 直接设置输入参数 */
        if (otherArgs.length != 2) {
            System.err.println("Usage: wordcount <in><out>");
            System.exit(2);
            }
    Job job = Job.getInstance(conf,"Single table join");
        job.setJarByClass(simple_data_mining.class);
        job.setMapperClass(Map.class);
        job.setReducerClass(Reduce.class);
        job.setOutputKeyClass(Text.class);
        job.setOutputValueClass(Text.class);
        FileInputFormat.addInputPath(job, new Path(otherArgs[0]));
        FileOutputFormat.setOutputPath(job, new Path(otherArgs[1]));
        System.exit(job.waitForCompletion(true) ? 0 : 1);
 
    }
}
Logo

永洪科技,致力于打造全球领先的数据技术厂商,具备从数据应用方案咨询、BI、AIGC智能分析、数字孪生、数据资产、数据治理、数据实施的端到端大数据价值服务能力。

更多推荐