背景
技术上的需求,要解析mysql binlog,再同步数据到elasticsearch。
一个简单的demo,使用canal接收mysql binlog日志,并且写入到kafka topic。新版的canal已经原生支持写入kafka、rocketmq,只需要配置即可,无需开发canal producer。 安装kafka、zookeeper、canal就不再重复了。
配置mysql
在[mysqld]开启binlog,并且设置binlog_format为ROW。
server-id = 1
log_bin = /var/log/mysql/mysql-bin.log
expire_logs_days = 10
max_binlog_size = 100M
binlog_format = ROW
如果使用 statement 或者 mixed format,那么binlog里面只能看到sql语句,没有对应的数据。 修改配置后重启mysql
sudo systemctl start mysql.service
为canal创建mysql访问账号
嗯,这里我挖了坑。 create user之后grant all privilege,导致报错,详见后面的解析。
(这才是正确的)
CREATE USER 'canal'@'%' IDENTIFIED BY 'canal';
GRANT SELECT, REPLICATION SLAVE, REPLICATION CLIENT ON *.* TO 'canal'@'%' IDENTIFIED BY 'canal';
配置canal
conf/canal.properties是canal server的配置文件。
修改几个重要配置即可
# binlog filter config
canal.instance.filter.druid.ddl = true
canal.instance.filter.query.dcl = false
canal.instance.filter.query.dml = false
canal.instance.filter.query.ddl = false
canal.zkServers = 127.0.0.1
# tcp, kafka, RocketMQ
canal.serverMode = kafka
canal.mq.servers = 127.0.0.1:9092
conf/example下面的是canal client配置。
打开instace.properties
# position info
canal.instance.master.address=127.0.0.1:3306
# username/password
canal.instance.dbUsername=canal
canal.instance.dbPassword=canal
# table regex
#canal.instance.filter.regex=.*\\..*
canal.instance.filter.regex=user\\.t_person
# mq config
canal.mq.topic=binlog-test
包括了mysql服务器地址、canal连接mysql的账号、要同步的表、kafka topic等配置。
更多的配置见Canal Kafka RocketMQ QuickStart
启动canal
bin/startup.sh
canal server日志在logs/canal/,canal client日志在logs/example/。
观察发现client日志报错:
2019-11-14 16:29:57.529 [destination = example , address = /127.0.0.1:3306 , EventParser] ERROR c.a.o.c.p.inbound.mysql.rds.RdsBinlogEventParserProxy - dump address /127.0.0.1:3306 has an error, retrying. caused by
com.alibaba.otter.canal.parse.exception.CanalParseException: command : 'show master status' has an error!
Caused by: java.io.IOException: ErrorPacket [errorNumber=1227, fieldCount=-1, message=Access denied; you need (at least one of) the SUPER, REPLICATION CLIENT privilege(s) for this operation, sqlState=42000, sqlStateMarker=#]
with command: show master status
at com.alibaba.otter.canal.parse.driver.mysql.MysqlQueryExecutor.query(MysqlQueryExecutor.java:61) ~[canal.parse.driver-1.1.4.jar:na]
at com.alibaba.otter.canal.parse.inbound.mysql.MysqlConnection.query(MysqlConnection.java:106) ~[canal.parse-1.1.4.jar:na]
因为canal server伪装为mysql slave,读取binlog二进制流来解析,因此需要replication的权限。
GRANT SELECT, REPLICATION SLAVE, REPLICATION CLIENT ON *.* TO 'canal'@'%' IDENTIFIED BY 'canal';
重新grant之后,日志如下
2019-11-14 16:32:48.276 [destination = example , address = /127.0.0.1:3306 , EventParser] WARN c.a.o.c.p.inbound.mysql.rds.RdsBinlogEventParserProxy - ---> begin to find start position, it will be long time for reset or first position
2019-11-14 16:32:48.277 [destination = example , address = /127.0.0.1:3306 , EventParser] WARN c.a.o.c.p.inbound.mysql.rds.RdsBinlogEventParserProxy - prepare to find start position just show master status
2019-11-14 16:32:49.299 [destination = example , address = /127.0.0.1:3306 , EventParser] WARN c.a.o.c.p.inbound.mysql.rds.RdsBinlogEventParserProxy - ---> find start position successfully, EntryPosition[included=false,journalName=mysql-bin.000001,position=4,serverId=1,gtid=<null>,timestamp=1573720110000] cost : 1014ms , the next step is binlog dump
插入一条sql
INSERT INTO `user`.`t_person` (`id`, `name`, `age`) VALUES ('1', '22', '3');
因为配置canal client写入kafka topic,直接查看
bin/kafka-console-consumer.sh --bootstrap-server localhost:9092 --topic binlog-test --from-beginning
{"data":null,"database":"","es":1573720366000,"id":1,"isDdl":false,"mysqlType":null,"old":null,"pkNames":null,"sql":"GRANT SELECT, REPLICATION SLAVE, REPLICATION CLIENT ON *.* TO 'canal'@'%' IDENTIFIED WITH 'mysql_native_password' AS '*E3619321C1A937C46A0D8BD1DAC39F93B27D4458'","sqlType":null,"table":"","ts":1573720369373,"type":"QUERY"}
{"data":[{"id":"1","name":"22","age":"3"}],"database":"user","es":1573721530000,"id":2,"isDdl":false,"mysqlType":{"id":"int(11)","name":"varchar(255)","age":"int(11)"},"old":null,"pkNames":["id"],"sql":"","sqlType":{"id":4,"name":12,"age":4},"table":"t_person","ts":1573721530702,"type":"INSERT"}
第一条是grant的记录,是dcl语句。
第二条是刚刚插入的,是dml语句。
因为使用了canal.properties的默认配置,都放行了。后续可以在# binlog filter config加上过滤。
尝试delete效果
# mysql
INSERT INTO `user`.`t_person` (`id`, `name`, `age`) VALUES ('2', '33', '44');
DELETE FROM `user`.`t_person` WHERE ID > -1;
# kafka topic
{"data":[{"id":"1","name":"22","age":"3"},{"id":"2","name":"33","age":"44"}],"database":"user","es":1573736190000,"id":4,"isDdl":false,"mysqlType":{"id":"int(11)","name":"varchar(255)","age":"int(11)"},"old":null,"pkNames":["id"],"sql":"","sqlType":{"id":4,"name":12,"age":4},"table":"t_person","ts":1573736190235,"type":"DELETE"}
批量查询可以看到每条受影响的记录。这里要小心,如果批量删除的数据很大,会导致msg body很大。
考虑在应用层加上limit多删除几次。
dcl,dml,ddl
canal可以根据sql语句类型,过滤binlog日志
- DML(data manipulation language)。操作数据相关,例如CRUD。
- DDL(data definition language)。table相关,例如create、drop、alter。
- DCL(Data Control Language)。权限、角色相关,例如grant。