前言

在前面Storm系列之——基本概念一文中,提到过Storm的Spout应该是源源不断的取数据,不能间断。那么,很显然,消息队列系统、分布式内存系统或内存数据库是作为其数据源的很好的选择。本文就如何集成Kafka进行介绍。


准备工作

KafkaSpout其实网上已经有人写了,在github上开源了,不用我们自己造轮子。只是要注意版本问题:
0.7版本的Kafka,对应KafkaSpout可以使用Storm-contrib下面的例子
源码:https://github.com/nathanmarz/storm-contrib/tree/master/storm-kafka
Maven依赖:https://clojars.org/storm/storm-kafka

0.8版本的Kafka在API上和底层Offset的处理方式上发生了重大变化,所以老的KafkaSpout不再适用,必须使用新的KafkaAPI

这里因为0.8版本的Kafka必然是将来主流,所以我就不介绍0.7 的了,使用方式基本上是类似的。

PS:
是人写的,就会有bug,何况是别人分享出来的。所以,遇到bug,还请去github上提交一个issue告诉作者修正。

2014/7/29 更新:
wurstmeister/storm-kafka-0.8-plus 现在合并到Apache Storm了,在其external/storm-kakfa目录 
Maven依赖直接更新成:
1
2
3
4
5
         <dependency>
   <groupId>org.apache.storm</groupId>
   <artifactId>storm-kafka</artifactId>
   <version>0.9.2-incubating</version>
</dependency>
但是storm似乎没有直接把external的包加载到classpath,所以使用时,还得手动把该jar包从external/storm-kafka/下拷到storm的lib目录。
当然,也可以在maven中加上<scope>compile</scope>,直接把该jar打到你项目一起。 

使用KafkaSpout

一个KafkaSpout只能去处理一个topic的内容,所以,它要求初始化时提供如下与topic相关信息:
  • Kafka集群中的Broker地址 (IP+Port)
有两种方法指定:
1. 使用静态地址,即直接给定Kafka集群中所有Broker信息
1
2
3
4
GlobalPartitionInformation info = new GlobalPartitionInformation();
info.addPartition( 0 , new Broker( "10.1.110.24" , 9092 ));
info.addPartition( 0 , new Broker( "10.1.110.21" , 9092 ));
BrokerHosts brokerHosts = new StaticHosts(info);

2. 从Zookeeper动态读取
1
BrokerHosts brokerHosts = new ZkHosts( "10.1.110.24:2181,10.1.110.22:2181" );
推荐使用这种方法,因为Kafka的Broker可能会动态的增减 

  • topic名字
  • 当前spout的唯一标识Id (以下代称$spout_id)
  • zookeeper上用于存储当前处理到哪个Offset了 (以下代称$zk_root)
  • 当前topic中数据如何解码
了解Kafka的应该知道,Kafka中当前处理到哪的Offset是由客户端自己管理的。所以,后面两个的目的,其实是在zookeeper上建立一个 $zk_root/$spout_id 的节点,其值是一个map,存放了当前Spout处理的Offset的信息。

在Topology中加入Spout的代码:
1
2
3
4
5
6
7
8
9
String topic = "test" ;
String zkRoot = "kafkastorm" ;
String spoutId = "myKafka" ;
 
SpoutConfig spoutConfig = new SpoutConfig(brokerHosts, topic, zkRoot, spoutId);
spoutConfig.scheme = new SchemeAsMultiScheme( new TestMessageScheme());
 
TopologyBuilder builder = new TopologyBuilder();
builder.setSpout( "spout" , new KafkaSpout(spoutConfig), spoutNum);

其中TestMessageScheme就是告诉KafkaSpout如何去解码数据,生成Storm内部传递数据

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
public class TestMessageScheme implements Scheme {
 
     private static final Logger LOGGER = LoggerFactory.getLogger(TestMessageScheme. class );
     
     @Override
     public List<Object> deserialize( byte [] bytes) {
         try {
             String msg = new String(bytes, "UTF-8" );
             return new Values(msg);
         } catch (InvalidProtocolBufferException e) {
             LOGGER.error( "Cannot parse the provided message!" );
         }
         
         //TODO: what happend if returns null?
         return null ;
     }
 
     @Override
     public Fields getOutputFields() {
         return new Fields( "msg" );
     }
 
}
这个解码方式是与Producer端生成时塞入数据的编码方式配套的。这里我Producer端塞入的是String的byte,所以这里也还原成String,定义输出为一个名叫"msg"的field。

后面就可以自己添加Bolt处理tuple中该field的数据了。


使用TransactionalTridentKafkaSpout

TransactionalTridentKafkaSpout是为事务性的Trident而用的。用法与KafkaSpout有所不同。
1
2
3
4
5
6
7
TridentKafkaConfig kafkaConfig = new TridentKafkaConfig(brokerHosts, topic, spoutId);
kafkaConfig.scheme = new SchemeAsMultiScheme( new TestMessageScheme());
 
TransactionalTridentKafkaSpout kafkaSpout = new TransactionalTridentKafkaSpout(kafkaConfig);
 
TridentTopology topology = new TridentTopology();
topology.newStream( "test_str" , kafkaSpout).shuffle().each( new Fields( "msg" , new PrintFunction());

看到它并没有要求我们提供zkRoot,因为直接代码里面写死了…… -_-T
地址是 /transactional/<STREAM_NAME>/<Spout_Id>,在上面的例子中,就是  /transactional/test_str/myKafaka


常见问题

1. 本地模式无法保存Offset
KafkaSpout初始化时,会去取spoutConfig.zkServers 和 spoutConfig.zkPort 变量的值,而该值默认是没塞的,所以是空,那么它就会去取当前运行的Storm所配置的zookeeper地址和端口,而本地运行的Storm,是一个临时的zookeeper实例,并不会真正持久化。所以,每次关闭后,数据就没了。
本地模式,要显示的去配置
1
2
3
4
5
6
spoutConfig.zkServers = new ArrayList<String>(){{
                 add( "10.1.110.20" );
                 add( "10.1.110.21" );
                 add( "10.1.110.24" );
             }};
spoutConfig.zkPort = 2181 ;


2. 用Maven导入时,运行中SLF4J打印MutipleBinding 错误,导致无log输出。
原因是在这个KafkaSpout的pom.xml里依赖了kafka_2.9.2,而这货带了一个slf4j-simple的SLF4J绑定,修复这个问题
1
2
3
4
5
6
7
8
9
< del >< dependency >
   < groupId >net.wurstmeister.storm</ groupId >
   < artifactId >storm-kafka-0.8-plus</ artifactId >
   < version >0.2.0</ version >
   < exclusion >
     < groupId >org.slf4j</ groupId >
     < artifactId >slf4j-simple</ artifactId >
   </ exclusion >
</ dependency ></ del >

3. 如果在topology第一次启动前,往kafka里面写数据,启动Storm后,这部分数据读不出来
原因是第一次启动topology时,在zookeeper上并未创建出保存Offset信息的节点,所以默认它会取当前partition最新的Offset(Kafka自己维护的单个partition上递增序号)。
理论上,如果找不到保存的Offset信息,应该从-1的Offset读起。
这个问题我给作者提出来了,但作者认为这样可以避免重复处理,我没有想通为何会有重复处理。但好在作者说会在后续版本加入参数来控制。
刚去看了下,似乎作者已经在提交 8b764cd fix掉了。有兴趣的可以去试下。我是自己本地改了他的代码。
以上问题已修复并合并。

Logo

Kafka开源项目指南提供详尽教程,助开发者掌握其架构、配置和使用,实现高效数据流管理和实时处理。它高性能、可扩展,适合日志收集和实时数据处理,通过持久化保障数据安全,是企业大数据生态系统的核心。

更多推荐