SOLVED mysql同步StarRocks删除Delete问题
-
源端: MySQL
对端: StarRocks 版本1.19.0
数据问题产生阶段:增量阶段
CloudCanal版本:2.1.0.7
问题描述:源端执行delete操作后,对端数据不会删除掉,对端使用主键模型(PRIMARY KEY)
,注意我使用了自定义代码增加了一个字段,作为StarRocks的主键之一源端DDL
CREATE TABLE `t_user_test` ( `id` bigint(20) NOT NULL AUTO_INCREMENT, `name` varchar(80)NULL COMMENT '姓名', `sex` int(4) NOT NULL DEFAULT '0' COMMENT '性别', PRIMARY KEY (`id`) ) ENGINE=InnoDB AUTO_INCREMENT=1 DEFAULT CHARSET=utf8mb4; insert into t_user_test(name,sex) values ('张三',1); insert into t_user_test(name,sex) values ('李四',0); insert into t_user_test(name,sex) values ('王五',1); insert into t_user_test(name,sex) values ('赵六',0); insert into t_user_test(name,sex) values ('钱七',1); select * from t_user_test update t_user_test set sex =0 where id =1 update t_user_test set sex =1 insert into t_user_test(name,sex) values ('王八',0); delete from t_user_test where id = 6 -- 王八
对端DDL,没有使用自定义代码增加字段,删除生效
CREATE TABLE `t_user_test` ( `id` largeint(40) NOT NULL COMMENT "ID", `name` varchar(500) NULL COMMENT "姓名", `sex` int(4) NULL COMMENT "性别" ) ENGINE=OLAP PRIMARY KEY(`id`) COMMENT "测试表" DISTRIBUTED BY HASH(`id`) BUCKETS 1 PROPERTIES ( "replication_num" = "3", "in_memory" = "false", "storage_format" = "DEFAULT" );
对端DDL2 ,使用自定义代码增加一个字段,作为StarRocks的主键之一,删除不生效
CREATE TABLE `t_user_test_2` ( `id` largeint(40) NOT NULL COMMENT "ID", `tenant_id` bigint(20) NOT NULL, `name` varchar(500) NULL COMMENT "姓名", `sex` int(4) NULL COMMENT "性别" ) ENGINE=OLAP PRIMARY KEY(`id`,`tenant_id`) COMMENT "测试表2" DISTRIBUTED BY HASH(`tenant_id`) BUCKETS 1 PROPERTIES ( "replication_num" = "3", "in_memory" = "false", "storage_format" = "DEFAULT" );
对端查询SQL
select * from t_user_test order by id select * from t_user_test_2 order by id
-
粉丝群内已处理,由于自定义代码添加的列是PK,所以针对DELETE事件,自定义代码中也需要处理
-
【已解决】
cloudcanal 版本 2.2.1.0 cloudcanal-sdk版本1.0.4
StarRocks版本已经升级到2.0.3 故1.19.0版本尚未测试
请教了CC开发大佬解决方案如下:
因为我的场景是增加的自定义字段为主键PK,故需要在自定义代码中,对INSERT,UPDATE,DELETE事件分别进行处理,之前失败就是因为 没有分别处理。
之前代码:for (CustomRecordV2 recordV2 : data.getRecords()) { recordV2.addField(CustomFieldV2.buildField("tenant_id", tenantId, Types.VARCHAR, false, false, true)); }
修改后 代码
for (CustomRecordV2 recordV2 : data.getRecords()) { CustomFieldV2 tenant = CustomFieldV2.buildField("tenant_id", tenantId, Types.VARCHAR, true, false, true); switch (data.getEventType()) { case INSERT: { recordV2.getAfterColumnMap().put(tenant.getFieldName(), tenant); recordV2.getAfterKeyColumnMap().put(tenant.getFieldName(), tenant); break; } case UPDATE: { recordV2.getAfterColumnMap().put(tenant.getFieldName(), tenant); recordV2.getAfterKeyColumnMap().put(tenant.getFieldName(), tenant); recordV2.getBeforeColumnMap().put(tenant.getFieldName(), tenant); recordV2.getBeforeKeyColumnMap().put(tenant.getFieldName(), tenant); } case DELETE: { recordV2.getBeforeColumnMap().put(tenant.getFieldName(), tenant); recordV2.getBeforeKeyColumnMap().put(tenant.getFieldName(), tenant); break; } default: { break; } } }
-
补充
另外还有就是:
使用自定义代码增加额外字段
后:StarRocks 这边如果不给非NULL
字段赋值默认值,必报下面的错误Caused by: com.clougence.cloudcanal.base.service.task.exception.runtime.ApplierHandlerException: apply batch data error.msg:RuntimeException: Failed to flush data to StarRocks, Error response: {"Status":"Fail","BeginTxnTimeMs":0,"Message":"Column has no default value. column: id","NumberUnselectedRows":0,"CommitAndPublishTimeMs":0,"Label":"acdcf56f-5f7a-42ec-b052-1a478d7ba402","LoadBytes":0,"StreamLoadPutTimeMs":0,"NumberTotalRows":0,"WriteDataTimeMs":0,"TxnId":16482410,"LoadTimeMs":0,"ReadDataTimeMs":0,"NumberLoadedRows":0,"NumberFilteredRows":0} {}
没有使用自定义代码的情况: 目前没发现会报错,即使是非NULL列没有给默认值
-
Copyright © 2020 ClouGence, Inc.备案号:浙ICP备20007605号-2