大数据——Spark RDD算子(四)创建键值对RDD mapToPair、flatMapToPair
大数据——Spark RDD算子(四)创建键值对RDD mapToPair、flatMapToPair示例文件mapToPairScala版本Java版本flatMapToPairScala版本Java版本,Spark2.0以上示例文件在同级目录下有一个文件夹in,文件夹in下有一个sample.txt,内容如下aa bb cc aa aa aa dd dd ee ee ee eeff aa bb
·
Spark RDD算子(四)创建键值对RDD mapToPair、flatMapToPair
示例文件
- 在同级目录下有一个文件夹in,文件夹in下有一个sample.txt,内容如下
aa bb cc aa aa aa dd dd ee ee ee ee
ff aa bb zks
ee kks
ee zz zks
mapToPair
Scala版本
- Scala中是没有mapToPair函数的,只需要用map就行了
- 将每一行的第一个单词作为键,1作为value创建pairRDD
package nj.zb.sparkstu
import org.apache.spark.{SparkConf, SparkContext}
object MapToPairScala {
def main(args: Array[String]): Unit = {
val conf = new SparkConf().setMaster("local[*]").setAppName("maptopair")
val sc = new SparkContext(conf)
//map
val lines = sc.textFile("in/sample.txt")
val pairs = lines.map(x=>(x.split(" ")(0),1))
pairs.collect.foreach(println)
}
}
结果展示:
Java版本
- 将每一行的第一个单词作为键,1作为value创建pairRDD
package nj.zb.sparkstu;
import org.apache.spark.SparkConf;
import org.apache.spark.api.java.JavaPairRDD;
import org.apache.spark.api.java.JavaRDD;
import org.apache.spark.api.java.JavaSparkContext;
import org.apache.spark.api.java.function.PairFlatMapFunction;
import org.apache.spark.api.java.function.PairFunction;
import scala.Int;
import scala.Tuple2;
import java.util.ArrayList;
import java.util.Iterator;
import java.util.List;
public class MapToPairJava {
public static void main(String[] args) {
SparkConf conf = new SparkConf().setMaster("local[*]").setAppName("maptopair");
JavaSparkContext sc = new JavaSparkContext(conf);
JavaRDD<String> lines = sc.textFile("in/sample.txt");
//mapToPair
JavaPairRDD<String, Integer> pairRdd = lines.mapToPair(new PairFunction<String, String, Integer>() {
@Override
public Tuple2<String, Integer> call(String s) throws Exception {
String[] split = s.split(" ");
String key = split[0];
return new Tuple2<>(key, 1);
//return new Tuple2<>(s.split(" ")[0],1);
}
});
List<Tuple2<String, Integer>> collect = pairRdd.collect();
for (Tuple2 str :
collect) {
System.out.println(str
);
}
}
}
结果展示:
flatMapToPair
- 类似于xxx连接mapToPair是一对一,一个元素返回一个元素,而flatMapToPair可以一个元素返回多个,相当于先flatMap,再mapToPair
Scala版本
- 将每一个单词都分成键值对
package nj.zb.sparkstu
import org.apache.spark.{SparkConf, SparkContext}
object MapToPairScala {
def main(args: Array[String]): Unit = {
val conf = new SparkConf().setMaster("local[*]").setAppName("maptopair")
val sc = new SparkContext(conf)
//flatMap
val pairRdd = sc.textFile("in/sample.txt").flatMap(x=>x.split(" ")).map(x=>(x,1))
pairRdd.collect.foreach(println)
//pairRdd collect() foreach println
}
}
结果展示:
Java版本,Spark2.0以上
- 主要是iterator和iteratable的一些区别
- 将每一个单词都分成键值对
package nj.zb.sparkstu;
import org.apache.spark.SparkConf;
import org.apache.spark.api.java.JavaPairRDD;
import org.apache.spark.api.java.JavaRDD;
import org.apache.spark.api.java.JavaSparkContext;
import org.apache.spark.api.java.function.PairFlatMapFunction;
import org.apache.spark.api.java.function.PairFunction;
import scala.Int;
import scala.Tuple2;
import java.util.ArrayList;
import java.util.Iterator;
import java.util.List;
public class MapToPairJava {
public static void main(String[] args) {
SparkConf conf = new SparkConf().setMaster("local[*]").setAppName("maptopair");
JavaSparkContext sc = new JavaSparkContext(conf);
JavaRDD<String> lines = sc.textFile("in/sample.txt");
//flatMapToPair
JavaPairRDD<String, Integer> stringIntegerJavaPairRDD = lines.flatMapToPair(new PairFlatMapFunction<String, String, Integer>() {
@Override
public Iterator<Tuple2<String, Integer>> call(String s) throws Exception {
List<Tuple2<String, Integer>> list = new ArrayList<>();
String[] split = s.split(" ");
for (int i = 0; i < split.length; i++) {
String key = split[i];
Tuple2<String, Integer> tup2 = new Tuple2<>(key, 1);
list.add(tup2);
}
return list.iterator();
}
});
List<Tuple2<String, Integer>> collect = stringIntegerJavaPairRDD.collect();
for (Tuple2<String, Integer> tup2 :
collect) {
System.out.println("key:"+tup2._1+" value:"+tup2._2);
}
}
}
结果展示:
更多推荐
所有评论(0)