已解决 canal无法写入kafka多分区
-
-
@navyaijm2017 在 canal无法写入kafka多分区 中说:
已解决,正确的配置
mq config
canal.mq.dynamicTopic=db_chj_test.sbtest2
#canal.mq.partition=3
canal.mq.partitionsNum=3
canal.mq.partitionHash=db_chj_test.sbtest2:id补充下。
canal.mq.partition 这个参数实际上表示的是在 canal.mq.partitionsNum 和 canal.mq.partitionHash 未设置的情况下,默认写入的partition。以下是参考源码
for (FlatMessage flatMessage : flatMessages) { /**canal.mq.partitionsNum和canal.mq.partitionHash **/ if (mqDestination.getPartitionHash() != null && !mqDestination.getPartitionHash().isEmpty()) { FlatMessage[] partitionFlatMessage = MQMessageUtils.messagePartition(flatMessage, mqDestination.getPartitionsNum(), mqDestination.getPartitionHash(), this.mqProperties.isDatabaseHash()); int length = partitionFlatMessage.length; for (int i = 0; i < length; i++) { FlatMessage flatMessagePart = partitionFlatMessage[i]; if (flatMessagePart != null) { records.add(new ProducerRecord<>(topicName, i, null, JSON.toJSONBytes(flatMessagePart, SerializerFeature.WriteMapNullValue))); } } } else { /**canal.mq.partition**/ final int partition = mqDestination.getPartition() != null ? mqDestination.getPartition() : 0; records.add(new ProducerRecord<>(topicName, partition, null, JSON.toJSONBytes(flatMessage, SerializerFeature.WriteMapNullValue))); } }
-
Copyright © 2020 ClouGence, Inc.备案号:浙ICP备20007605号-2