main

Main.java

package shixun.part1.main;

import shixun.part1.select.SelectByCombine;
import shixun.part1.select.SelectBySituation;

import java.io.IOException;
import java.util.Scanner;

public class Main {

	public static void main(String[] args) throws Exception {
		int op;
		boolean flag = true;
		Scanner scan = new Scanner(System.in);
		String time = "";
		String uid = "";
		String key = "";
		String url = "";
		while (flag) {
			System.out.println("---------------------欢迎使用SogouQ数据服务-------------------");
			System.out.println("\t\t1.按照时间查询");
			System.out.println("\t\t2.按照user_id查询");
			System.out.println("\t\t3.按照关键词查询");
			System.out.println("\t\t4.按照访问url查询");
			System.out.println("\t\t5.组合查询");
			System.out.println("\t\t0.退出系统");
			System.out.println("请输入操作码(0-5):");
			op = scan.nextInt();
			switch(op){
			case 1:
				//2.时间查询
				System.out.print("请输入时间范围(00:00:00-23:59:59)[起始时间与结束时间以\"|\"分隔]:");
				time = scan.next();
				SelectBySituation.SelectByTime(time);
				break;
			case 2:
				//1.uid查询
				System.out.print("请输入uid[多个uid以\"|\"分隔]:");
				uid = scan.next();
				SelectBySituation.SelectByUid(uid);
				break;
			case 3:
				//3.关键词查询
				System.out.print("请输入关键字[多个关键字以\"|\"分隔]:");
				key = scan.next();
				SelectBySituation.SelectByKey(key);
				break;
			case 4:
				//4.关键词查询URL
				System.out.print("请输入url(例:baidu)[多个url以\"|\"分隔]:");
				url = scan.next();
				SelectBySituation.SelectByKeyForURL(url);
				break;
			case 5:
				//5.组合查询
				System.out.print("请输入时间范围(00:00:00-23:59:59)[起始时间与结束时间以\"|\"分隔][不填写清填写\"|\"]:");
				time = scan.next();
				System.out.print("请输入user_id[多个uid以\"|\"分隔][不填写清填写\"|\"]:");
				uid = scan.next();
				System.out.print("请输入关键字[多个关键字以\"|\"分隔][不填写清填写\"|\"]:");
				key = scan.next();
				System.out.print("请输入url(例:baidu)[多个url以\"|\"分隔][不填写清填写\"|\"]:");
				url = scan.next();
				String situation = time+"+"+uid+"+"+key+"+"+url;
				SelectByCombine.SelectCombine(situation);
				break;
			case 0:
				flag = false;
				break;
			default:
				System.out.println("输入操作指令有误,清重新输入!");
				break;
			}
		}
		scan.close();
	}
}

DataImport.java

package shixun.part1.main;

import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.hbase.HBaseConfiguration;
import org.apache.hadoop.hbase.HColumnDescriptor;
import org.apache.hadoop.hbase.HTableDescriptor;
import org.apache.hadoop.hbase.TableName;
import org.apache.hadoop.hbase.client.*;

import java.io.BufferedReader;
import java.io.FileInputStream;
import java.io.IOException;
import java.io.InputStreamReader;

public class DataImport {
    public static Configuration configuration;
    public static Connection connection;
    public static Admin admin;

    public static void createTable(String tableName, String[] fields) throws IOException {
        init();
        TableName tablename = TableName.valueOf(tableName);
        System.out.println(admin.tableExists(tablename));
        if (admin.tableExists(tablename)) {
            System.out.println("table is exists!");
            admin.disableTable(tablename);
            admin.deleteTable(tablename);
        }
        HTableDescriptor hTableDescriptor = new HTableDescriptor(tablename);
        for (String str : fields) {
            HColumnDescriptor hColumnDescriptor = new HColumnDescriptor(str);
            hTableDescriptor.addFamily(hColumnDescriptor);
        }
        admin.createTable(hTableDescriptor);
        close();
    }

    public static void init() {
        configuration = HBaseConfiguration.create();
//        configuration.set("hbase.root.dir", "hdfs://172.18.0.2:9000/hbase");
        configuration.set("hbase.zookeeper.quorum","172.18.0.2");
        configuration.set("hbase.zookeeper.property.clientPort","2181");
        try {
            connection = ConnectionFactory.createConnection(configuration);
            admin = connection.getAdmin();
        } catch (IOException e) {
            e.printStackTrace();
        }
    }

    public static void close() {
        try {
            if (admin != null) {
                admin.close();
            }
            if (null != connection) {
                connection.close();
            }
        } catch (IOException e) {
            e.printStackTrace();
        }
    }

    public static void main(String[] args) {
        //创建表
//        String[] fields = {"info"};
//        try {
//            createTable("sougouqqqq", fields);
//
//        } catch (IOException e) {
//            e.printStackTrace();
//        }

        //数据导入
        String[] fields = {"info:time", "info:user_id", "info:search_word","info:url_return_rank","info:click_num","info:url_click"};
        try {
            addRecord("sougouq", fields);
        } catch (IOException e) {
            e.printStackTrace();
        }

    }
    public static void addRecord(String tableName, String[] fields) throws IOException {
        init();
        Table table = connection.getTable(TableName.valueOf(tableName));
        try {
            FileInputStream in = new FileInputStream("/home/zsl/Downloads/SougouQ.txt");
            InputStreamReader inReader = new InputStreamReader(in, "gbk");
            BufferedReader bufReader = new BufferedReader(inReader);
            String line = null;
            int i = 1;
            while((line = bufReader.readLine()) != null){
                if(i>1670122){
                    String[] splited = line.split("\\s+");
                    Put put = new Put(Integer.toString(i).getBytes());
                    for (int j = 0; j != fields.length; j++) {
                        String[] cols = fields[j].split(":");
                        put.addColumn(cols[0].getBytes(), cols[1].getBytes(), splited[j].getBytes());
//                    System.out.println(splited[j]);
                    }
                    table.put(put);
                }
                i++;
//                if(i==2)break;
            }
            System.out.println("hanshu:" + i);
            System.out.println(line);
            bufReader.close();
            inReader.close();
            in.close();
        } catch (Exception e) {
            e.printStackTrace();
            System.out.println("读取" + "/home/zsl/Download/SougouQ.txt" + "出错!");
        }
        table.close();
        close();
    }
}

utils

Utils.java

package shixun.part1.utils;

import java.io.IOException;
import java.util.Iterator;

import org.apache.commons.lang3.StringUtils;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.hbase.Cell;
import org.apache.hadoop.hbase.CellScanner;
import org.apache.hadoop.hbase.CellUtil;
import org.apache.hadoop.hbase.HBaseConfiguration;
import org.apache.hadoop.hbase.TableName;
import org.apache.hadoop.hbase.client.Admin;
import org.apache.hadoop.hbase.client.Connection;
import org.apache.hadoop.hbase.client.ConnectionFactory;
import org.apache.hadoop.hbase.client.Result;
import org.apache.hadoop.hbase.client.ResultScanner;
import org.apache.hadoop.hbase.client.Scan;
import org.apache.hadoop.hbase.client.Table;
import org.apache.hadoop.hbase.filter.Filter;

public class Utils {
	public static Configuration configuration;
	public static Connection connection;
	public static Admin admin;
	
	//建立连接
	public static void init() {
		configuration = new Configuration();
//		HBaseConfiguration.addHbaseResources(configuration);
//		configuration.set("hbase.rootdir", "hdfs://Master:9000/hbase");
//		configuration.set("hbase.zookeeper.quorum", "Slave1");
//	    configuration.set("hbase.zookeeper.property.clientPort", "2181");
		configuration.set("hbase.zookeeper.quorum","172.18.0.2");
		configuration.set("hbase.zookeeper.property.clientPort","2181");
		try {
			connection = ConnectionFactory.createConnection(configuration);
			admin = connection.getAdmin();
		} catch(IOException e) {
			e.printStackTrace();
		}
	}
		
	//结束连接
	public static void close() {
		try {
			if(admin != null) {
				admin.close();
			}
			if(null != connection) {
				connection.close();
			}
		} catch(IOException e) {
			e.printStackTrace();
		}
	}
	
	public static Table getTable() {
		return getTable("sougouq");
	}
	
	public static Table getTable(String tableName) {
		init();
		Table table = null;
//		System.out.println(tableName);
		try {
			if(StringUtils.isEmpty(tableName)) return null;
			table = connection.getTable(TableName.valueOf(tableName));
		} catch (IOException e) {
			// TODO Auto-generated catch block
			e.printStackTrace();
		}
		return table;
	}
	
	public static void showResult(Result result) {
		//表格扫描器
		CellScanner cellScanner = result.cellScanner();
		try {
			while(cellScanner.advance()) {
				Cell cell = cellScanner.current();
				
				//列族:列名=列值
				System.out.println(new String(CellUtil.cloneFamily(cell),"utf-8")+
						":"+new String(CellUtil.cloneQualifier(cell),"utf-8")+
						"="+new String(CellUtil.cloneValue(cell),"utf-8"));
			}
			System.out.println();
		}catch(Exception e) {
			e.printStackTrace();
		}
	}
	
	public static void showScan(Table table, Scan scan) {
		try {
			//结果扫描器
			ResultScanner resultScanner = table.getScanner(scan);
			//进行迭代
			Iterator<Result> iterator = resultScanner.iterator();
			while(iterator.hasNext()) {
				Result result = iterator.next();
				showResult(result);
			}
		}catch (Exception e) {
			e.printStackTrace();
		}
	}
	
	public static void showFilter(Filter filter) {
		Scan scan = new Scan();
		scan.setFilter(filter);
		Table table = getTable();
		showScan(table,scan);
	}
}

select

SelectBySituation.java

package shixun.part1.select;

import org.apache.hadoop.hbase.client.Scan;
import org.apache.hadoop.hbase.filter.CompareFilter.CompareOp;
import org.apache.hadoop.hbase.filter.FilterList;
import org.apache.hadoop.hbase.filter.SingleColumnValueFilter;
import org.apache.hadoop.hbase.filter.SubstringComparator;
import org.apache.hadoop.hbase.util.Bytes;
import shixun.part1.utils.Utils;


public class SelectBySituation {

	static String[] situations = {};
	//根据开始和结束时间查询
	public static void SelectByTime(String situation) {
		situations = situation.split("\\|");
		FilterList filterList = new FilterList(FilterList.Operator.MUST_PASS_ALL);
		SingleColumnValueFilter filterStart = 
				new SingleColumnValueFilter(Bytes.toBytes("info"), Bytes.toBytes("time"), CompareOp.GREATER_OR_EQUAL, Bytes.toBytes(situations[0]));
		filterStart.setFilterIfMissing(true);
		SingleColumnValueFilter filterEnd = 
				new SingleColumnValueFilter(Bytes.toBytes("info"), Bytes.toBytes("time"), CompareOp.LESS_OR_EQUAL, Bytes.toBytes(situations[1]));
		filterEnd.setFilterIfMissing(true);
		filterList.addFilter(filterStart);
		filterList.addFilter(filterEnd);
		Utils.showFilter(filterList);
		Utils.close();
	}
	
	//根据用户id查询
	public static void SelectByUid(String situation) {
		situations = situation.split("\\|");
		FilterList filterList = new FilterList(FilterList.Operator.MUST_PASS_ONE);
		for(int i=0; i<situations.length; i++ ) {
			SingleColumnValueFilter filter = 
					new SingleColumnValueFilter(Bytes.toBytes("info"), Bytes.toBytes("user_id"), CompareOp.EQUAL, Bytes.toBytes(situations[i]));
			filter.setFilterIfMissing(true);
			filterList.addFilter(filter);
		}
		Utils.showFilter(filterList);
		Utils.close();
	}
	
	//根据关键字查询
	public static void SelectByKey(String situation) {
		situations = situation.split("\\|");
		FilterList filterList = new FilterList(FilterList.Operator.MUST_PASS_ONE);
		for(int i=0; i<situations.length; i++ ) {
			SingleColumnValueFilter filter = 
					new SingleColumnValueFilter(Bytes.toBytes("info"), Bytes.toBytes("search_word"), CompareOp.EQUAL, new SubstringComparator(situations[i]));
			filter.setFilterIfMissing(true);
			filterList.addFilter(filter);
		}
		Utils.showFilter(filterList);
		Utils.close();
	}
	
	//根据关键字查询访问URL列表信息
	public static void SelectByKeyForURL(String situation) {
		situations = situation.split("\\|");
		FilterList filterList = new FilterList(FilterList.Operator.MUST_PASS_ONE);
		for(int i=0; i<situations.length; i++ ) {
			SingleColumnValueFilter filter = 
					new SingleColumnValueFilter(Bytes.toBytes("info"), Bytes.toBytes("url_click"), CompareOp.EQUAL, new SubstringComparator(situations[i]));
			filter.setFilterIfMissing(true);
			filterList.addFilter(filter);
		}
		Utils.showFilter(filterList);
		Utils.close();
	}
}

SelectByCombine.java

package shixun.part1.select;

import org.apache.commons.lang3.StringUtils;
import org.apache.hadoop.hbase.filter.FilterList;
import org.apache.hadoop.hbase.filter.SingleColumnValueFilter;
import org.apache.hadoop.hbase.filter.SubstringComparator;
import org.apache.hadoop.hbase.filter.CompareFilter.CompareOp;
import org.apache.hadoop.hbase.util.Bytes;
import shixun.part1.utils.Utils;

public class SelectByCombine {
	
	static String[] situations = {};
	static String[] s_split = {};
	public static void SelectCombine(String situation) {
		situations = situation.split("\\+");
		FilterList filterList = new FilterList(FilterList.Operator.MUST_PASS_ALL);
		for(int i=0; i<situations.length; i++) {
			if(StringUtils.isEmpty(situations[i])) continue;
			s_split  = situations[i].split("\\|");
			for(String s: s_split) {
				System.out.println(s+"-"+i);
			}
			//设置过滤器
			switch(i) {
			case 0:
				FilterList filterList0 = new FilterList(FilterList.Operator.MUST_PASS_ALL);
				SingleColumnValueFilter filterStart = new SingleColumnValueFilter(Bytes.toBytes("info"),
						Bytes.toBytes("time"), CompareOp.GREATER_OR_EQUAL, Bytes.toBytes(s_split[0]));
				filterStart.setFilterIfMissing(true);
				SingleColumnValueFilter filterEnd = new SingleColumnValueFilter(Bytes.toBytes("info"),
						Bytes.toBytes("time"), CompareOp.LESS_OR_EQUAL, Bytes.toBytes(s_split[1]));
				filterEnd.setFilterIfMissing(true);
				filterList0.addFilter(filterStart);
				filterList0.addFilter(filterEnd);
				filterList.addFilter(filterList0);
				break;
			case 1:
				FilterList filterList1 = new FilterList(FilterList.Operator.MUST_PASS_ONE);
				for(int j=0; j<s_split.length; j++ ) {
					SingleColumnValueFilter filterUid = 
							new SingleColumnValueFilter(Bytes.toBytes("info"), Bytes.toBytes("user_id"), CompareOp.EQUAL, Bytes.toBytes(s_split[j]));
					filterUid.setFilterIfMissing(true);
					filterList1.addFilter(filterUid);
				}
				filterList.addFilter(filterList1);
				break;
			case 2:
				FilterList filterList2 = new FilterList(FilterList.Operator.MUST_PASS_ONE);
				for(int k=0; k<s_split.length; k++ ) {
					SingleColumnValueFilter filterKey = 
							new SingleColumnValueFilter(Bytes.toBytes("info"), Bytes.toBytes("search_word"), CompareOp.EQUAL, new SubstringComparator(s_split[k]));
					filterKey.setFilterIfMissing(true);
					filterList2.addFilter(filterKey);
				}
				filterList.addFilter(filterList2);
				break;
			case 3:
				FilterList filterList3 = new FilterList(FilterList.Operator.MUST_PASS_ONE);
				for(int m=0; m<s_split.length; m++ ) {
					SingleColumnValueFilter filterUrl = 
							new SingleColumnValueFilter(Bytes.toBytes("info"), Bytes.toBytes("url_click"), CompareOp.EQUAL, new SubstringComparator(s_split[m]));
					filterUrl.setFilterIfMissing(true);
					filterList3.addFilter(filterUrl);
				}
				filterList.addFilter(filterList3);
				break;
			}
		}
		Utils.showFilter(filterList);
		Utils.close();
	}
}

Logo

永洪科技,致力于打造全球领先的数据技术厂商,具备从数据应用方案咨询、BI、AIGC智能分析、数字孪生、数据资产、数据治理、数据实施的端到端大数据价值服务能力。

更多推荐