springboot集成canal

avatar
作者
猴君
阅读量:0

目录

项目上需要一个app,但是他们没有公网服务器,所以就在自家公网服务器开了一个mysql,项目上的服务器是能访问外网的,所以canal完美适配了这个需求

原理简介:canal服务端模拟mysql主从协议伪装成从数据库,从而读取主库的binlog,我们使用canal客户端自定义数据同步规则。

具体步骤

一、打开mysql的binlog

1.1 打开 MySQL 配置文件 my.cnf(通常位于 /etc/mysql/my.cnf/etc/my.cnf)并添加或修改以下设置:
[mysqld] server-id=1 log-bin=mysql-bin binlog-format=row 

注意 :确保binlog-format是 row模式

1.2 重启mysql服务

具体命令根据你的服务器类型决定

1.3 验证是否生效
SHOW MASTER STATUS; 

二、 部署canal 服务端(docker)

2.1 下载启动脚本(可能需要梯子)
# 下载脚本 wget https://raw.githubusercontent.com/alibaba/canal/master/docker/run.sh  
2.2 启动服务
# 构建一个destination name为test的队列 sh run.sh -e canal.auto.scan=false \ 		  -e canal.destinations=test \ 		  -e canal.instance.master.address=127.0.0.1:3306  \ 		  -e canal.instance.dbUsername=canal  \ 		  -e canal.instance.dbPassword=canal  \ 		  -e canal.instance.connectionCharset=UTF-8 \ 		  -e canal.instance.tsdb.enable=true \ 		  -e canal.instance.gtidon=false  \       -e canal.instance.filter.regex=.*\\..*  

参数解释:

-e canal.auto.scan=false:  关闭自动扫描数据库实例。即 Canal 不会自动检测数据库的变更实例,而是使用手动指定的配置。 -e canal.destinations=test:  设置 Canal 的目标队列名称为 test。destination 是 Canal 中用来标识不同数据源的名称。 -e canal.instance.master.address=127.0.0.1:3306:  指定主数据库的地址和端口。这里是本地 MySQL 实例,监听在 3306 端口。 -e canal.instance.dbUsername=canal:  设置连接到主数据库的用户名为 canal。这个用户名需要有足够的权限以读取 MySQL 的 binlog。 -e canal.instance.dbPassword=canal:  设置连接到主数据库的密码为 canal。这个密码需要与 dbUsername 配对,以验证用户身份。 -e canal.instance.connectionCharset=UTF-8:  设置数据库连接的字符集为 UTF-8。确保字符集正确可以避免中文字符等数据的乱码问题。 -e canal.instance.tsdb.enable=true:  启用 Canal 的时间序列数据库(TSDB)。TSDB 用于存储时间戳和位置信息,这有助于在重启时恢复复制状态。 -e canal.instance.gtidon=false:  关闭 GTID(全局事务标识符)。如果 GTID 处于关闭状态,Canal 将基于 binlog 文件和位置进行复制,而不是 GTID。 -e canal.instance.filter.regex=.*\\..*:  设置 binlog 过滤规则。这条规则表示 Canal 将监听所有数据库和所有表的变更。正则表达式 .*\\..* 匹配所有数据库(.)和表(.*)。 
2.3 验证服务启动成功
docker logs <containerids> 

可以看到这样的打印:
image.png

三、springboot端集成canal客户端

3.1 添加依赖 /配置
<!--  canal begin--> <dependency>     <groupId>com.alibaba.otter</groupId>     <artifactId>canal.client</artifactId>     <version>1.1.0</version> </dependency> <dependency>     <groupId>com.alibaba.otter</groupId>     <artifactId>canal.protocol</artifactId>     <version>1.1.0</version> </dependency> <!--  canal end--> 
canal:   host: 127.0.0.1 #自己的canal服务器ip   port: 11111  #canal默认端口   destination: test #配置文件配置的名称   username: root   password: 214365   batch:     size: 100 
3.2 客户端代码
import cn.hutool.json.JSONObject; import cn.hutool.json.JSONUtil; import com.alibaba.otter.canal.client.CanalConnector; import com.alibaba.otter.canal.client.CanalConnectors; import com.alibaba.otter.canal.protocol.CanalEntry; import com.alibaba.otter.canal.protocol.Message; import com.eco.db.entity.Record; import com.eco.fishway.service.RecordService; import lombok.extern.slf4j.Slf4j; import org.springframework.beans.factory.DisposableBean; import org.springframework.beans.factory.InitializingBean; import org.springframework.beans.factory.annotation.Autowired; import org.springframework.beans.factory.annotation.Value; import org.springframework.stereotype.Component;  import java.net.InetSocketAddress; import java.util.List; import java.util.concurrent.ExecutorService; import java.util.concurrent.Executors;   @Slf4j @Component public class CanalClient implements InitializingBean, DisposableBean {     @Value("${canal.host}")     private String canalHost;      @Value("${canal.port}")     private int canalPort;      @Value("${canal.destination}")     private String canalDestination;      @Value("${canal.username}")     private String canalUsername;      @Value("${canal.password}")     private String canalPassword;      @Value("${canal.batch.size}")     private int batchSize;      private final RecordService recordService;      private CanalConnector canalConnector;       private ExecutorService executorService;      public CanalClient(RecordService recordService) {         this.recordService = recordService;     }      @Override     public void afterPropertiesSet() throws Exception {         this.canalConnector = CanalConnectors.newSingleConnector(             new InetSocketAddress(canalHost, canalPort),             canalDestination,             canalUsername,             canalPassword         );         this.executorService = Executors.newSingleThreadExecutor();         this.executorService.execute(new Task());     }      @Override     public void destroy() throws Exception {         if (executorService != null) {             executorService.shutdown();         }     }      private class Task implements Runnable {         @Override         public void run() {             while (true) {                 try {                     //连接                     canalConnector.connect();                     //订阅                     canalConnector.subscribe();                     while (true) {                         Message message = canalConnector.getWithoutAck(batchSize); // batchSize为每次获取的batchSize大小                         long batchId = message.getId();                         //获取批量的数量                         int size = message.getEntries().size();                          //如果没有数据                         if (batchId == -1 || size == 0) {                             // log.info("无数据");                             try {                                 // 线程休眠2秒                                 Thread.sleep(2000);                             } catch (InterruptedException e) {                                 e.printStackTrace();                             }                         } else {                             // 如果有数据,处理数据                             printEntry(message.getEntries());                         }                         canalConnector.ack(batchId);                     }                 } catch (Exception e) {                     log.error("Error occurred when running Canal Client", e);                 } finally {                     canalConnector.disconnect();                 }             }         }     }      private void printEntry(List<CanalEntry.Entry> entrys) {         for (CanalEntry.Entry entry : entrys) {             if (isTransactionEntry(entry)){                 //开启/关闭事务的实体类型,跳过                 continue;             }             //RowChange对象,包含了一行数据变化的所有特征             //比如isDdl 是否是ddl变更操作 sql 具体的ddl sql beforeColumns afterColumns 变更前后的数据字段等等             CanalEntry.RowChange rowChange;             try {                 rowChange = CanalEntry.RowChange.parseFrom(entry.getStoreValue());             } catch (Exception e) {                 throw new RuntimeException("ERROR ## parser of eromanga-event has an error , data:" + entry.toString(), e);             }             //获取操作类型:insert/update/delete类型             CanalEntry.EventType eventType = rowChange.getEventType();             //打印Header信息             log.info("================》; binlog[{} : {}] , name[{}, {}] , eventType : {}",                     entry.getHeader().getLogfileName(), entry.getHeader().getLogfileOffset(),                     entry.getHeader().getSchemaName(), entry.getHeader().getTableName(),                     eventType);             //判断是否是DDL语句             if (rowChange.getIsDdl()) {                 log.info("================》;isDdl: true,sql:{}", rowChange.getSql());             }             log.info(rowChange.getSql());             //获取RowChange对象里的每一行数据,打印出来             for (CanalEntry.RowData rowData : rowChange.getRowDatasList()) {                 //如果是删除语句                 if (eventType == CanalEntry.EventType.DELETE) {                     log.info(">>>>>>>>>> 删除 >>>>>>>>>>");                     printColumnAndExecute(rowData.getBeforeColumnsList(), "DELETE");                     //如果是新增语句                 } else if (eventType == CanalEntry.EventType.INSERT) {                     log.info(">>>>>>>>>> 新增 >>>>>>>>>>");                     printColumnAndExecute(rowData.getAfterColumnsList(), "INSERT");                     //如果是更新的语句                 } else {                     log.info(">>>>>>>>>> 更新 >>>>>>>>>>");                     //变更前的数据                     log.info("------->; before");                     printColumnAndExecute(rowData.getBeforeColumnsList(), null);                     //变更后的数据                     log.info("------->; after");                     printColumnAndExecute(rowData.getAfterColumnsList(), "UPDATE");                 }             }         }     }      /**      * 执行数据同步      * @param columns      * @param type      */     private void printColumnAndExecute(List<CanalEntry.Column> columns, String type) {         if(type == null){             return;         }         JSONObject jsonObject = new JSONObject();         for (CanalEntry.Column column : columns) {             jsonObject.put(column.getName(), column.getValue());         }         // 此处使用json转对象的方式进行转换         Record bean = jsonObject.toBean(Record.class);         if(type.equals("INSERT")){             // 执行新增             recordService.save(bean);             log.info("新增成功->{}", jsonObject.toJSONString(0));         }else if (type.equals("UPDATE")){             // 执行编辑             recordService.updateById(bean);             log.info("编辑成功->{}", jsonObject.toJSONString(0));         }else if (type.equals("DELETE")){             // 执行删除             recordService.removeById(bean.getRecordId());             log.info("删除成功->{}", jsonObject.toJSONString(0));         }     }     /**      * 判断当前entry是否为事务日志      */     private boolean isTransactionEntry(CanalEntry.Entry entry){         if(entry.getEntryType() == CanalEntry.EntryType.TRANSACTIONBEGIN){             log.info("********* 日志文件为:{}, 事务开始偏移量为:{}, 事件类型为type={}",                     entry.getHeader().getLogfileName(),                     entry.getHeader().getLogfileOffset(),                     entry.getEntryType()             );             return true;         }else if (entry.getEntryType() == CanalEntry.EntryType.TRANSACTIONEND){             log.info("********* 日志文件为:{}, 事务结束偏移量为:{}, 事件类型为type={}",                     entry.getHeader().getLogfileName(),                     entry.getHeader().getLogfileOffset(),                     entry.getEntryType()             );             return true;         }else {             return false;         }     }   } 
3.3 数据同步效果

image.png
有点感叹需求就是最好的老师,但是完不成需求就不好玩了

广告一刻

为您即时展示最新活动产品广告消息,让您随时掌握产品活动新动态!