Hadoop集群大数据解决方案之MapReduce 程序实战进阶(QQ/微博好友推荐实现)(七)
准 备 经常玩QQ、微博的朋友不难发现,时不时浏览到图1类似的界面,说是你可能认识的人,是否需要加对方为好友,你发现其中有些人是真的认识,有些人是不认识的,那么这个QQ、微博好友推荐掌法是怎么练成的,现在我们来一探武功秘籍。图1 QQ好友推荐界面需求 转化一下其实就是,已知a和b是好友,b和c是好友,那么得出a和c科能认识,推荐a和c是否加好友,典型的二度关系算法,当然以此类推还...
准 备
经常玩QQ、微博的朋友不难发现,时不时浏览到图1类似的界面,说是你可能认识的人,是否需要加对方为好友,你发现其中有些人是真的认识,有些人是不认识的,那么这个QQ、微博好友推荐掌法是怎么练成的,现在我们来一探武功秘籍。
需 求
转化一下其实就是,已知a和b是好友,b和c是好友,那么得出a和c科能认识,推荐a和c是否加好友,典型的二度关系算法,当然以此类推还有三度,四度,五度,六度等等,这里就只算二度关系吧,太多了博主不会,写不来!
题外故事
美国社会心理学家StanleyMilgram在1967年提出的著名的“六度分隔”理论———在优化的情况下,你只需要通过6个人,就可以结识任何你想要认识的人。StanleyMilgram在1967年做了一次连锁信试验,实验的结果证明了这一观点。你也许不认识比尔·盖茨,但你只需要通过六个人就可以结识他。“六度分隔”说明了社会中普遍存在一些“弱链接”关系,这一关系其实可以发挥非常强大的作用,很多人在找工作或托人办事时会体会到这种弱链接的能量。“六度分隔”过去只是一种理论,但互联网的出现和迅速发展,使这一理论的实现成为可能!
算法剖析
测试数据,用QQ号一堆数字,看起来太抽象,就用一堆单词代替吧,你们姑且认为他是QQ昵称,但是实际计算的时候肯定是用具有唯一性约束的QQ号,不会用QQ昵称,这里重在说明问题,测试数据如下:
hadoop hello
hdfs word
tom cat
cat dog
hello word
hdfs myname
yangfeng lau
lau lili
myname yangfeng
算法:二度关系,a和b有关系,b和c有关系,那么a可能认识c;
map 输出:
key:数据第一列 value:数据第二列
再反转一下
key:数据第二列 value:数据第一列
经过Shuffle后相同的key放到了一起,key为同一个QQ号的,value为该QQ号的所有好友,对value做一下笛卡尔积,得到两两一对除去自己本身的两个QQ号即为可能认识的人。
实 战
实现的Mapper 如下:
package qq_friends_recmd;
import org.apache.hadoop.io.LongWritable;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.mapreduce.Mapper;
import java.io.IOException;
public class QFRMapper extends Mapper<LongWritable,Text,Text, Text>
{
//重写map
@Override
protected void map(LongWritable key, Text value, Context context) throws IOException, InterruptedException
{
String line = value.toString();
String[] ss=line.split("\t");
context.write(new Text(ss[0]),new Text(ss[1]));
//记得主从反过来再输出一次
context.write(new Text(ss[1]),new Text(ss[0]));
}
}
实现的Reducer如下:
package qq_friends_recmd;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.mapreduce.Reducer;
import java.awt.event.ItemEvent;
import java.io.IOException;
import java.util.HashSet;
import java.util.Iterator;
import java.util.Set;
public class QFRReducer extends Reducer<Text,Text,Text, Text>
{
@Override
protected void reduce(Text key, Iterable<Text> values, Context context) throws IOException, InterruptedException
{
//将reduce输入的values 迭代器的值赋给自建 的一个集合
Set<String> myset= new HashSet<String>();
for (Text t:values)
{
myset.add(t.toString());
}
//如果同一个人有两个好友才计算二度关系,不然没必要
if(myset.size()>1)
{
//将集合内的值做笛卡尔积,得到的两两一对的就是二度人脉关系
for(Iterator j= myset.iterator();j.hasNext();)
{
String name =(String) j.next();
for(Iterator i = myset.iterator();i.hasNext();)
{
String other =(String) i.next();
if(!name.equals(other))//排除自己等于自己
{
context.write(new Text(name),new Text(other));
}
}
}
}
}
}
Shuffle使用默认的,main函数的class实现如下:
package qq_friends_recmd;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.FileSystem;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.mapreduce.Job;
import org.apache.hadoop.mapreduce.lib.input.FileInputFormat;
import org.apache.hadoop.mapreduce.lib.output.FileOutputFormat;
public class QFRRunJob {
public static void main(String[] args)
{
//获取环境变量,设置提交该Job的mapred.job.tracker
Configuration conf =new Configuration();
//配置mapreduce.job.tracker,
//和集群mapred-site.xml里面的属性 保持一致即可,
//此句也可以不写,直接省略。
// conf.set("mapreduce.job.tracker","dw-cluster-master:9001");
try
{
//mapreduce输出结果会自动创建folder,
//但是如果指定的输出target folder如果已存在,是会报错的,
//这段是做容错,可以让程序rerun
Path outputPath= new Path(args[2]);
FileSystem fileSystem =FileSystem.get(conf);
if(fileSystem.exists(outputPath)){
fileSystem.delete(outputPath,true);
System.out.println("outputPath is exist,but has deleted!");
}
Job myjob= Job.getInstance(conf);
myjob.setJarByClass(QFRRunJob.class);//指定调用的WcJobRun Class打成Jar再跑
myjob.setMapperClass(QFRMapper.class);//指定Map类
myjob.setReducerClass(QFRReducer.class);//指定Reduce类
myjob.setMapOutputKeyClass(Text.class);//指定Map的输出key类型
myjob.setMapOutputValueClass(Text.class);//指定Map输出的value的类型
myjob.setNumReduceTasks(1);//指定reduce的个数,有7个年份
//为什么用args[1],因为args[0]第一个参数留给main方法所在的Class
FileInputFormat.addInputPath(myjob,new Path(args[1]));//指定整个Job的输入文件路径,args[1]表示调用Jar包时,紧跟Jar包的第二个参数
//FileInputFormat.addInputPath(myjob,new Path("/tmp/wcinput/wordcount.xt"));
//指定整个Job的输出文件路径,args[2]表示调用Jar包时,紧跟Jar包的第三个参数
FileOutputFormat.setOutputPath(myjob,new Path(args[2]));
//FileOutputFormat.setOutputPath(myjob,new Path("/tmp/wcoutput"));
System.exit(myjob.waitForCompletion(true)?0:1);//等待Job完成,正确完成则退出
}
catch (Exception e)
{
e.printStackTrace();
}
}
}
部署和调用
打包部署请参考Hadoop集群大数据解决方案之IDE配Maven实现MapReduce 程序实战(五)的打包部署
将jar包上传到集群,调用指令参考如下:
[liuxiaowei@dw-cluster-master qq_friends]$ hadoop jar ./qq_friends_recommend.jar QFRRunJob /tmp/input/mydata.txt /tmp/output/qq_friends/
执行流程如下:
outputPath is exist,but has deleted!
20/02/04 11:54:01 INFO client.RMProxy: Connecting to ResourceManager at dw-cluster-master/10.216.10.141:8032
20/02/04 11:54:01 WARN mapreduce.JobResourceUploader: Hadoop command-line option parsing not performed. Implement the Tool interface and execute your application with ToolRunner to remedy this.
20/02/04 11:54:02 INFO input.FileInputFormat: Total input files to process : 1
20/02/04 11:54:02 INFO mapreduce.JobSubmitter: number of splits:1
20/02/04 11:54:02 INFO mapreduce.JobSubmitter: Submitting tokens for job: job_1578394893972_0058
20/02/04 11:54:02 INFO impl.YarnClientImpl: Submitted application application_1578394893972_0058
20/02/04 11:54:02 INFO mapreduce.Job: The url to track the job: http://dw-cluster-master:8088/proxy/application_1578394893972_0058/
20/02/04 11:54:02 INFO mapreduce.Job: Running job: job_1578394893972_0058
20/02/04 11:54:07 INFO mapreduce.Job: Job job_1578394893972_0058 running in uber mode : false
20/02/04 11:54:07 INFO mapreduce.Job: map 0% reduce 0%
20/02/04 11:54:13 INFO mapreduce.Job: map 100% reduce 0%
20/02/04 11:54:18 INFO mapreduce.Job: map 100% reduce 100%
20/02/04 11:54:18 INFO mapreduce.Job: Job job_1578394893972_0058 completed successfully
20/02/04 11:54:18 INFO mapreduce.Job: Counters: 49
File System Counters
FILE: Number of bytes read=242
FILE: Number of bytes written=321593
FILE: Number of read operations=0
FILE: Number of large read operations=0
FILE: Number of write operations=0
HDFS: Number of bytes read=212
HDFS: Number of bytes written=164
HDFS: Number of read operations=6
HDFS: Number of large read operations=0
HDFS: Number of write operations=2
Job Counters
Launched map tasks=1
Launched reduce tasks=1
Data-local map tasks=1
Total time spent by all maps in occupied slots (ms)=3038
Total time spent by all reduces in occupied slots (ms)=2298
Total time spent by all map tasks (ms)=3038
Total time spent by all reduce tasks (ms)=2298
Total vcore-milliseconds taken by all map tasks=3038
Total vcore-milliseconds taken by all reduce tasks=2298
Total megabyte-milliseconds taken by all map tasks=3110912
Total megabyte-milliseconds taken by all reduce tasks=2353152
Map-Reduce Framework
Map input records=9
Map output records=18
Map output bytes=200
Map output materialized bytes=242
Input split bytes=112
Combine input records=0
Combine output records=0
Reduce input groups=11
Reduce shuffle bytes=242
Reduce input records=18
Reduce output records=14
Spilled Records=36
Shuffled Maps =1
Failed Shuffles=0
Merged Map outputs=1
GC time elapsed (ms)=100
CPU time spent (ms)=1670
Physical memory (bytes) snapshot=1479749632
Virtual memory (bytes) snapshot=12751675392
Total committed heap usage (bytes)=1515716608
Shuffle Errors
BAD_ID=0
CONNECTION=0
IO_ERROR=0
WRONG_LENGTH=0
WRONG_MAP=0
WRONG_REDUCE=0
File Input Format Counters
Bytes Read=100
File Output Format Counters
Bytes Written=164
最终结果预览:
[liuxiaowei@dw-cluster-master qq_friends]$ hadoop fs -cat /tmp/output/qq_friends/* tom dog
dog tom
myname word
word myname
hadoop word
word hadoop
yangfeng lili
lili yangfeng
yangfeng hdfs
hdfs yangfeng
hdfs hello
hello hdfs
lau myname
myname lau
MapReduce篇章结束寄语
到这里mapreduce篇章博主就不想写了,主要是随着hive,impala和spark等等大数组的百花齐放,mapreduce暴露出了很多的不足,计算慢,开发难度高等等,让他在实际运用中很难有一席之地,但是mapreduce始终是大数据分布式计算最原始最稳定的计算框架,千里之行始于足下,在这说来,mapreduce会了,对其他工具的理解更快,他就像编程语言中的C语言,武功秘籍中的九阳神功,有它护体,练什么都快。
Hive系列传送门:Hive从入门到放弃 目录
更多推荐
所有评论(0)