@navyaijm2017 在 canal无法写入kafka多分区 中说:
已解决,正确的配置
mq config
canal.mq.dynamicTopic=db_chj_test.sbtest2
#canal.mq.partition=3
canal.mq.partitionsNum=3
canal.mq.partitionHash=db_chj_test.sbtest2:id
补充下。
canal.mq.partition 这个参数实际上表示的是在 canal.mq.partitionsNum 和 canal.mq.partitionHash 未设置的情况下,默认写入的partition。以下是参考源码
for (FlatMessage flatMessage : flatMessages) {
/**canal.mq.partitionsNum和canal.mq.partitionHash **/
if (mqDestination.getPartitionHash() != null && !mqDestination.getPartitionHash().isEmpty()) {
FlatMessage[] partitionFlatMessage = MQMessageUtils.messagePartition(flatMessage,
mqDestination.getPartitionsNum(),
mqDestination.getPartitionHash(),
this.mqProperties.isDatabaseHash());
int length = partitionFlatMessage.length;
for (int i = 0; i < length; i++) {
FlatMessage flatMessagePart = partitionFlatMessage[i];
if (flatMessagePart != null) {
records.add(new ProducerRecord<>(topicName, i, null, JSON.toJSONBytes(flatMessagePart,
SerializerFeature.WriteMapNullValue)));
}
}
} else {
/**canal.mq.partition**/
final int partition = mqDestination.getPartition() != null ? mqDestination.getPartition() : 0;
records.add(new ProducerRecord<>(topicName, partition, null, JSON.toJSONBytes(flatMessage,
SerializerFeature.WriteMapNullValue)));
}
}