Kafka—Storm之KafkaSpout和KafkaBolt源码解释
转载来自:http://blog.csdn.net/ransom0512/article/details/50497261另一个比较详细的KafkaSpout详解见:http://www.cnblogs.com/cruze/p/4241181.html Storm-Kafka源代码解析说明:本文所有代码基于Storm 0.10版本,本文描述内容只涉及KafkaSpout和Kafka...
转载来自:http://blog.csdn.net/ransom0512/article/details/50497261
另一个比较详细的KafkaSpout详解见:http://www.cnblogs.com/cruze/p/4241181.html
Storm-Kafka源代码解析
说明:本文所有代码基于Storm 0.10版本,本文描述内容只涉及KafkaSpout和KafkaBolt相关,不包含trident特性。
Kafka Spout
KafkaSpout的构造函数如下:
public KafkaSpout(SpoutConfig spoutConf) {
_spoutConfig = spoutConf;
}
- 1
- 2
- 3
- 1
- 2
- 3
其构造参数来自于SpoutConfig对象,Spout中用到的所有参数都来自于该对象。该对象参数说明如下:
SpoutConfig
SpoutConfig继承自KafkaConfig。两个类内部所有参数及说明如下:
/**
* Kafka地址和分区关系对应信息
* 在kafka的分区信息和地址信息都很清楚的情况下,可以以直接使用StaticHosts
* 但是该对象参数很难构建,需要的信息很多,所以我们一般情况下并不使用它。
* 我们主要用的是ZKHosts的实例。可以在其中设置Zookeeper地址等信息,然后动态获取kafka元数据
* ZKHost的参数信息见下面一段。
* 必选参数
**/
public final BrokerHosts hosts;
/**
* 要从kafka中读取的topic队列名称
* 必选参数
**/
public final String topic;
/**
* Kafka的客户端id参数,该参数一般不需要设置
* 默认值为kafka.api.OffsetRequest.DefaultClientId()
* 空字符串
**/
public final String clientId;
/**
* Kafka Consumer每次请求获取的数据量大小
* 每次获取的数据消费完毕之后,才会再获取数据
* 默认1MB
**/
public int fetchSizeBytes = 1024 * 1024;
/**
* Kafka SimpleConsumer 客户端和服务端连接的超时时间
* 单位:毫秒
**/
public int socketTimeoutMs = 10000;
/**
* Consumer每次获取数据的超时时间
* 单位:毫秒
**/
public int fetchMaxWait = 10000;
/**
* Consumer通过网络IO获取数据的socket buffet大小,
* 默认1MB
**/
public int bufferSizeBytes = 1024 * 1024;
/**
* 该参数有两个作用:
* 1:申明输出的数据字段 declareoutputFileds
* 2:对从kafka中读到的数据进行反序列化,即将byte字节数组转为tuple对象。
* 对kafka存入数据的key和message都比较关心的,可以使用KeyValueSchemeAsMultiScheme,
* 如果不关心,可以使用SchemeAsMultiScheme
* 默认接口实现一般都只会输出一个字段或者两个字段,很多时候,我们需要直接从kafka中读取到数据之后,就将每个字段解析了,然后进行简单处理再emit
* 这个时候,建议自己实现MultiScheme接口
* 必选参数
**/
public MultiScheme scheme = new RawMultiScheme();
/**
* 在拓扑提交之后,KafkaSpout会从zookeeper中读取以前的offset值,以便沿着上次位置继续读取数据。
* KafkaSpout会检查拓扑ID和zookeeper中保存的拓扑id是否相同。
* 如果不同,并且ignoreZkOffsets=true,那么就会从startOffsetTime参数位置读取数据
* 否则,沿着zookeeper中保存的offset位置继续读取数据。
* 也就是说,当ignoreZkOffsets=true的时候,kafkaspout只能保证在拓扑不杀掉的情况下,当worker进程异常退出的时候,会沿着上次读取位置继续读取数据,当拓扑重新提交的时候,就会从队列最早位置开始读取数据。
* 这样就会存在重复读取数据的问题,所以正式场景,该参数还是应该设置为false。以保证任何场景数据的只被读取一次。
**/
public boolean ignoreZkOffsets = false;
/**
* 拓扑第一次提交,zookeeper中没有保存对应offset的情况下,默认从kafka中读取的offset位置。默认从队列最早位置开始读取数据,即从队列最开始位置读取数据。
**/
public long startOffsetTime = kafka.api.OffsetRequest.EarliestTime();
/**
*
* 如果当前的(offset值-failed offsets中最小值) < maxOffsetBehind
* 那么就会清理failed列表中所有大于maxOffsetBehind的offset值。
* 这是为了防止failed过多,重发太多导致内存溢出
* 不过默认为了保证数据不丢失,所以maxOffsetBehind设置的最大
**/
public long maxOffsetBehind = Long.MAX_VALUE;
/**
* 当KafkaSpout初始化之后,使用从zookeeper中读取的上次记录的offset
* 从kafka中获取数据失败,返回offsetOutofRange错误之后,
* 是否使用startOffset从队列最早位置重新获取数据。
* offsetOutofrange一般发生在topic被重建,分片被删除的场景。
**/
public boolean useStartOffsetTimeIfOffsetOutOfRange = true;
/**
* metric监控信息采集间隔
**/
public int metricsTimeBucketSizeInSecs = 60;
/**
* KafkaSpout保存offset的zookeeper所在地址
* 独立出来这个属性是为了防止offset保存位置不在kafka集群中
* 如果kafka和storm在一个集群,该属性可以忽略
**/
public List<String> zkServers = null;
/**
* KafkaSpout保存offset的zookeeper端口
* 如果kafka和storm在一个集群,该属性可以忽略
**/
public Integer zkPort = null;
/**
* offset在zookeeper中保存的路径
* 路径计算方式为:${zkRoot}/${id}/${partitionId}
* 必选参数
**/
public String zkRoot = null;
/**
* kafkaSpout保存offset的不同客户端区分标志
* 建议每个拓扑使用固定的,不同的参数,以保证拓扑重新提交之后,可以从上次位置继续读取数据
* 如果两个拓扑公用同一个id,那么可能会被重复读取
* 如果在拓扑中使用了动态生成的uuid来作为id,那么每次提交的拓扑,都会从队列最开始位置读取数据
* 必选参数
**/
public String id = null;
/**
* offset刷新到zookeeper中的时间间隔
* 单位:毫秒
**/
public long stateUpdateIntervalMs = 2000;
/**
* 数据发送失败之后重试策略相关参数
**/
public long retryInitialDelayMs = 0;
/**
* 数据发送失败之后重试策略相关参数
**/
public double retryDelayMultiplier = 1.0;
/**
* 数据发送失败之后重试策略相关参数
**/
public long retryDelayMaxMs = 60 * 1000;
ZKHost中保存了kafka集群所在的zookeeper地址等信息
ZKHost
/**
* kafka集群zookeeper地址,允许包含chroot
* 比如:192.168.0.10:2181,192.168.0.11:2181,192.168.0.12:2181/kafka
**/
public String brokerZkStr = null;
/**
* kafka集群中broker元数据所在地址
* 默认为/brokers
* 如果配置了chroot,那么就是/kafka/brokers
* 这个和kakfa服务端配置默认是一样的,如果服务端采用默认配置,该属性也可以使用默认值
**/
public String brokerZkPath = null; // e.g., /kafka/brokers
/**
* kafka broker分区信息刷新时间间隔,
* 单位:秒
* 当kafka有broker节点重启或者分区信息发生变化而导致数据读取失败的时候,
* 都会重新触发一次分区信息刷新
**/
public int refreshFreqSecs = 60;
KafkaSpout初始化
public void open(Map conf, final TopologyContext context, final SpoutOutputCollector collector) {
_collector = collector;
Map stateConf = new HashMap(conf);
/*
* offset保存位置的zookeeper地址
* 如果该地址为空,则默认使用Storm集群的zookeeper
*/
List<String> zkServers = _spoutConfig.zkServers;
if (zkServers == null) {
zkServers = (List<String>) conf.get(Config.STORM_ZOOKEEPER_SERVERS);
}
Integer zkPort = _spoutConfig.zkPort;
if (zkPort == null) {
zkPort = ((Number) conf.get(Config.STORM_ZOOKEEPER_PORT)).intValue();
}
stateConf.put(Config.TRANSACTIONAL_ZOOKEEPER_SERVERS, zkServers);
stateConf.put(Config.TRANSACTIONAL_ZOOKEEPER_PORT, zkPort);
stateConf.put(Config.TRANSACTIONAL_ZOOKEEPER_ROOT, _spoutConfig.zkRoot);
//保存offset信息到zookeeper
_state = new ZkState(stateConf);
//kafka集群的连接器
_connections = new DynamicPartitionConnections(_spoutConfig, KafkaUtils.makeBrokerReader(conf, _spoutConfig));
// using TransactionalState like this is a hack
int totalTasks = context.getComponentTasks(context.getThisComponentId()).size();
if (_spoutConfig.hosts instanceof StaticHosts) {
_coordinator = new StaticCoordinator(_connections, conf, _spoutConfig, _state, context.getThisTaskIndex(), totalTasks, _uuid);
} else {
//从zookeeper中读取kafka的broker信息,只保存自身实例需要用到的分区信息
_coordinator = new ZkCoordinator(_connections, conf, _spoutConfig, _state, context.getThisTaskIndex(), totalTasks, _uuid);
}
//两个metrics监控信息,忽略
context.registerMetric("kafkaOffset", new IMetric() { ...}, _spoutConfig.metricsTimeBucketSizeInSecs);
context.registerMetric("kafkaPartition", new IMetric() {...}, _spoutConfig.metricsTimeBucketSizeInSecs);
}
以上是kafkaSpout的初始化方法,主要是完成对自身管理分区信息的刷新。
这里有一个问题,就是会创建3个zookeeper客户端连接,一个用来从kafka中读取数据,一个保存offset,一个是metrics监控信息,每个zookeeper客户端连接会创建3个线程,这样,光一个kafkaSpout就会存在9个zookeeper线程!当worker进程中有多个spout实例的时候,就会产生更多的线程,这就会很消耗性能,这个还是建议对zookeeper连接进行合并处理。
系统通过KafkaUtils.calculatePartitionsForTask方法来获取自己需要管理的分区列表:
for (int i = taskIndex; i < numPartitions; i += totalTasks) {
Partition taskPartition = partitions.get(i);
taskPartitions.add(taskPartition);
}
其中,taskIndex就对应自身spout实例的序号,比如该spout并发度为3,那么这个spout实例就可能为0,1,2。当kafka的topic有5个分区的时候,第一个spout实例管理0,3的分区;第二个spout实例管理编号为1,4的分区,第三个spout实例管理编号为2的分区。
taskId保存在Spout的Open方法的context参数中。context.getThisTaskIndex()
KafkaSpout从Kafka中如何读取数据并发送
kafkaSpout主要在nextTuple方法中读取数据并emit。
public void nextTuple() {
//获取自身实例管理的分区列表
List<PartitionManager> managers = _coordinator.getMyManagedPartitions();
for (int i = 0; i < managers.size(); i++) {
try {
//_currPartitionIndex永远小于manager的大小
// in case the number of managers decreased
_currPartitionIndex = _currPartitionIndex % managers.size();
//获取数据并emit
EmitState state = managers.get(_currPartitionIndex).next(_collector);
/*
* 检查此次数据发送状态
* 如果没有取到数据或者取到的数据都已经emit完毕
* 那么就增加_currPartitionIndex值,然后就可以从下个分区中读取数据了。
*/
if (state != EmitState.EMITTED_MORE_LEFT) {
_currPartitionIndex = (_currPartitionIndex + 1) % managers.size();
}
/*
* 如果还有数据没有emit,就退出此次循环,等待下次nexttuple调用
* 然后仍然从当前分区中取获取数据并emit
*/
if (state != EmitState.NO_EMITTED) {
break;
}
} catch (FailedFetchException e) {
LOG.warn("Fetch failed", e);
_coordinator.refresh();
}
}
//定期保存offset数据到zookeeper
long now = System.currentTimeMillis();
if ((now - _lastUpdateMs) > _spoutConfig.stateUpdateIntervalMs) {
commit();
}
}
数据发送状态EmitState一共有三种状态
- EMITTED_MORE_LEFT
上次取到的数据还没有emit完毕- EMITTED_END,
上次取到的数据已经全部emit完毕- NO_EMITTED
本次没有取到数据,没有可供emit的数据
再来看下PartitionManager.next方法,里面就包含如何获取数据已经如何emit
public EmitState next(SpoutOutputCollector collector) {
//如果等待发送的队列为空,那么就从kafka中再取一次数据
if (_waitingToEmit.isEmpty()) {
fill();
}
while (true) {
//从等待发送的队列中获取第一个数据
MessageAndRealOffset toEmit = _waitingToEmit.pollFirst();
//如果没有可供发送的数据,那么返回emit状态为没有可以emit的数据
if (toEmit == null) {
return EmitState.NO_EMITTED;
}
//根据KeyValueSchemeAsMultiScheme接口实现,将kafka中取到的数据转为tuple
Iterable<List<Object>> tups = KafkaUtils.generateTuples(_spoutConfig, toEmit.msg);
if (tups != null) {
//发送所有的tuple,因为kafka一条数据可能对应storm的多条
for (List<Object> tup : tups) {
collector.emit(tup, new KafkaMessageId(_partition, toEmit.offset));
}
break;
} else {
//如果tuple转化失败,返回null,直接告诉storm该条已经处理成功,即忽略数据错误
ack(toEmit.offset);
}
}
/*
* 每次从等待队列中取一条数据反序列化并emit,
* 然后判断等待队列是否还有数据,
* 如果还有数据,就告诉spout,数据还没有发送完,不要切换分区
* 如果数据已经发送完毕,就告诉spout,数据已经发送完毕,可以切换到下个分区了。
*/
if (!_waitingToEmit.isEmpty()) {
return EmitState.EMITTED_MORE_LEFT;
} else {
return EmitState.EMITTED_END;
}
}
当有数据发送失败的时候,失败的数据又会重新加入到_waitingToEmit队列中,这样就会产生一个问题,就是当数据发送失败的时候,kakfaSpout会永远只读一个分区,前天分区都不会读取,从而产生数据消费不均匀的问题。
在0.9.6以前老版本的时候哟一个问题,就是当较多数据emit失败的时候,会有很多的数据在不断重试,然后重试不断超时,又不断重新加入重试列表,从而导致一个数据发送的死循环。这个问题也就是offset超时的问题。见Storm-643, 这个问题目前在最新版本中已经解决。
KafkaBolt
KafkaBolt就比较简单,0.10版本还是使用old Producer API。
Storm所有的配置属性,都在kafka.broker.properties中保存着,这就要求在submitTopology的时候,在topologyConf中再put一个kafka.broker.properties属性,形成一个map中套map的结构。这样有一点不好的就是一个拓扑中数据只能写到一个kafka集群中,不支持同事写到多个kafka集群中。不过这个在0.11新版本中已经解决了,kafka.broker.properties被作为了一个局部变量,可以在不同的bolt实例中保存不同的配置属性。
数据写入方法如下:
public void execute(Tuple input) {
if (TupleUtils.isTick(input)) {
collector.ack(input);
return; // Do not try to send ticks to Kafka
}
K key = null;
V message = null;
String topic = null;
try {
//消息的键值,不同的值在kafka中对应不同的分发方式,这个在KafkaBolt的FAQ中有介绍。
key = mapper.getKeyFromTuple(input);
//消息体
message = mapper.getMessageFromTuple(input);
//topic名称
topic = topicSelector.getTopic(input);
if(topic != null ) {
producer.send(new KeyedMessage<K, V>(topic, key, message));
} else {
LOG.warn("skipping key = " + key + ", topic selector returned null.");
}
collector.ack(input);
} catch (Exception ex) {
collector.reportError(ex);
collector.fail(input);
}
}
Storm-Kafka FAQ
KafkaSpout
- KafkaSpout excutor数量和Kafka topic分区数量的关系
当executor并发度大于topic数量的时候,就会存在有的spout实例可以读到数据, 有的spout实例读不到数据。
当executor并发度小于topic数量的时候,就会存在一个spout实例对应多个分区的情况;kafka会先从一个分区中取一次数据,当这次获取的数据emit完毕之后,就会再从下个分区中取数据。
当executor并发度等于topic数量的时候,一个spout实例对应一个分区。在实际应用中,我们也推荐这种配置方式。- 如何从kafka中读取数据,每次读取多少数据
根据fetchSizeBytes参数的配置,默认每次取1MB数据。- 数据读取失败如何处理
KafkaSpout每个PartitionManager内部保存一个重试队列,当数据发送失败的时候,加入重试队列,然后重新发送,直到成功为止。
通过maxOffsetBehind参数来解决failed数量过多导致内存溢出问题。- Topic不存在如何处理
直接报错。- 拓扑重新提交,会不会接着上次位置继续读取数据
重新提交的时候,只要id这个参数不变,那么就会沿着上次位置继续读取数据。- zookeeper中保存的kafka的offset位置有错误怎么办?
会抛出offsetOutofRange异常,然后默认从kafka分区队列最早位置开始读取数据。- 能不能在一个spout中从多个topic读取数据?
在0.10版本不行,在0.11版本中,支持按照正则方式匹配topic名称,可以从所有满足正则条件的topic中读取数据。- topic分区主备信息发生变化,如何处理
抛出异常,然后马上更新分区信息,再次读取数据。
KafkaBolt
- 写入数据,kafka topic不存在怎么办?
如果kakfa服务端允许自动创建topic,那么就会自动创建topic。
如果不允许自动创建,那么就会抛出异常- 如何写数据到指定分区?
取决于tupleToKafkaMapper的接口实现。
kafka 0.10版本使用的是old producer的API,0.11版本使用的是new Producer的API
对于old Producer
如果key == null,那么在kafka中,会随机寸照一个分区去写入数据,之后只要不重启,就都会往这个分区写入数据
如果key != null,那么就会在写入数据的时候,以utils.abs(key.hashCode)%numPartitions规则计算分区id
对于New Producer
如果key = null,那么就会使用一个递增的int值,每次发送数据的时候递增,然后执行utils.abs(nextValue)%availablePartitions.size(),数据写入会比较均衡。
如果key != null,那么就会按照Utils.abs(Utils.murmur2(record.key()))%numPartitions的规则计算分区。
当然,New Producer API也可以手工指定分区id。
更多推荐
所有评论(0)