kafka与storm集成
kafka与storm的集成步骤storm采用最新版本1.1.1
·
kafka与storm的集成步骤
采用官方storm-kafka-client方式进行集成
一 引入pom依赖
<properties>
<project.build.sourceEncoding>UTF-8</project.build.sourceEncoding>
<storm.version>1.1.1</storm.version>
<storm.kafka.client.version>1.1.1</storm.kafka.client.version>
<kafka.version>0.10.0.1</kafka.version>
<kafka.client.version>0.10.0.1</kafka.client.version>
<jdk.version>1.8</jdk.version>
</properties>
<dependencies>
<dependency>
<groupId>org.apache.storm</groupId>
<artifactId>storm-core</artifactId>
<version>${storm.version}</version>
<!--<scope>provided</scope>--><!--本地模式需要将<scope>provided</scope> 屏蔽掉-->
</dependency>
<dependency>
<groupId>org.apache.storm</groupId>
<artifactId>storm-kafka-client</artifactId>
<version>${storm.kafka.client.version}</version>
</dependency>
<!--如果需要更改kafka客户端的版本,额可以通过此依赖进行定义-->
<dependency>
<groupId>org.apache.kafka</groupId>
<artifactId>kafka-clients</artifactId>
<version>${kafka.client.version}</version>
</dependency>
</dependencies>
二 从kafka获取数据
通过KafkaSpout进行接入数据,注意新版本配置必须通过kafka:9092端口去获取数据,而不是通过zk去请求数据
2.1 配置
//KafkaSpout在将数据解析的时候 默认封装5个元数据分别为:"topic","partition","offset","key","value":value即为kafka中的数据
builder.setSpout("kafkaSpout", new KafkaSpout<>(CollectorKafkaSpoutConfig.getKafkaSpoutConfig()), 1);
public static KafkaSpoutConfig getKafkaSpoutConfig(){
Properties props = new Properties();
props.put("group.id", "消费组GROUP_ID");
//props.put("auto.offset.reset", "earliest");
return KafkaSpoutConfig
.builder("127.0.0.1:9092,10.0.0.1:9092", "kafka Topic")
.setProp(props)
.setKey(ByteArrayDeserializer.class)//反序列话配置
.setValue(ByteArrayDeserializer.class)//反序列化配置
.build();
}
三 往kafka输出数据
通过kakfaBolt进行操作
3.1 配置
KafkaBolt kafkaBolt = new KafkaBolt()
.withProducerProperties(KafkaBoltConfig.getKafkaBoltConfig())
.withTopicSelector(new DefaultTopicSelector("kafkaTopicName"))//设置kafka主题
.withTupleToKafkaMapper(new FieldNameBasedTupleToKafkaMapper<>("kafkaKeyName", "kafkaValue"));//将数据映射到kafka消息队列key=>value,注意:kafkaValue是bolt中的outputFieldsDeclarer.declare(new Fields("kafkaValue"));定义的值
builder.setBolt("kafkaOutBolt", kafkaBolt).shuffleGrouping("bolt数据");
四 参考
https://github.com/apache/storm/blob/master/docs/storm-kafka-client.md
更多推荐
已为社区贡献1条内容
所有评论(0)