Navigation

    数据用户治理组

    • Register
    • Login
    • Search
    • Categories
    • Recent
    • Tags
    • Popular
    • Users
    • Groups

    SOLVED CloudCanalProcessorV2使用问题-字段无法正确赋值

    CloudCanal
    cloudcanal cloudcanalprocessorv2 自定义代码
    2
    2
    1162
    Loading More Posts
    • Oldest to Newest
    • Newest to Oldest
    • Most Votes
    Reply
    • Reply as topic
    Log in to reply
    This topic has been deleted. Only users with topic management privileges can see it.
    • 微希夷
      微希夷 last edited by

      需求

      通过自定义代码实现对一个表修改日志的自动记录:每次数据源表有变化,不管新增,修改,删除,目标表都需要新增一条记录。

      问题

      新增成功了,但是有个字段(is_del)赋值不对。

      配置

      ### 数据源表
      create table test1
      (
          id     bigint not null primary key auto_increment comment ' ID',
          type   int(4) null comment '类型',
          num    int(8) null comment '排序',
          status int(8) null comment '状态'
      )
          comment '测试';
      
      ### 目标表
      create table test1_log
      (
          id       bigint     not null primary key auto_increment comment 'ID',
          is_del   tinyint(1) not null comment '是否删除,0初始1修改2删除',
          test1_id bigint     null comment 'test1_ID',
          type     int(4)     null comment '类型',
          num      int(8)     null comment '排序',
          status   int(8)     null comment '状态'
      )
          comment '测试日志';
      

      字段映射
      3dc65c01-4447-4b08-9dbf-0720cd703b80-image.png
      自定义代码

      public class MySqlToMySql_log implements CloudCanalProcessorV2 {
          protected static final Logger log = LoggerFactory.getLogger("custom_processor");
      
          private SchemaInfo targetTable = new SchemaInfo(null, "dev", "test1");
      
          private CustomFieldV2 isDel_0 = CustomFieldV2.buildField("is_del", 0, Types.TINYINT, false, true, false);
          private CustomFieldV2 isDel_1 = CustomFieldV2.buildField("is_del", 1, Types.TINYINT, false, true, false);
          private CustomFieldV2 isDel_2 = CustomFieldV2.buildField("is_del", 2, Types.TINYINT, false, true, false);
      
          @Override
          public void start(ProcessorContext context) {
              // do nothing
          }
      
          @Override
          public List<CustomData> process(CustomData data) {
              log.info("开始。。。。。。。。。。。。。。");
              List<CustomData> re = new ArrayList<>();
              if (data.getSchemaInfo().equals(targetTable)) {
                  handleFactTable(re, data);
              } else {
                  re.add(data);
              }
              log.info("结束。。。。。。。。。。。。。。");
              return re;
          }
      
          protected void handleFactTable(List<CustomData> re, CustomData data) {
              Iterator<CustomRecordV2> it = data.getRecords().iterator();
              CustomData res = new CustomData(data.getSchemaInfo(), EventTypeInSdk.INSERT, new ArrayList<>());
              while (it.hasNext()) {
                  CustomRecordV2 recordV2 = it.next();
                  switch (data.getEventType()) {
                      case INSERT: {
                          CustomFieldV2 dataIsDel = isDel_0.cloneField();
                          recordV2.getAfterColumnMap().put(dataIsDel.getFieldName(), dataIsDel);
                          res.getRecords().add(recordV2.cloneRecord());
                          break;
                      }
                      case UPDATE: {
                          CustomFieldV2 dataIsDel = isDel_1.cloneField();
                          recordV2.getAfterColumnMap().put(dataIsDel.getFieldName(), dataIsDel);
                          recordV2.getBeforeColumnMap().clear();
                          recordV2.getBeforeKeyColumnMap().clear();
                          res.getRecords().add(recordV2.cloneRecord());
                          break;
                      }
                      case DELETE: {
                          CustomFieldV2 dataIsDel = isDel_2.cloneField();
                          recordV2.getAfterColumnMap().put(dataIsDel.getFieldName(), dataIsDel);
                          recordV2.getBeforeColumnMap().clear();
                          recordV2.getBeforeKeyColumnMap().clear();
                          res.getRecords().add(recordV2.cloneRecord());
                          break;
                      }
                      default: {
                          break;
                      }
                  }
              }
              log.info("结果CustomData:{}", res);
              re.add(res);
          }
      
          @Override
          public void stop() {
              // do nothing
          }
      }
      
      
      ### 第一次执行
      INSERT INTO dev.test1 (id, type, num, status) VALUES (1, 1, 1, 1);
      INSERT INTO dev.test1 (id, type, num, status) VALUES (2, 2, 2, 2);
      INSERT INTO dev.test1 (id, type, num, status) VALUES (3, 3, 3, 3);
      
      ### 第二次执行
      UPDATE dev.test1 t SET t.num = 0 WHERE t.id = 1;
      UPDATE dev.test1 t SET t.num = 0 WHERE t.id = 2;
      UPDATE dev.test1 t SET t.num = 0 WHERE t.id = 3;
      
      ### 第三次执行
      DELETE FROM dev.test1 WHERE id = 1;
      DELETE FROM dev.test1 WHERE id = 2;
      DELETE FROM dev.test1 WHERE id = 3;
      

      第一次执行结果
      eb6e4691-134f-4daf-8b1a-e54c69f97b28-image.png
      第二次执行结果
      893807cf-c4bd-4320-acaa-d081d8a8c22d-image.png
      第三次执行结果
      726eb05c-158d-4dfd-9f64-f1079e0e3190-image.png

      希望预期时第一次执行结果里is_del为0,第二次执行结果里is_del为1,第三次执行结果里is_del为2。
      但是三次执行结果里is_del都是0

      junyu-cloudcanal 1 Reply Last reply Reply Quote 0
      • junyu-cloudcanal
        junyu-cloudcanal @微希夷 last edited by

        @微希夷 不好意思,回复有点晚。

        private CustomFieldV2 isDel_0 = CustomFieldV2.buildField(“is_del”, 0, Types.TINYINT, false, true, false);

        把最后一个字段isUpdated 改成true.

        1 Reply Last reply Reply Quote 0
        • 1 / 1
        • First post
          Last post
        Copyright © 2020 ClouGence, Inc.备案号:浙ICP备20007605号-2