大数据——把Kafka中的数据传输到HBase中
日志数据导入一级目录二级目录三级目录一级目录二级目录三级目录总体package nj.zb.kb09.kafkatohbase;import org.apache.hadoop.conf.Configuration;import org.apache.hadoop.hbase.HBaseConfiguration;import org.apache.hadoop.hbase.TableName;i
·
把Kafka中的数据传输到HBase中
查看topic中的消息
- 查看需要传输的topic中的消息数量
//event_attendees
[root@hadoop100 ~]# kafka-run-class.sh kafka.tools.GetOffsetShell --broker-list hadoop100:9092 --topic event_attendees --time -1 --offsets 1
//events
[root@hadoop100 ~]# kafka-run-class.sh kafka.tools.GetOffsetShell --broker-list hadoop100:9092 --topic events --time -1 --offsets 1
//train
[root@hadoop100 ~]# kafka-run-class.sh kafka.tools.GetOffsetShell --broker-list hadoop100:9092 --topic train --time -1 --offsets 1
//user_friends
[root@hadoop100 ~]# kafka-run-class.sh kafka.tools.GetOffsetShell --broker-list hadoop100:9092 --topic user_friends --time -1 --offsets 1
//users
[root@hadoop100 ~]# kafka-run-class.sh kafka.tools.GetOffsetShell --broker-list hadoop100:9092 --topic users --time -1 --offsets 1
在HBase中创建topic所需要的表
- 查看命名空间
hbase(main):001:0> list_namespace
- 创建命名空间
hbase(main):001:0> create_namesppace 'events_db'
- 创建表
//events
hbase(main):002:0> create 'events_db:events','schedule','location','creator','remark'
//users
hbase(main):003:0> create 'events_db:users','profile','region','registration'
//event_attendee
hbase(main):004:0> create 'events_db:event_attendee','euat'
//train
hbase(main):005:0> create 'events_db:train','eu'
//user_friend
hbase(main):006:0> create 'events_db:user_friend','uf'
- 查看表
hbase(main):007:0> list_namespace_tables 'events_db'
编写Java程序传输数据
- event_attendees
package nj.zb.kb09.kafkatohbase;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.hbase.HBaseConfiguration;
import org.apache.hadoop.hbase.TableName;
import org.apache.hadoop.hbase.client.Connection;
import org.apache.hadoop.hbase.client.ConnectionFactory;
import org.apache.hadoop.hbase.client.Put;
import org.apache.hadoop.hbase.client.Table;
import org.apache.hadoop.hbase.util.Bytes;
import org.apache.kafka.clients.consumer.ConsumerConfig;
import org.apache.kafka.clients.consumer.ConsumerRecord;
import org.apache.kafka.clients.consumer.ConsumerRecords;
import org.apache.kafka.clients.consumer.KafkaConsumer;
import org.apache.kafka.common.serialization.StringDeserializer;
import java.io.IOException;
import java.time.Duration;
import java.util.ArrayList;
import java.util.Collections;
import java.util.Properties;
public class UserFriendTOHbase {
public static void main(String[] args) {
//kafka客户端属性配置
Properties prop = new Properties();
prop.put(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG,"192.168.136.100:9092");
prop.put(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class);
prop.put(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG,StringDeserializer.class);
//会话窗口3秒
prop.put(ConsumerConfig.SESSION_TIMEOUT_MS_CONFIG,30000);
//不自动提交
prop.put(ConsumerConfig.ENABLE_AUTO_COMMIT_CONFIG, false);
prop.put(ConsumerConfig.AUTO_COMMIT_INTERVAL_MS_CONFIG, 1000);
prop.put(ConsumerConfig.GROUP_ID_CONFIG, "userFriend");
prop.put(ConsumerConfig.AUTO_OFFSET_RESET_CONFIG, "earliest");
KafkaConsumer<String, String> consumer = new KafkaConsumer<>(prop);
consumer.subscribe(Collections.singleton("user_friends"));
//配置hbase信息 连接hbase数据库
Configuration conf = HBaseConfiguration.create();
conf.set("hbase.rootdir","hdfs://192.168.136.100:9000/hbase");
conf.set("hbase.zookeeper.quorum","192.168.136.100");
conf.set("hbase.zookeeper.property.clientPort","2181");
try {
Connection connection= ConnectionFactory.createConnection(conf);
Table eventAttendTable = connection.getTable(TableName.valueOf("events_db:user_friend"));
while (true){
ConsumerRecords<String, String> poll = consumer.poll(Duration.ofMillis(100));
ArrayList<Put> datas = new ArrayList<Put>();
for (ConsumerRecord<String, String> p:
poll){
System.out.println(p.value());
String[] split = p.value().split(",");
Put put = new Put(Bytes.toBytes((split[0] + split[1] .hashCode())));
put.addColumn("uf".getBytes(),"userid".getBytes(),split[0].getBytes());
put.addColumn("uf".getBytes(),"friendid".getBytes(),split[1].getBytes());
datas.add(put);
}
eventAttendTable.put(datas);
}
} catch (IOException e) {
e.printStackTrace();
}
}
}
使用设计模式编写Java程序传输数据
- IWorker
package nj.zb.kb09.kafkahbaseUserfriendgj;
public interface IWorker {
public void fillData();
}
- IWriter
package nj.zb.kb09.kafkahbaseUserfriendgj;
import org.apache.kafka.clients.consumer.ConsumerRecords;
import java.io.IOException;
public interface IWriter {
public int write(ConsumerRecords<String, String>records,String tableName) throws IOException;
}
- IParseRecord
package nj.zb.kb09.kafkahbaseUserfriendgj;
import org.apache.hadoop.hbase.client.Put;
import org.apache.kafka.clients.consumer.ConsumerRecords;
import java.util.List;
/*
* 组装者
* 将kafka消费的信息,通过加工转化,得到List<Put>对象,用于hbase存储使用
* */
public interface IParseRecord {
public List<Put> parse(ConsumerRecords<String,String> records);
}
- ParentWorker
package nj.zb.kb09.kafkahbaseUserfriendgj;
import org.apache.kafka.clients.consumer.ConsumerConfig;
import org.apache.kafka.common.serialization.StringDeserializer;
import java.util.Properties;
public abstract class ParentWorker implements IWorker {
protected Properties prop;
public ParentWorker(String groupName) {
prop=new Properties();
prop.put(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG,"192.168.136.100:9092");
prop.put(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class);
prop.put(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG,StringDeserializer.class);
//会话窗口3秒
prop.put(ConsumerConfig.SESSION_TIMEOUT_MS_CONFIG,30000);
//不自动提交
prop.put(ConsumerConfig.ENABLE_AUTO_COMMIT_CONFIG, false);
prop.put(ConsumerConfig.AUTO_COMMIT_INTERVAL_MS_CONFIG, 1000);
prop.put(ConsumerConfig.GROUP_ID_CONFIG,groupName);
prop.put(ConsumerConfig.AUTO_OFFSET_RESET_CONFIG, "earliest");
}
}
- HbaseWorker
package nj.zb.kb09.kafkahbaseUserfriendgj;
import org.apache.kafka.clients.consumer.ConsumerConfig;
import org.apache.kafka.clients.consumer.ConsumerRecords;
import org.apache.kafka.clients.consumer.KafkaConsumer;
import org.apache.kafka.common.serialization.StringDeserializer;
import java.io.IOException;
import java.time.Duration;
import java.util.Collections;
import java.util.Properties;
public class HbaseWorker extends ParentWorker {
private IWriter writer;
private String topic;
private String target;
public HbaseWorker(IWriter writer,String groupName,String topic,String targetTable) {
super(groupName);
this.topic=topic;
this.target=targetTable;
this.writer=writer;
}
@Override
public void fillData() {
KafkaConsumer<String, String> consumer = new KafkaConsumer<>(prop);
consumer.subscribe(Collections.singleton(this.topic));
try {
while (true){
ConsumerRecords<String, String> poll = consumer.poll(Duration.ofMillis(100));
Integer rowNum= writer.write(poll,this.target);
System.out.println("行数:"+rowNum);
try {
Thread.sleep(100);
} catch (InterruptedException e) {
e.printStackTrace();
}
}
} catch (IOException e) {
e.printStackTrace();
}
}
}
- HbaseWriter
package nj.zb.kb09.kafkahbaseUserfriendgj;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.hbase.HBaseConfiguration;
import org.apache.hadoop.hbase.TableName;
import org.apache.hadoop.hbase.client.Connection;
import org.apache.hadoop.hbase.client.ConnectionFactory;
import org.apache.hadoop.hbase.client.Put;
import org.apache.hadoop.hbase.client.Table;
import org.apache.kafka.clients.consumer.ConsumerRecords;
import java.io.IOException;
import java.util.List;
public class HbaseWriter implements IWriter {
private Connection connection;
private IParseRecord parseRecord;
/*public IParseRecord getParseRecord() {
return parseRecord;
}
public void setParseRecord(IParseRecord parseRecord) {
this.parseRecord = parseRecord;
}*/
public HbaseWriter(IParseRecord parseRecord) {
this.parseRecord=parseRecord;
//配置hbase信息 连接hbase数据库
Configuration conf = HBaseConfiguration.create();
conf.set("hbase.rootdir","hdfs://192.168.136.100:9000/hbase");
conf.set("hbase.zookeeper.quorum","192.168.136.100");
conf.set("hbase.zookeeper.property.clientPort","2181");
try {
connection= ConnectionFactory.createConnection(conf);
} catch (IOException e) {
e.printStackTrace();
}
}
@Override
public int write(ConsumerRecords<String, String> records, String tableName) throws IOException {
Table eventAttendTable = connection.getTable(TableName.valueOf(tableName));
List<Put> datas = parseRecord.parse(records);
eventAttendTable.put(datas);
return datas.size();
}
}
- UserFriendHandler
package nj.zb.kb09.kafkahbaseUserfriendgj;
import org.apache.hadoop.hbase.client.Put;
import org.apache.hadoop.hbase.util.Bytes;
import org.apache.kafka.clients.consumer.ConsumerRecord;
import org.apache.kafka.clients.consumer.ConsumerRecords;
import java.util.ArrayList;
import java.util.List;
public class UserFriendHandler implements IParseRecord{
List<Put> datas=new ArrayList<>();
@Override
public List<Put> parse(ConsumerRecords<String, String> records) {
for (ConsumerRecord<String, String> p:
records){
System.out.println(p.value());
String[] split = p.value().split(",");
Put put = new Put(Bytes.toBytes((split[0] + split[1] .hashCode())));
put.addColumn("uf".getBytes(),"userid".getBytes(),split[0].getBytes());
put.addColumn("uf".getBytes(),"friendid".getBytes(),split[1].getBytes());
datas.add(put);
}
return datas;
}
}
- EventAttendHandler
package nj.zb.kb09.kafkahbaseUserfriendgj;
import nj.zb.kb09.kafkahbaseUserfriendgj.IParseRecord;
import org.apache.hadoop.hbase.client.Put;
import org.apache.hadoop.hbase.util.Bytes;
import org.apache.kafka.clients.consumer.ConsumerRecord;
import org.apache.kafka.clients.consumer.ConsumerRecords;
import java.util.ArrayList;
import java.util.List;
public class EventAttendHandler implements IParseRecord {
List<Put> datas=new ArrayList<>();
@Override
public List<Put> parse(ConsumerRecords<String, String> records) {
for (ConsumerRecord<String, String> p:
records){
System.out.println(p.value());
String[] split = p.value().split(",");
Put put = new Put(Bytes.toBytes((split[0] + split[1] + split[2]).hashCode()));
put.addColumn("euat".getBytes(),"eventid".getBytes(),split[0].getBytes());
put.addColumn("euat".getBytes(),"userid".getBytes(),split[1].getBytes());
put.addColumn("euat".getBytes(),"state".getBytes(),split[2].getBytes());
datas.add(put);
}
return datas;
}
}
- EventsHandler
package nj.zb.kb09.kafkahbaseUserfriendgj;
import org.apache.hadoop.hbase.client.Put;
import org.apache.hadoop.hbase.util.Bytes;
import org.apache.kafka.clients.consumer.ConsumerRecord;
import org.apache.kafka.clients.consumer.ConsumerRecords;
import java.util.ArrayList;
import java.util.List;
public class EventsHandler implements IParseRecord{
@Override
public List<Put> parse(ConsumerRecords<String, String> records) {
List<Put> datas=new ArrayList<>();
for (ConsumerRecord<String, String> record :
records) {
String[] split = record.value().split(",");
System.out.println(record);
Put put=new Put(Bytes.toBytes(split[0].hashCode()));
put.addColumn("schedule".getBytes(),"start_time".getBytes(),split[2].getBytes());
put.addColumn("location".getBytes(),"city".getBytes(),split[3].getBytes());
put.addColumn("location".getBytes(),"state".getBytes(),split[4].getBytes());
put.addColumn("location".getBytes(),"zip".getBytes(),split[5].getBytes());
put.addColumn("location".getBytes(),"country".getBytes(),split[6].getBytes());
put.addColumn("location".getBytes(),"lat".getBytes(),split[7].getBytes());
put.addColumn("location".getBytes(),"lng".getBytes(),split[8].getBytes());
put.addColumn("creator".getBytes(),"user_id".getBytes(),split[1].getBytes());
put.addColumn("remark".getBytes(),"common_words".getBytes(),split[9].getBytes());
datas.add(put);
}
return datas;
}
}
- TrainHandler
package nj.zb.kb09.kafkahbaseUserfriendgj;
import org.apache.hadoop.hbase.client.Put;
import org.apache.hadoop.hbase.util.Bytes;
import org.apache.kafka.clients.consumer.ConsumerRecord;
import org.apache.kafka.clients.consumer.ConsumerRecords;
import java.util.ArrayList;
import java.util.List;
public class TrainHandler implements IParseRecord {
@Override
public List<Put> parse(ConsumerRecords<String, String> records) {
List<Put> datas=new ArrayList<>();
for (ConsumerRecord<String, String> record :
records) {
String[] split = record.value().split(",");
System.out.println(record);
Put put = new Put(Bytes.toBytes((split[0] + split[1]).hashCode()));
put.addColumn("eu".getBytes(),"user".getBytes(),split[0].getBytes());
put.addColumn("eu".getBytes(),"event".getBytes(),split[1].getBytes());
put.addColumn("eu".getBytes(),"invited".getBytes(),split[2].getBytes());
put.addColumn("eu".getBytes(),"timestamp".getBytes(),split[3].getBytes());
put.addColumn("eu".getBytes(),"interested".getBytes(),split[4].getBytes());
put.addColumn("eu".getBytes(),"not_interested".getBytes(),split[5].getBytes());
datas.add(put);
}
return datas;
}
}
- UsersHandler
package nj.zb.kb09.kafkahbaseUserfriendgj;
import org.apache.hadoop.hbase.client.Put;
import org.apache.hadoop.hbase.util.Bytes;
import org.apache.kafka.clients.consumer.ConsumerRecord;
import org.apache.kafka.clients.consumer.ConsumerRecords;
import java.util.ArrayList;
import java.util.List;
public class UsersHandler implements IParseRecord{
@Override
public List<Put> parse(ConsumerRecords<String, String> records) {
List<Put> datas=new ArrayList<>();
for (ConsumerRecord<String, String> record :
records) {
String[] split = record.value().split(",");
if(split[0].trim().length()==0){
continue;
}
System.out.println(record);
Put put = new Put(Bytes.toBytes(split[0].hashCode()));
put.addColumn("profile".getBytes(),"birthyear".getBytes(),split[2].getBytes());
put.addColumn("profile".getBytes(),"gender".getBytes(),split[3].getBytes());
put.addColumn("region".getBytes(),"locale".getBytes(),split[1].getBytes());
if(split.length>5){
put.addColumn("region".getBytes(),"location".getBytes(),split[5].getBytes());
}
if (split.length>6) {
put.addColumn("region".getBytes(), "timezone".getBytes(), split[6].getBytes());
}
if (split.length>4) {
put.addColumn("registration".getBytes(), "joinedAt".getBytes(), split[4].getBytes());
}
datas.add(put);
}
return datas;
}
}
- ALLTohbase2
package nj.zb.kb09.kafkahbaseUserfriendgj;
public class ALLTohbase2 {
public static void main(String[] args) {
//user_friend
/*IParseRecord record=new UserFriendHandler();
IWriter writer=new HbaseWriter(record);
IWorker worker=new HbaseWorker(writer,"userfriend","user_friends","events_db:user_friend");
*/
//等同于
IWorker worker=new HbaseWorker(new HbaseWriter(new UserFriendHandler()),"userfriend","user_friends","events_db:user_friend");
//worker.fillData();
//event_attendee"
//new HbaseWorker(new HbaseWriter(new EventAttendHandler()),"eventattend1","event_attendees","event_db:event_attendee").fillData();
//users
//new HbaseWorker(new HbaseWriter(new UsersHandler()),"users1","users","events_db:users").fillData();
//events
new HbaseWorker(new HbaseWriter(new EventsHandler()),"events1","events","events_db:events").fillData();
//train
//new HbaseWorker(new HbaseWriter(new TrainHandler()),"train1","train","events_db:train").fillData();
}
}
注意:这里程序运行,需要一个运行结束后,才能传输另一个topic中的数据到hbase中
- 查看表中的数据量
//events
hbase(main):010:0> count 'events_db:events',INTERVAL => 5000000,CACHE => 5000000
//event_attendee
hbase(main):011:0> count 'events_db:event_attendee',INTERVAL => 5000000,CACHE => 5000000
//train
hbase(main):012:0> count 'events_db:train',INTERVAL => 5000000,CACHE => 5000000
//user_friend
hbase(main):013:0> count 'events_db:user_friend',INTERVAL => 5000000,CACHE => 5000000
//users
hbase(main):013:0> count 'events_db:user_friend',INTERVAL => 5000000,CACHE => 5000000
更多推荐
所有评论(0)