kafka与storm的集成步骤

采用官方storm-kafka-client方式进行集成
一 引入pom依赖
  <properties>
        <project.build.sourceEncoding>UTF-8</project.build.sourceEncoding>
        <storm.version>1.1.1</storm.version>
        <storm.kafka.client.version>1.1.1</storm.kafka.client.version>
        <kafka.version>0.10.0.1</kafka.version>
        <kafka.client.version>0.10.0.1</kafka.client.version>
        <jdk.version>1.8</jdk.version>
 </properties>

 <dependencies>

        <dependency>
            <groupId>org.apache.storm</groupId>
            <artifactId>storm-core</artifactId>
            <version>${storm.version}</version>
            <!--<scope>provided</scope>--><!--本地模式需要将<scope>provided</scope> 屏蔽掉-->
        </dependency>

        <dependency>
            <groupId>org.apache.storm</groupId>
            <artifactId>storm-kafka-client</artifactId>
            <version>${storm.kafka.client.version}</version>
        </dependency>

        <!--如果需要更改kafka客户端的版本,额可以通过此依赖进行定义-->
        <dependency>
            <groupId>org.apache.kafka</groupId>
            <artifactId>kafka-clients</artifactId>
            <version>${kafka.client.version}</version>
        </dependency>

    </dependencies>
二 从kafka获取数据
通过KafkaSpout进行接入数据,注意新版本配置必须通过kafka:9092端口去获取数据,而不是通过zk去请求数据
2.1 配置
 //KafkaSpout在将数据解析的时候  默认封装5个元数据分别为:"topic","partition","offset","key","value":value即为kafka中的数据
builder.setSpout("kafkaSpout", new KafkaSpout<>(CollectorKafkaSpoutConfig.getKafkaSpoutConfig()), 1);

public static KafkaSpoutConfig getKafkaSpoutConfig(){

        Properties props = new Properties();

        props.put("group.id", "消费组GROUP_ID");
        //props.put("auto.offset.reset", "earliest");

        return KafkaSpoutConfig
                .builder("127.0.0.1:9092,10.0.0.1:9092", "kafka Topic")
                .setProp(props)
                .setKey(ByteArrayDeserializer.class)//反序列话配置
                .setValue(ByteArrayDeserializer.class)//反序列化配置
                .build();

    }
三 往kafka输出数据
通过kakfaBolt进行操作
3.1 配置
 KafkaBolt kafkaBolt = new KafkaBolt()
                .withProducerProperties(KafkaBoltConfig.getKafkaBoltConfig())
                .withTopicSelector(new DefaultTopicSelector("kafkaTopicName"))//设置kafka主题
                .withTupleToKafkaMapper(new FieldNameBasedTupleToKafkaMapper<>("kafkaKeyName", "kafkaValue"));//将数据映射到kafka消息队列key=>value,注意:kafkaValue是bolt中的outputFieldsDeclarer.declare(new Fields("kafkaValue"));定义的值
builder.setBolt("kafkaOutBolt", kafkaBolt).shuffleGrouping("bolt数据");
四 参考

https://github.com/apache/storm/blob/master/docs/storm-kafka-client.md

Logo

Kafka开源项目指南提供详尽教程,助开发者掌握其架构、配置和使用,实现高效数据流管理和实时处理。它高性能、可扩展,适合日志收集和实时数据处理,通过持久化保障数据安全,是企业大数据生态系统的核心。

更多推荐