kafka只让Producer自动创建Topic同时禁止consumer自动创建Topic
kafka只让Producer自动创建Topic背景操作后记背景 最近我们要做从mysql 到大数据平台的数据流转,定下的方案是maxwell同步binlog到kafka中,再由flink消费kafka的数据写往kudu里,最后利用kudu和hive利用impala提供的视图作统一查询,其中kudu保留近七天的数据,七天前数据滚动下沉到hive表。&n
背景
最近我们要做从mysql 到大数据平台的数据流转,定下的方案是maxwell同步binlog到kafka中,再由flink消费kafka的数据写往kudu里,最后利用kudu和hive利用impala提供的视图作统一查询,其中kudu保留近七天的数据,七天前数据滚动下沉到hive表。
maxwell实例和kafka topic的对应关系是一个maxwell任务对应一个mysql实例,对应kafka的多个Topic,为了解决数据的有序性和倾斜的问题,最后还是采用一张表对应一个Topic的方式,所以现在的需求是maxwell producer 能自动创建topic,flink consumer不能自动创建topic。
操作
先删除一个topic看看,从现象中可以看到删除的一瞬间,topic又被自动创建出来了,kafka server的日志如下
删除Topic时的日志
......
[2020-07-02 15:08:32,044] INFO Log for partition autoCreatTest-0 is renamed to /data/kafka-logs/autoCreatTest-0.f10b571f162249fa85dbcdfa2ae6fb01-delete and is scheduled for deletion (kafka.log.LogManager)
[2020-07-02 15:08:32,048] INFO Log for partition autoCreatTest-1 is renamed to /data/kafka-logs/autoCreatTest-1.3be2864a4fac4132826f09eecb1079d5-delete and is scheduled for deletion (kafka.log.LogManager)
[2020-07-02 15:08:32,049] INFO Log for partition autoCreatTest-2 is renamed to /data/kafka-logs/autoCreatTest-2.557a65f116284c4bb2560b1829a4ac3e-delete and is scheduled for deletion (kafka.log.LogManager)
创建Topic的日志
[2020-07-02 15:08:32,555] INFO Creating topic autoCreatTest with configuration {} and initial partition assignment HashMap(0 -> ArrayBuffer(228, 229, 227), 1 -> ArrayBuffer(229, 227, 228), 2 -> ArrayBuffer(227, 228, 229)) (kafka.zk.AdminZkClient)
[2020-07-02 15:08:32,560] INFO [KafkaApi-228] Auto creation of topic autoCreatTest with 3 partitions and replication factor 3 is successful (kafka.server.KafkaApis)
......
由于我们使用的flink-connector-kafka-0.11_2.11,所以我们查看的是kafka-client-0.11.0.2代码,可以看到创建topic这个动作由kafka.server.KafkaApis这个类触发
看这个类代码,搜索日志内容"Auto creation of topic"发现在这里调用的
private def createTopic(topic: String,
numPartitions: Int,
replicationFactor: Int,
properties: Properties = new Properties()): MetadataResponse.TopicMetadata = {
AdminUtils.createTopic(zkUtils, topic, numPartitions,
info("Auto creation of topic %s with %d partitions and replication factor %d is successful"
.format(topic, numPartitions, replicationFactor))
......
再搜索createTopic再哪些地方被调用,发现有3处,其中两处是创建内部topic:__consumer_offsets 和 __transaction_state,真正由我们调用的是这里
private def getTopicMetadata(allowAutoTopicCreation: Boolean, topics: Set[String], listenerName: ListenerName,
errorUnavailableEndpoints: Boolean): Seq[MetadataResponse.TopicMetadata] = {
......
else if (allowAutoTopicCreation && config.autoCreateTopicsEnable) {
createTopic(topic, config.numPartitions, config.defaultReplicationFactor)
}
......
}
可以看到是我们的消费者或者生产者在获取topic的metadata的时候,如果集群配置了auto.create.topics.enable=true,就会自动创建topic了,可是除了这个判断条件我们发现还有一个条件就是allowAutoTopicCreation,继续查看调用关系就可以发现了
def handleTopicMetadataRequest(request: RequestChannel.Request) {
val metadataRequest = request.body[MetadataRequest]
......
else
getTopicMetadata(metadataRequest.allowAutoTopicCreation, authorizedTopics, request.listenerName,
errorUnavailableEndpoints)
......
def handle(request: RequestChannel.Request) {
......
try {
case ApiKeys.METADATA => handleTopicMetadataRequest(request)
}
......
}
从这里就可以差不多可以判定的到这个参数是有我们客户端的request发过来,我们可以转而去看client里的consumer的代码了,在kafka\clients\consumer\KafkaConsumer.java里搜索metadata
public class KafkaConsumer<K, V> implements Consumer<K, V> {
private final Metadata metadata;
this.metadata = new Metadata(retryBackoffMs, config.getLong(ConsumerConfig.METADATA_MAX_AGE_CONFIG),
true, false, clusterResourceListeners);
}
看看MetaData的构造方法
public Metadata(long refreshBackoffMs, long metadataExpireMs, boolean allowAutoTopicCreation,
boolean topicExpiryEnabled, ClusterResourceListeners clusterResourceListeners)
到这里大概明白了,0.11.0.2版本的client里的kafkaconsumer在获取metadata的时候,把allowAutoTopicCreation写死成true了,难怪consumer也会自动创建topic了,知道了原因后面就好办了,改成false重新打包,我们通过consumer demo来测试,果然创建不出来了。
public static void main(String[] args) throws Exception {
Properties props = new Properties();
props.put("bootstrap.servers", "10.30.130.227:9093");
props.put("group.id", "test");
props.put("key.deserializer", "org.apache.kafka.common.serialization.StringDeserializer");
props.put("value.deserializer", "org.apache.kafka.common.serialization.StringDeserializer");
KafkaConsumer<String, String> consumer = new KafkaConsumer<>(props);
consumer.subscribe(Arrays.asList("autoCreateTest"));
while (true) {
ConsumerRecords<String, String> records = consumer.poll(100);
for (ConsumerRecord<String, String> record : records)
System.out.printf(" offset = %d, key = %s, value = %s % n",
record.offset(), record.key(), record.value());
}
}
本以为到这里就结束了,替换掉我们flink程序依赖的kafka-client版本,可是,之后打包还是自动创建了topic,继续崩溃,通过IDEA的debug功能,结合kafka server的日志,看topic再哪一行被创建的,先将断点设置在FlinkKafkaConsumerBase 的run 方法的第一行,发现程序走到这里,topic已经创建了,继续把断点往前设置,open的第一行,然后F7一行一行的走,
FlinkKafkaConsumerBase-182: List<KafkaTopicPartition> allPartitions = this.partitionDiscoverer.discoverPartitions();
->
AbstractPartitionDiscoverer-51:newDiscoveredPartitions = this.getAllPartitionsForTopics(this.topicsDescriptor.getFixedTopics());
->
Kafka09PartitionDiscoverer-77:kafkaConsumer.partitionsFor(topic)
->
KafkaConsumer-1389: Map<String, List<PartitionInfo>> topicMetadata = fetcher.getTopicMetadata(new MetadataRequest.Builder(Collections.singletonList(topic), true), requestTimeoutMs);
->
MetadataRequest-49:
public Builder(List<String> topics, boolean allowAutoTopicCreation) {
super(ApiKeys.METADATA);
this.topics = topics;
this.allowAutoTopicCreation = allowAutoTopicCreation;
}
奶奶个腿,终于发现问题了,原来flink配置了自动发现新的kafka分区,所以在真正消费前会获取所有topic partition信息,在这一步他就把topic自动创建出来了,改完KafkaConsumer的1389行写死的true改为false,再打包,终于实现了flink消费者不自动创建topic的功能了。
后记
发现了MetadataRequest.Builder之后,再查一下所有consumer里调用这个方法的地方,把写死的ture都改为false,彻底避免后患,再看kafka新版本的时候,发现在kafka2.3的时候,KAFKA-7320.已经优化了这个问题,可以给consumer传递一个配置allow.auto.create.topics=false即可禁用consumer自动创建topic功能了,可惜目前我们使用的flink版本不支持这么高的客户端,虽然浪费了不少时间,好歹也了解了一番kafka consumer的流程,这波操作不亏。
更多推荐
所有评论(0)