侧边栏壁纸
博主头像
再见理想博主等级

只争朝夕,不负韶华

  • 累计撰写 112 篇文章
  • 累计创建 64 个标签
  • 累计收到 4 条评论

目 录CONTENT

文章目录

Canal-开源数据同步工具

再见理想
2022-09-07 / 0 评论 / 0 点赞 / 975 阅读 / 1,307 字

一,前言

如今大型的 IT 系统中,都会使用分布式的方式,同时会有非常多的中间件,如 redis、消息队列、大数据存储等,但是实际核心的数据存储依然是存储在数据库,作为使用最广泛的数据库,如何将 mysql 的数据与中间件的数据进行同步,既能确保数据的一致性、及时性,也能做到代码无侵入的方式呢 ?
如果有这样的一个需求,数据修改后,需要及时将 mysql 中的数据更新到 MongoDB,我们会怎么进行实呢?


二,Canal 是什么

canal 是阿里巴巴的一个开源项目,基于java实现,整体已经在很多大型的互联网项目生产环境中使用,包括阿里、美团等都有广泛的应用,是一个非常成熟的数据库同步方案,基础的使用只需要进行简单的配置即可。Canal的部署也是支持集群的,需要配合ZooKeeper进行集群管理。

官网的介绍:

canal,译意为水道/管道/沟渠,主要用途是基于 MySQL 数据库增量日志解析,提供增量数据订阅和消费

canal 的工作原理就是把自己伪装成 MySQL slave,模拟 MySQL slave 的交互协议向 MySQL Mater 发送 dump 协议,MySQL mater 收到 canal 发送过来的 dump 请求,开始推送 binary log 给 canal,然后 canal 解析 binary log,再发送到存储目的地,比如 MySQL,Kafka,Elastic Search 等等。
实际项目我们是配置 MQ 模式,canal 会把数据发送到 MQ 的 topic 中,由对应消费者进行处理。
以下是 canal 原理图:


三,canal 实战

3.1,场景

MySQL 部分表的数据变动需要实时同步 MongoDB 中,达到数据的一致性。


3.2,搭建

3.2.1 MySQL 设置

目的:开启 Binlog,生成 Canal 账号。

编辑 MySQL 配置文件:

$ vim /etc/my.cnf

修改以下配置:

log-bin=mysql-bin # 开启 binlog
binlog-format=ROW # 选择 ROW 模式
server_id=1 #配置 MySQL replaction 需要定义,不要和 canal 的 slaveId 重复

重启 MySQL:

$ service mysqld restart

MySQL 检查 Binlog 开启状态:

$ show variables like 'log_%';

看到 log_bin on表示已开启。

查看 Binlog 当前日志:

$ show master status;

生成 Canal 账号:

$ CREATE USER canal的账号 IDENTIFIED BY 'canal的密码';  
$ GRANT SELECT, REPLICATION SLAVE, REPLICATION CLIENT ON *.* TO 'canal'@'%';
$ FLUSH PRIVILEGES;

3.2.2,Canal 设置

1,下载:

下载 deployer,选择合适版本,如 canal.deployer-1.1.6.tar.gz。

2,上传到服务器:

将压缩包上传的服务器 /usr/local/canal 目录下,并解压:

$ tar zxvf canal.deployer-1.1.6.tar.gz

3,设置实例参数:

$ vim  conf/example/instance.properties

修改以下配置:

# position info,需要改成自己的数据库信息
canal.instance.master.address = mysql的ip:mysql端口
# username/password,需要改成自己的数据库信息
canal.instance.dbUsername = 在MySQL中创建的Canal账号 
canal.instance.dbPassword = 在MySQL中创建的Canal密码
canal.instance.filter.regex=要监听的表,如 database.table
canal.instance.filter.field=要监听表的字段,如 database.table:userId/niceName/phone
# MQ主题
canal.mq.topic=canal_routing_key

4,设置过滤日志参数:

$ vim conf/canal.properties

修改以下配置:

#需要过滤掉ddl、dcl日志,否则都会传输到应用服务中(非常多日志)
canal.instance.filter.druid.ddl = true
canal.instance.filter.query.dcl = true
canal.instance.filter.query.dml = true
canal.instance.filter.query.ddl = true
canal.instance.filter.table.error = true
canal.instance.filter.transaction.entry = true

#不过滤增删改
canal.instance.filter.rows = false
canal.instance.filter.dml.insert = false
canal.instance.filter.dml.update = false
canal.instance.filter.dml.delete = false

5,设置MQ参数:

$ vim conf/canal.properties

修改以下配置:

canal.serverMode = rabbitMQ
rabbitmq.host = 自己MQ的ip
rabbitmq.virtual.host = 自己MQ的虚拟主机
rabbitmq.exchange = canal.exchange
rabbitmq.username = 自己MQ的用户名
rabbitmq.password = 自己MQ密码

6,启动:

$ sh bin/startup.sh

7,查看日志

$ tail -f logs/canal/canal.log
$ tail -f logs/example/example.log

3.2.3,业务处理

通过 MQ 接收消息并消费:

    @RabbitListener(bindings = {
    	@QueueBinding(
            value = @Queue(value = AmqpExchange.CANAL_QUEUE, durable = "true"),
            exchange = @Exchange(value = AmqpExchange.CANAL_EXCHANGE),
            key = AmqpExchange.CANAL_ROUTING_KEY
        )
    } )
    public void handleCanalDataChange(byte[] message) {
        String realMsg = null;
        try {
            realMsg = new String(message, "utf-8");
            JSONObject msg = JSON.parseObject(realMsg);
            String database = msg.getString("database");
            if(SynEnums.foreachEnum(database) == null){
                return;
            }
            log.info("[canal] 接收消息: {}", realMsg);
            //表名
            String  table=msg.getString("table");
            //操作类型
            String  type=msg.getString("type");
           // 业务处理...
            
        } catch (Exception e) {
            log.error("接收canal异常,数据: {}", realMsg);
        }
    }

四,总结

canal 的好处在于对业务代码没有侵入,因为是基于监听 binlog 日志去进行同步数据的,实时性也能做到准实时,其实是很多企业一种比较常见的数据同步的方案。
实际项目我们是配置 MQ 模式,canal 会把数据发送到 MQ 的 topic 中,由消费者进行处理。

0

评论区