Canal监听指定库并发送到MQ

环境

  1. CentOS Linux release 7.5.1804 (Core)
  2. openjdk version "1.8.0_212"
  3. MySql 8.0.12
  4. canal.deployer-1.1.5
  5. kafka_2.13-2.4.0 [单机]

基本的安装

参考【官方文档

基本配置参考

投递数据到Kafka,要修改的配置如下

vim conf/canal.properties
# 可选项: tcp(默认), kafka, RocketMQ
canal.serverMode = kafka # 改为Kafka
##################################################
#########            Kafka           #############
##################################################
kafka.bootstrap.servers = 127.0.0.1:9092 # 配置Kafka的服务器地址【主要】,其它的配置项,可参考官方文档
kafka.acks = all 
kafka.compression.type = none
kafka.batch.size = 16384
kafka.linger.ms = 1 
kafka.max.request.size = 1048576
kafka.buffer.memory = 33554432
kafka.max.in.flight.requests.per.connection = 1 
kafka.retries = 0 

kafka.kerberos.enable = false
kafka.kerberos.krb5.file = "../conf/kerberos/krb5.conf"
kafka.kerberos.jaas.file = "../conf/kerberos/jaas.conf"

根据以上的配置,就可以启动 canal 了,这时候,默认是监听全部的库,包括Mysql系统库

重点

监听指定的库或表,或者投递数据到指定的Topic之类的配置,要进行如下配置

conf/example/instance.properties

此文件中主要的配置项

  1. canal.instance.filter.regex 配置监听的库或表信息,支持Perl正则
  2. canal.mq.topic 默认的Topic,如果没有匹配到其它的Topic,就会投递到此
  3. canal.mq.dynamicTopic 动态配置投递数据到相应的Topic里面【MQ最好开启自动创建Topic】

【canal.instance.filter.regex】监听库配置:【官方文档

1.  所有表:.*  or  .*\\..*
2.  db001 库下所有表: db001\\..*
3.  db001 库下的以 user 打头的表[正则匹配的表]:db001\\.user.*
4.  db001 库下的一张登陆日志表[单表]:db001\\.login_log
5.  多个规则组合【必需逗号分隔】使用,分号无效:admin_db\\..*,db_cargo\\..*,db_cabin\\.tbl_cabin_order (逗号分隔)

注意:正则中的一些特殊符号,如果做为普通符号使用时,要用转义符进行转义【如 . * 等】

【canal.mq.dynamicTopic】投递动态Topic【官方文档

配置格式:schema 或 schema.table,多个配置之间使用逗号或分号分隔

例子1:canal.mq.dynamicTopic=db_name\\.table_name
指定匹配的单表,发送到以【db_name_table_name】为名字的topic上

例子2:canal.mq.dynamicTopic=.*\\..*
匹配所有表,则每个表都会发送到各自表名的topic上

例子3:canal.mq.dynamicTopic=db_name
指定匹配对应的库,一个库的所有表都会发送到库名[ db_name ]的topic上

例子4:canal.mq.dynamicTopic=test\\..*
指定匹配的表达式,针对匹配的表会发送到各自[库名_表名]的topic上

例子5:canal.mq.dynamicTopic=test,test1\\.test1
指定多个表达式,会将test库的表都发送到test的topic上,
test1\\.test1的表发送到对应的test1_test1 topic上,
其余的表发送到默认的canal.mq.topic值

上面的例子都没有指定Topic,都是根据库名或者表名生成Topic,投递到相应的Topic中。

为满足更大的灵活性,允许对匹配条件的规则指定发送的topic名字

配置格式:topicName:schema 或 topicName:schema.table

注意: 使用冒号分隔

例子1: canal.mq.dynamicTopic=topicName:db\\.table
指定匹配的单表,发送到以 topicName 为名字的topic上

例子2: canal.mq.dynamicTopic=topicName:.*\\..*
匹配所有表,因为有指定topic,则每个表都会发送到 topicName 的topic下

例子3: canal.mq.dynamicTopic=topicName:dbName
指定匹配对应的库,一个库[ dbName ]的所有表都会发送到[ topicName ]的topic下

例子4:canal.mq.dynamicTopic=topicName:dbName\\..*
指定匹配的表达式,针对匹配的表[ dbName\\..* ]会发送到[ topicName ]的topic下

例子5:canal.mq.dynamicTopic=test0:test,test1:test1\\.test1
指定多个表达式,使用逗号分隔。
会将test库的表都发送到test0的topic下
test1\\.test1的表发送到对应的test1的topic下
其余的表发送到默认的canal.mq.topic值

例子6:canal.mq.dynamicTopic=dj_.*\..*
指定1个表达式时,就只处理匹配的。
会将[dj_]开头的库的所有表都发送到[dj_库名.表名]下的topic
### 数据保存目录根据kafka的 *log.dirs* 路径配置 会创建如下格式的数据目录: 库名_表名_分区ID ![file](https://www.codelinux.cn/wp-content/uploads/2022/01/image-1684206478288.png)