Navigation

    • Register
    • Login
    • Search
    • Categories
    • Recent
    • Tags
    • Popular
    • Users
    • Groups

    SOLVED canal无法写入kafka多分区

    问答区
    2
    3
    332
    Loading More Posts
    • Oldest to Newest
    • Newest to Oldest
    • Most Votes
    Reply
    • Reply as topic
    Log in to reply
    This topic has been deleted. Only users with topic management privileges can see it.
    • navyaijm2017
      navyaijm2017 last edited by

      57677c06-2adc-47ab-bb80-95ac415a7fef-image.png
      c7d7c425-5ad5-476d-bf4f-3cff9ac0c6b6-image.png
      64cf69e9-6c87-4232-9c86-c0f13a4cbc54-image.png

      1 Reply Last reply Reply Quote 0
      • junyu-cloudcanal
        junyu-cloudcanal @navyaijm2017 last edited by

        @navyaijm2017 在 canal无法写入kafka多分区 中说:

        已解决,正确的配置

        mq config

        canal.mq.dynamicTopic=db_chj_test.sbtest2
        #canal.mq.partition=3
        canal.mq.partitionsNum=3
        canal.mq.partitionHash=db_chj_test.sbtest2:id

        补充下。

        canal.mq.partition 这个参数实际上表示的是在 canal.mq.partitionsNum 和 canal.mq.partitionHash 未设置的情况下,默认写入的partition。以下是参考源码

         for (FlatMessage flatMessage : flatMessages) {
                        /**canal.mq.partitionsNum和canal.mq.partitionHash **/
                        if (mqDestination.getPartitionHash() != null && !mqDestination.getPartitionHash().isEmpty()) {
                            FlatMessage[] partitionFlatMessage = MQMessageUtils.messagePartition(flatMessage,
                                mqDestination.getPartitionsNum(),
                                mqDestination.getPartitionHash(),
                                this.mqProperties.isDatabaseHash());
                            int length = partitionFlatMessage.length;
                            for (int i = 0; i < length; i++) {
                                FlatMessage flatMessagePart = partitionFlatMessage[i];
                                if (flatMessagePart != null) {
                                    records.add(new ProducerRecord<>(topicName, i, null, JSON.toJSONBytes(flatMessagePart,
                                        SerializerFeature.WriteMapNullValue)));
                                }
                            }
                        } else {
                            /**canal.mq.partition**/
                            final int partition = mqDestination.getPartition() != null ? mqDestination.getPartition() : 0;
                            records.add(new ProducerRecord<>(topicName, partition, null, JSON.toJSONBytes(flatMessage,
                                SerializerFeature.WriteMapNullValue)));
                        }
                    }
        
        1 Reply Last reply Reply Quote 1
        • navyaijm2017
          navyaijm2017 last edited by

          已解决,正确的配置

          mq config

          canal.mq.dynamicTopic=db_chj_test.sbtest2
          #canal.mq.partition=3
          canal.mq.partitionsNum=3
          canal.mq.partitionHash=db_chj_test.sbtest2:id

          junyu-cloudcanal 1 Reply Last reply Reply Quote 0
          • 1 / 1
          • First post
            Last post
          Copyright © 2020 ClouGence, Inc.备案号:浙ICP备20007605号-2