数据开发-大数据实验手册
大数据实验手册2021-05-26 任务hdfs:shell api总结:FileSystem,IOUtils,LocalFileSystemFileSystem不能new,就使用FileSystem.get或FileSystem.newInstance,没有get就检查导包流是从FileSystem对象中的方法获取的,若类型不匹配,尝试用对象.方法的形式获取1.hdfs启动命令start-dfs
大数据实验手册
2021-05-26 任务
hdfs:shell api
总结:
FileSystem,IOUtils,LocalFileSystem
FileSystem不能new,就使用FileSystem.get或FileSystem.newInstance,没有get就检查导包
流是从FileSystem对象中的方法获取的,若类型不匹配,尝试用对象.方法的形式获取
1.hdfs启动命令
start-dfs.sh
stop-dfs.sh
start-yarn.sh
stop-yarn.sh
2.hdfs的shell命令操作
[bigdata@bigdata02 ~]$ hdfs dfs -help
Usage: hadoop fs [generic options]
[-appendToFile <localsrc> ... <dst>]
[-cat [-ignoreCrc] <src> ...]
[-checksum <src> ...]
[-chgrp [-R] GROUP PATH...]
[-chmod [-R] <MODE[,MODE]... | OCTALMODE> PATH...]
[-chown [-R] [OWNER][:[GROUP]] PATH...]
[-copyFromLocal [-f] [-p] [-l] <localsrc> ... <dst>]
[-copyToLocal [-p] [-ignoreCrc] [-crc] <src> ... <localdst>]
[-count [-q] [-h] <path> ...]
[-cp [-f] [-p | -p[topax]] <src> ... <dst>]
[-createSnapshot <snapshotDir> [<snapshotName>]]
[-deleteSnapshot <snapshotDir> <snapshotName>]
[-df [-h] [<path> ...]]
[-du [-s] [-h] <path> ...]
[-expunge]
[-find <path> ... <expression> ...]
[-get [-p] [-ignoreCrc] [-crc] <src> ... <localdst>]
[-getfacl [-R] <path>]
[-getfattr [-R] {-n name | -d} [-e en] <path>]
[-getmerge [-nl] <src> <localdst>]
[-help [cmd ...]]
[-ls [-d] [-h] [-R] [<path> ...]]
[-mkdir [-p] <path> ...]
[-moveFromLocal <localsrc> ... <dst>]
[-moveToLocal <src> <localdst>]
[-mv <src> ... <dst>]
[-put [-f] [-p] [-l] <localsrc> ... <dst>]
[-renameSnapshot <snapshotDir> <oldName> <newName>]
[-rm [-f] [-r|-R] [-skipTrash] <src> ...]
[-rmdir [--ignore-fail-on-non-empty] <dir> ...]
[-setfacl [-R] [{-b|-k} {-m|-x <acl_spec>} <path>]|[--set <acl_spec> <path>]]
[-setfattr {-n name [-v value] | -x name} <path>]
[-setrep [-R] [-w] <rep> <path> ...]
[-stat [format] <path> ...]
[-tail [-f] <file>]
[-test -[defsz] <path>]
[-text [-ignoreCrc] <src> ...]
[-touchz <path> ...]
[-truncate [-w] <length> <path> ...]
[-usage [cmd ...]]
3.hdfs命令操作实践
[bigdata@bigdata02 ~]$ hdfs dfs -ls /hello.txt
-rw-r--r-- 3 bigdata supergroup 3 2021-05-26 14:56 /hello.txt
[bigdata@bigdata02 ~]$
[bigdata@bigdata02 ~]$ hdfs dfs -put ./a.txt /c.txt
[bigdata@bigdata02 ~]$
[bigdata@bigdata02 ~]$ hdfs dfs -get /a.txt ./c.txt
[bigdata@bigdata02 ~]$
[bigdata@bigdata02 ~]$ hdfs dfs -cat /hello.txt
123
[bigdata@bigdata02 ~]$
[bigdata@bigdata02 ~]$ hdfs dfs -mkdir /hello
[bigdata@bigdata02 ~]$
[bigdata@bigdata02 ~]$ hdfs dfs -rm -r /hello
21/05/26 16:45:40 INFO fs.TrashPolicyDefault: Namenode trash configuration: Deletion interval = 0 minutes, Emptier interval = 0 minutes.
Deleted /hello
[bigdata@bigdata02 ~]$
[bigdata@bigdata02 ~]$ hdfs dfs -mv /hello.txt /hello
[bigdata@bigdata02 ~]$
[bigdata@bigdata02 ~]$ hdfs dfs -cp /hello/hello.txt /hello01.txt
[bigdata@bigdata02 ~]$
[bigdata@bigdata02 ~]$ hdfs dfs -rm -r /*.txt
Deleted /a.txt
Deleted /b.txt
Deleted /c.txt
[bigdata@bigdata02 ~]$
4.pom文件配置
<?xml version="1.0" encoding="UTF-8"?>
<project xmlns="http://maven.apache.org/POM/4.0.0"
xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd">
<modelVersion>4.0.0</modelVersion>
<groupId>bigdata</groupId>
<artifactId>bigdata</artifactId>
<version>1.0-SNAPSHOT</version>
<dependencies>
<dependency>
<groupId>org.apache.hadoop</groupId>
<artifactId>hadoop-common</artifactId>
<version>2.7.5</version>
</dependency>
<dependency>
<groupId>org.apache.hadoop</groupId>
<artifactId>hadoop-hdfs</artifactId>
<version>2.7.5</version>
</dependency>
<dependency>
<groupId>org.apache.hadoop</groupId>
<artifactId>hadoop-client</artifactId>
<version>2.7.5</version>
</dependency>
<dependency>
<groupId>junit</groupId>
<artifactId>junit</artifactId>
<version>RELEASE</version>
</dependency>
</dependencies>
<build>
<plugins>
<plugin>
<groupId>org.apache.maven.plugins</groupId>
<artifactId>maven-compiler-plugin</artifactId>
<version>3.1</version>
<configuration>
<source>1.8</source>
<target>1.8</target>
<encoding>UTF-8</encoding>
<!--<verbal>true</verbal>-->
</configuration>
</plugin>
<plugin>
<groupId>org.apache.maven.plugins</groupId>
<artifactId>maven-shade-plugin</artifactId>
<version>2.4.3</version>
<executions>
<execution>
<phase>package</phase>
<goals>
<goal>shade</goal>
</goals>
<configuration>
<minimizeJar>true</minimizeJar>
</configuration>
</execution>
</executions>
</plugin>
<!--生成doc jar包-->
<plugin>
<groupId>org.apache.maven.plugins</groupId>
<artifactId>maven-javadoc-plugin</artifactId>
<version>3.1.1</version>
<executions>
<execution>
<id>attach-javadocs</id>
<goals>
<goal>jar</goal>
</goals>
</execution>
</executions>
</plugin>
<!--生成源码jar包-->
<plugin>
<groupId>org.apache.maven.plugins</groupId>
<artifactId>maven-source-plugin</artifactId>
<version>2.1.2</version>
<executions>
<execution>
<id>attach-sources</id>
<goals>
<goal>jar</goal>
</goals>
</execution>
</executions>
</plugin>
</plugins>
</build>
</project>
5.hdfs编写实践
1.使用流的方式上传文件
package hdfs;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.FSDataOutputStream;
import org.apache.hadoop.fs.FileSystem;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.io.IOUtils;
import java.io.File;
import java.io.FileInputStream;
import java.io.IOException;
public class url_connect_hdfs {
public static void main(String[] args) throws IOException {
Configuration conf= new Configuration();
System.setProperty("HADOOP_USER_NAME", "bigdata");
conf.set("fs.defaultFS","192.168.123.152:9000");
FileSystem fs=FileSystem.newInstance(conf);
FSDataOutputStream out = fs.create(new Path("hdfs://192.168.123.152:9000/hello1.txt"));
FileInputStream in = new FileInputStream(new File("D://hello.txt"));
IOUtils.copyBytes(in,out,4096);
in.close();
out.close();
fs.close();
}
}
2.使用流的方式下载文件
package hdfs;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.FSDataInputStream;
import org.apache.hadoop.fs.FileSystem;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.io.IOUtils;
import java.io.*;
public class url_connect_hdfs {
public static void main(String[] args) throws IOException {
Configuration conf = new Configuration();
conf.set("fs.defaultFS","192.168.123.152:9000");
System.setProperty("HADOOP_USER_NAME","bigdata");
FileSystem fs=FileSystem.newInstance(conf);
FSDataInputStream in = fs.open(new Path("/hello.txt"));
FileOutputStream out = new FileOutputStream(new File("D://tttt.txt"));
IOUtils.copyBytes(in,out,4096);
in.close();
out.close();
fs.close();
}
}
3.读取hdfs上的文件列表
package hdfs;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.*;
import java.io.*;
public class url_connect_hdfs {
public static void main(String[] args) throws IOException {
Configuration conf = new Configuration();
conf.set("fs.defaultFS","192.168.123.152:9000");
System.setProperty("HADOOP_USER_NAME","bigdata");
FileSystem fs=FileSystem.newInstance(conf);
RemoteIterator<LocatedFileStatus> its = fs.listFiles(new Path("/"),true);
while(its.hasNext()){
LocatedFileStatus it = its.next();
if(it.isFile()){
System.out.println(it.getPath().toString());
}
}
fs.close();
}
}
4.将hdfs上文件信息及内容打印到屏幕上
package hdfs;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.*;
import org.apache.hadoop.io.IOUtils;
import java.io.*;
public class url_connect_hdfs {
public static void main(String[] args) throws IOException {
Configuration conf = new Configuration();
conf.set("fs.defaultFS","192.168.123.152:9000");
System.setProperty("HADOOP_USER_NAME","bigdata");
FileSystem fs=FileSystem.newInstance(conf);
RemoteIterator<LocatedFileStatus> its = fs.listFiles(new Path("/"),true);
while(its.hasNext()){
LocatedFileStatus it = its.next();
if(it.isFile()){
System.out.println(it.getPath().toString());
FSDataInputStream in = fs.open(it.getPath());
IOUtils.copyBytes(in,System.out,4096);
in.close();
}
}
fs.close();
}
}
2021-05-27 任务
mapreduce:api
总结:
Partitioner要与mapper的输出的key-value类型对应
WritableComparator需要以WritableComparable为基础,且在mapper的输出和reducer的输入类型中存在
jar->inputformat/outputformat->inputpath/outputpath->mapper/reducer->mapoutput/output+key/value
Partitioner->Reducer(Combiner)->WritableComparable->WritableComparator+GroupingComparator
千万不要写setCombinerClass,否则会报错
记住一定要写构造函数!!!
1.pom文件配置
<?xml version="1.0" encoding="UTF-8"?>
<project xmlns="http://maven.apache.org/POM/4.0.0"
xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd">
<modelVersion>4.0.0</modelVersion>
<groupId>bigdata</groupId>
<artifactId>bigdata</artifactId>
<version>1.0-SNAPSHOT</version>
<dependencies>
<dependency>
<groupId>org.apache.hadoop</groupId>
<artifactId>hadoop-common</artifactId>
<version>2.7.5</version>
</dependency>
<dependency>
<groupId>org.apache.hadoop</groupId>
<artifactId>hadoop-client</artifactId>
<version>2.7.5</version>
</dependency>
<dependency>
<groupId>org.apache.hadoop</groupId>
<artifactId>hadoop-hdfs</artifactId>
<version>2.7.5</version>
</dependency>
<dependency>
<groupId>org.apache.hadoop</groupId>
<artifactId>hadoop-mapreduce-client-core</artifactId>
<version>2.7.5</version>
</dependency>
<dependency>
<groupId>junit</groupId>
<artifactId>junit</artifactId>
<version>RELEASE</version>
</dependency>
</dependencies>
<build>
<plugins>
<plugin>
<groupId>org.apache.maven.plugins</groupId>
<artifactId>maven-compiler-plugin</artifactId>
<version>3.1</version>
<configuration>
<source>1.8</source>
<target>1.8</target>
<encoding>UTF-8</encoding>
<!--<verbal>true</verbal>-->
</configuration>
</plugin>
<plugin>
<groupId>org.apache.maven.plugins</groupId>
<artifactId>maven-shade-plugin</artifactId>
<version>2.4.3</version>
<executions>
<execution>
<phase>package</phase>
<goals>
<goal>shade</goal>
</goals>
<configuration>
<minimizeJar>true</minimizeJar>
</configuration>
</execution>
</executions>
</plugin>
<!--生成doc jar包-->
<plugin>
<groupId>org.apache.maven.plugins</groupId>
<artifactId>maven-javadoc-plugin</artifactId>
<version>3.1.1</version>
<executions>
<execution>
<id>attach-javadocs</id>
<goals>
<goal>jar</goal>
</goals>
</execution>
</executions>
</plugin>
<!--生成源码jar包-->
<plugin>
<groupId>org.apache.maven.plugins</groupId>
<artifactId>maven-source-plugin</artifactId>
<version>2.1.2</version>
<executions>
<execution>
<id>attach-sources</id>
<goals>
<goal>jar</goal>
</goals>
</execution>
</executions>
</plugin>
</plugins>
</build>
</project>
2.mapreduce编写实践
(1)mymapper
package hdfs;
import org.apache.hadoop.io.IntWritable;
import org.apache.hadoop.io.LongWritable;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.mapreduce.Mapper;
import java.io.IOException;
public class mymapper extends Mapper<LongWritable, Text,mycomparable, IntWritable> {
@Override
protected void map(LongWritable key, Text value, Context context) throws IOException, InterruptedException, IOException {
String line = value.toString();
String[] split = line.split(",");
for (String word : split) {
IntWritable intvalue = new IntWritable();
intvalue.set(Integer.parseInt(word));
mycomparable mc = new mycomparable();
mc.setNum(intvalue);
context.write(mc, intvalue);
}
}
}
(2)mycombinerreducer
package hdfs;
import org.apache.hadoop.io.IntWritable;
import org.apache.hadoop.io.NullWritable;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.mapreduce.Reducer;
import java.io.IOException;
import java.util.Iterator;
public class mycombinerreducer extends Reducer<mycomparable, IntWritable,Text,NullWritable> {
Text combiner = new Text();
@Override
protected void reduce(mycomparable key, Iterable<IntWritable> values, Context context) throws IOException, InterruptedException {
Iterator<IntWritable> it = values.iterator();
int sum = 0;
while(it.hasNext()){
sum+=it.next().get();
}
combiner.set(Integer.toString(sum));
context.write(combiner,NullWritable.get());
}
}
(3)jobmain
package hdfs;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.io.IntWritable;
import org.apache.hadoop.io.NullWritable;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.mapreduce.Job;
import org.apache.hadoop.mapreduce.lib.input.TextInputFormat;
import org.apache.hadoop.mapreduce.lib.output.TextOutputFormat;
import java.io.IOException;
public class jobmain {
public static void main(String[] args) throws IOException, ClassNotFoundException, InterruptedException {
Configuration conf=new Configuration();
Job job = Job.getInstance(conf);
job.setJarByClass(jobmain.class);
job.setInputFormatClass(TextInputFormat.class);
job.setOutputFormatClass(TextOutputFormat.class);
TextInputFormat.addInputPath(job,new Path("hdfs://192.168.123.152:9000/in/c.txt"));
TextOutputFormat.setOutputPath(job,new Path("hdfs://192.168.123.152:9000/out"));
job.setMapperClass(mymapper.class);
job.setReducerClass(mycombinerreducer.class);
job.setMapOutputKeyClass(mycomparable.class);
job.setOutputKeyClass(Text.class);
job.setMapOutputValueClass(IntWritable.class);
job.setOutputValueClass(NullWritable.class);
job.setPartitionerClass(mypartitioner.class);
job.setGroupingComparatorClass(mycomparator.class);
//job.setCombinerClass(mycombinerreducer.class); ///千万不要写这一行,否则会报错
job.waitForCompletion(true);
}
}
(4)mypartitioner
package hdfs;
import org.apache.hadoop.io.IntWritable;
import org.apache.hadoop.mapreduce.Partitioner;
public class mypartitioner extends Partitioner<mycomparable, IntWritable> {
@Override
public int getPartition(mycomparable myc, IntWritable intWritable, int i) {
if(myc.getNum().get()==1) return 1;
else return 0;
}
}
(5)mycomparable
package hdfs;
import org.apache.hadoop.io.IntWritable;
import org.apache.hadoop.io.WritableComparable;
import java.io.DataInput;
import java.io.DataOutput;
import java.io.IOException;
public class mycomparable implements WritableComparable<mycomparable> {
private IntWritable num;
public mycomparable(){
super();
}
public IntWritable getNum(){
return num;
}
public void setNum(IntWritable onum){
num = onum;
}
@Override
public void write(DataOutput out) throws IOException {
out.writeInt(num.get());
}
@Override
public void readFields(DataInput in) throws IOException {
this.num = new IntWritable(in.readInt());
}
public int compareTo(mycomparable o) {
return o.getNum().compareTo(this.getNum());
}
}
(6)mycomparator
package hdfs;
import org.apache.hadoop.io.WritableComparable;
import org.apache.hadoop.io.WritableComparator;
public class mycomparator extends WritableComparator {
public mycomparator(){
super(mycomparable.class,true);
}
@Override
public int compare(WritableComparable a, WritableComparable b) {
mycomparable a1 = (mycomparable)a;
mycomparable b1 = (mycomparable)b;
return a1.getNum().compareTo(b1.getNum());
}
}
3.提交运行
jar bigdata-1.0-SNAPSHOT.jar hdfs.jobmain
2021-05-28 任务
zookeeper:api
总结:
主要的类有Zookeeper,Watcher
回调函数引用外部变量要初始化
version参数填-1,Watcher可以设置回调或null
zk.create("/test",“hello”.getBytes(), ZooDefs.Ids.OPEN_ACL_UNSAFE, CreateMode.PERSISTENT);
1.zookeeper启动命令
#没有一键启动集群的启动脚本,需要每个服务节点各自单独启动
zkServer.sh start
zkServer.sh stop
zkServer.sh status
2.zookeeper的shell命令操作
[bigdata@bigdata02 ~]$ zkCli.sh -server bigdata02:2181
[zk: bigdata02:2181(CONNECTED) 1] help
ZooKeeper -server host:port cmd args
stat path [watch]
set path data [version]
ls path [watch]
delquota [-n|-b] path
ls2 path [watch]
setAcl path acl
setquota -n|-b val path
history
redo cmdno
printwatches on|off
delete path [version]
sync path
listquota path
rmr path
get path [watch]
create [-s] [-e] path data acl
addauth scheme auth
quit
getAcl path
close
connect host:port
[zk: bigdata02:2181(CONNECTED) 2]
3.zookeeper命令操作实践
[zk: bigdata02:2181(CONNECTED) 2] create -e /testzk "111"
Created /testzk
[zk: bigdata02:2181(CONNECTED) 3] ls2 /
[zookeeper, testzk]
[zk: bigdata02:2181(CONNECTED) 4] get /testzk
111
[zk: bigdata02:2181(CONNECTED) 5] set /testzk "aaa"
[zk: bigdata02:2181(CONNECTED) 6] get /testzk
aaa
[zk: bigdata02:2181(CONNECTED) 7] rmr /testzk
[zk: bigdata02:2181(CONNECTED) 8] ls2 /
[zookeeper]
[zk: bigdata02:2181(CONNECTED) 9] quit
Quitting...
2021-05-28 15:45:52,176 [myid:] - INFO [main:ZooKeeper@693] - Session: 0x20004cc50380000 closed
2021-05-28 15:45:52,177 [myid:] - INFO [main-EventThread:ClientCnxn$EventThread@522] - EventThread shut down for session: 0x20004cc50380000
[bigdata@bigdata02 ~]$
4.pom文件配置
<?xml version="1.0" encoding="UTF-8"?>
<project xmlns="http://maven.apache.org/POM/4.0.0"
xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd">
<modelVersion>4.0.0</modelVersion>
<groupId>bigdata</groupId>
<artifactId>bigdata</artifactId>
<version>1.0-SNAPSHOT</version>
<dependencies>
<dependency>
<groupId>junit</groupId>
<artifactId>junit</artifactId>
<version>4.12</version>
</dependency>
<dependency>
<groupId>org.apache.zookeeper</groupId>
<artifactId>zookeeper</artifactId>
<version>3.4.6</version>
</dependency>
</dependencies>
<build>
<plugins>
<plugin>
<groupId>org.apache.maven.plugins</groupId>
<artifactId>maven-compiler-plugin</artifactId>
<version>3.1</version>
<configuration>
<source>1.8</source>
<target>1.8</target>
<encoding>UTF-8</encoding>
<!--<verbal>true</verbal>-->
</configuration>
</plugin>
<plugin>
<groupId>org.apache.maven.plugins</groupId>
<artifactId>maven-shade-plugin</artifactId>
<version>2.4.3</version>
<executions>
<execution>
<phase>package</phase>
<goals>
<goal>shade</goal>
</goals>
<configuration>
<minimizeJar>true</minimizeJar>
</configuration>
</execution>
</executions>
</plugin>
<!--生成doc jar包-->
<plugin>
<groupId>org.apache.maven.plugins</groupId>
<artifactId>maven-javadoc-plugin</artifactId>
<version>3.1.1</version>
<executions>
<execution>
<id>attach-javadocs</id>
<goals>
<goal>jar</goal>
</goals>
</execution>
</executions>
</plugin>
<!--生成源码jar包-->
<plugin>
<groupId>org.apache.maven.plugins</groupId>
<artifactId>maven-source-plugin</artifactId>
<version>2.1.2</version>
<executions>
<execution>
<id>attach-sources</id>
<goals>
<goal>jar</goal>
</goals>
</execution>
</executions>
</plugin>
</plugins>
</build>
</project>
5.zookeeper编写实践
package hdfs;
import org.apache.zookeeper.*;
import java.io.IOException;
public class myzookeeper {
static ZooKeeper zk = null;
public static void main(String[] args) throws IOException, KeeperException, InterruptedException {
zk = new ZooKeeper("bigdata02:2181,bigdata03:2181", 4000, new Watcher() {
@Override
public void process(WatchedEvent watchedEvent) {
try {
System.out.println(new String(zk.getData("/test",false,null)));
} catch (KeeperException e) {
e.printStackTrace();
} catch (InterruptedException e) {
e.printStackTrace();
}
}
});
zk.create("/test","hello111".getBytes(), ZooDefs.Ids.OPEN_ACL_UNSAFE, CreateMode.PERSISTENT);
System.out.println(new String(zk.getData("/test",null,null)));
zk.setData("/test","world".getBytes(),-1);
System.out.println(new String(zk.getData("/test",null,null)));
System.out.println(zk.exists("/test",null) != null?"exists":"not exists");
zk.delete("/test",-1);
zk.close();
}
}
2021-05-29 任务
hbase:shell api
总结:
database:HBaseConfiguration,ConnectionFactory
column family:HTableDescriptor
操作:Put,Get,ResultScanner
表的增删用Admin类对象进行操作
表的内容获取修改用Table类对象进行操作
Admin和Table都是从Connection中获取的
1.hbase启动命令
zkServer.sh start
start-dfs.sh
start-hbase.sh
2.hbase的shell命令操作
[bigdata@bigdata02 conf]$ hbase shell
hbase(main):001:0> help
3.hbase命令操作实践
hbase(main):002:0> create 'table1','info'
0 row(s) in 1.4590 seconds
=> Hbase::Table - table1
hbase(main):003:0> put 'table1','1001','info:age','18'
0 row(s) in 0.1980 seconds
hbase(main):004:0> put 'table1','1001','info:name','jack'
0 row(s) in 0.0150 seconds
hbase(main):005:0> scan 'table1'
ROW COLUMN+CELL
1001 column=info:age, timestamp=1622267507372, value=18
1001 column=info:name, timestamp=1622267563806, value=jack
1 row(s) in 0.0190 seconds
hbase(main):006:0> scan 'table1',{STARTROW => '1001',STOPROW => '1009'}
ROW COLUMN+CELL
1001 column=info:age, timestamp=1622267507372, value=18
1001 column=info:name, timestamp=1622267563806, value=jack
1 row(s) in 0.0110 seconds
hbase(main):007:0> get 'table1','1001','info:name'
COLUMN CELL
info:name timestamp=1622267563806, value=jack
1 row(s) in 0.0170 seconds
hbase(main):008:0> count 'table1'
1 row(s) in 0.0130 seconds
=> 1
hbase(main):009:0> delete 'table1','1001','info:name'
0 row(s) in 0.0150 seconds
hbase(main):010:0> truncate 'table1'
Truncating 'table1' table (it may take a while):
- Disabling table...
- Truncating table...
0 row(s) in 3.6530 seconds
hbase(main):011:0> disable 'table1'
0 row(s) in 2.2530 seconds
hbase(main):012:0> drop 'table1'
0 row(s) in 1.2620 seconds
hbase(main):013:0>
4.pom文件配置
<?xml version="1.0" encoding="UTF-8"?>
<project xmlns="http://maven.apache.org/POM/4.0.0"
xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd">
<modelVersion>4.0.0</modelVersion>
<groupId>bigdata</groupId>
<artifactId>bigdata</artifactId>
<version>1.0-SNAPSHOT</version>
<dependencies>
<dependency>
<groupId>junit</groupId>
<artifactId>junit</artifactId>
<version>4.12</version>
</dependency>
<dependency>
<groupId>org.apache.hbase</groupId>
<artifactId>hbase-server</artifactId>
<version>2.0.0</version>
</dependency>
<dependency>
<groupId>org.apache.hbase</groupId>
<artifactId>hbase-client</artifactId>
<version>2.0.0</version>
</dependency>
<dependency>
<groupId>org.testng</groupId>
<artifactId>testng</artifactId>
<version>6.14.3</version>
<scope>test</scope>
</dependency>
</dependencies>
<build>
<plugins>
<plugin>
<groupId>org.apache.maven.plugins</groupId>
<artifactId>maven-compiler-plugin</artifactId>
<version>3.1</version>
<configuration>
<source>1.8</source>
<target>1.8</target>
<encoding>UTF-8</encoding>
<!--<verbal>true</verbal>-->
</configuration>
</plugin>
<plugin>
<groupId>org.apache.maven.plugins</groupId>
<artifactId>maven-shade-plugin</artifactId>
<version>2.4.3</version>
<executions>
<execution>
<phase>package</phase>
<goals>
<goal>shade</goal>
</goals>
<configuration>
<minimizeJar>true</minimizeJar>
</configuration>
</execution>
</executions>
</plugin>
<!--生成doc jar包-->
<plugin>
<groupId>org.apache.maven.plugins</groupId>
<artifactId>maven-javadoc-plugin</artifactId>
<version>3.1.1</version>
<executions>
<execution>
<id>attach-javadocs</id>
<goals>
<goal>jar</goal>
</goals>
</execution>
</executions>
</plugin>
<!--生成源码jar包-->
<plugin>
<groupId>org.apache.maven.plugins</groupId>
<artifactId>maven-source-plugin</artifactId>
<version>2.1.2</version>
<executions>
<execution>
<id>attach-sources</id>
<goals>
<goal>jar</goal>
</goals>
</execution>
</executions>
</plugin>
</plugins>
</build>
</project>
5.hbase编写实践
package hdfs;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.hbase.HBaseConfiguration;
import org.apache.hadoop.hbase.HColumnDescriptor;
import org.apache.hadoop.hbase.HTableDescriptor;
import org.apache.hadoop.hbase.TableName;
import org.apache.hadoop.hbase.client.*;
import java.io.IOException;
import java.util.Iterator;
public class myhbase {
public static void main(String[] args) throws IOException {
Configuration conf= HBaseConfiguration.create();
conf.set("hbase.zookeeper.quorum","bigdata02:2181,bigdata02:2181,bigdata04:2181");
Connection connection = ConnectionFactory.createConnection(conf);
Admin admin = connection.getAdmin();
String tableName = "table01";
String[] cf = {"lie001","lie002"};
HTableDescriptor hTableDescriptor = new HTableDescriptor(TableName.valueOf(tableName));
hTableDescriptor.addFamily(new HColumnDescriptor(cf[0]));
hTableDescriptor.addFamily(new HColumnDescriptor(cf[1]));
admin.createTable(hTableDescriptor);
Table table = connection.getTable(TableName.valueOf("table01"));
Put put = new Put("0001".getBytes());
put.addColumn(cf[0].getBytes(),"name".getBytes(),"lisi".getBytes());
table.put(put);
Get get = new Get("0001".getBytes());
Result result = table.get(get);
String values = new String(result.getValue(cf[0].getBytes(),"name".getBytes()));
System.out.println(values);
ResultScanner resultScanner = table.getScanner(new Scan());
Iterator<Result> it = resultScanner.iterator();
while(it.hasNext()){
String string = new String(it.next().getValue(cf[0].getBytes(),"name".getBytes()));
System.out.println("Scanner:"+string);
}
admin.disableTable(TableName.valueOf(tableName));
admin.deleteTable(TableName.valueOf(tableName));
admin.close();
}
}
2021-05-30 任务
kafka:shell api
总结:
主要类有KafkaProducer,KafkaConsumer,Arrays
设置kafka服务端bootstrap.servers
设置ack
设置key,value的序列化key.serializer,value.serializer
1.kafka启动命令
./apps/kafka_2.11-0.10.0.0/bin/kafka-server-start.sh -daemon ./apps/kafka_2.11-0.10.0.0/config/server.properties
2.kafka命令操作实践
[bigdata@bigdata02 kafka_2.11-0.10.0.0]$ bin/kafka-topics.sh --create --zookeeper bigdata02:2181 --replication-factor 2 --partitions 3 --topic test
Created topic "test".
[bigdata@bigdata02 kafka_2.11-0.10.0.0]$ bin/kafka-topics.sh --list --zookeeper bigdata02:2181,bigdata03:2181,bigdata04:2181
test
[bigdata@bigdata02 kafka_2.11-0.10.0.0]$ bin/kafka-console-producer.sh --broker-list bigdata02:9092,bigdata03:9092,bigdata04:9092 --topic test
111
hello
[bigdata@bigdata03 kafka_2.11-0.10.0.0]$ bin/kafka-console-consumer.sh --from-beginning --topic test --zookeeper bigdata02:2181,bigdata03:2181,bigdata04:2181
111
hello
[bigdata@bigdata02 kafka_2.11-0.10.0.0]$ bin/kafka-topics.sh --describe --zookeeper bigdata02:2181,bigdata03:2181,bigdata04:2181 --topic test
Topic:test PartitionCount:3 ReplicationFactor:2 Configs:
Topic: test Partition: 0 Leader: 3 Replicas: 3,0 Isr: 3,0
Topic: test Partition: 1 Leader: 0 Replicas: 0,1 Isr: 0,1
Topic: test Partition: 2 Leader: 1 Replicas: 1,2 Isr: 1,2
[bigdata@bigdata02 kafka_2.11-0.10.0.0]$ bin/kafka-topics.sh --delete --zookeeper bigdata02:2181,bigdata03:2181,bigdata04:2181 --topic test
Topic test is marked for deletion.
Note: This will have no impact if delete.topic.enable is not set to true.
3.pom文件配置
<?xml version="1.0" encoding="UTF-8"?>
<project xmlns="http://maven.apache.org/POM/4.0.0"
xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd">
<modelVersion>4.0.0</modelVersion>
<groupId>bigdata</groupId>
<artifactId>bigdata</artifactId>
<version>1.0-SNAPSHOT</version>
<dependencies>
<dependency>
<groupId>org.apache.kafka</groupId>
<artifactId>kafka-clients</artifactId>
<version> 0.10.0.0</version>
</dependency>
<dependency>
<groupId>org.apache.kafka</groupId>
<artifactId>kafka-streams</artifactId>
<version>0.10.0.0</version>
</dependency>
</dependencies>
<build>
<plugins>
<plugin>
<groupId>org.apache.maven.plugins</groupId>
<artifactId>maven-compiler-plugin</artifactId>
<version>3.1</version>
<configuration>
<source>1.8</source>
<target>1.8</target>
<encoding>UTF-8</encoding>
<!--<verbal>true</verbal>-->
</configuration>
</plugin>
<plugin>
<groupId>org.apache.maven.plugins</groupId>
<artifactId>maven-shade-plugin</artifactId>
<version>2.4.3</version>
<executions>
<execution>
<phase>package</phase>
<goals>
<goal>shade</goal>
</goals>
<configuration>
<minimizeJar>true</minimizeJar>
</configuration>
</execution>
</executions>
</plugin>
<!--生成doc jar包-->
<plugin>
<groupId>org.apache.maven.plugins</groupId>
<artifactId>maven-javadoc-plugin</artifactId>
<version>3.1.1</version>
<executions>
<execution>
<id>attach-javadocs</id>
<goals>
<goal>jar</goal>
</goals>
</execution>
</executions>
</plugin>
<!--生成源码jar包-->
<plugin>
<groupId>org.apache.maven.plugins</groupId>
<artifactId>maven-source-plugin</artifactId>
<version>2.1.2</version>
<executions>
<execution>
<id>attach-sources</id>
<goals>
<goal>jar</goal>
</goals>
</execution>
</executions>
</plugin>
</plugins>
</build>
</project>
4.kafka编写实践
(1)kafka生产者producer
package hdfs;
import org.apache.kafka.clients.producer.KafkaProducer;
import org.apache.kafka.clients.producer.ProducerRecord;
import java.util.Properties;
public class mykafkaproducer {
public static void main(String[] args) {
Properties properties = new Properties();
properties.put("bootstrap.servers","bigdata02:9092");
properties.put("ack","all");
properties.put("key.serializer","org.apache.kafka.common.serialization.StringSerializer");
properties.put("value.serializer","org.apache.kafka.common.serialization.StringSerializer");
KafkaProducer<String, String> kafkaProducer = new KafkaProducer<String, String>(properties);
for (int i = 0; i < 1000; i++) {
kafkaProducer.send(new ProducerRecord<String, String>("test","world"+i));
}
}
}
(2)kafka消费者consumer
package hdfs;
import org.apache.kafka.clients.consumer.ConsumerRecord;
import org.apache.kafka.clients.consumer.ConsumerRecords;
import org.apache.kafka.clients.consumer.KafkaConsumer;
import java.util.Arrays;
import java.util.Properties;
public class mykafkaconsumer {
public static void main(String[] args) {
Properties properties = new Properties();
properties.put("bootstrap.servers","bigdata03:9092");
properties.put("group.id","test");
properties.put("key.deserializer","org.apache.kafka.common.serialization.StringDeserializer");
properties.put("value.deserializer","org.apache.kafka.common.serialization.StringDeserializer");
KafkaConsumer<String,String> kafkaConsumer = new KafkaConsumer<String,String>(properties);
kafkaConsumer.subscribe(Arrays.asList("test"));
while(true){
ConsumerRecords<String, String> consumerRecords = kafkaConsumer.poll(100);
for(ConsumerRecord<String, String> consumerRecord:consumerRecords){
System.out.println(consumerRecord.value());
}
}
}
}
00其它备份
pom版本文件
<?xml version="1.0" encoding="UTF-8"?>
<project xmlns="http://maven.apache.org/POM/4.0.0" xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd">
<modelVersion>4.0.0</modelVersion>
<groupId>cn.itcast</groupId>
<artifactId>tags</artifactId>
<version>0.0.1-SNAPSHOT</version>
<packaging>pom</packaging>
<modules>
<module>tag-model</module>
<module>tag-web</module>
<module>tag_model_new</module>
</modules>
<properties>
<!-- Maven 配置 -->
<project.build.sourceEncoding>UTF-8</project.build.sourceEncoding>
<project.reporting.outputEncoding>UTF-8</project.reporting.outputEncoding>
<maven.compiler.encoding>UTF-8</maven.compiler.encoding>
<maven.build.timestamp.format>yyyyMMddHHmmss</maven.build.timestamp.format>
<!-- Common 系列 -->
<commons-io.version>2.6</commons-io.version>
<commons-codec.version>1.11</commons-codec.version>
<commons-lang.version>2.6</commons-lang.version>
<commons-lang3.version>3.7</commons-lang3.version>
<commons-upload.version>1.3.3</commons-upload.version>
<commons-collections.version>4.4</commons-collections.version>
<fastjson.version>1.2.47</fastjson.version>
<jackson.version>2.9.9</jackson.version>
<!-- Web -->
<javax.servlet-api.version>4.0.0</javax.servlet-api.version>
<jstl.version>1.2</jstl.version>
<javax.servlet.jsp.jstl-api.version>1.2.1</javax.servlet.jsp.jstl-api.version>
<httpmime.version>4.3.2</httpmime.version>
<spring.version>5.0.0.RELEASE</spring.version>
<shiro.version>1.3.2</shiro.version>
<shiro-freemarker-tags.version>0.1</shiro-freemarker-tags.version>
<aspectjweaver.version>1.9.0</aspectjweaver.version>
<freemarker.version>2.3.28</freemarker.version>
<!-- 数据 -->
<scala.version>2.11.12</scala.version>
<spark.version>2.4.0-cdh6.3.2</spark.version>
<hadoop.version>3.0.0-cdh6.3.2</hadoop.version>
<!-- 数据库 -->
<mysql.version>8.0.17</mysql.version>
<jedis.version>2.9.0</jedis.version>
<ehcache.version>2.10.5</ehcache.version>
<mybatis.version>3.4.6</mybatis.version>
<mybatis.spring.version>1.3.2</mybatis.spring.version>
<mybatis-generator.version>1.3.5</mybatis-generator.version>
<druid.version>1.1.9</druid.version>
<hbase.version>2.1.0-cdh6.3.2</hbase.version>
<shc.version>1.1.3-2.4-s_2.11</shc.version>
<solr.version>7.4.0-cdh6.3.2</solr.version>
<!-- 调度 -->
<oozie.version>5.1.0-cdh6.3.2</oozie.version>
<quartz.version>2.2.1</quartz.version>
<jsch.version>0.1.53</jsch.version>
<!-- 配置 -->
<config.version>1.4.0</config.version>
<!-- 日志 -->
<log4j.version>2.11.0</log4j.version>
<slf4j.version>1.7.25</slf4j.version>
<!-- 测试 -->
<junit.version>4.12</junit.version>
<!-- maven plugins -->
<maven-compiler-plugin.version>3.1</maven-compiler-plugin.version>
<scala-compiler-plugin.version>4.3.1</scala-compiler-plugin.version>
<maven-war-plugin.version>3.2.1</maven-war-plugin.version>
<maven-shade-plugin.version>3.2.1</maven-shade-plugin.version>
<mybatis-generator-maven-plugin.version>1.3.5</mybatis-generator-maven-plugin.version>
<maven-surefire-plugin.version>2.19.1</maven-surefire-plugin.version>
<jetty-maven-plugin.version>9.4.10.v20180503</jetty-maven-plugin.version>
<maven-assembly-plugin.version>2.6</maven-assembly-plugin.version>
</properties>
<dependencyManagement>
<dependencies>
<!-- Module -->
<dependency>
<groupId>cn.itcast</groupId>
<artifactId>tag-web</artifactId>
<version>0.0.1-SNAPSHOT</version>
</dependency>
<dependency>
<groupId>cn.itcast</groupId>
<artifactId>tag-model</artifactId>
<version>0.0.1-SNAPSHOT</version>
</dependency>
<dependency>
<groupId>cn.itcast</groupId>
<artifactId>tag-data</artifactId>
<version>0.0.1-SNAPSHOT</version>
</dependency>
<!-- Common 系列 -->
<dependency>
<groupId>commons-fileupload</groupId>
<artifactId>commons-fileupload</artifactId>
<version>${commons-upload.version}</version>
</dependency>
<dependency>
<groupId>commons-io</groupId>
<artifactId>commons-io</artifactId>
<version>${commons-io.version}</version>
</dependency>
<dependency>
<groupId>commons-codec</groupId>
<artifactId>commons-codec</artifactId>
<version>${commons-codec.version}</version>
</dependency>
<dependency>
<groupId>commons-lang</groupId>
<artifactId>commons-lang</artifactId>
<version>${commons-lang.version}</version>
</dependency>
<dependency>
<groupId>org.apache.commons</groupId>
<artifactId>commons-lang3</artifactId>
<version>${commons-lang3.version}</version>
</dependency>
<dependency>
<groupId>com.alibaba</groupId>
<artifactId>fastjson</artifactId>
<version>${fastjson.version}</version>
</dependency>
<dependency>
<groupId>com.fasterxml.jackson.core</groupId>
<artifactId>jackson-databind</artifactId>
<version>${jackson.version}</version>
</dependency>
<dependency>
<groupId>com.fasterxml.jackson.core</groupId>
<artifactId>jackson-annotations</artifactId>
<version>${jackson.version}</version>
</dependency>
<dependency>
<groupId>com.fasterxml.jackson.core</groupId>
<artifactId>jackson-core</artifactId>
<version>${jackson.version}</version>
</dependency>
<dependency>
<groupId>org.apache.commons</groupId>
<artifactId>commons-collections4</artifactId>
<version>${commons-collections.version}</version>
</dependency>
<!-- Web -->
<dependency>
<groupId>javax.servlet</groupId>
<artifactId>javax.servlet-api</artifactId>
<version>${javax.servlet-api.version}</version>
</dependency>
<dependency>
<groupId>javax.servlet</groupId>
<artifactId>jstl</artifactId>
<version>${jstl.version}</version>
</dependency>
<dependency>
<groupId>javax.servlet.jsp.jstl</groupId>
<artifactId>javax.servlet.jsp.jstl-api</artifactId>
<version>${javax.servlet.jsp.jstl-api.version}</version>
</dependency>
<dependency>
<groupId>org.apache.httpcomponents</groupId>
<artifactId>httpmime</artifactId>
<version>${httpmime.version}</version>
</dependency>
<dependency>
<groupId>org.springframework</groupId>
<artifactId>spring-webmvc</artifactId>
<version>${spring.version}</version>
</dependency>
<dependency>
<groupId>org.springframework</groupId>
<artifactId>spring-jdbc</artifactId>
<version>${spring.version}</version>
</dependency>
<dependency>
<groupId>org.springframework</groupId>
<artifactId>spring-test</artifactId>
<version>${spring.version}</version>
</dependency>
<dependency>
<groupId>org.springframework</groupId>
<artifactId>spring-aspects</artifactId>
<version>${spring.version}</version>
</dependency>
<dependency>
<groupId>org.springframework.integration</groupId>
<artifactId>spring-integration-mail</artifactId>
<version>${spring.version}</version>
</dependency>
<dependency>
<groupId>org.apache.shiro</groupId>
<artifactId>shiro-all</artifactId>
<version>${shiro.version}</version>
<exclusions>
<exclusion>
<artifactId>commons-beanutils</artifactId>
<groupId>commons-beanutils</groupId>
</exclusion>
</exclusions>
</dependency>
<dependency>
<groupId>net.mingsoft</groupId>
<artifactId>shiro-freemarker-tags</artifactId>
<version>${shiro-freemarker-tags.version}</version>
</dependency>
<dependency>
<groupId>org.aspectj</groupId>
<artifactId>aspectjweaver</artifactId>
<version>${aspectjweaver.version}</version>
</dependency>
<dependency>
<groupId>org.freemarker</groupId>
<artifactId>freemarker</artifactId>
<version>${freemarker.version}</version>
</dependency>
<!-- 数据 -->
<dependency>
<groupId>org.scala-lang</groupId>
<artifactId>scala-library</artifactId>
<version>${scala.version}</version>
</dependency>
<dependency>
<groupId>org.apache.hadoop</groupId>
<artifactId>hadoop-common</artifactId>
<version>${hadoop.version}</version>
</dependency>
<dependency>
<groupId>org.apache.hadoop</groupId>
<artifactId>hadoop-hdfs</artifactId>
<version>${hadoop.version}</version>
</dependency>
<dependency>
<groupId>org.apache.hadoop</groupId>
<artifactId>hadoop-yarn-common</artifactId>
<version>${hadoop.version}</version>
</dependency>
<dependency>
<groupId>org.apache.spark</groupId>
<artifactId>spark-core_2.11</artifactId>
<version>${spark.version}</version>
</dependency>
<dependency>
<groupId>org.apache.spark</groupId>
<artifactId>spark-sql_2.11</artifactId>
<version>${spark.version}</version>
</dependency>
<dependency>
<groupId>org.apache.spark</groupId>
<artifactId>spark-hive_2.11</artifactId>
<version>${spark.version}</version>
</dependency>
<dependency>
<groupId>org.apache.spark</groupId>
<artifactId>spark-mllib_2.11</artifactId>
<version>${spark.version}</version>
</dependency>
<!-- 数据库 -->
<dependency>
<groupId>mysql</groupId>
<artifactId>mysql-connector-java</artifactId>
<version>${mysql.version}</version>
</dependency>
<dependency>
<groupId>redis.clients</groupId>
<artifactId>jedis</artifactId>
<version>${jedis.version}</version>
</dependency>
<dependency>
<groupId>net.sf.ehcache</groupId>
<artifactId>ehcache</artifactId>
<version>${ehcache.version}</version>
</dependency>
<dependency>
<groupId>org.mybatis</groupId>
<artifactId>mybatis</artifactId>
<version>${mybatis.version}</version>
</dependency>
<dependency>
<groupId>org.mybatis</groupId>
<artifactId>mybatis-spring</artifactId>
<version>${mybatis.spring.version}</version>
</dependency>
<dependency>
<groupId>org.mybatis.generator</groupId>
<artifactId>mybatis-generator-core</artifactId>
<version>${mybatis-generator.version}</version>
</dependency>
<dependency>
<groupId>com.alibaba</groupId>
<artifactId>druid</artifactId>
<version>${druid.version}</version>
</dependency>
<dependency>
<groupId>org.apache.hbase</groupId>
<artifactId>hbase-client</artifactId>
<version>${hbase.version}</version>
</dependency>
<dependency>
<groupId>org.apache.hbase</groupId>
<artifactId>hbase-common</artifactId>
<version>${hbase.version}</version>
</dependency>
<dependency>
<groupId>org.apache.hbase</groupId>
<artifactId>hbase-server</artifactId>
<version>${hbase.version}</version>
</dependency>
<dependency>
<groupId>org.apache.hbase</groupId>
<artifactId>hbase-mapreduce</artifactId>
<version>${hbase.version}</version>
</dependency>
<dependency>
<groupId>com.hortonworks</groupId>
<artifactId>shc-core</artifactId>
<version>${shc.version}</version>
</dependency>
<dependency>
<groupId>org.apache.solr</groupId>
<artifactId>solr-core</artifactId>
<version>${solr.version}</version>
</dependency>
<dependency>
<groupId>org.apache.solr</groupId>
<artifactId>solr-solrj</artifactId>
<version>${solr.version}</version>
<exclusions>
<exclusion>
<artifactId>commons-math3</artifactId>
<groupId>org.apache.commons</groupId>
</exclusion>
<exclusion>
<artifactId>zookeeper</artifactId>
<groupId>org.apache.zookeeper</groupId>
</exclusion>
</exclusions>
</dependency>
<!-- 调度 -->
<dependency>
<groupId>org.apache.oozie</groupId>
<artifactId>oozie-core</artifactId>
<version>${oozie.version}</version>
<exclusions>
<exclusion>
<artifactId>jsr305</artifactId>
<groupId>com.google.code.findbugs</groupId>
</exclusion>
<exclusion>
<artifactId>gson</artifactId>
<groupId>com.google.code.gson</groupId>
</exclusion>
<exclusion>
<artifactId>guava</artifactId>
<groupId>com.google.guava</groupId>
</exclusion>
<exclusion>
<artifactId>hadoop-yarn-server-resourcemanager</artifactId>
<groupId>org.apache.hadoop</groupId>
</exclusion>
<exclusion>
<artifactId>hadoop-yarn-server-common</artifactId>
<groupId>org.apache.hadoop</groupId>
</exclusion>
<exclusion>
<artifactId>hadoop-yarn-server-applicationhistoryservice</artifactId>
<groupId>org.apache.hadoop</groupId>
</exclusion>
<exclusion>
<artifactId>hadoop-yarn-server-web-proxy</artifactId>
<groupId>org.apache.hadoop</groupId>
</exclusion>
<exclusion>
<artifactId>guice</artifactId>
<groupId>com.google.inject</groupId>
</exclusion>
<exclusion>
<artifactId>guice-servlet</artifactId>
<groupId>com.google.inject.extensions</groupId>
</exclusion>
<exclusion>
<artifactId>jersey-client</artifactId>
<groupId>com.sun.jersey</groupId>
</exclusion>
<exclusion>
<artifactId>xml-apis</artifactId>
<groupId>xml-apis</groupId>
</exclusion>
<exclusion>
<artifactId>asm-commons</artifactId>
<groupId>org.ow2.asm</groupId>
</exclusion>
<exclusion>
<artifactId>jsp-api</artifactId>
<groupId>javax.servlet.jsp</groupId>
</exclusion>
<exclusion>
<artifactId>metrics-json</artifactId>
<groupId>io.dropwizard.metrics</groupId>
</exclusion>
<exclusion>
<artifactId>metrics-graphite</artifactId>
<groupId>io.dropwizard.metrics</groupId>
</exclusion>
<exclusion>
<artifactId>jersey-json</artifactId>
<groupId>com.sun.jersey</groupId>
</exclusion>
<exclusion>
<artifactId>commons-beanutils</artifactId>
<groupId>commons-beanutils</groupId>
</exclusion>
<exclusion>
<artifactId>metrics-core</artifactId>
<groupId>io.dropwizard.metrics</groupId>
</exclusion>
<exclusion>
<artifactId>joda-time</artifactId>
<groupId>joda-time</groupId>
</exclusion>
<exclusion>
<artifactId>htrace-core4</artifactId>
<groupId>org.apache.htrace</groupId>
</exclusion>
<exclusion>
<artifactId>jackson-xc</artifactId>
<groupId>org.codehaus.jackson</groupId>
</exclusion>
<exclusion>
<artifactId>commons-compress</artifactId>
<groupId>org.apache.commons</groupId>
</exclusion>
<exclusion>
<artifactId>curator-framework</artifactId>
<groupId>org.apache.curator</groupId>
</exclusion>
<exclusion>
<artifactId>httpclient</artifactId>
<groupId>org.apache.httpcomponents</groupId>
</exclusion>
<exclusion>
<artifactId>metrics-ganglia</artifactId>
<groupId>io.dropwizard.metrics</groupId>
</exclusion>
<exclusion>
<artifactId>metrics-jvm</artifactId>
<groupId>io.dropwizard.metrics</groupId>
</exclusion>
<exclusion>
<artifactId>activation</artifactId>
<groupId>javax.activation</groupId>
</exclusion>
<exclusion>
<artifactId>antlr-runtime</artifactId>
<groupId>org.antlr</groupId>
</exclusion>
<exclusion>
<artifactId>calcite-core</artifactId>
<groupId>org.apache.calcite</groupId>
</exclusion>
<exclusion>
<artifactId>calcite-linq4j</artifactId>
<groupId>org.apache.calcite</groupId>
</exclusion>
<exclusion>
<artifactId>jersey-server</artifactId>
<groupId>com.sun.jersey</groupId>
</exclusion>
<exclusion>
<artifactId>commons-cli</artifactId>
<groupId>commons-cli</groupId>
</exclusion>
<exclusion>
<artifactId>commons-logging</artifactId>
<groupId>commons-logging</groupId>
</exclusion>
<exclusion>
<artifactId>commons-pool</artifactId>
<groupId>commons-pool</groupId>
</exclusion>
<exclusion>
<artifactId>curator-recipes</artifactId>
<groupId>org.apache.curator</groupId>
</exclusion>
<exclusion>
<artifactId>curator-client</artifactId>
<groupId>org.apache.curator</groupId>
</exclusion>
<exclusion>
<artifactId>hadoop-client</artifactId>
<groupId>org.apache.hadoop</groupId>
</exclusion>
<exclusion>
<artifactId>hadoop-yarn-api</artifactId>
<groupId>org.apache.hadoop</groupId>
</exclusion>
<exclusion>
<artifactId>httpcore</artifactId>
<groupId>org.apache.httpcomponents</groupId>
</exclusion>
<exclusion>
<artifactId>libthrift</artifactId>
<groupId>org.apache.thrift</groupId>
</exclusion>
<exclusion>
<artifactId>xmlgraphics-commons</artifactId>
<groupId>org.apache.xmlgraphics</groupId>
</exclusion>
<exclusion>
<artifactId>zookeeper</artifactId>
<groupId>org.apache.zookeeper</groupId>
</exclusion>
<exclusion>
<artifactId>jackson-jaxrs</artifactId>
<groupId>org.codehaus.jackson</groupId>
</exclusion>
<exclusion>
<artifactId>jackson-mapper-asl</artifactId>
<groupId>org.codehaus.jackson</groupId>
</exclusion>
<exclusion>
<artifactId>jamon-runtime</artifactId>
<groupId>org.jamon</groupId>
</exclusion>
<exclusion>
<artifactId>asm</artifactId>
<groupId>org.ow2.asm</groupId>
</exclusion>
</exclusions>
</dependency>
<dependency>
<groupId>org.apache.oozie</groupId>
<artifactId>oozie-client</artifactId>
<version>${oozie.version}</version>
<exclusions>
<exclusion>
<artifactId>commons-cli</artifactId>
<groupId>commons-cli</groupId>
</exclusion>
</exclusions>
</dependency>
<dependency>
<groupId>com.jcraft</groupId>
<artifactId>jsch</artifactId>
<version>${jsch.version}</version>
</dependency>
<dependency>
<groupId>org.quartz-scheduler</groupId>
<artifactId>quartz</artifactId>
<version>${quartz.version}</version>
</dependency>
<dependency>
<groupId>org.quartz-scheduler</groupId>
<artifactId>quartz-jobs</artifactId>
<version>${quartz.version}</version>
</dependency>
<!-- 配置 -->
<dependency>
<groupId>com.typesafe</groupId>
<artifactId>config</artifactId>
<version>${config.version}</version>
</dependency>
<!-- 日志 -->
<dependency>
<groupId>org.slf4j</groupId>
<artifactId>slf4j-api</artifactId>
<version>${slf4j.version}</version>
</dependency>
<dependency>
<groupId>org.slf4j</groupId>
<artifactId>jcl-over-slf4j</artifactId>
<version>${slf4j.version}</version>
<scope>runtime</scope>
</dependency>
<dependency>
<groupId>org.apache.logging.log4j</groupId>
<artifactId>log4j-slf4j-impl</artifactId>
<version>${log4j.version}</version>
</dependency>
<dependency>
<groupId>org.slf4j</groupId>
<artifactId>slf4j-simple</artifactId>
<version>${slf4j.version}</version>
</dependency>
<dependency>
<groupId>org.apache.logging.log4j</groupId>
<artifactId>log4j-web</artifactId>
<version>${log4j.version}</version>
</dependency>
<dependency>
<groupId>org.apache.logging.log4j</groupId>
<artifactId>log4j-api</artifactId>
<version>${log4j.version}</version>
</dependency>
<dependency>
<groupId>org.apache.logging.log4j</groupId>
<artifactId>log4j-core</artifactId>
<version>${log4j.version}</version>
</dependency>
<!-- 测试 -->
<dependency>
<groupId>junit</groupId>
<artifactId>junit</artifactId>
<version>${junit.version}</version>
<scope>test</scope>
</dependency>
</dependencies>
</dependencyManagement>
<build>
<pluginManagement>
<plugins>
<plugin>
<groupId>org.apache.maven.plugins</groupId>
<artifactId>maven-compiler-plugin</artifactId>
<version>${maven-compiler-plugin.version}</version>
<configuration>
<source>1.8</source>
<target>1.8</target>
</configuration>
</plugin>
<plugin>
<groupId>org.mybatis.generator</groupId>
<artifactId>mybatis-generator-maven-plugin</artifactId>
<version>${mybatis-generator-maven-plugin.version}</version>
<configuration>
<verbose>true</verbose>
<overwrite>true</overwrite>
</configuration>
</plugin>
<plugin>
<groupId>org.apache.maven.plugins</groupId>
<artifactId>maven-surefire-plugin</artifactId>
<version>${maven-surefire-plugin.version}</version>
<configuration>
<argLine>-Dfile.encoding=UTF-8</argLine>
<skip>true</skip>
</configuration>
</plugin>
<plugin>
<groupId>org.apache.maven.plugins</groupId>
<artifactId>maven-war-plugin</artifactId>
<version>${maven-war-plugin.version}</version>
<configuration>
<failOnMissingWebXml>false</failOnMissingWebXml>
</configuration>
</plugin>
<plugin>
<groupId>org.eclipse.jetty</groupId>
<artifactId>jetty-maven-plugin</artifactId>
<version>${jetty-maven-plugin.version}</version>
<configuration>
<scanIntervalSeconds>10</scanIntervalSeconds>
<httpConnector>
<port>8090</port>
</httpConnector>
<webApp>
<contextPath>/${project.name}</contextPath>
</webApp>
</configuration>
</plugin>
<plugin>
<groupId>org.apache.maven.plugins</groupId>
<artifactId>maven-shade-plugin</artifactId>
<version>${maven-shade-plugin.version}</version>
<configuration>
<filters>
<filter>
<artifact>*:*</artifact>
<excludes>
<exclude>META-INF/*.SF</exclude>
<exclude>META-INF/*.tld</exclude>
<exclude>META-INF/*.xml</exclude>
<exclude>META-INF/*.txt</exclude>
<exclude>META-INF/*.properties</exclude>
<exclude>META-INF/*.RSA</exclude>
<exclude>META-INF/*.DSA</exclude>
</excludes>
</filter>
</filters>
<createSourcesJar>false</createSourcesJar>
<minimizeJar>false</minimizeJar>
<shadedArtifactAttached>false</shadedArtifactAttached>
</configuration>
<executions>
<execution>
<id>shade-jar</id>
<phase>package</phase>
<goals>
<goal>shade</goal>
</goals>
</execution>
</executions>
</plugin>
<plugin>
<groupId>net.alchim31.maven</groupId>
<artifactId>scala-maven-plugin</artifactId>
<version>${scala-compiler-plugin.version}</version>
<executions>
<execution>
<id>scala-compile-first</id>
<phase>process-resources</phase>
<goals>
<goal>compile</goal>
</goals>
</execution>
<execution>
<id>scala-test-compile-first</id>
<phase>process-test-resources</phase>
<goals>
<goal>testCompile</goal>
</goals>
</execution>
<execution>
<id>attach-scaladocs</id>
<phase>verify</phase>
<goals>
<goal>doc-jar</goal>
</goals>
</execution>
</executions>
</plugin>
<plugin>
<groupId>org.apache.maven.plugins</groupId>
<artifactId>maven-assembly-plugin</artifactId>
<version>${maven-assembly-plugin.version}</version>
<configuration>
<descriptorRefs>
<descriptorRef>jar-with-dependencies</descriptorRef>
</descriptorRefs>
<archive>
<manifest>
<mainClass>YOUR_MAIN_CLASS</mainClass>
</manifest>
</archive>
</configuration>
<executions>
<execution>
<id>make-assembly</id>
<phase>package</phase>
<goals>
<goal>single</goal>
</goals>
</execution>
</executions>
</plugin>
</plugins>
</pluginManagement>
</build>
<repositories>
<repository>
<id>ali-repo</id>
<name>ali-repo</name>
<url>http://maven.aliyun.com/nexus/content/groups/public/</url>
<layout>default</layout>
</repository>
<repository>
<id>mvn-repo</id>
<name>mvn-repo</name>
<url>https://mvnrepository.com</url>
</repository>
<repository>
<id>cdh-repo</id>
<name>cdh-repo</name>
<url>https://repository.cloudera.com/artifactory/cloudera-repos/</url>
</repository>
<repository>
<id>hdp-repo</id>
<name>hdp-repo</name>
<url>http://repo.hortonworks.com/content/groups/public/</url>
</repository>
</repositories>
</project>
Hive2HBase
1.pom文件配置
<?xml version="1.0" encoding="UTF-8"?>
<project xmlns="http://maven.apache.org/POM/4.0.0"
xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd">
<parent>
<artifactId>tags</artifactId>
<groupId>cn.itcast</groupId>
<version>0.0.1-SNAPSHOT</version>
</parent>
<modelVersion>4.0.0</modelVersion>
<artifactId>tag_model_new</artifactId>
<version>0.0.1</version>
<dependencies>
<dependency>
<groupId>org.scala-lang</groupId>
<artifactId>scala-library</artifactId>
</dependency>
<dependency>
<groupId>org.apache.spark</groupId>
<artifactId>spark-core_2.11</artifactId>
</dependency>
<dependency>
<groupId>org.apache.spark</groupId>
<artifactId>spark-sql_2.11</artifactId>
</dependency>
<dependency>
<groupId>org.apache.spark</groupId>
<artifactId>spark-mllib_2.11</artifactId>
</dependency>
<dependency>
<groupId>org.apache.spark</groupId>
<artifactId>spark-hive_2.11</artifactId>
</dependency>
<dependency>
<groupId>org.apache.hbase</groupId>
<artifactId>hbase-server</artifactId>
<exclusions>
<exclusion>
<artifactId>jersey-container-servlet-core</artifactId>
<groupId>org.glassfish.jersey.containers</groupId>
</exclusion>
<exclusion>
<artifactId>guice-servlet</artifactId>
<groupId>com.google.inject.extensions</groupId>
</exclusion>
</exclusions>
</dependency>
<dependency>
<groupId>org.apache.hbase</groupId>
<artifactId>hbase-mapreduce</artifactId>
</dependency>
<dependency>
<groupId>mysql</groupId>
<artifactId>mysql-connector-java</artifactId>
</dependency>
<dependency>
<groupId>org.slf4j</groupId>
<artifactId>slf4j-api</artifactId>
</dependency>
<dependency>
<groupId>org.slf4j</groupId>
<artifactId>slf4j-simple</artifactId>
</dependency>
</dependencies>
<build>
<plugins>
<plugin>
<groupId>org.apache.maven.plugins</groupId>
<artifactId>maven-compiler-plugin</artifactId>
</plugin>
<plugin>
<groupId>net.alchim31.maven</groupId>
<artifactId>scala-maven-plugin</artifactId>
</plugin>
<plugin>
<groupId>org.apache.maven.plugins</groupId>
<artifactId>maven-assembly-plugin</artifactId>
</plugin>
</plugins>
</build>
</project>
2.hive2hbase编写实践
package cn.itcast.model.utils
import org.apache.hadoop.conf.Configuration
import org.apache.hadoop.fs.Path
import org.apache.hadoop.hbase.client._
import org.apache.hadoop.hbase.io.ImmutableBytesWritable
import org.apache.hadoop.hbase.mapred.TableOutputFormat
import org.apache.hadoop.hbase.mapreduce.HFileOutputFormat2
import org.apache.hadoop.hbase.tool.LoadIncrementalHFiles
import org.apache.hadoop.hbase.util.Bytes
import org.apache.hadoop.hbase.{HBaseConfiguration, KeyValue, TableName}
import org.apache.hadoop.mapreduce.Job
import org.apache.spark.sql.{Dataset, Row, SparkSession}
object Hive2HBase {
val defaultHBaseCF = "default"
val hfilePath = "/user/admin/data_extra/hfile_cache"
val hbaseDefaultNameSpace = "default"
/**
* 要加载五张表, tbl_goods, tbl_goods_new, tbl_logs, tbl_orders, tbl_users
* 这五张表都需要处理, 所以, 可能要写五个程序去抽取
* 优化点1: 通过 args 传入表名, 和相应的表中的主键
*
* @param args db.table 1. 数据库名, 2. 表名, 3. 表中的主键列名
*/
def main(args: Array[String]): Unit = {
// 校验 参数
// spark-submit --class --master jarPath arg1 arg2 arg3
if (args.length < 3) {
return
}
val sourceDbName = args(0)
val sourceTableName = args(1)
val sourceKeyFieldName = args(2)
// 1. 创建配置
val hadoopConfig = HBaseConfiguration.create()
// 1.1. 配置: HBase 中落地的那张表的表名, Output TableName
hadoopConfig.set(TableOutputFormat.OUTPUT_TABLE, sourceTableName)
// 1.2. 配置: HFile 中写入时的表名
hadoopConfig.set("hbase.mapreduce.hfileoutputformat.table.name", sourceTableName)
// 无论是 Map 还是 Reduce 其实处理的都是 KV
// 1.3. Job配置: 任务执行的信息, Key 的类型
val job = Job.getInstance(hadoopConfig)
job.setMapOutputKeyClass(classOf[ImmutableBytesWritable])
// 1.4. Job配置: 配置 Value 的类型
job.setMapOutputValueClass(classOf[KeyValue])
val currentHFilePath = s"$hfilePath/$sourceTableName"
hive2HFile(sourceDbName, sourceTableName, sourceKeyFieldName, currentHFilePath, hadoopConfig)
hfile2HBase(job, sourceTableName, currentHFilePath)
}
/**
* 通过 Spark 读取 Hive 表加载数据
* 并且, 落地成为 HFile 格式
*
* 来源: Hive 表, sourceDbName.sourceTableName
* 落地: HDFS 当中的 HFile
*/
def hive2HFile(db: String, table: String, keyField: String, hfilePath: String, config: Configuration): Unit = {
// 创建 SparkSession
// 注意点: 不能指定 Master, spark-submit 提交的时候, 会指定 Master
val spark = SparkSession.builder()
.appName("extra data from hive to hfile")
.enableHiveSupport()
.getOrCreate()
// 1. 加载数据
// type DataFrame = Dataset[Row]
val source: Dataset[Row] = spark.read.table(s"$db.$table")
// 2. 处理数据, (ImmutableBytesWritable, KeyValue)
val transfer = source.rdd
// 因为 map 是对一整行数据进行转换, 但是最终的出口应该多个单元格的数据 KV
// 所以使用 flatMap, row -> convert -> multi cell KV
.filter(row => row.getAs(keyField) != null)
.sortBy(row => s"${row.getAs(keyField)}")
.flatMap(row => {
// HBase 中所有数据类型都是 Bytes
// 取得 RowKey 的值
val rowKeyBytes = Bytes.toBytes(s"${row.getAs(keyField)}")
// 把 row 拆开, 转换为 cells 形式, 多个单元格, 对每个单元格处理, 转成 KV 形式出去
val hbaseKV: Seq[(ImmutableBytesWritable, KeyValue)] = row.schema
.filter(field => row.getAs(field.name) != null)
.sortBy(field => field.name)
.map(field => {
// 取得列名, 将列名转为 KV 形式
// 1. 获取 row 的每一个单元格的 field name 和 value
val fieldNameBytes = Bytes.toBytes(field.name)
val valueBytes = Bytes.toBytes(s"${row.getAs(field.name)}")
// 2. 生成 KV 返回
val kv = new KeyValue(rowKeyBytes, Bytes.toBytes(defaultHBaseCF), fieldNameBytes, valueBytes)
(new ImmutableBytesWritable(rowKeyBytes), kv)
})
hbaseKV
})
// 3. 落地, HDFS -> HFile
// HBase 提供了表的访问模式, 但是 HBase 底层存储的数据结构是 KV, K = rowkey:cf:c V=value
// 所以, 我们要去将数据处理成 KV 的形式, 再落地成 HFile
// 所以, 最终需要的数据格式是 (K, KV) => (RowKey, (RowKey, Value)) => (ImmutableBytesWritable, KeyValue)
// (ImmutableBytesWritable, KeyValue)
transfer.saveAsNewAPIHadoopFile(
hfilePath,
classOf[ImmutableBytesWritable],
classOf[KeyValue],
classOf[HFileOutputFormat2],
config
)
}
/**
* 通过 HBase 的 API, 将 HFile 数据加载到 HBase 中
*/
def hfile2HBase(job: Job, table: String, hfilePath: String): Unit = {
// 1. 配置, connection, admin
val connection = ConnectionFactory.createConnection(job.getConfiguration)
val admin = connection.getAdmin
// 2. 判断表是否存在
val tableName = TableName.valueOf(Bytes.toBytes(hbaseDefaultNameSpace), Bytes.toBytes(table))
if (!admin.tableExists(tableName)) {
admin.createTable(
TableDescriptorBuilder.newBuilder(tableName)
.setColumnFamily(ColumnFamilyDescriptorBuilder
.newBuilder(Bytes.toBytes(defaultHBaseCF))
.build()
)
.build()
)
}
// 3. 调用 API 进行 BulkLoad
val realTable = connection.getTable(tableName)
val regionLocator = new HRegionLocator(tableName, connection.asInstanceOf[ClusterConnection])
val loader = new LoadIncrementalHFiles(job.getConfiguration)
loader.doBulkLoad(new Path(hfilePath), admin, realTable, regionLocator)
}
}
SHC(Spark HBase Connector)
1.准备工作
1.spark必须是2.4及以上版本,否则无法使用SHC
2.添加电脑本地JAVA_HOME环境变量:略
3.添加电脑本地mvn环境变量:C:\Software\apache-maven-3.6.3\bin
4.在shell黑窗口执行命令(执行1个多小时):mvn install -DskipTests
5.把hdfs-site.xml,core-site.xml和hbase-site.xml放到项目的resources目录下
2.pom文件配置
<?xml version="1.0" encoding="UTF-8"?>
<project xmlns="http://maven.apache.org/POM/4.0.0"
xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd">
<parent>
<artifactId>tags</artifactId>
<groupId>cn.itcast</groupId>
<version>0.0.1-SNAPSHOT</version>
</parent>
<modelVersion>4.0.0</modelVersion>
<artifactId>tag_model_new</artifactId>
<version>0.0.1</version>
<dependencies>
<dependency>
<groupId>org.scala-lang</groupId>
<artifactId>scala-library</artifactId>
</dependency>
<dependency>
<groupId>org.apache.spark</groupId>
<artifactId>spark-core_2.11</artifactId>
</dependency>
<dependency>
<groupId>org.apache.spark</groupId>
<artifactId>spark-sql_2.11</artifactId>
</dependency>
<dependency>
<groupId>org.apache.spark</groupId>
<artifactId>spark-mllib_2.11</artifactId>
</dependency>
<dependency>
<groupId>org.apache.spark</groupId>
<artifactId>spark-hive_2.11</artifactId>
</dependency>
<dependency>
<groupId>org.apache.hbase</groupId>
<artifactId>hbase-server</artifactId>
<exclusions>
<exclusion>
<artifactId>jersey-container-servlet-core</artifactId>
<groupId>org.glassfish.jersey.containers</groupId>
</exclusion>
<exclusion>
<artifactId>guice-servlet</artifactId>
<groupId>com.google.inject.extensions</groupId>
</exclusion>
</exclusions>
</dependency>
<dependency>
<groupId>org.apache.hbase</groupId>
<artifactId>hbase-mapreduce</artifactId>
</dependency>
<dependency>
<groupId>com.hortonworks</groupId>
<artifactId>shc-core</artifactId>
</dependency>
<dependency>
<groupId>com.typesafe</groupId>
<artifactId>config</artifactId>
</dependency>
<dependency>
<groupId>mysql</groupId>
<artifactId>mysql-connector-java</artifactId>
</dependency>
<dependency>
<groupId>org.slf4j</groupId>
<artifactId>slf4j-api</artifactId>
</dependency>
<dependency>
<groupId>org.slf4j</groupId>
<artifactId>slf4j-simple</artifactId>
</dependency>
</dependencies>
<build>
<plugins>
<plugin>
<groupId>org.apache.maven.plugins</groupId>
<artifactId>maven-compiler-plugin</artifactId>
</plugin>
<plugin>
<groupId>net.alchim31.maven</groupId>
<artifactId>scala-maven-plugin</artifactId>
</plugin>
<plugin>
<groupId>org.apache.maven.plugins</groupId>
<artifactId>maven-assembly-plugin</artifactId>
</plugin>
</plugins>
</build>
</project>
3.SHC编写实践
package cn.itcast.model.utils
import org.apache.spark.sql.{DataFrame, SparkSession}
import org.apache.spark.sql.execution.datasources.hbase.HBaseTableCatalog
object ShcTest {
def main(args: Array[String]): Unit = {
// 1. 读取 HBase 表
// 1.1. 配置 HBase 表结构, SHC 中配置使用 JSON 的形式来配置的
// table 指定的是要读取或者要写入的哪一张 HBase 表
// rowkey 指定 HBase 中 RowKey 所在的列
// 注意点: 指定了 RowKey 的话, 这个 RowKey 一定要出现在 DataFrame 中, 所以 Columns 中, 一定要配置 RowKey 列
// columns 指定了要出现在 DataFrame 中的所有列
// columns 是一个 JSON 对象, 属性名是出现在 DataFrame 中名字, 属性值就是 HBase 中的列
val catalog =
s"""
|{
| "table": {
| "namespace": "default", "name": "tbl_users"
| },
| "rowkey": "id",
| "columns": {
| "id": {"cf": "rowkey", "col": "id", "type": "string"},
| "userName": {"cf": "default", "col": "username", "type": "string"}
| }
|}
|""".stripMargin
// 1.2. 使用 Spark 连接 HBase
val spark = SparkSession.builder()
.appName("shc test")
.master("local[5]")
.getOrCreate()
val source: DataFrame = spark.read
.option(HBaseTableCatalog.tableCatalog, catalog)
.format("org.apache.spark.sql.execution.datasources.hbase")
.load()
source.show()
// 2. 写入 HBase 表
val catalogWrite =
s"""
|{
| "table": {
| "namespace": "default", "name": "tbl_test1"
| },
| "rowkey": "id",
| "columns": {
| "id": {"cf": "rowkey", "col": "id", "type": "string"},
| "userName": {"cf": "default", "col": "username", "type": "string"}
| }
|}
|""".stripMargin
source.write
.option(HBaseTableCatalog.tableCatalog, catalogWrite)
.option(HBaseTableCatalog.newTable, "5")
.format("org.apache.spark.sql.execution.datasources.hbase")
.save()
}
}
更多推荐
所有评论(0)