Storm+Kafka集成
前面介绍了 Storm环境的搭建 和 Kafka环境的搭建。分别是下面两篇文章:http://blog.csdn.net/ch717828/article/details/50748872http://blog.csdn.net/ch717828/article/details/50718783这篇介绍了如何将 Storm 和 Kafka进行整合,并且运行了一个代码实例
·
前面介绍了 Storm环境的搭建 和 Kafka环境的搭建。分别是下面两篇文章:
http://blog.csdn.net/ch717828/article/details/50748872
http://blog.csdn.net/ch717828/article/details/50718783
这篇介绍了如何将 Storm 和 Kafka进行整合,并且运行了一个代码实例
1. 机器&环境 准备
我准备了3台机器 ,分别是
10.101.214.71
10.101.214.73
10.101.214.74
且这三台机器均安装了 kafka和storm。详细参考上面两篇文章。注意,之前的文章我安装的storm版本为0.9.1 ,该版本中缺少许多与kafka集成需要的包,因此,升级为0.9.2 。
2.Storm自定义日志
为了清晰得打印出Storm处理 Kafka发送来的消息,此处自定义了一个日志。
// 在73,74机器上 修改 /usr/share/storm/logback/cluster.xml
<appender name="mylog" class="ch.qos.logback.core.rolling.RollingFileAppender">
<file>${storm.home}/logs/mylog.log</file><!-- log文件输出path -->
<rollingPolicy class="ch.qos.logback.core.rolling.FixedWindowRollingPolicy">
<fileNamePattern>${storm.home}/logs/mylog.log.%i</fileNamePattern><!-- 保留多个文件的文件命名格式 -->
<minIndex>1</minIndex>
<maxIndex>20</maxIndex><!-- 这两行可以共同配置保留多少个文件 -->
</rollingPolicy>
<triggeringPolicy class="ch.qos.logback.core.rolling.SizeBasedTriggeringPolicy">
<maxFileSize>100MB</maxFileSize><!-- log文件的最大大小 -->
</triggeringPolicy>
<encoder>
<pattern>%d{yyyy-MM-dd'T'HH:mm:ss.SSSZZ} %c{1} [%p] %m%n</pattern> <!-- 输出的日志信息的格式 -->
</encoder>
</appender>
<logger name="ch.main.MyKafkaTopology" additivity="false" >
<!-- name 可以配置哪些包下的日志信息要输出,也可以精准到一个类 -->
<level value="INFO"/><!-- 要输出的日志信息的级别,我要输出业务日志,则配置为INFO -->
<appender-ref ref="mylog"/><!-- 上面的appender的name -->
</logger>
配置好后, ch.main.MyKafkaTopology打印出的INFO日志,均会存在 /usr/share/storm/logs/mylog.log 文件下
3.代码编写
pom.xml
<dependencies>
<dependency>
<groupId>org.apache.storm</groupId>
<artifactId>storm-core</artifactId>
<version>0.9.2-incubating</version>
</dependency>
<dependency>
<groupId>org.apache.storm</groupId>
<artifactId>storm-kafka</artifactId>
<version>0.9.2-incubating</version>
</dependency>
<dependency>
<groupId>org.apache.kafka</groupId>
<artifactId>kafka_2.11</artifactId>
<version>0.9.0.0</version>
<exclusions>
<exclusion>
<groupId>org.apache.zookeeper</groupId>
<artifactId>zookeeper</artifactId>
</exclusion>
<exclusion>
<groupId>log4j</groupId>
<artifactId>log4j</artifactId>
</exclusion>
<exclusion>
<groupId>org.slf4j</groupId>
<artifactId>slf4j-log4j12</artifactId>
</exclusion>
</exclusions>
</dependency>
</dependencies>
java 代码
package ch.main;
import backtype.storm.Config;
import backtype.storm.LocalCluster;
import backtype.storm.StormSubmitter;
import backtype.storm.generated.AlreadyAliveException;
import backtype.storm.generated.InvalidTopologyException;
import backtype.storm.spout.SchemeAsMultiScheme;
import backtype.storm.task.OutputCollector;
import backtype.storm.task.TopologyContext;
import backtype.storm.topology.OutputFieldsDeclarer;
import backtype.storm.topology.TopologyBuilder;
import backtype.storm.topology.base.BaseRichBolt;
import backtype.storm.tuple.Fields;
import backtype.storm.tuple.Tuple;
import backtype.storm.tuple.Values;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import storm.kafka.*;
import java.util.Arrays;
import java.util.HashMap;
import java.util.Iterator;
import java.util.Map;
import java.util.concurrent.atomic.AtomicInteger;
/**
* Created by chenhong on 16/2/24.
*/
public class MyKafkaTopology {
public static class KafkaWordSplitter extends BaseRichBolt{
// private static final Log LOG = LogFactory.getLog(KafkaWordSplitter.class);
private static final Logger LOG = LoggerFactory.getLogger(KafkaWordSplitter.class);
private static final long serialVersionUID = 1L;
private OutputCollector collector;
@Override
public void prepare(Map stormConf, TopologyContext context, OutputCollector collector) {
this.collector = collector;
}
@Override
public void execute(Tuple input) {
String line = input.getString(0);
LOG.info("RECE[kafka -> splitter] "+line);
String[] words = line.split("\\s+");
for(String word : words){
LOG.info("EMIT[splitter -> counter] "+word);
collector.emit(input,new Values(word,1));
}
collector.ack(input);
}
@Override
public void declareOutputFields(OutputFieldsDeclarer declarer) {
declarer.declare(new Fields("word","count"));
}
}
public static class WordCounter extends BaseRichBolt {
// private static final Log LOG = LogFactory.getLog(WordCounter.class);
private static final Logger LOG = LoggerFactory.getLogger(WordCounter.class);
private static final long serialVersionUID =1L;
private OutputCollector collector;
private Map<String,AtomicInteger> counterMap;
@Override
public void prepare(Map stormConf, TopologyContext context, OutputCollector collector) {
this.collector=collector;
this.counterMap = new HashMap<String,AtomicInteger>();
}
@Override
public void execute(Tuple input) {
String word = input.getString(0);
int count = input.getInteger(1);
LOG.info("RECE[splitter -> counter] "+word+" : "+count);
AtomicInteger ai = this.counterMap.get(word);
if(ai==null){
ai= new AtomicInteger();
this.counterMap.put(word,ai);
}
ai.addAndGet(count);
collector.ack(input);
LOG.info("CHECK statistics map: "+this.counterMap);
}
@Override
public void declareOutputFields(OutputFieldsDeclarer declarer) {
declarer.declare(new Fields("word","count"));
}
@Override
public void cleanup() {
LOG.info("The final result:");
Iterator<Map.Entry<String,AtomicInteger>> iter = this.counterMap.entrySet().iterator();
while(iter.hasNext()){
Map.Entry<String,AtomicInteger> entry =iter.next();
LOG.info(entry.getKey()+"\t:\t"+entry.getValue().get());
}
}
}
public static void main(String[] args) throws AlreadyAliveException,InvalidTopologyException,InterruptedException{
String zks = "10.101.214.71:2181,10.101.214.73:2181,10.101.214.74:2181";
String topic ="my-replicated-topic5";
String zkRoot ="/kafka" ;
String id ="word"; // 读取的status会被存在,/zkRoot/id下面,所以id类似consumer group
BrokerHosts brokerHosts = new ZkHosts(zks,"/kafka/brokers");
SpoutConfig spoutConf = new SpoutConfig(brokerHosts,topic,zkRoot,id);
spoutConf.scheme = new SchemeAsMultiScheme(new StringScheme());
spoutConf.forceFromStart = false;
spoutConf.zkServers= Arrays.asList(new String[]{"10.101.214.71","10.101.214.73","10.101.214.74"});
spoutConf.zkPort=2181;
TopologyBuilder builder = new TopologyBuilder();
builder.setSpout("kafka-reader", new KafkaSpout(spoutConf), 5); Kafka我们创建了一个5分区的Topic,这里并行度设置为5
builder.setBolt("word-splitter",new KafkaWordSplitter(),2).shuffleGrouping("kafka-reader");
builder.setBolt("word-counter",new WordCounter() ).fieldsGrouping("word-splitter",new Fields("word"));
Config config = new Config();
String name = MyKafkaTopology.class.getSimpleName();
if(args !=null && args.length>0 ){
//config.put(Config.NIMBUS_HOST,args[0]);
config.setNumWorkers(3);
StormSubmitter.submitTopology(name,config,builder.createTopology());
}else{
config.setMaxTaskParallelism(3);
LocalCluster cluster = new LocalCluster();
cluster.submitTopology(name,config,builder.createTopology());
Thread.sleep(60000);
cluster.shutdown();
}
}
}
4 提交运行
使用 mvn将项目打包
mvn clean install
为了在storm中使用kafka,需要将 依赖jar文件到Storm集群中的lib目录下面
cp /usr/local/kafka/libs/kafka_2.11-0.9.0.0.jar /usr/share/storm/lib/
cp /usr/local/kafka/libs/scala-library-2.11.7.jar /usr/share/storm/lib/
cp /usr/local/kafka/libs/metrics-core-2.2.0.jar /usr/share/storm/lib/
cp /usr/local/kafka/libs/snappy-java-1.1.1.7.jar /usr/share/storm/lib/
cp /usr/local/kafka/libs/zkclient-0.7.jar /usr/share/storm/lib/
cp /usr/local/kafka/libs/log4j-1.2.17.jar /usr/share/storm/lib/
cp /usr/local/kafka/libs/slf4j-api-1.7.6.jar /usr/share/storm/lib/
cp /usr/local/kafka/libs/jopt-simple-3.2.jar /usr/share/storm/lib/
提交
//在 71机器上提交
storm jar StormKafka0.1-1.0-SNAPSHOT.jar ch.main.MyKafkaTopology MyKafkaTopology
//在71机器上打开 kafka启动Producer ,产生日志
/usr/local/kafka_2.11-0.9.0.0/bin/kafka-console-producer.sh --broker-list 10.101.214.71:9092,10.101.214.73:9092,10.101.214.74:9092 --topic my-replicated-topic5
(随便输入一些内容)
//在 73,74机器上查看日志
cat /usr/share/storm/logs/mylog.log
(可以看到 MyKafkaTopology 打出的日志)
下面是我查看mylog.log的部分日志
2016-02-26T12:47:13.238+0800 c.m.MyKafkaTopology$KafkaWordSplitter [INFO] EMIT[splitter -> counter] 123
2016-02-26T12:47:13.238+0800 c.m.MyKafkaTopology$KafkaWordSplitter [INFO] EMIT[splitter -> counter] 123123
2016-02-26T12:47:13.238+0800 c.m.MyKafkaTopology$KafkaWordSplitter [INFO] EMIT[splitter -> counter] 123123
2016-02-26T12:47:13.238+0800 c.m.MyKafkaTopology$KafkaWordSplitter [INFO] EMIT[splitter -> counter] 123123
2016-02-26T12:47:13.238+0800 c.m.MyKafkaTopology$KafkaWordSplitter [INFO] EMIT[splitter -> counter] aa
2016-02-26T12:47:13.238+0800 c.m.MyKafkaTopology$KafkaWordSplitter [INFO] EMIT[splitter -> counter] bbc
2016-02-26T12:47:13.238+0800 c.m.MyKafkaTopology$KafkaWordSplitter [INFO] EMIT[splitter -> counter] ccc
2016-02-26T12:47:13.238+0800 c.m.MyKafkaTopology$KafkaWordSplitter [INFO] EMIT[splitter -> counter] ddd
2016-02-26T12:47:13.239+0800 c.m.MyKafkaTopology$KafkaWordSplitter [INFO] EMIT[splitter -> counter] eeee
2016-02-26T12:47:13.239+0800 c.m.MyKafkaTopology$KafkaWordSplitter [INFO] EMIT[splitter -> counter] ffffff
2016-02-26T12:47:13.239+0800 c.m.MyKafkaTopology$KafkaWordSplitter [INFO] EMIT[splitter -> counter] jsdkfjasnng
2016-02-26T12:47:13.239+0800 c.m.MyKafkaTopology$WordCounter [INFO] RECE[splitter -> counter] 123 : 1
2016-02-26T12:47:13.239+0800 c.m.MyKafkaTopology$WordCounter [INFO] CHECK statistics map: {=2, aa=6, bbc=6, ccc=6, --broker-list=1, ddd=6, eeee=6, my-replicated-topic5=1, asdfasdfasdf=15, 123=7, jsdkfjasnng=6, 123123=18, 10.101.214.71:9092,10.101.214.73:9092,10.101.214.74:9092=1, --topic=1, v=1, /usr/local/kafka_2.11-0.9.0.0/bin/kafka-console-producer.sh=1, ffffff=6}
2016-02-26T12:47:13.239+0800 c.m.MyKafkaTopology$WordCounter [INFO] RECE[splitter -> counter] 123123 : 1
2016-02-26T12:47:13.239+0800 c.m.MyKafkaTopology$WordCounter [INFO] CHECK statistics map: {=2, aa=6, bbc=6, ccc=6, --broker-list=1, ddd=6, eeee=6, my-replicated-topic5=1, asdfasdfasdf=15, 123=7, jsdkfjasnng=6, 123123=19, 10.101.214.71:9092,10.101.214.73:9092,10.101.214.74:9092=1, --topic=1, v=1, /usr/local/kafka_2.11-0.9.0.0/bin/kafka-console-producer.sh=1, ffffff=6}
2016-02-26T12:47:13.239+0800 c.m.MyKafkaTopology$WordCounter [INFO] RECE[splitter -> counter] 123123 : 1
2016-02-26T12:47:13.239+0800 c.m.MyKafkaTopology$WordCounter [INFO] CHECK statistics map: {=2, aa=6, bbc=6, ccc=6, --broker-list=1, ddd=6, eeee=6, my-replicated-topic5=1, asdfasdfasdf=15, 123=7, jsdkfjasnng=6, 123123=20, 10.101.214.71:9092,10.101.214.73:9092,10.101.214.74:9092=1, --topic=1, v=1, /usr/local/kafka_2.11-0.9.0.0/bin/kafka-console-producer.sh=1, ffffff=6}
其他
1 启动storm 发生 line 61:normclasspath = cygpath if sys.platform == 'cygwin' else identity 错误
安装python2.7
修改/usr/bin/storm
将首行显示的 !#/usr/bin/python 修改为 !#/home/tops/bin/python2.7
更多推荐
已为社区贡献2条内容
所有评论(0)