• maven配置
<dependency>
    <groupId>org.apache.flink</groupId>
    <artifactId>flink-scala_2.11</artifactId>
    <version>1.7.2</version>
</dependency>

<dependency>
    <groupId>org.apache.flink</groupId>
    <artifactId>flink-streaming-scala_2.11</artifactId>
    <version>1.7.2</version>
</dependency>

<dependency>
    <groupId>org.apache.flink</groupId>
    <artifactId>flink-connector-kafka_2.11</artifactId>
    <!--<artifactId>flink-connector-kafka-0.9_2.11</artifactId>-->
    <version>1.7.2</version>
</dependency>

  • 代码如下
import java.util.Properties

import org.apache.flink.streaming.api.scala.{StreamExecutionEnvironment, _}
import org.apache.flink.streaming.connectors.kafka.FlinkKafkaConsumer
import org.apache.flink.streaming.util.serialization.SimpleStringSchema

object FlinKafka {
  def main(args: Array[String]): Unit = {
    // 获取执行器的环境
    val env: StreamExecutionEnvironment = StreamExecutionEnvironment.getExecutionEnvironment
    env.enableCheckpointing(5000) // checkpoint every 5000 msecs

    //kafak配置
    val properties = new Properties()
    properties.setProperty("bootstrap.servers", "localhost:9092")
    properties.setProperty("zookeeper.connect", "localhost:2181")
    properties.setProperty("group.id", "a")

    //消费Kafka数据
    //Flink’s Kafka consumer is called FlinkKafkaConsumer08 (
    // or 09 for Kafka 0.9.0.x versions, etc.
    // or just FlinkKafkaConsumer for Kafka >= 1.0.0 versions).
    val stream: DataStream[String] = env.addSource(new FlinkKafkaConsumer[String]("t1", new SimpleStringSchema(), properties))
    val stream2 = stream.map(_.split("\\W+")).flatMap(_.toSeq).map((_, 1)).keyBy(0).sum(1)
    stream2.addSink(tup=>{ //sink动作,出发action
      println(tup._1+", count->  ",tup._2)
    })

    //启动程序
    env.execute("test kafka")
  }
}

### 输入测试数据: kafka生产者---------
wang@wang-pc:~$ kafka-console-producer.sh --broker-list localhost:9092 --topi1 t1
>a a a
>b c

### 程序运行结果如下: --------
(a, count->  ,1)
(a, count->  ,2)
(a, count->  ,3)
(c, count->  ,1)
(b, count->  ,1)

Logo

Kafka开源项目指南提供详尽教程,助开发者掌握其架构、配置和使用,实现高效数据流管理和实时处理。它高性能、可扩展,适合日志收集和实时数据处理,通过持久化保障数据安全,是企业大数据生态系统的核心。

更多推荐