编程要求

完成矩阵乘法的 Map 函数和 Reduce 函数
1、设计两个矩阵(3050,50100),在每个单元格中填入一个 0-99 的随机数,并写入
两个文件中,作为 Map 函数的输入
2、测试运行矩阵乘法的 MapReduce 框架,并将结果输出到新的结果文件中

思路

  在矩阵乘法的过程中,很容易发现可以并行运算前一矩阵的行与后一矩阵的列的乘法和加法运算,且这个过程相互独立,互不影响。因此对于矩阵乘法可以考虑利用 mapreduce 的框架进行运算,来提高工作效率。
  根据我们平时计算矩阵乘法的过程,每次都是先算乘法,再算加法,但这个过程并不适应 mapreduce 的框架。通过对得到矩阵乘法结果的全部过程分析,会发现两个矩阵中的每个元素计算的次数都是有限的、有规律的,如前一矩阵中的第一个元素只需计算后一矩阵的列数的次数。
  有了上述的发现,又由于 mapreduce 的计算特点,就可以利用 mapreduce 来计算矩阵乘法,提高效率。
  除了以上的思路,而对于矩阵的存储进行一定的设计。当矩阵的维数较小时,将矩阵存在一个文件中,是没有任何问题的。但当矩阵的维数为几十万时,且矩阵为稀疏矩阵时,可以选择(i,j,A[I,j])来存储,更为方便。

步骤

  1. 添加执行权限:在终端输入“chmod+x ./genMatrix.sh”命令添加执行权限;
  2. 生成矩阵文档:输入“./genMatrix.sh 30 50 100”命令生成一个 3050 和 50100 的两
    个矩阵文件,并将其放入到 hdfs 文件系统上。
  3. 初始化工作:在执行 Map 任务前,用 setup 方法进行相关变量或者资源的集中初始化
    工作,获取 columnM 和 rowM 的值。
  4. map 读取矩阵文件:map 先获取文件名,然后从矩阵文件中读取一行内容,格式为:
    i,j Mij。利用 split()方法将获取到的内容从“,”分割,得到一个 String 类型的数组 tuple,数
    组包含元素的信息。
       再获取元素详细内容时,先匹配该元素是来自哪个矩阵,这是因为 map 读取的内容是
    被分为切片的块,而输出的内容中要有区别两个矩阵的标识。匹配后,tuple[0]为元素所在
    的行号 i,而 tuple[1]包含列号和元素的值,因此再次用 split()从“\t”处分割,分别得到列
    号 j 和元素的值 Mij或 Njk。
       由于每个元素只需计算前一矩阵的行数或后一矩阵的列数,因此,循环 columnN 或
    rowM 次,将输出的键值对的 key 设为(I,k),value 设为(M/N,j,Mij/Njk)。
  5. MatrixReducer 获取元素,计算乘法和加法:在 reduce 中,先获取同一个 key=(I,k)下,
    Mij 和 Njk,并分别放入一维数组 M[]和 N[]中。然后对有相同下标 j(0<=j<column)的 M[j]和
    N[j]相乘累加。
  6. 编译执行:用相应的命令编译执行,最后用“hdfs dfs -cat output/*”命令将计算结果显
    示。

代码

1、全局变量
/** mapper和reducer需要的三个必要变量,由conf.get()方法得到 **/
  public static int rowM = 0;
  public static int columnM = 0;
  public static int columnN = 0;
2、Mapper方法
public static class MatrixMapper extends Mapper<LongWritable, Text, Text, Text> {

    /**
     * 执行map()函数前先由conf.get()得到main函数中提供的必要变量, 这也是MapReduce中共享变量的一种方式
     */
    public void setup(Context context) throws IOException {
      Configuration conf = context.getConfiguration();
      columnN = Integer.parseInt(conf.get("columnN"));
      rowM = Integer.parseInt(conf.get("rowM"));
    }

    public void map(LongWritable key, Text value, Context context)
        throws IOException, InterruptedException {
      /** 得到输入文件名,从而区分输入矩阵M和N **/
      FileSplit fileSplit = (FileSplit) context.getInputSplit();
      String fileName = fileSplit.getPath().getName();

      if (fileName.contains("M")) {
        //TODO:行号i,列号j,数字Mij,根据矩阵N的任意列号k,输出(i,k)->(M,j,Mij)
        //输出k次
	String[] t = value.toString().split("\t");
	for(int i=1; i<=columnN; i++){
	    mappedKey.set( t[0].substring(0, t[0]. indexOf(","))+","+k);
	    mappedValue.set("M"+ ","+ t[0].substring(t[0].indexOf(",")+1)+ "," + t[1]);
	    context.write(mappedKey,mappedValue);
	}
      }

      else if (fileName.contains("N")) {
        //TODO:行号j,列号k,数字Njk,根据矩阵M的任意行号i,输出(i,k)->(N,j,Njk)
        //
	String[] t = value.toString().split("\t");
	for(int i=1; i<=rowM; i++){
	    mappedKey.set( i+ "," +t[0].substring(0, t[0]. indexOf(",")+1));
	    mappedValue.set("N"+ ","+ t[0].substring(t[0].indexOf(","))+ "," + t[1]);
	    context.write(mappedKey,mappedValue);
	}
      }
    }
  }

3、Reducer
public static class MatrixReducer extends Reducer<Text, Text, Text, Text> {
    private int sum = 0;

    public void setup(Context context) throws IOException {
      Configuration conf = context.getConfiguration();
      columnM = Integer.parseInt(conf.get("columnM"));
    }

    public void reduce(Text key, Iterable<Text> values, Context context)
        throws IOException, InterruptedException {
      int[] M = new int[columnM + 1];
      int[] N = new int[columnM + 1];

      //TODO:获取同一个key=(i,k)下,Mij=M[j]和Njk=N[j]


      /** 根据j值,对M[j]和N[j]进行相乘累加得到乘积矩阵的数据 **/
      for (int j = 1; j < columnM + 1; j++) {
        sum += M[j] * N[j];
      }
      context.write(key, new Text(Integer.toString(sum)));
      sum = 0;
    }
  }
4、main
 public static void main(String[] args) throws Exception {

  if (args.length != 3) {
    System.err
        .println("Usage: MatrixMultiply <inputPathM> <inputPathN> <outputPath>");
    System.exit(2);
  } else {
    String[] infoTupleM = args[0].split("_");
    rowM = Integer.parseInt(infoTupleM[1]);
    columnM = Integer.parseInt(infoTupleM[2]);
    String[] infoTupleN = args[1].split("_");
    columnN = Integer.parseInt(infoTupleN[2]);
  }

  Configuration conf = new Configuration();
  /** 设置三个全局共享变量 **/
  conf.setInt("rowM", rowM);
  conf.setInt("columnM", columnM);
  conf.setInt("columnN", columnN);

  Job job = new Job(conf, "MatrixMultiply");
  job.setJarByClass(MatrixMultiply.class);
  job.setMapperClass(MatrixMapper.class);
  job.setReducerClass(MatrixReducer.class);
  job.setOutputKeyClass(Text.class);
  job.setOutputValueClass(Text.class);
  FileInputFormat.setInputPaths(job, new Path(args[0]), new Path(args[1]));
  FileOutputFormat.setOutputPath(job, new Path(args[2]));
  System.exit(job.waitForCompletion(true) ? 0 : 1);
}
}

运行截图

在这里插入图片描述

Logo

永洪科技,致力于打造全球领先的数据技术厂商,具备从数据应用方案咨询、BI、AIGC智能分析、数字孪生、数据资产、数据治理、数据实施的端到端大数据价值服务能力。

更多推荐