阅读量:0
目录
项目上需要一个app,但是他们没有公网服务器,所以就在自家公网服务器开了一个mysql,项目上的服务器是能访问外网的,所以canal完美适配了这个需求
原理简介:canal服务端模拟mysql主从协议伪装成从数据库,从而读取主库的binlog,我们使用canal客户端自定义数据同步规则。
具体步骤
一、打开mysql的binlog
1.1 打开 MySQL 配置文件 my.cnf
(通常位于 /etc/mysql/my.cnf
或 /etc/my.cnf
)并添加或修改以下设置:
[mysqld] server-id=1 log-bin=mysql-bin binlog-format=row
注意 :确保binlog-format是 row模式
1.2 重启mysql服务
具体命令根据你的服务器类型决定
1.3 验证是否生效
SHOW MASTER STATUS;
二、 部署canal 服务端(docker)
2.1 下载启动脚本(可能需要梯子)
# 下载脚本 wget https://raw.githubusercontent.com/alibaba/canal/master/docker/run.sh
2.2 启动服务
# 构建一个destination name为test的队列 sh run.sh -e canal.auto.scan=false \ -e canal.destinations=test \ -e canal.instance.master.address=127.0.0.1:3306 \ -e canal.instance.dbUsername=canal \ -e canal.instance.dbPassword=canal \ -e canal.instance.connectionCharset=UTF-8 \ -e canal.instance.tsdb.enable=true \ -e canal.instance.gtidon=false \ -e canal.instance.filter.regex=.*\\..*
参数解释:
-e canal.auto.scan=false: 关闭自动扫描数据库实例。即 Canal 不会自动检测数据库的变更实例,而是使用手动指定的配置。 -e canal.destinations=test: 设置 Canal 的目标队列名称为 test。destination 是 Canal 中用来标识不同数据源的名称。 -e canal.instance.master.address=127.0.0.1:3306: 指定主数据库的地址和端口。这里是本地 MySQL 实例,监听在 3306 端口。 -e canal.instance.dbUsername=canal: 设置连接到主数据库的用户名为 canal。这个用户名需要有足够的权限以读取 MySQL 的 binlog。 -e canal.instance.dbPassword=canal: 设置连接到主数据库的密码为 canal。这个密码需要与 dbUsername 配对,以验证用户身份。 -e canal.instance.connectionCharset=UTF-8: 设置数据库连接的字符集为 UTF-8。确保字符集正确可以避免中文字符等数据的乱码问题。 -e canal.instance.tsdb.enable=true: 启用 Canal 的时间序列数据库(TSDB)。TSDB 用于存储时间戳和位置信息,这有助于在重启时恢复复制状态。 -e canal.instance.gtidon=false: 关闭 GTID(全局事务标识符)。如果 GTID 处于关闭状态,Canal 将基于 binlog 文件和位置进行复制,而不是 GTID。 -e canal.instance.filter.regex=.*\\..*: 设置 binlog 过滤规则。这条规则表示 Canal 将监听所有数据库和所有表的变更。正则表达式 .*\\..* 匹配所有数据库(.)和表(.*)。
2.3 验证服务启动成功
docker logs <containerids>
可以看到这样的打印:
三、springboot端集成canal客户端
3.1 添加依赖 /配置
<!-- canal begin--> <dependency> <groupId>com.alibaba.otter</groupId> <artifactId>canal.client</artifactId> <version>1.1.0</version> </dependency> <dependency> <groupId>com.alibaba.otter</groupId> <artifactId>canal.protocol</artifactId> <version>1.1.0</version> </dependency> <!-- canal end-->
canal: host: 127.0.0.1 #自己的canal服务器ip port: 11111 #canal默认端口 destination: test #配置文件配置的名称 username: root password: 214365 batch: size: 100
3.2 客户端代码
import cn.hutool.json.JSONObject; import cn.hutool.json.JSONUtil; import com.alibaba.otter.canal.client.CanalConnector; import com.alibaba.otter.canal.client.CanalConnectors; import com.alibaba.otter.canal.protocol.CanalEntry; import com.alibaba.otter.canal.protocol.Message; import com.eco.db.entity.Record; import com.eco.fishway.service.RecordService; import lombok.extern.slf4j.Slf4j; import org.springframework.beans.factory.DisposableBean; import org.springframework.beans.factory.InitializingBean; import org.springframework.beans.factory.annotation.Autowired; import org.springframework.beans.factory.annotation.Value; import org.springframework.stereotype.Component; import java.net.InetSocketAddress; import java.util.List; import java.util.concurrent.ExecutorService; import java.util.concurrent.Executors; @Slf4j @Component public class CanalClient implements InitializingBean, DisposableBean { @Value("${canal.host}") private String canalHost; @Value("${canal.port}") private int canalPort; @Value("${canal.destination}") private String canalDestination; @Value("${canal.username}") private String canalUsername; @Value("${canal.password}") private String canalPassword; @Value("${canal.batch.size}") private int batchSize; private final RecordService recordService; private CanalConnector canalConnector; private ExecutorService executorService; public CanalClient(RecordService recordService) { this.recordService = recordService; } @Override public void afterPropertiesSet() throws Exception { this.canalConnector = CanalConnectors.newSingleConnector( new InetSocketAddress(canalHost, canalPort), canalDestination, canalUsername, canalPassword ); this.executorService = Executors.newSingleThreadExecutor(); this.executorService.execute(new Task()); } @Override public void destroy() throws Exception { if (executorService != null) { executorService.shutdown(); } } private class Task implements Runnable { @Override public void run() { while (true) { try { //连接 canalConnector.connect(); //订阅 canalConnector.subscribe(); while (true) { Message message = canalConnector.getWithoutAck(batchSize); // batchSize为每次获取的batchSize大小 long batchId = message.getId(); //获取批量的数量 int size = message.getEntries().size(); //如果没有数据 if (batchId == -1 || size == 0) { // log.info("无数据"); try { // 线程休眠2秒 Thread.sleep(2000); } catch (InterruptedException e) { e.printStackTrace(); } } else { // 如果有数据,处理数据 printEntry(message.getEntries()); } canalConnector.ack(batchId); } } catch (Exception e) { log.error("Error occurred when running Canal Client", e); } finally { canalConnector.disconnect(); } } } } private void printEntry(List<CanalEntry.Entry> entrys) { for (CanalEntry.Entry entry : entrys) { if (isTransactionEntry(entry)){ //开启/关闭事务的实体类型,跳过 continue; } //RowChange对象,包含了一行数据变化的所有特征 //比如isDdl 是否是ddl变更操作 sql 具体的ddl sql beforeColumns afterColumns 变更前后的数据字段等等 CanalEntry.RowChange rowChange; try { rowChange = CanalEntry.RowChange.parseFrom(entry.getStoreValue()); } catch (Exception e) { throw new RuntimeException("ERROR ## parser of eromanga-event has an error , data:" + entry.toString(), e); } //获取操作类型:insert/update/delete类型 CanalEntry.EventType eventType = rowChange.getEventType(); //打印Header信息 log.info("================》; binlog[{} : {}] , name[{}, {}] , eventType : {}", entry.getHeader().getLogfileName(), entry.getHeader().getLogfileOffset(), entry.getHeader().getSchemaName(), entry.getHeader().getTableName(), eventType); //判断是否是DDL语句 if (rowChange.getIsDdl()) { log.info("================》;isDdl: true,sql:{}", rowChange.getSql()); } log.info(rowChange.getSql()); //获取RowChange对象里的每一行数据,打印出来 for (CanalEntry.RowData rowData : rowChange.getRowDatasList()) { //如果是删除语句 if (eventType == CanalEntry.EventType.DELETE) { log.info(">>>>>>>>>> 删除 >>>>>>>>>>"); printColumnAndExecute(rowData.getBeforeColumnsList(), "DELETE"); //如果是新增语句 } else if (eventType == CanalEntry.EventType.INSERT) { log.info(">>>>>>>>>> 新增 >>>>>>>>>>"); printColumnAndExecute(rowData.getAfterColumnsList(), "INSERT"); //如果是更新的语句 } else { log.info(">>>>>>>>>> 更新 >>>>>>>>>>"); //变更前的数据 log.info("------->; before"); printColumnAndExecute(rowData.getBeforeColumnsList(), null); //变更后的数据 log.info("------->; after"); printColumnAndExecute(rowData.getAfterColumnsList(), "UPDATE"); } } } } /** * 执行数据同步 * @param columns * @param type */ private void printColumnAndExecute(List<CanalEntry.Column> columns, String type) { if(type == null){ return; } JSONObject jsonObject = new JSONObject(); for (CanalEntry.Column column : columns) { jsonObject.put(column.getName(), column.getValue()); } // 此处使用json转对象的方式进行转换 Record bean = jsonObject.toBean(Record.class); if(type.equals("INSERT")){ // 执行新增 recordService.save(bean); log.info("新增成功->{}", jsonObject.toJSONString(0)); }else if (type.equals("UPDATE")){ // 执行编辑 recordService.updateById(bean); log.info("编辑成功->{}", jsonObject.toJSONString(0)); }else if (type.equals("DELETE")){ // 执行删除 recordService.removeById(bean.getRecordId()); log.info("删除成功->{}", jsonObject.toJSONString(0)); } } /** * 判断当前entry是否为事务日志 */ private boolean isTransactionEntry(CanalEntry.Entry entry){ if(entry.getEntryType() == CanalEntry.EntryType.TRANSACTIONBEGIN){ log.info("********* 日志文件为:{}, 事务开始偏移量为:{}, 事件类型为type={}", entry.getHeader().getLogfileName(), entry.getHeader().getLogfileOffset(), entry.getEntryType() ); return true; }else if (entry.getEntryType() == CanalEntry.EntryType.TRANSACTIONEND){ log.info("********* 日志文件为:{}, 事务结束偏移量为:{}, 事件类型为type={}", entry.getHeader().getLogfileName(), entry.getHeader().getLogfileOffset(), entry.getEntryType() ); return true; }else { return false; } } }
3.3 数据同步效果
有点感叹需求就是最好的老师,但是完不成需求就不好玩了