大数据平台实时数仓从0到1搭建之 - 13 Canal
大数据平台实时数仓从0到1搭建之 - 13 Canal概述Canalquickstart修改MariaDB配置Docker下载canal-server启动canal-server客户端输出收尾附客户端代码概述今天学习下Mysql数据库的实时数据采集,看着网上比较流行的有阿里的Canal和Maxwell,看着GitHub上还是用Canal的的人多一点,可能是因为阿里的开源项目。不过,工具不在乎背景,
大数据平台实时数仓从0到1搭建之 - 13 Canal
概述
今天学习下Mysql数据库的实时数据采集,看着网上比较流行的有阿里的
Canal
和Maxwell
,看着GitHub上还是用Canal的的人多一点,可能是因为阿里的开源项目。不过,工具不在乎背景,适合自己的才是最好的。
初次认识到这两个工具,先从实践来看下两个工具的相似和不同。
Maxwell官网:http://maxwells-daemon.io/
Maxwell GitHub:https://github.com/zendesk/maxwell
Canal
从GitHub主页来看,Canal主要用途是基于 MySQL 数据库增量日志解析,提供增量数据的订阅和消费。
当前的 canal 支持源端 MySQL 版本包括 5.1.x , 5.5.x , 5.6.x , 5.7.x , 8.0.x
GitHub首页:https://github.com/alibaba/canal
Docker QuickStart:https://github.com/alibaba/canal/wiki/Docker-QuickStart
Canal Kafka RocketMQ QuickStart:https://github.com/alibaba/canal/wiki/Canal-Kafka-RocketMQ-QuickStart
参数配置AdminGuide:https://github.com/alibaba/canal/wiki/AdminGuide
quickstart
Canal的配置特别多,详细配置参考
AdminGuide
修改MariaDB配置
修改配置
数据的配置在上一篇maxwell已经配置过了,canal和maxwell都是模拟slave,配置一样
添加Canal用户,并授权
[root@server111 ~]# mysql -uroot -proot
MariaDB [(none)]> CREATE USER canal IDENTIFIED BY 'canal';
Query OK, 0 rows affected (0.002 sec)
MariaDB [(none)]> GRANT SELECT, REPLICATION SLAVE, REPLICATION CLIENT ON *.* TO 'canal'@'%';
Query OK, 0 rows affected (0.001 sec)
MariaDB [(none)]> FLUSH PRIVILEGES;
Query OK, 0 rows affected (0.001 sec)
MariaDB [(none)]> show grants for 'canal' ;
+----------------------------------------------------------------------------------------------------------------------------------------------+
| Grants for canal@% |
+----------------------------------------------------------------------------------------------------------------------------------------------+
| GRANT SELECT, REPLICATION SLAVE, REPLICATION CLIENT ON *.* TO `canal`@`%` IDENTIFIED BY PASSWORD '*E3619321C1A937C46A0D8BD1DAC39F93B27D4458' |
+----------------------------------------------------------------------------------------------------------------------------------------------+
1 row in set (0.003 sec)
MariaDB [(none)]>
Docker下载canal-server
[root@server113 ~]# docker pull canal/canal-server:v1.1.5
[root@server113 ~]# docker images
REPOSITORY TAG IMAGE ID CREATED SIZE
zendesk/maxwell latest f46af4abbe00 4 months ago 888MB
canal/canal-server v1.1.5 0c7f1d62a7d8 5 months ago 874MB
docker启动脚本,GitHub给的wget地址访问不了,直接cp的代码,https://github.com/alibaba/canal/blob/master/docker/run.sh
执行run.sh,有对应的Usage,-e参数里可以指定以前canal.properties/instance.properties里的所有配置的key和value,canal-server启动时会有限读取-e指定的变量
[root@server113 ~]# wget https://raw.githubusercontent.com/alibaba/canal/master/docker/run.sh
--2021-10-07 10:02:04-- https://raw.githubusercontent.com/alibaba/canal/master/docker/run.sh
正在解析主机 raw.githubusercontent.com (raw.githubusercontent.com)... 185.199.110.133, 185.199.111.133, 185.199.108.133, ...
正在连接 raw.githubusercontent.com (raw.githubusercontent.com)|185.199.110.133|:443... ^C
[root@server113 ~]# vim run.sh
[root@server113 ~]# chmod +x run.sh
[root@server113 ~]# ./run.sh
Usage:
run.sh [CONFIG]
example 1 :
run.sh -e canal.instance.master.address=127.0.0.1:3306 \
-e canal.instance.dbUsername=canal \
-e canal.instance.dbPassword=canal \
-e canal.instance.connectionCharset=UTF-8 \
-e canal.instance.tsdb.enable=true \
-e canal.instance.gtidon=false \
-e canal.instance.filter.regex=.*\\\..*
example 2 :
run.sh -e canal.admin.manager=127.0.0.1:8089 \
-e canal.admin.port=11110 \
-e canal.admin.user=admin \
-e canal.admin.passwd=4ACFE3202A5FF5CF467898FC58AAB1D615029441
启动canal-server
启动zk,kafka
[root@server110 ~]# /opt/zk.sh start
ZooKeeper JMX enabled by default
Using config: /opt/modules/zookeeper-3.5.8/bin/../conf/zoo.cfg
Starting zookeeper ... STARTED
----------server110 zookeeper start-----------------
ZooKeeper JMX enabled by default
Using config: /opt/modules/zookeeper-3.5.8/bin/../conf/zoo.cfg
Starting zookeeper ... STARTED
----------server111 zookeeper start-----------------
ZooKeeper JMX enabled by default
Using config: /opt/modules/zookeeper-3.5.8/bin/../conf/zoo.cfg
Starting zookeeper ... STARTED
----------server112 zookeeper start-----------------
[root@server110 ~]# /opt/kafka.sh start
--------kafka server110 start------------------
--------kafka server111 start------------------
--------kafka server112 start------------------
启动Canal
#修改启动脚本最下方,默认使用latest版本,添加了【:v1.1.5】
[root@server113 ~]# vim run.sh
cmd="docker run -d -it -h $LOCALHOST $CONFIG --name=canal-server $VOLUMNS $NET_MODE $PORTS $MEMORY canal/canal-server:v1.1.5"
[root@server113 ~]# ./run.sh \
-e canal.instance.master.address=192.168.1.111:3306 \
-e canal.instance.dbUsername=canal \
-e canal.instance.dbPassword=canal \
-e canal.serverMode=kafka \
-e canal.mq.servers=192.168.1.110:9092 \
-e canal.mq.topic=canal
[root@server113 ~]# docker ps -a #查看所以容器
CONTAINER ID IMAGE COMMAND CREATED STATUS PORTS NAMES
6a9ee592f037 canal/canal-server:v1.1.5 "/alidata/bin/main.s…" 32 seconds ago Up 31 seconds canal-server
客户端输出
使用的flink客户端直接获取的kafka数据。
客户端乱码了,看到mq有个配置canal.mq.flatMessage
,默认为false,需要另外解码,因为我使用flink客户端,直接获取kafka的数据,所以客户端就不使用canal了,修改该配置为true,获取数据为json格式
[root@server113 ~]# ./run.sh \
-e canal.instance.master.address=192.168.1.111:3306 \
-e canal.instance.dbUsername=canal \
-e canal.instance.dbPassword=canal \
-e canal.serverMode=kafka \
-e canal.mq.servers=192.168.1.110:9092 \
-e canal.mq.topic=canal \
-e canal.mq.flatMessage=true
###以下是输出###
docker run -d -it -h 192.168.1.113 -e canal.instance.master.address=192.168.1.111:3306 -e canal.instance.dbUsername=canal -e canal.instance.dbPassword=canal -e canal.serverMode=kafka -e canal.mq.servers=192.168.1.110:9092 -e canal.mq.flatMessage=false -e canal.mq.topic=
canal -e canal.mq.flatMessage=true --name=canal-server --net=host -m 4096m canal/canal-server:v1.1.55ecfcba21a707f4806d1a1e06c2c919034a423c9d5a04473e65851f5a971e56d
--insert 语句,得到如下json
insert into test values (4,'canal');
{
"data": null,
"database": "",
"es": 1633574967000,
"id": 2,
"isDdl": false,
"mysqlType": null,
"old": null,
"pkNames": null,
"sql": "insert into test values (4,'canal')",
"sqlType": null,
"table": "",
"ts": 1633574968133,
"type": "QUERY"
} {
"data": [{
"id": "4",
"name": "canal"
}],
"database": "test",
"es": 1633574967000,
"id": 2,
"isDdl": false,
"mysqlType": {
"id": "int(11)",
"name": "varchar(255)"
},
"old": null,
"pkNames": null,
"sql": "",
"sqlType": {
"id": 4,
"name": 12
},
"table": "test",
"ts": 1633574968133,
"type": "INSERT"
}
--update 语句 得到如下json
update test set name='canal-server' where id = 4
{
"data": null,
"database": "",
"es": 1633575442000,
"id": 5,
"isDdl": false,
"mysqlType": null,
"old": null,
"pkNames": null,
"sql": "update test set name='canal-server' where id = 4",
"sqlType": null,
"table": "",
"ts": 1633575443450,
"type": "QUERY"
} {
"data": [{
"id": "4",
"name": "canal-server"
}],
"database": "test",
"es": 1633575442000,
"id": 5,
"isDdl": false,
"mysqlType": {
"id": "int(11)",
"name": "varchar(255)"
},
"old": [{
"name": "canal"
}],
"pkNames": null,
"sql": "",
"sqlType": {
"id": 4,
"name": 12
},
"table": "test",
"ts": 1633575443450,
"type": "UPDATE"
}
收尾
Canal-server提供增量数据的订阅和消费,可供用户配置项较丰富,文档特别详细。
canal监听每个操作,输出两条json,一条sql语句,一条实际数据
附客户端代码
package com.z
import org.apache.flink.api.common.serialization.SimpleStringSchema
import org.apache.flink.streaming.api.scala._
import org.apache.flink.streaming.connectors.kafka.FlinkKafkaConsumer
import java.util.Properties
/**
* @author wenzheng.ma
* @date 2021-10-07 10:40
* @desc
*/
object CanalDemo {
def main(args: Array[String]): Unit = {
val env = StreamExecutionEnvironment.getExecutionEnvironment
//设置并行度1
env.setParallelism(1)
//topic
val topic = "canal"
//kafka的配置信息
val prop = new Properties()
prop.setProperty("bootstrap.servers", "server110:9092,server111:9092,server112:9092")
prop.setProperty("group.id", "test-canal-group")
//创建kafka数据源
val kafka = new FlinkKafkaConsumer[String](topic, new SimpleStringSchema(), prop)
//添加kafka数据源
val inputStream = env.addSource(kafka)
//打印结果
inputStream.print()
//阻塞进程,一直等待数据
env.execute()
}
}
更多推荐
所有评论(0)