前面介绍了 Storm环境的搭建 和 Kafka环境的搭建。分别是下面两篇文章:
http://blog.csdn.net/ch717828/article/details/50748872

http://blog.csdn.net/ch717828/article/details/50718783

这篇介绍了如何将 Storm 和 Kafka进行整合,并且运行了一个代码实例

1. 机器&环境 准备

我准备了3台机器 ,分别是

   10.101.214.71 

   10.101.214.73

   10.101.214.74

且这三台机器均安装了 kafka和storm。详细参考上面两篇文章。

注意,之前的文章我安装的storm版本为0.9.1 ,该版本中缺少许多与kafka集成需要的包,因此,升级为0.9.2 。


2.Storm自定义日志 

为了清晰得打印出Storm处理 Kafka发送来的消息,此处自定义了一个日志。

// 在73,74机器上 修改 /usr/share/storm/logback/cluster.xml
<appender name="mylog" class="ch.qos.logback.core.rolling.RollingFileAppender">
      <file>${storm.home}/logs/mylog.log</file><!-- log文件输出path -->
      <rollingPolicy class="ch.qos.logback.core.rolling.FixedWindowRollingPolicy">
        <fileNamePattern>${storm.home}/logs/mylog.log.%i</fileNamePattern><!-- 保留多个文件的文件命名格式 -->
        <minIndex>1</minIndex>
        <maxIndex>20</maxIndex><!-- 这两行可以共同配置保留多少个文件 -->
      </rollingPolicy>
      <triggeringPolicy class="ch.qos.logback.core.rolling.SizeBasedTriggeringPolicy">
        <maxFileSize>100MB</maxFileSize><!-- log文件的最大大小 -->
      </triggeringPolicy>
      <encoder>
        <pattern>%d{yyyy-MM-dd'T'HH:mm:ss.SSSZZ} %c{1} [%p] %m%n</pattern> <!-- 输出的日志信息的格式 -->
      </encoder>
  </appender>

<logger name="ch.main.MyKafkaTopology" additivity="false" >
<!-- name 可以配置哪些包下的日志信息要输出,也可以精准到一个类 -->
    <level value="INFO"/><!-- 要输出的日志信息的级别,我要输出业务日志,则配置为INFO -->
    <appender-ref ref="mylog"/><!-- 上面的appender的name -->
  </logger>

配置好后, ch.main.MyKafkaTopology打印出的INFO日志,均会存在 /usr/share/storm/logs/mylog.log 文件下


3.代码编写

pom.xml

<dependencies>
        <dependency>
            <groupId>org.apache.storm</groupId>
            <artifactId>storm-core</artifactId>
            <version>0.9.2-incubating</version>
        </dependency>
        <dependency>
            <groupId>org.apache.storm</groupId>
            <artifactId>storm-kafka</artifactId>
            <version>0.9.2-incubating</version>
        </dependency>
        <dependency>
            <groupId>org.apache.kafka</groupId>
            <artifactId>kafka_2.11</artifactId>
            <version>0.9.0.0</version>
            <exclusions>
                <exclusion>
                    <groupId>org.apache.zookeeper</groupId>
                    <artifactId>zookeeper</artifactId>
                </exclusion>
                <exclusion>
                    <groupId>log4j</groupId>
                    <artifactId>log4j</artifactId>
                </exclusion>
                <exclusion>
                    <groupId>org.slf4j</groupId>
                    <artifactId>slf4j-log4j12</artifactId>
                </exclusion>
            </exclusions>
        </dependency>
    </dependencies>

java 代码

package ch.main;

import backtype.storm.Config;
import backtype.storm.LocalCluster;
import backtype.storm.StormSubmitter;
import backtype.storm.generated.AlreadyAliveException;
import backtype.storm.generated.InvalidTopologyException;
import backtype.storm.spout.SchemeAsMultiScheme;
import backtype.storm.task.OutputCollector;
import backtype.storm.task.TopologyContext;
import backtype.storm.topology.OutputFieldsDeclarer;
import backtype.storm.topology.TopologyBuilder;
import backtype.storm.topology.base.BaseRichBolt;
import backtype.storm.tuple.Fields;
import backtype.storm.tuple.Tuple;
import backtype.storm.tuple.Values;


import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import storm.kafka.*;


import java.util.Arrays;
import java.util.HashMap;

import java.util.Iterator;
import java.util.Map;
import java.util.concurrent.atomic.AtomicInteger;

/**
 * Created by chenhong on 16/2/24.
 */
public class MyKafkaTopology {


   public static class KafkaWordSplitter extends BaseRichBolt{
      // private static final Log LOG = LogFactory.getLog(KafkaWordSplitter.class);
       private static final Logger LOG = LoggerFactory.getLogger(KafkaWordSplitter.class);
       private static final long serialVersionUID = 1L;
       private OutputCollector collector;


       @Override
       public void prepare(Map stormConf, TopologyContext context, OutputCollector collector) {
           this.collector = collector;
       }

       @Override
       public void execute(Tuple input) {
           String line = input.getString(0);
           LOG.info("RECE[kafka -> splitter] "+line);
           String[] words = line.split("\\s+");
           for(String word : words){
               LOG.info("EMIT[splitter -> counter] "+word);
               collector.emit(input,new Values(word,1));
           }
           collector.ack(input);
       }

       @Override
       public void declareOutputFields(OutputFieldsDeclarer declarer) {
            declarer.declare(new Fields("word","count"));
       }
   }

    public static class WordCounter extends BaseRichBolt {
       // private static final Log LOG = LogFactory.getLog(WordCounter.class);
        private static final Logger LOG = LoggerFactory.getLogger(WordCounter.class);
        private static final long serialVersionUID =1L;
        private OutputCollector collector;
        private Map<String,AtomicInteger> counterMap;

        @Override
        public void prepare(Map stormConf, TopologyContext context, OutputCollector collector) {
            this.collector=collector;
            this.counterMap = new HashMap<String,AtomicInteger>();
        }

        @Override
        public void execute(Tuple input) {
            String word = input.getString(0);
            int count = input.getInteger(1);
            LOG.info("RECE[splitter -> counter] "+word+" : "+count);
            AtomicInteger ai = this.counterMap.get(word);
            if(ai==null){
                ai= new AtomicInteger();
                this.counterMap.put(word,ai);
            }
            ai.addAndGet(count);
            collector.ack(input);
            LOG.info("CHECK statistics map: "+this.counterMap);
        }

        @Override
        public void declareOutputFields(OutputFieldsDeclarer declarer) {
            declarer.declare(new Fields("word","count"));
        }

        @Override
        public void cleanup() {
            LOG.info("The final result:");
            Iterator<Map.Entry<String,AtomicInteger>> iter = this.counterMap.entrySet().iterator();
            while(iter.hasNext()){
                Map.Entry<String,AtomicInteger> entry =iter.next();
                LOG.info(entry.getKey()+"\t:\t"+entry.getValue().get());
            }
        }
    }

    public static void main(String[] args) throws AlreadyAliveException,InvalidTopologyException,InterruptedException{
        String zks = "10.101.214.71:2181,10.101.214.73:2181,10.101.214.74:2181";
        String topic ="my-replicated-topic5";
        String zkRoot ="/kafka" ;
        String id ="word"; // 读取的status会被存在,/zkRoot/id下面,所以id类似consumer group

        BrokerHosts brokerHosts = new ZkHosts(zks,"/kafka/brokers");
        SpoutConfig spoutConf = new SpoutConfig(brokerHosts,topic,zkRoot,id);
        spoutConf.scheme = new SchemeAsMultiScheme(new StringScheme());
        spoutConf.forceFromStart = false;
        spoutConf.zkServers= Arrays.asList(new String[]{"10.101.214.71","10.101.214.73","10.101.214.74"});
        spoutConf.zkPort=2181;

        TopologyBuilder  builder = new TopologyBuilder();
        builder.setSpout("kafka-reader", new KafkaSpout(spoutConf), 5);  Kafka我们创建了一个5分区的Topic,这里并行度设置为5
        builder.setBolt("word-splitter",new KafkaWordSplitter(),2).shuffleGrouping("kafka-reader");
        builder.setBolt("word-counter",new WordCounter() ).fieldsGrouping("word-splitter",new Fields("word"));

        Config config = new Config();
        String name = MyKafkaTopology.class.getSimpleName();
        if(args !=null && args.length>0 ){
          //config.put(Config.NIMBUS_HOST,args[0]);
            config.setNumWorkers(3);
            StormSubmitter.submitTopology(name,config,builder.createTopology());
        }else{
            config.setMaxTaskParallelism(3);
            LocalCluster cluster = new LocalCluster();
            cluster.submitTopology(name,config,builder.createTopology());
            Thread.sleep(60000);
            cluster.shutdown();
        }



    }

}

4 提交运行

使用 mvn将项目打包 

mvn clean install

为了在storm中使用kafka,需要将 依赖jar文件到Storm集群中的lib目录下面

cp /usr/local/kafka/libs/kafka_2.11-0.9.0.0.jar /usr/share/storm/lib/
cp /usr/local/kafka/libs/scala-library-2.11.7.jar /usr/share/storm/lib/
cp /usr/local/kafka/libs/metrics-core-2.2.0.jar /usr/share/storm/lib/
cp /usr/local/kafka/libs/snappy-java-1.1.1.7.jar /usr/share/storm/lib/
cp /usr/local/kafka/libs/zkclient-0.7.jar /usr/share/storm/lib/
cp /usr/local/kafka/libs/log4j-1.2.17.jar /usr/share/storm/lib/
cp /usr/local/kafka/libs/slf4j-api-1.7.6.jar /usr/share/storm/lib/
cp /usr/local/kafka/libs/jopt-simple-3.2.jar /usr/share/storm/lib/

提交

//在 71机器上提交
storm jar StormKafka0.1-1.0-SNAPSHOT.jar ch.main.MyKafkaTopology MyKafkaTopology
//在71机器上打开 kafka启动Producer ,产生日志
/usr/local/kafka_2.11-0.9.0.0/bin/kafka-console-producer.sh --broker-list 10.101.214.71:9092,10.101.214.73:9092,10.101.214.74:9092 --topic my-replicated-topic5

(随便输入一些内容)
//在 73,74机器上查看日志 
cat  /usr/share/storm/logs/mylog.log 
(可以看到 MyKafkaTopology 打出的日志)

下面是我查看mylog.log的部分日志


2016-02-26T12:47:13.238+0800 c.m.MyKafkaTopology$KafkaWordSplitter [INFO] EMIT[splitter -> counter] 123
2016-02-26T12:47:13.238+0800 c.m.MyKafkaTopology$KafkaWordSplitter [INFO] EMIT[splitter -> counter] 123123
2016-02-26T12:47:13.238+0800 c.m.MyKafkaTopology$KafkaWordSplitter [INFO] EMIT[splitter -> counter] 123123
2016-02-26T12:47:13.238+0800 c.m.MyKafkaTopology$KafkaWordSplitter [INFO] EMIT[splitter -> counter] 123123
2016-02-26T12:47:13.238+0800 c.m.MyKafkaTopology$KafkaWordSplitter [INFO] EMIT[splitter -> counter] aa
2016-02-26T12:47:13.238+0800 c.m.MyKafkaTopology$KafkaWordSplitter [INFO] EMIT[splitter -> counter] bbc
2016-02-26T12:47:13.238+0800 c.m.MyKafkaTopology$KafkaWordSplitter [INFO] EMIT[splitter -> counter] ccc
2016-02-26T12:47:13.238+0800 c.m.MyKafkaTopology$KafkaWordSplitter [INFO] EMIT[splitter -> counter] ddd
2016-02-26T12:47:13.239+0800 c.m.MyKafkaTopology$KafkaWordSplitter [INFO] EMIT[splitter -> counter] eeee
2016-02-26T12:47:13.239+0800 c.m.MyKafkaTopology$KafkaWordSplitter [INFO] EMIT[splitter -> counter] ffffff
2016-02-26T12:47:13.239+0800 c.m.MyKafkaTopology$KafkaWordSplitter [INFO] EMIT[splitter -> counter] jsdkfjasnng
2016-02-26T12:47:13.239+0800 c.m.MyKafkaTopology$WordCounter [INFO] RECE[splitter -> counter] 123 : 1
2016-02-26T12:47:13.239+0800 c.m.MyKafkaTopology$WordCounter [INFO] CHECK statistics map: {=2, aa=6, bbc=6, ccc=6, --broker-list=1, ddd=6, eeee=6, my-replicated-topic5=1, asdfasdfasdf=15, 123=7, jsdkfjasnng=6, 123123=18, 10.101.214.71:9092,10.101.214.73:9092,10.101.214.74:9092=1, --topic=1, v=1, /usr/local/kafka_2.11-0.9.0.0/bin/kafka-console-producer.sh=1, ffffff=6}
2016-02-26T12:47:13.239+0800 c.m.MyKafkaTopology$WordCounter [INFO] RECE[splitter -> counter] 123123 : 1
2016-02-26T12:47:13.239+0800 c.m.MyKafkaTopology$WordCounter [INFO] CHECK statistics map: {=2, aa=6, bbc=6, ccc=6, --broker-list=1, ddd=6, eeee=6, my-replicated-topic5=1, asdfasdfasdf=15, 123=7, jsdkfjasnng=6, 123123=19, 10.101.214.71:9092,10.101.214.73:9092,10.101.214.74:9092=1, --topic=1, v=1, /usr/local/kafka_2.11-0.9.0.0/bin/kafka-console-producer.sh=1, ffffff=6}
2016-02-26T12:47:13.239+0800 c.m.MyKafkaTopology$WordCounter [INFO] RECE[splitter -> counter] 123123 : 1
2016-02-26T12:47:13.239+0800 c.m.MyKafkaTopology$WordCounter [INFO] CHECK statistics map: {=2, aa=6, bbc=6, ccc=6, --broker-list=1, ddd=6, eeee=6, my-replicated-topic5=1, asdfasdfasdf=15, 123=7, jsdkfjasnng=6, 123123=20, 10.101.214.71:9092,10.101.214.73:9092,10.101.214.74:9092=1, --topic=1, v=1, /usr/local/kafka_2.11-0.9.0.0/bin/kafka-console-producer.sh=1, ffffff=6}



其他


1 启动storm 发生 line 61:normclasspath = cygpath if sys.platform == 'cygwin' else identity  错误

安装python2.7
修改/usr/bin/storm
将首行显示的 !#/usr/bin/python 修改为 !#/home/tops/bin/python2.7



在集成过程中可能会遇到许多奇怪的问题,一路走来也踩了许多坑,有问题的可以私信或者留言。



Logo

Kafka开源项目指南提供详尽教程,助开发者掌握其架构、配置和使用,实现高效数据流管理和实时处理。它高性能、可扩展,适合日志收集和实时数据处理,通过持久化保障数据安全,是企业大数据生态系统的核心。

更多推荐