大数据课设(hbase和mapreduce)
hbase
·
hbase部分
main
Main.java
package shixun.part1.main;
import shixun.part1.select.SelectByCombine;
import shixun.part1.select.SelectBySituation;
import java.io.IOException;
import java.util.Scanner;
public class Main {
public static void main(String[] args) throws Exception {
int op;
boolean flag = true;
Scanner scan = new Scanner(System.in);
String time = "";
String uid = "";
String key = "";
String url = "";
while (flag) {
System.out.println("---------------------欢迎使用SogouQ数据服务-------------------");
System.out.println("\t\t1.按照时间查询");
System.out.println("\t\t2.按照user_id查询");
System.out.println("\t\t3.按照关键词查询");
System.out.println("\t\t4.按照访问url查询");
System.out.println("\t\t5.组合查询");
System.out.println("\t\t0.退出系统");
System.out.println("请输入操作码(0-5):");
op = scan.nextInt();
switch(op){
case 1:
//2.时间查询
System.out.print("请输入时间范围(00:00:00-23:59:59)[起始时间与结束时间以\"|\"分隔]:");
time = scan.next();
SelectBySituation.SelectByTime(time);
break;
case 2:
//1.uid查询
System.out.print("请输入uid[多个uid以\"|\"分隔]:");
uid = scan.next();
SelectBySituation.SelectByUid(uid);
break;
case 3:
//3.关键词查询
System.out.print("请输入关键字[多个关键字以\"|\"分隔]:");
key = scan.next();
SelectBySituation.SelectByKey(key);
break;
case 4:
//4.关键词查询URL
System.out.print("请输入url(例:baidu)[多个url以\"|\"分隔]:");
url = scan.next();
SelectBySituation.SelectByKeyForURL(url);
break;
case 5:
//5.组合查询
System.out.print("请输入时间范围(00:00:00-23:59:59)[起始时间与结束时间以\"|\"分隔][不填写清填写\"|\"]:");
time = scan.next();
System.out.print("请输入user_id[多个uid以\"|\"分隔][不填写清填写\"|\"]:");
uid = scan.next();
System.out.print("请输入关键字[多个关键字以\"|\"分隔][不填写清填写\"|\"]:");
key = scan.next();
System.out.print("请输入url(例:baidu)[多个url以\"|\"分隔][不填写清填写\"|\"]:");
url = scan.next();
String situation = time+"+"+uid+"+"+key+"+"+url;
SelectByCombine.SelectCombine(situation);
break;
case 0:
flag = false;
break;
default:
System.out.println("输入操作指令有误,清重新输入!");
break;
}
}
scan.close();
}
}
DataImport.java
package shixun.part1.main;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.hbase.HBaseConfiguration;
import org.apache.hadoop.hbase.HColumnDescriptor;
import org.apache.hadoop.hbase.HTableDescriptor;
import org.apache.hadoop.hbase.TableName;
import org.apache.hadoop.hbase.client.*;
import java.io.BufferedReader;
import java.io.FileInputStream;
import java.io.IOException;
import java.io.InputStreamReader;
public class DataImport {
public static Configuration configuration;
public static Connection connection;
public static Admin admin;
public static void createTable(String tableName, String[] fields) throws IOException {
init();
TableName tablename = TableName.valueOf(tableName);
System.out.println(admin.tableExists(tablename));
if (admin.tableExists(tablename)) {
System.out.println("table is exists!");
admin.disableTable(tablename);
admin.deleteTable(tablename);
}
HTableDescriptor hTableDescriptor = new HTableDescriptor(tablename);
for (String str : fields) {
HColumnDescriptor hColumnDescriptor = new HColumnDescriptor(str);
hTableDescriptor.addFamily(hColumnDescriptor);
}
admin.createTable(hTableDescriptor);
close();
}
public static void init() {
configuration = HBaseConfiguration.create();
// configuration.set("hbase.root.dir", "hdfs://172.18.0.2:9000/hbase");
configuration.set("hbase.zookeeper.quorum","172.18.0.2");
configuration.set("hbase.zookeeper.property.clientPort","2181");
try {
connection = ConnectionFactory.createConnection(configuration);
admin = connection.getAdmin();
} catch (IOException e) {
e.printStackTrace();
}
}
public static void close() {
try {
if (admin != null) {
admin.close();
}
if (null != connection) {
connection.close();
}
} catch (IOException e) {
e.printStackTrace();
}
}
public static void main(String[] args) {
//创建表
// String[] fields = {"info"};
// try {
// createTable("sougouqqqq", fields);
//
// } catch (IOException e) {
// e.printStackTrace();
// }
//数据导入
String[] fields = {"info:time", "info:user_id", "info:search_word","info:url_return_rank","info:click_num","info:url_click"};
try {
addRecord("sougouq", fields);
} catch (IOException e) {
e.printStackTrace();
}
}
public static void addRecord(String tableName, String[] fields) throws IOException {
init();
Table table = connection.getTable(TableName.valueOf(tableName));
try {
FileInputStream in = new FileInputStream("/home/zsl/Downloads/SougouQ.txt");
InputStreamReader inReader = new InputStreamReader(in, "gbk");
BufferedReader bufReader = new BufferedReader(inReader);
String line = null;
int i = 1;
while((line = bufReader.readLine()) != null){
if(i>1670122){
String[] splited = line.split("\\s+");
Put put = new Put(Integer.toString(i).getBytes());
for (int j = 0; j != fields.length; j++) {
String[] cols = fields[j].split(":");
put.addColumn(cols[0].getBytes(), cols[1].getBytes(), splited[j].getBytes());
// System.out.println(splited[j]);
}
table.put(put);
}
i++;
// if(i==2)break;
}
System.out.println("hanshu:" + i);
System.out.println(line);
bufReader.close();
inReader.close();
in.close();
} catch (Exception e) {
e.printStackTrace();
System.out.println("读取" + "/home/zsl/Download/SougouQ.txt" + "出错!");
}
table.close();
close();
}
}
utils
Utils.java
package shixun.part1.utils;
import java.io.IOException;
import java.util.Iterator;
import org.apache.commons.lang3.StringUtils;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.hbase.Cell;
import org.apache.hadoop.hbase.CellScanner;
import org.apache.hadoop.hbase.CellUtil;
import org.apache.hadoop.hbase.HBaseConfiguration;
import org.apache.hadoop.hbase.TableName;
import org.apache.hadoop.hbase.client.Admin;
import org.apache.hadoop.hbase.client.Connection;
import org.apache.hadoop.hbase.client.ConnectionFactory;
import org.apache.hadoop.hbase.client.Result;
import org.apache.hadoop.hbase.client.ResultScanner;
import org.apache.hadoop.hbase.client.Scan;
import org.apache.hadoop.hbase.client.Table;
import org.apache.hadoop.hbase.filter.Filter;
public class Utils {
public static Configuration configuration;
public static Connection connection;
public static Admin admin;
//建立连接
public static void init() {
configuration = new Configuration();
// HBaseConfiguration.addHbaseResources(configuration);
// configuration.set("hbase.rootdir", "hdfs://Master:9000/hbase");
// configuration.set("hbase.zookeeper.quorum", "Slave1");
// configuration.set("hbase.zookeeper.property.clientPort", "2181");
configuration.set("hbase.zookeeper.quorum","172.18.0.2");
configuration.set("hbase.zookeeper.property.clientPort","2181");
try {
connection = ConnectionFactory.createConnection(configuration);
admin = connection.getAdmin();
} catch(IOException e) {
e.printStackTrace();
}
}
//结束连接
public static void close() {
try {
if(admin != null) {
admin.close();
}
if(null != connection) {
connection.close();
}
} catch(IOException e) {
e.printStackTrace();
}
}
public static Table getTable() {
return getTable("sougouq");
}
public static Table getTable(String tableName) {
init();
Table table = null;
// System.out.println(tableName);
try {
if(StringUtils.isEmpty(tableName)) return null;
table = connection.getTable(TableName.valueOf(tableName));
} catch (IOException e) {
// TODO Auto-generated catch block
e.printStackTrace();
}
return table;
}
public static void showResult(Result result) {
//表格扫描器
CellScanner cellScanner = result.cellScanner();
try {
while(cellScanner.advance()) {
Cell cell = cellScanner.current();
//列族:列名=列值
System.out.println(new String(CellUtil.cloneFamily(cell),"utf-8")+
":"+new String(CellUtil.cloneQualifier(cell),"utf-8")+
"="+new String(CellUtil.cloneValue(cell),"utf-8"));
}
System.out.println();
}catch(Exception e) {
e.printStackTrace();
}
}
public static void showScan(Table table, Scan scan) {
try {
//结果扫描器
ResultScanner resultScanner = table.getScanner(scan);
//进行迭代
Iterator<Result> iterator = resultScanner.iterator();
while(iterator.hasNext()) {
Result result = iterator.next();
showResult(result);
}
}catch (Exception e) {
e.printStackTrace();
}
}
public static void showFilter(Filter filter) {
Scan scan = new Scan();
scan.setFilter(filter);
Table table = getTable();
showScan(table,scan);
}
}
select
SelectBySituation.java
package shixun.part1.select;
import org.apache.hadoop.hbase.client.Scan;
import org.apache.hadoop.hbase.filter.CompareFilter.CompareOp;
import org.apache.hadoop.hbase.filter.FilterList;
import org.apache.hadoop.hbase.filter.SingleColumnValueFilter;
import org.apache.hadoop.hbase.filter.SubstringComparator;
import org.apache.hadoop.hbase.util.Bytes;
import shixun.part1.utils.Utils;
public class SelectBySituation {
static String[] situations = {};
//根据开始和结束时间查询
public static void SelectByTime(String situation) {
situations = situation.split("\\|");
FilterList filterList = new FilterList(FilterList.Operator.MUST_PASS_ALL);
SingleColumnValueFilter filterStart =
new SingleColumnValueFilter(Bytes.toBytes("info"), Bytes.toBytes("time"), CompareOp.GREATER_OR_EQUAL, Bytes.toBytes(situations[0]));
filterStart.setFilterIfMissing(true);
SingleColumnValueFilter filterEnd =
new SingleColumnValueFilter(Bytes.toBytes("info"), Bytes.toBytes("time"), CompareOp.LESS_OR_EQUAL, Bytes.toBytes(situations[1]));
filterEnd.setFilterIfMissing(true);
filterList.addFilter(filterStart);
filterList.addFilter(filterEnd);
Utils.showFilter(filterList);
Utils.close();
}
//根据用户id查询
public static void SelectByUid(String situation) {
situations = situation.split("\\|");
FilterList filterList = new FilterList(FilterList.Operator.MUST_PASS_ONE);
for(int i=0; i<situations.length; i++ ) {
SingleColumnValueFilter filter =
new SingleColumnValueFilter(Bytes.toBytes("info"), Bytes.toBytes("user_id"), CompareOp.EQUAL, Bytes.toBytes(situations[i]));
filter.setFilterIfMissing(true);
filterList.addFilter(filter);
}
Utils.showFilter(filterList);
Utils.close();
}
//根据关键字查询
public static void SelectByKey(String situation) {
situations = situation.split("\\|");
FilterList filterList = new FilterList(FilterList.Operator.MUST_PASS_ONE);
for(int i=0; i<situations.length; i++ ) {
SingleColumnValueFilter filter =
new SingleColumnValueFilter(Bytes.toBytes("info"), Bytes.toBytes("search_word"), CompareOp.EQUAL, new SubstringComparator(situations[i]));
filter.setFilterIfMissing(true);
filterList.addFilter(filter);
}
Utils.showFilter(filterList);
Utils.close();
}
//根据关键字查询访问URL列表信息
public static void SelectByKeyForURL(String situation) {
situations = situation.split("\\|");
FilterList filterList = new FilterList(FilterList.Operator.MUST_PASS_ONE);
for(int i=0; i<situations.length; i++ ) {
SingleColumnValueFilter filter =
new SingleColumnValueFilter(Bytes.toBytes("info"), Bytes.toBytes("url_click"), CompareOp.EQUAL, new SubstringComparator(situations[i]));
filter.setFilterIfMissing(true);
filterList.addFilter(filter);
}
Utils.showFilter(filterList);
Utils.close();
}
}
SelectByCombine.java
package shixun.part1.select;
import org.apache.commons.lang3.StringUtils;
import org.apache.hadoop.hbase.filter.FilterList;
import org.apache.hadoop.hbase.filter.SingleColumnValueFilter;
import org.apache.hadoop.hbase.filter.SubstringComparator;
import org.apache.hadoop.hbase.filter.CompareFilter.CompareOp;
import org.apache.hadoop.hbase.util.Bytes;
import shixun.part1.utils.Utils;
public class SelectByCombine {
static String[] situations = {};
static String[] s_split = {};
public static void SelectCombine(String situation) {
situations = situation.split("\\+");
FilterList filterList = new FilterList(FilterList.Operator.MUST_PASS_ALL);
for(int i=0; i<situations.length; i++) {
if(StringUtils.isEmpty(situations[i])) continue;
s_split = situations[i].split("\\|");
for(String s: s_split) {
System.out.println(s+"-"+i);
}
//设置过滤器
switch(i) {
case 0:
FilterList filterList0 = new FilterList(FilterList.Operator.MUST_PASS_ALL);
SingleColumnValueFilter filterStart = new SingleColumnValueFilter(Bytes.toBytes("info"),
Bytes.toBytes("time"), CompareOp.GREATER_OR_EQUAL, Bytes.toBytes(s_split[0]));
filterStart.setFilterIfMissing(true);
SingleColumnValueFilter filterEnd = new SingleColumnValueFilter(Bytes.toBytes("info"),
Bytes.toBytes("time"), CompareOp.LESS_OR_EQUAL, Bytes.toBytes(s_split[1]));
filterEnd.setFilterIfMissing(true);
filterList0.addFilter(filterStart);
filterList0.addFilter(filterEnd);
filterList.addFilter(filterList0);
break;
case 1:
FilterList filterList1 = new FilterList(FilterList.Operator.MUST_PASS_ONE);
for(int j=0; j<s_split.length; j++ ) {
SingleColumnValueFilter filterUid =
new SingleColumnValueFilter(Bytes.toBytes("info"), Bytes.toBytes("user_id"), CompareOp.EQUAL, Bytes.toBytes(s_split[j]));
filterUid.setFilterIfMissing(true);
filterList1.addFilter(filterUid);
}
filterList.addFilter(filterList1);
break;
case 2:
FilterList filterList2 = new FilterList(FilterList.Operator.MUST_PASS_ONE);
for(int k=0; k<s_split.length; k++ ) {
SingleColumnValueFilter filterKey =
new SingleColumnValueFilter(Bytes.toBytes("info"), Bytes.toBytes("search_word"), CompareOp.EQUAL, new SubstringComparator(s_split[k]));
filterKey.setFilterIfMissing(true);
filterList2.addFilter(filterKey);
}
filterList.addFilter(filterList2);
break;
case 3:
FilterList filterList3 = new FilterList(FilterList.Operator.MUST_PASS_ONE);
for(int m=0; m<s_split.length; m++ ) {
SingleColumnValueFilter filterUrl =
new SingleColumnValueFilter(Bytes.toBytes("info"), Bytes.toBytes("url_click"), CompareOp.EQUAL, new SubstringComparator(s_split[m]));
filterUrl.setFilterIfMissing(true);
filterList3.addFilter(filterUrl);
}
filterList.addFilter(filterList3);
break;
}
}
Utils.showFilter(filterList);
Utils.close();
}
}
更多推荐
所有评论(0)