storm与kafka集成
storm与kafka集成本案例storm使用1.1.1版本kafka使用2.11版本1.创建工程并导入依赖 <dependencies><dependency><groupId>org.apache.storm</groupId><artifactId>storm-kafka-client</artifactId>
·
storm与kafka集成
本案例storm使用1.1.1版本kafka使用2.11版本
1.创建工程并导入依赖
<dependencies>
<dependency>
<groupId>org.apache.storm</groupId>
<artifactId>storm-kafka-client</artifactId>
<version>1.1.1</version>
</dependency>
<dependency>
<groupId>org.apache.kafka</groupId>
<artifactId>kafka-clients</artifactId>
<version>0.10.0.0</version>
</dependency>
<dependency>
<groupId>org.apache.storm</groupId>
<artifactId>storm-core</artifactId>
<version>1.1.1</version>
<!-- 本地测试注释集群运行打开 -->
<!-- <scope>provided</scope>-->
</dependency>
</dependencies>
2.编写主函数程序
进程数和线程数根据需要自行调整,也可以不设置默认为1
package cn.itcast.storm.kafka;
import org.apache.storm.Config;
import org.apache.storm.LocalCluster;
import org.apache.storm.StormSubmitter;
import org.apache.storm.kafka.spout.KafkaSpout;
import org.apache.storm.kafka.spout.KafkaSpoutConfig;
import org.apache.storm.topology.TopologyBuilder;
public class MainTopology {
public static void main(String[] args) throws Exception {
TopologyBuilder builder = new TopologyBuilder();
KafkaSpoutConfig.Builder<String, String> kafkaBuilder = KafkaSpoutConfig.builder("node-1:9092,node-2:9092,node-3:9092", "testcheng");
//设置kafka属于哪个组
kafkaBuilder.setGroupId("testgroup");
//创建kafkaspoutConfig
KafkaSpoutConfig<String, String> build = kafkaBuilder.build();
//通过kafkaspoutConfig获得kafkaspout
KafkaSpout<String, String> kafkaSpout = new KafkaSpout<String,String>(build);
//设置5个线程接收数据
builder.setSpout("kafkaSpout",kafkaSpout,5);
//设置2个线程处理数据
builder.setBolt("printBolt",new PrintBolt(),2).localOrShuffleGrouping("kafkaSpout");
Config config = new Config();
if (args.length>0){
//集群提交模式
config.setDebug(false);
StormSubmitter.submitTopology(args[0],config,builder.createTopology());
} else{
//本地测试模式
config.setDebug(true);
//设置2个进程
config.setNumWorkers(2);
LocalCluster cluster = new LocalCluster();
cluster.submitTopology("kafkaSpout",config,builder.createTopology());
}
}
}
3.编写kafkabolt作为消息处理
package cn.itcast.storm.kafka;
import org.apache.storm.topology.BasicOutputCollector;
import org.apache.storm.topology.OutputFieldsDeclarer;
import org.apache.storm.topology.base.BaseBasicBolt;
import org.apache.storm.tuple.Tuple;
public class PrintBolt extends BaseBasicBolt {
/**
* execute会被storm一直调用
* @param tuple
* @param basicOutputCollector
*/
public void execute(Tuple tuple, BasicOutputCollector basicOutputCollector) {
//为了便于查看消息用err标红
System.err.println(tuple.getValue(4));
System.err.println(tuple.getValues());
}
public void declareOutputFields(OutputFieldsDeclarer outputFieldsDeclarer) {
}
}
4.测试
启动kafka模拟生产者发送消息
bin/kafka-console-producer.sh --broker-list node-1:9092,node-2:9092,n-3:9092 --topic testcheng
喜欢就点赞评论+关注吧
感谢阅读,希望能帮助到大家,谢谢大家的支持!
更多推荐
已为社区贡献6条内容
所有评论(0)