storm kafka集成
前言在前面Storm系列之——基本概念一文中,提到过Storm的Spout应该是源源不断的取数据,不能间断。那么,很显然,消息队列系统、分布式内存系统或内存数据库是作为其数据源的很好的选择。本文就如何集成Kafka进行介绍。Kafka的基本介绍: http://blog.csdn.net/xeseo/article/details/18311955准
·
前言
在前面Storm系列之——基本概念一文中,提到过Storm的Spout应该是源源不断的取数据,不能间断。那么,很显然,消息队列系统、分布式内存系统或内存数据库是作为其数据源的很好的选择。本文就如何集成Kafka进行介绍。
准备工作
KafkaSpout其实网上已经有人写了,在github上开源了,不用我们自己造轮子。只是要注意版本问题:
0.7版本的Kafka,对应KafkaSpout可以使用Storm-contrib下面的例子
源码:https://github.com/nathanmarz/storm-contrib/tree/master/storm-kafka
Maven依赖:https://clojars.org/storm/storm-kafka
0.8版本的Kafka在API上和底层Offset的处理方式上发生了重大变化,所以老的KafkaSpout不再适用,必须使用新的KafkaAPI
这里因为0.8版本的Kafka必然是将来主流,所以我就不介绍0.7 的了,使用方式基本上是类似的。
PS:
是人写的,就会有bug,何况是别人分享出来的。所以,遇到bug,还请去github上提交一个issue告诉作者修正。
2014/7/29 更新:
wurstmeister/storm-kafka-0.8-plus 现在合并到Apache Storm了,在其external/storm-kakfa目录
Maven依赖直接更新成:
1
2
3
4
5
|
<dependency>
<groupId>org.apache.storm</groupId>
<artifactId>storm-kafka</artifactId>
<version>0.9.2-incubating</version>
</dependency>
|
当然,也可以在maven中加上<scope>compile</scope>,直接把该jar打到你项目一起。
使用KafkaSpout
一个KafkaSpout只能去处理一个topic的内容,所以,它要求初始化时提供如下与topic相关信息:
- Kafka集群中的Broker地址 (IP+Port)
有两种方法指定:1. 使用静态地址,即直接给定Kafka集群中所有Broker信息
1234GlobalPartitionInformation info =
new
GlobalPartitionInformation();
info.addPartition(
0
,
new
Broker(
"10.1.110.24"
,
9092
));
info.addPartition(
0
,
new
Broker(
"10.1.110.21"
,
9092
));
BrokerHosts brokerHosts =
new
StaticHosts(info);
2. 从Zookeeper动态读取推荐使用这种方法,因为Kafka的Broker可能会动态的增减
1BrokerHosts brokerHosts =
new
ZkHosts(
"10.1.110.24:2181,10.1.110.22:2181"
);
- topic名字
- 当前spout的唯一标识Id (以下代称$spout_id)
- zookeeper上用于存储当前处理到哪个Offset了 (以下代称$zk_root)
- 当前topic中数据如何解码
了解Kafka的应该知道,Kafka中当前处理到哪的Offset是由客户端自己管理的。所以,后面两个的目的,其实是在zookeeper上建立一个 $zk_root/$spout_id 的节点,其值是一个map,存放了当前Spout处理的Offset的信息。
在Topology中加入Spout的代码:
1
2
3
4
5
6
7
8
9
|
String topic =
"test"
;
String zkRoot =
"kafkastorm"
;
String spoutId =
"myKafka"
;
SpoutConfig spoutConfig =
new
SpoutConfig(brokerHosts, topic, zkRoot, spoutId);
spoutConfig.scheme =
new
SchemeAsMultiScheme(
new
TestMessageScheme());
TopologyBuilder builder =
new
TopologyBuilder();
builder.setSpout(
"spout"
,
new
KafkaSpout(spoutConfig), spoutNum);
|
其中TestMessageScheme就是告诉KafkaSpout如何去解码数据,生成Storm内部传递数据
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
|
public
class
TestMessageScheme
implements
Scheme {
private
static
final
Logger LOGGER = LoggerFactory.getLogger(TestMessageScheme.
class
);
@Override
public
List<Object> deserialize(
byte
[] bytes) {
try
{
String msg =
new
String(bytes,
"UTF-8"
);
return
new
Values(msg);
}
catch
(InvalidProtocolBufferException e) {
LOGGER.error(
"Cannot parse the provided message!"
);
}
//TODO: what happend if returns null?
return
null
;
}
@Override
public
Fields getOutputFields() {
return
new
Fields(
"msg"
);
}
}
|
后面就可以自己添加Bolt处理tuple中该field的数据了。
使用TransactionalTridentKafkaSpout
TransactionalTridentKafkaSpout是为事务性的Trident而用的。用法与KafkaSpout有所不同。
1
2
3
4
5
6
7
|
TridentKafkaConfig kafkaConfig =
new
TridentKafkaConfig(brokerHosts, topic, spoutId);
kafkaConfig.scheme =
new
SchemeAsMultiScheme(
new
TestMessageScheme());
TransactionalTridentKafkaSpout kafkaSpout =
new
TransactionalTridentKafkaSpout(kafkaConfig);
TridentTopology topology =
new
TridentTopology();
topology.newStream(
"test_str"
, kafkaSpout).shuffle().each(
new
Fields(
"msg"
,
new
PrintFunction());
|
地址是 /transactional/<STREAM_NAME>/<Spout_Id>,在上面的例子中,就是 /transactional/test_str/myKafaka
常见问题
1. 本地模式无法保存Offset
KafkaSpout初始化时,会去取spoutConfig.zkServers 和 spoutConfig.zkPort 变量的值,而该值默认是没塞的,所以是空,那么它就会去取当前运行的Storm所配置的zookeeper地址和端口,而本地运行的Storm,是一个临时的zookeeper实例,并不会真正持久化。所以,每次关闭后,数据就没了。
本地模式,要显示的去配置
1
2
3
4
5
6
|
spoutConfig.zkServers =
new
ArrayList<String>(){{
add(
"10.1.110.20"
);
add(
"10.1.110.21"
);
add(
"10.1.110.24"
);
}};
spoutConfig.zkPort =
2181
;
|
1
2
3
4
5
6
7
8
9
|
<
del
><
dependency
>
<
groupId
>net.wurstmeister.storm</
groupId
>
<
artifactId
>storm-kafka-0.8-plus</
artifactId
>
<
version
>0.2.0</
version
>
<
exclusion
>
<
groupId
>org.slf4j</
groupId
>
<
artifactId
>slf4j-simple</
artifactId
>
</
exclusion
>
</
dependency
></
del
>
|
以上问题已修复并合并。
更多推荐
已为社区贡献2条内容
所有评论(0)