Kafka如何做到全局有序
全局有序?业务: 1 2 3 5消费: 1 5 2 31.一个topic 一个分区 3个 虽然保证全局有序,但是性能下降生产(很多公司也在使用,或则没有吧太在意)2.单分区有序,那么我们想方法把同一个特征数据写到一个分区p0 p1 p2id money业务系统:insert into t values(1,1)update t set age= 200 where id =1...
全局有序?
业务: 1 2 3 5
消费: 1 5 2 3
1.一个topic 一个分区 3个 虽然保证全局有序,但是性能下降 生产(很多公司也在使用,或则没有吧太在意)
2.单分区有序,那么我们想方法把同一个特征数据写到一个分区
p0 p1 p2
id money
业务系统:
insert into t values(1,1)
update t set age= 200 where id =1
update t set age= 400 where id =1
update t set age= 1000000 where id =1
delete from t where id =1
最终的结果的是0条
如何把特征的数据写一个分区里面
producer send api (key,value)
key: erp.t.1 null
value:SQL数据
业务系统: 特征值
insert into t values(1,1) erp.t.1 hash 5 %3=1…2 -->p2
update t set age= 200 where id =1 erp.t.1
update t set age= 400 where id =1 erp.t.1
update t set age= 1000000 where id =1 erp.t.1
delete from t where id =1 erp.t.1
源码读取:https://github.com/apache/kafka/blob/3cdc78e6bb1f83973a14ce1550fe3874f7348b05/clients/src/main/java/org/apache/kafka/clients/producer/internals/DefaultPartitioner.java
这5条记录都发送到一个分区 有序的发送 ,那么消费时也不会出现紊乱
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.kafka.clients.producer.internals;
import java.util.List;
import java.util.Map;
import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.ConcurrentMap;
import java.util.concurrent.ThreadLocalRandom;
import java.util.concurrent.atomic.AtomicInteger;
import org.apache.kafka.clients.producer.Partitioner;
import org.apache.kafka.common.Cluster;
import org.apache.kafka.common.PartitionInfo;
import org.apache.kafka.common.utils.Utils;
/**
* The default partitioning strategy:
* <ul>
* <li>If a partition is specified in the record, use it
* <li>If no partition is specified but a key is present choose a partition based on a hash of the key
* <li>If no partition or key is present choose a partition in a round-robin fashion
*/
public class DefaultPartitioner implements Partitioner {
private final ConcurrentMap<String, AtomicInteger> topicCounterMap = new ConcurrentHashMap<>();
public void configure(Map<String, ?> configs) {}
/**
* Compute the partition for the given record.
*
* @param topic The topic name
* @param key The key to partition on (or null if no key)
* @param keyBytes serialized key to partition on (or null if no key)
* @param value The value to partition on or null
* @param valueBytes serialized value to partition on or null
* @param cluster The current cluster metadata
*/
public int partition(String topic, Object key, byte[] keyBytes, Object value, byte[] valueBytes, Cluster cluster) {
List<PartitionInfo> partitions = cluster.partitionsForTopic(topic);
int numPartitions = partitions.size();
if (keyBytes == null) {
int nextValue = nextValue(topic);
List<PartitionInfo> availablePartitions = cluster.availablePartitionsForTopic(topic);
if (availablePartitions.size() > 0) {
int part = Utils.toPositive(nextValue) % availablePartitions.size();
return availablePartitions.get(part).partition();
} else {
// no partitions are available, give a non-available partition
return Utils.toPositive(nextValue) % numPartitions;
}
} else {
// hash the keyBytes to choose a partition
return Utils.toPositive(Utils.murmur2(keyBytes)) % numPartitions;
}
}
private int nextValue(String topic) {
AtomicInteger counter = topicCounterMap.get(topic);
if (null == counter) {
counter = new AtomicInteger(ThreadLocalRandom.current().nextInt());
AtomicInteger currentCounter = topicCounterMap.putIfAbsent(topic, counter);
if (currentCounter != null) {
counter = currentCounter;
}
}
return counter.getAndIncrement();
}
public void close() {}
}
重点:
只要把同一个表的同一个主键的数据发到同一个分区即可(如果多数据库得加入数据库名)
分区定义如下:
private int partitionDefine(String keyToPartition) {
if (keyToPartition == null) {
return new Random().nextInt(numPartitions);
} else {
return Math.abs(keyToPartition.hashCode()) % numPartitions;
}
}
传入的参数 tableName+主键
这样,消费到的数据就是有序的。不同的场景灵活运用即可。
更多推荐
所有评论(0)