创建一个CanalBean对象进行接收:
public class CanalBean {//数据private List<TbCommodityInfo> data;//数据库名称private String database;private long es;//递增,从1开始private int id;//是否是DDL语句private boolean isDdl;//表结构的字段类型private MysqlType mysqlType;//UPDATE语句,旧数据private String old;//主键名称private List<String> pkNames;//sql语句private String sql;private SqlType sqlType;//表名private String table;private long ts;//(新增)INSERT、(更新)UPDATE、(删除)DELETE、(删除表)ERASE等等private String type;//getter、setter方法}public class MysqlType {private String id;private String commodity_name;private String commodity_price;private String number;private String description;//getter、setter方法}public class SqlType {private int id;private int commodity_name;private int commodity_price;private int number;private int description;}最后就可以创建一个消费者CanalConsumer进行消费:
@Slf4j@Componentpublic class CanalConsumer {@Resourceprivate RedisClient redisClient;@KafkaListener(topics = "canaltopic")public void receive(ConsumerRecord<?, ?> consumer) {String value = (String) consumer.value();log.info("topic名称:{},key:{},分区位置:{},下标:{},value:{}", consumer.topic(), consumer.key(),consumer.partition(), consumer.offset(), value);//转换为JAVABeanCanalBean canalBean = JSONObject.parseobject(value, CanalBean.class);//获取是否是DDL语句boolean isDdl = canalBean.hasDdl();//获取类型String type = canalBean.getType();//不是DDL语句if (!isDdl) {List<TbCommodityInfo> tbCommodityInfos = canalBean.getData();//过期时间long TIME_OUT = 600L;if ("INSERT".equals(type)) {//新增语句for (TbCommodityInfo tbCommodityInfo : tbCommodityInfos) {String id = tbCommodityInfo.getId();log.info("新增数据到redis, id: {}, data: {}", id, JSONObject.toJSONString(tbCommodityInfo));//新增到redis中,过期时间是10分钟redisClient.setString(id, JSONObject.toJSONString(tbCommodityInfo), TIME_OUT);log.info("从redis获取数据 result: {}", JSONObject.toJSONString(redisClient.getString(id)));}} else if ("UPDATE".equals(type)) {//更新语句for (TbCommodityInfo tbCommodityInfo : tbCommodityInfos) {String id = tbCommodityInfo.getId();log.info("修改数据到redis, id: {}, data: {}", id, JSONObject.toJSONString(tbCommodityInfo));//更新到redis中,过期时间是10分钟redisClient.setString(id, JSONObject.toJSONString(tbCommodityInfo), TIME_OUT);}} else {//删除语句for (TbCommodityInfo tbCommodityInfo : tbCommodityInfos) {String id = tbCommodityInfo.getId();log.info("删除数据从redis, id: {}", id);//从redis中删除redisClient.deleteKey(id);}}}}}测试Mysql与Redis同步
mysql对应的表结构如下:
CREATE TABLE `tb_commodity_info` (`id` varchar(32) NOT NULL,`commodity_name` varchar(512) DEFAULT NULL COMMENT '商品名称',`commodity_price` varchar(36) DEFAULT '0' COMMENT '商品价格',`number` int(10) DEFAULT '0' COMMENT '商品数量',`description` varchar(2048) DEFAULT '' COMMENT '商品描述',PRIMARY KEY (`id`)) ENGINE=InnoDB DEFAULT CHARSET=utf8mb4 COMMENT='商品信息表';启动项目后,新增一条数据:
INSERT INTO `canaldb`.`tb_commodity_info` (`id`, `commodity_name`, `commodity_price`, `number`, `description`) VALUES ('3e71a81fd80711eaaed600163e046cc3', '叉烧包', '3.99', '3', '又大又香的叉烧包,老人小孩都喜欢');可以在控制台看到以下输出:
2022-01-02 18:12:51.317INFO 55608 --- [ntainer#0-0-C-1] c.kingoe.canaldj.mqcanal.CanalConsumer: 新增数据到redis, id: 3e71a81fd80711eaaed600163e046cc3, data: {"commodity_name":"叉烧包","commodity_price":"3.99","description":"又大又香的叉烧包,老人小孩都喜欢","id":"3e71a81fd80711eaaed600163e046cc3","number":"3"}2022-01-02 18:12:51.320INFO 55608 --- [ntainer#0-0-C-1] c.kingoe.canaldj.mqcanal.CanalConsumer: 从redis获取数据 result: "{"commodity_name":"叉烧包","commodity_price":"3.99","description":"又大又香的叉烧包,老人小孩都喜欢","id":"3e71a81fd80711eaaed600163e046cc3","number":"3"}"如果更新呢?试一下Update语句:
UPDATE `canaldb`.`tb_commodity_info` SET `commodity_name`='青菜包',`description`='很便宜的青菜包呀,不买也开看看了喂' WHERE `id`='3e71a81fd80711eaaed600163e046cc3';同样可以在控制台看到以下输出:
2022-01-02 18:14:44.613INFO 55608 --- [ntainer#0-0-C-1] c.kingoe.canaldj.mqcanal.CanalConsumer: topic名称:canaltopic,key:null,分区位置:0,下标:6,value:{"data":[{"id":"3e71a81fd80711eaaed600163e046cc3","commodity_name":"青菜包","commodity_price":"3.99","number":"3","description":"很便宜的青菜包呀,不买也开看看了喂"}],"database":"study","es":1641118484000,"id":7,"isDdl":false,"mysqlType":{"id":"varchar(32)","commodity_name":"varchar(512)","commodity_price":"varchar(36)","number":"int(10)","description":"varchar(2048)"},"old":[{"commodity_name":"叉烧包","description":"又大又香的叉烧包,老人小孩都喜欢"}],"pkNames":["id"],"sql":"","sqlType":{"id":12,"commodity_name":12,"commodity_price":12,"number":4,"description":12},"table":"tb_commodity_info","ts":1641118484602,"type":"UPDATE"}2022-01-02 18:14:44.616INFO 55608 --- [ntainer#0-0-C-1] c.kingoe.canaldj.mqcanal.CanalConsumer: 修改数据到redis, id: 3e71a81fd80711eaaed600163e046cc3, data: {"commodity_name":"青菜包","commodity_price":"3.99","description":"很便宜的青菜包呀,不买也开看看了喂","id":"3e71a81fd80711eaaed600163e046cc3","number":"3"}
推荐阅读
- oracle 和 mysql 自动按照日期备份数据库脚本
- 鸿蒙APP开发:如何实现“百度地图”的显示?需要3项认真操作才行
- Springboot使用OkHttp实现微信支付API-V3签名、证书的管理和使用
- Netty客户端断线重连实现及问题思考
- MySQL 使用 Mysqldump 备份导入数据导致主从异常
- 一行 CSS 实现 10 种现代布局
- Python实现各种加密,接口加解密不说难
- 在VUE中实现效果"换一换"功能
- Docker如何构建mysql主从?
- 太好玩了!6种Python实现「实时」显示进度条
