导航

    数据用户治理组

    • 注册
    • 登录
    • 搜索
    • 版块
    • 最新
    • 话题
    • 热门
    • 用户
    • 群组

    已解决 canal无法写入kafka多分区

    问答区
    2
    3
    875
    正在加载更多帖子
    • 从旧到新
    • 从新到旧
    • 最多赞同
    回复
    • 在新帖中回复
    登录后回复
    此主题已被删除。只有拥有主题管理权限的用户可以查看。
    • navyaijm2017
      navyaijm2017 最后由 编辑

      57677c06-2adc-47ab-bb80-95ac415a7fef-image.png
      c7d7c425-5ad5-476d-bf4f-3cff9ac0c6b6-image.png
      64cf69e9-6c87-4232-9c86-c0f13a4cbc54-image.png

      1 条回复 最后回复 回复 引用 0
      • junyu-cloudcanal
        junyu-cloudcanal @navyaijm2017 最后由 编辑

        @navyaijm2017 在 canal无法写入kafka多分区 中说:

        已解决,正确的配置

        mq config

        canal.mq.dynamicTopic=db_chj_test.sbtest2
        #canal.mq.partition=3
        canal.mq.partitionsNum=3
        canal.mq.partitionHash=db_chj_test.sbtest2:id

        补充下。

        canal.mq.partition 这个参数实际上表示的是在 canal.mq.partitionsNum 和 canal.mq.partitionHash 未设置的情况下,默认写入的partition。以下是参考源码

         for (FlatMessage flatMessage : flatMessages) {
                        /**canal.mq.partitionsNum和canal.mq.partitionHash **/
                        if (mqDestination.getPartitionHash() != null && !mqDestination.getPartitionHash().isEmpty()) {
                            FlatMessage[] partitionFlatMessage = MQMessageUtils.messagePartition(flatMessage,
                                mqDestination.getPartitionsNum(),
                                mqDestination.getPartitionHash(),
                                this.mqProperties.isDatabaseHash());
                            int length = partitionFlatMessage.length;
                            for (int i = 0; i < length; i++) {
                                FlatMessage flatMessagePart = partitionFlatMessage[i];
                                if (flatMessagePart != null) {
                                    records.add(new ProducerRecord<>(topicName, i, null, JSON.toJSONBytes(flatMessagePart,
                                        SerializerFeature.WriteMapNullValue)));
                                }
                            }
                        } else {
                            /**canal.mq.partition**/
                            final int partition = mqDestination.getPartition() != null ? mqDestination.getPartition() : 0;
                            records.add(new ProducerRecord<>(topicName, partition, null, JSON.toJSONBytes(flatMessage,
                                SerializerFeature.WriteMapNullValue)));
                        }
                    }
        
        1 条回复 最后回复 回复 引用 1
        • navyaijm2017
          navyaijm2017 最后由 编辑

          已解决,正确的配置

          mq config

          canal.mq.dynamicTopic=db_chj_test.sbtest2
          #canal.mq.partition=3
          canal.mq.partitionsNum=3
          canal.mq.partitionHash=db_chj_test.sbtest2:id

          junyu-cloudcanal 1 条回复 最后回复 回复 引用 0
          • 1 / 1
          • First post
            Last post
          Copyright © 2020 ClouGence, Inc.备案号:浙ICP备20007605号-2