「You have enabled the binary log, but you haven't provided the mandatory server-id. Please refer to the proper server」
之前我们的配置,对于5.7以下版本应该是可以的 。但对于高版本,我们需要指定server-id 。
我们给这个MySQL指定为2(只要不与其他库id重复):
server-id=2创建数据库Canal使用账号mysql> select user, host from user;+------------------+-----------+| user| host|+------------------+-----------+| root| %|| debian-sys-maint | localhost || mysql.session| localhost || mysql.sys| localhost || root| localhost |+------------------+-----------+5 rows in setCREATE USER canal IDENTIFIED BY 'xxxx';(填写密码)GRANT SELECT, REPLICATION SLAVE, REPLICATION CLIENT ON *.* TO 'canal'@'%';FLUSH PRIVILEGES;show grants for 'canal' 配置Canal服务去Github下载最近的Canal稳定版本包:
- https://github.com/alibaba/canal/releases
mkdir /tmp/canaltar zxvf canal.deployer-$version.tar.gz-C /tmp/canal配置文件设置:主要有两个文件配置,一个是conf/canal.properties一个是
conf/example/instance.properties 。
为了快速运行Demo,只修改
conf/example/instance.properties里的数据库连接账号密码即可
# username/passwordcanal.instance.dbUsername=canalcanal.instance.dbPassword=xxxxxxxcanal.instance.connectionCharset = UTF-8运行Canal服务请先确保机器上有JDK,接着运行Canal启动脚本:sh bin/startup.sh下图即成功运行:
文章插图
Java客户端代码我在秒杀系统系列文章的代码仓库里(miaosha-job)编写了如下客户端代码
仓库源码地址:
https://github.com/qqxx6661/miaosha
package job;import com.alibaba.otter.canal.client.CanalConnector;import com.alibaba.otter.canal.client.CanalConnectors;import com.alibaba.otter.canal.protocol.CanalEntry.*;import com.alibaba.otter.canal.protocol.Message;import com.google.protobuf.InvalidProtocolBufferException;import org.slf4j.Logger;import org.slf4j.LoggerFactory;import java.net.InetSocketAddress;import java.util.List;import java.util.concurrent.atomic.AtomicInteger;public class CanalClient {private static final Logger LOGGER = LoggerFactory.getLogger(CanalClient.class);public static void main(String[] args) {// 第一步:与canal进行连接CanalConnector connector = CanalConnectors.newSingleConnector(new InetSocketAddress("127.0.0.1", 11111),"example", "", "");connector.connect();// 第二步:开启订阅connector.subscribe();// 第三步:循环订阅while (true) {try {// 每次读取 1000 条Message message = connector.getWithoutAck(1000);long batchID = message.getId();int size = message.getEntries().size();if (batchID == -1 || size == 0) {LOGGER.info("当前暂时没有数据,休眠1秒");Thread.sleep(1000);} else {LOGGER.info("-------------------------- 有数据啦 -----------------------");printEntry(message.getEntries());}connector.ack(batchID);} catch (Exception e) {LOGGER.error("处理出错");} finally {try {Thread.sleep(1000);} catch (InterruptedException e) {e.printStackTrace();}}}}/*** 获取每条打印的记录*/public static void printEntry(List<Entry> entrys) {for (Entry entry : entrys) {// 第一步:拆解entry 实体Header header = entry.getHeader();EntryType entryType = entry.getEntryType();// 第二步: 如果当前是RowData,那就是我需要的数据if (entryType == EntryType.ROWDATA) {String tableName = header.getTableName();String schemaName = header.getSchemaName();RowChange rowChange = null;try {rowChange = RowChange.parseFrom(entry.getStoreValue());} catch (InvalidProtocolBufferException e) {e.printStackTrace();}EventType eventType = rowChange.getEventType();LOGGER.info(String.format("当前正在操作表 %s.%s,执行操作= %s", schemaName, tableName, eventType));// 如果是‘查询’ 或者 是 ‘DDL’ 操作,那么sql直接打出来if (eventType == EventType.QUERY || rowChange.getIsDdl()) {LOGGER.info("执行了查询语句:[{}]", rowChange.getSql());return;}// 第三步:追踪到 columns 级别rowChange.getRowDatasList().forEach((rowData) -> {// 获取更新之前的column情况List<Column> beforeColumns = rowData.getBeforeColumnsList();// 获取更新之后的 column 情况List<Column> afterColumns = rowData.getAfterColumnsList();// 当前执行的是 删除操作if (eventType == EventType.DELETE) {printColumn(beforeColumns);}// 当前执行的是 插入操作if (eventType == EventType.INSERT) {printColumn(afterColumns);}// 当前执行的是 更新操作if (eventType == EventType.UPDATE) {printColumn(afterColumns);// 进行删除缓存操作deleteCache(afterColumns, tableName, schemaName);}});}}}/*** 每个row上面的每一个column 的更改情况* @param columns*/public static void printColumn(List<Column> columns) {columns.forEach((column) -> {String columnName = column.getName();String columnValue = https://www.isolves.com/it/sjk/MYSQL/2020-06-12/column.getValue();String columnType = column.getMysqlType();// 判断 该字段是否更新boolean isUpdated = column.getUpdated();LOGGER.info(String.format("数据列:columnName=%s, columnValue=%s, columnType=%s, isUpdated=%s", columnName, columnValue, columnType, isUpdated));});}/*** 秒杀下单接口删除库存缓存*/public static void deleteCache(List
推荐阅读
-
-
-
-
-
-
-
-
-
-
-
-
-
-
-
-
-
-
-
-
- 基于MIT协议,一个开源的Python微信公众号开发框架——WeRoBot
- mysql5.7性能提升一百倍调优宝典
- IT管理者应该了解的四项开源工具
- 免费且开源的流程图工具 Draw.io
- 在 Linux 上使用开源软件创建 SDN
- 18条MySQL优化技巧
- MySQL库表设计小技巧
- MySQL数据库教程-环境与集成开发工具
- 开源技术大神总结的7条Linux核心知识点
- 一文看懂开源许可证,能不能商用再也不抓瞎
