MapReduce实战项目——芝加哥的犯罪数据分析
数据来源是https://www.kaggle.com/currie32/crimes-in-chicago以下是数据集的预览数据各个标签的含义如下//ID --> 身份证IDCase Number --> 案件编号Date --> 事件发生的日期Block --> 事件发生地的部分编辑地址,将其放置在与实际地址相同的块上。IUCR --> 犯罪报告代码//Prima
数据来源是https://www.kaggle.com/currie32/crimes-in-chicago
以下是数据集的预览
数据各个标签的含义如下
//ID --> 身份证ID
Case Number --> 案件编号
Date --> 事件发生的日期
Block --> 事件发生地的部分编辑地址,将其放置在与实际地址相同的块上。
IUCR --> 犯罪报告代码
//Primary Type --> 主要类型(主要描述的IUCR代码)
//Description --> 描述(IUCR代码的次要描述,主要描述的子类别)
Location Description --> 地点描述(事件发生地点的描述)
Arrest --> 逮捕(表明是否有人被逮捕)
Domestic --> 家庭暴力(表明事件是否与伊利诺伊州家庭暴力法案所定义的家庭暴力有关)
Beat --> 节拍(表示事故发生的节拍,巡逻是最小的警察地理区域,每个巡逻区都有一辆专用的警车。三到五个街区组成了一个警察区,三个街区组成了一个警察区。芝加哥警察局有22个警区)
District --> 地区(显示事件发生的警区)
//Ward --> 病房-发生事故的病房(市议会分区)
Community Area --> 社区区域(表示事件发生的社区区域,芝加哥有77个社区)
FBI Code --> 联邦调查局代码(表明了联邦调查局国家事件报告系统(nibrs)中的犯罪分类)
//X Coordinate --> X 坐标(事故发生地点的 x 坐标,在伊利诺斯州东部1983年的投影上。这个位置是从实际位置移动的部分编校,但落在同一块。)
//Y Coordinate --> Y 坐标(事故发生地点的 y 坐标,在伊利诺斯州东部1983年的投影上。这个位置是从实际位置移动的部分编校,但落在同一块)
Year --> 事故发生的年份
//Updated On --> 更新了记录上次更新的日期和时间
//Location --> 位置——事件发生的位置,其格式允许在此数据门户上创建地图和其他地理操作
标签前面加了 // 的,表明我决定舍弃这个标签
下面是具体的代码部分
1.按警区进行分组然后统计每组的案件数量。由此看出不同警区的犯罪次数的区别,哪些警区的案件数量少,说明警区的治安条件好,哪些警区的案件数量多,说明该警区的治理出现了问题,需要重点关注。
//这里的代码实现的是按地区District进行分组,然后统计案件Case Number的数量
import java.io.IOException;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.io.LongWritable;
import org.apache.hadoop.io.IntWritable;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.mapreduce.Job;
import org.apache.hadoop.mapreduce.Mapper;
import org.apache.hadoop.mapreduce.Reducer;
import org.apache.hadoop.mapreduce.Mapper.Context;
import org.apache.hadoop.mapreduce.lib.input.FileInputFormat;
import org.apache.hadoop.mapreduce.lib.output.FileOutputFormat;
public class Crime {
private static class CMapper extends Mapper<LongWritable , Text, Text, Text>{
Text dis=new Text();
Text cnumber =new Text();
protected void map(LongWritable key, Text value, Context context) throws IOException, InterruptedException{
String line = value.toString();
String[] l = line.split(",");
dis.set(l[11]);
cnumber.set(l[1]);
context.write(dis, cnumber);
}
}
public static class CReduce extends Reducer<Text, Text, Text, IntWritable>{
private int sum=0;
public void reduce(Text key, Iterable<Text> values, Context context) throws IOException, InterruptedException{
for(Text t : values){
sum=sum+1;
}
context.write(new Text(key),new IntWritable(sum));
}
}
public static void main(String[] args) throws Exception {
Configuration conf=new Configuration();
Job job=Job.getInstance(conf,"crime1");
job.setJobName("crime1");
job.setJarByClass(Crime.class);
job.setMapperClass(CMapper.class);
job.setMapOutputKeyClass(Text.class);
job.setMapOutputValueClass(Text.class);
job.setReducerClass(CReduce.class);
job.setOutputKeyClass(Text.class);
job.setOutputValueClass(IntWritable.class);
FileInputFormat.addInputPath(job, new Path(args[0]));
FileOutputFormat.setOutputPath(job,new Path(args[1]));
job.waitForCompletion(true);
}
}
这里是按从小到大的顺序进行警区的排序,由此看出犯罪数量最少的警区是警区1,犯罪数量最多的是警区934,判断出来警区1 的治安比较好。
2.这里是按警区进行分组,然后统计每一组的逮捕数量,逮捕数量即为警察在案件发生后,成功抓捕犯罪人员的次数。逮捕数量高,说明这个警区的警察抓捕成功率更高,更容易抓到犯罪人员。
//这里的代码实现的是按地区District进行分组,然后统计逮捕数
import java.io.IOException;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.io.LongWritable;
import org.apache.hadoop.io.IntWritable;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.mapreduce.Job;
import org.apache.hadoop.mapreduce.Mapper;
import org.apache.hadoop.mapreduce.Reducer;
import org.apache.hadoop.mapreduce.Mapper.Context;
import org.apache.hadoop.mapreduce.lib.input.FileInputFormat;
import org.apache.hadoop.mapreduce.lib.output.FileOutputFormat;
public class Crime {
private static class CMapper extends Mapper<LongWritable , Text, Text, IntWritable>{
Text dis=new Text();
//private final static IntWritable one = new IntWritable();
protected void map(LongWritable key, Text value, Context context) throws IOException, InterruptedException{
String line = value.toString();
String[] l = line.split(",");
dis.set(l[11]);
String arr=l[8];
String a ="FALSE";
int b=0;
if (arr.equals(a)){
b=1;
}
else {
b=0;
}
if(b==0){
context.write(dis, new IntWritable(1));
}
}
}
public static class CReduce extends Reducer<Text, IntWritable, Text, IntWritable>{
public void reduce(Text key, Iterable<IntWritable> values, Context context) throws IOException, InterruptedException{
int count = 0;
for (IntWritable val : values) {
count = count +val.get();
}
context.write(new Text(key), new IntWritable(count));
}
}
public static void main(String[] args) throws Exception {
Configuration conf=new Configuration();
Job job=Job.getInstance(conf,"crime1");
job.setJobName("crime1");
job.setJarByClass(Crime.class);
job.setMapperClass(CMapper.class);
job.setMapOutputKeyClass(Text.class);
job.setMapOutputValueClass(IntWritable.class);
job.setReducerClass(CReduce.class);
job.setOutputKeyClass(Text.class);
job.setOutputValueClass(IntWritable.class);
FileInputFormat.addInputPath(job, new Path(args[0]));
FileOutputFormat.setOutputPath(job,new Path(args[1]));
job.waitForCompletion(true);
}
}
3.将上面两个分析的结果文件合并在一起,文件合并,合并后的文件名称为ave
import java.io.IOException;
import java.util.Vector;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.io.LongWritable;
import org.apache.hadoop.io.IntWritable;
import org.apache.hadoop.io.FloatWritable;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.mapreduce.Job;
import org.apache.hadoop.mapreduce.Mapper;
import org.apache.hadoop.mapreduce.Reducer;
import org.apache.hadoop.mapreduce.Mapper.Context;
import org.apache.hadoop.mapreduce.lib.input.FileInputFormat;
import org.apache.hadoop.mapreduce.lib.input.FileSplit;
import org.apache.hadoop.mapreduce.lib.output.FileOutputFormat;
public class Crime {
private static class CMapper extends Mapper<LongWritable , Text, Text, Text>{
private FileSplit inputsplit;
int cnum=0;
int arr1=0;
Text dis=new Text();
protected void map(LongWritable key, Text value, Context context) throws IOException, InterruptedException{
inputsplit = (FileSplit)context.getInputSplit();
String filename = inputsplit.getPath().getName();
if(filename.contains("cnumber")){
String s1 = value.toString();
String[] split1 = s1.split("\t");
dis=new Text(split1[0]);
cnum=Integer.parseInt(split1[1]);
context.write(dis, new Text("cnumber"+cnum));
}
if(filename.contains("arr")){
String s2 = value.toString();
String[] split2 = s2.split("\t");
dis=new Text(split2[0]);
arr1=Integer.parseInt(split2[1]);
context.write(dis, new Text("arr"+arr1));
}
}
}
public static class CReduce extends Reducer<Text, Text, Text, Text>{
public void reduce(Text key, Iterable<IntWritable> values, Context context) throws IOException, InterruptedException{
Vector<String> a = new Vector<String>();
Vector<String> b = new Vector<String>();
for(IntWritable value : values){
String line = value.toString();
if(line.startsWith("cnumber")){
a.add(line.substring("cnumber".length()));
}
if(line.startsWith("arr")){
b.add(line.substring("arr".length()));
}
}
for(String w1 : a) {
for(String w2 : b){
context.write(new Text(key+"/t"+w1),new Text(w2));
}
}
}
}
public static void main(String[] args) throws Exception {
Configuration conf=new Configuration();
Job job=Job.getInstance(conf,"crime1");
job.setJobName("crime1");
job.setJarByClass(Crime.class);
job.setMapperClass(CMapper.class);
job.setMapOutputKeyClass(Text.class);
job.setMapOutputValueClass(Text.class);
job.setReducerClass(CReduce.class);
job.setOutputKeyClass(Text.class);
job.setOutputValueClass(Text.class);
FileInputFormat.addInputPath(job, new Path(args[0]));
FileOutputFormat.setOutputPath(job,new Path(args[1]));
job.waitForCompletion(true);
}
}
4.计算每个警区的逮捕率,逮捕率即说明各个警区的破案效率,逮捕率高说明破案速度更快,该警区的治安情况就可能会更好。
这里写了一个ave类,用来放入案件数量和逮捕数量。
import java.io.IOException;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.io.LongWritable;
import org.apache.hadoop.io.IntWritable;
import org.apache.hadoop.io.DoubleWritable;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.mapreduce.Job;
import org.apache.hadoop.mapreduce.Mapper;
import org.apache.hadoop.mapreduce.Reducer;
import org.apache.hadoop.mapreduce.Mapper.Context;
import org.apache.hadoop.mapreduce.lib.input.FileInputFormat;
import org.apache.hadoop.mapreduce.lib.input.FileSplit;
import org.apache.hadoop.mapreduce.lib.output.FileOutputFormat;
import java.math.BigDecimal;
import java.text.ParseException;
public class Crime {
private static class CMapper extends Mapper<LongWritable , Text, Text, ave>{
private ave a =new ave();
protected void map(LongWritable key, Text value, Context context) throws IOException, InterruptedException{
String line=value.toString();
String[] s=line.split("\t");
String dis=s[0];
int num1=Integer.parseInt(s[1]);
int arr1=Integer.parseInt(s[2]);
a.setnum(num1);
a.setarr(arr1);
context.write(new Text(dis),a);
}
}
public static class CReduce extends Reducer<Text, ave, Text, DoubleWritable>{
public void reduce(Text key, Iterable<ave> values, Context context) throws IOException, InterruptedException{
int ave_num=0;
int ave_arr=0;
for(ave bean : values){
ave_num=bean.getnum();
ave_arr=bean.getarr();
double ave_a = new BigDecimal((float)ave_arr/ave_num).setScale(2, BigDecimal.ROUND_HALF_UP).doubleValue();
context.write(key, new DoubleWritable(ave_a));
}
}
}
public static void main(String[] args) throws Exception {
Configuration conf=new Configuration();
Job job=Job.getInstance(conf,"crime1");
job.setJobName("crime1");
job.setJarByClass(Crime.class);
job.setMapperClass(CMapper.class);
job.setMapOutputKeyClass(Text.class);
job.setMapOutputValueClass(ave.class);
job.setReducerClass(CReduce.class);
job.setOutputKeyClass(Text.class);
job.setOutputValueClass(DoubleWritable.class);
FileInputFormat.addInputPath(job, new Path(args[0]));
FileOutputFormat.setOutputPath(job,new Path(args[1]));
job.waitForCompletion(true);
}
}
上面这张图是我在reduce任务中加入了打印命令,查看一个reduce任务中的变量值。这里可以看到1号警区的逮捕数量是10373,案件数量是32200,因此逮捕率即为逮捕数量除以案件数量。结果为0.32
这个是对上个逻辑分析结果文件ave的处理结果的部分截图
更多推荐
所有评论(0)