flink消费kafka数据
maven配置<dependency><groupId>org.apache.flink</groupId><artifactId>flink-scala_2.11</artifactId><version>1.7.2</version></dependency>...
·
- maven配置
<dependency>
<groupId>org.apache.flink</groupId>
<artifactId>flink-scala_2.11</artifactId>
<version>1.7.2</version>
</dependency>
<dependency>
<groupId>org.apache.flink</groupId>
<artifactId>flink-streaming-scala_2.11</artifactId>
<version>1.7.2</version>
</dependency>
<dependency>
<groupId>org.apache.flink</groupId>
<artifactId>flink-connector-kafka_2.11</artifactId>
<!--<artifactId>flink-connector-kafka-0.9_2.11</artifactId>-->
<version>1.7.2</version>
</dependency>
- 代码如下
import java.util.Properties
import org.apache.flink.streaming.api.scala.{StreamExecutionEnvironment, _}
import org.apache.flink.streaming.connectors.kafka.FlinkKafkaConsumer
import org.apache.flink.streaming.util.serialization.SimpleStringSchema
object FlinKafka {
def main(args: Array[String]): Unit = {
// 获取执行器的环境
val env: StreamExecutionEnvironment = StreamExecutionEnvironment.getExecutionEnvironment
env.enableCheckpointing(5000) // checkpoint every 5000 msecs
//kafak配置
val properties = new Properties()
properties.setProperty("bootstrap.servers", "localhost:9092")
properties.setProperty("zookeeper.connect", "localhost:2181")
properties.setProperty("group.id", "a")
//消费Kafka数据
//Flink’s Kafka consumer is called FlinkKafkaConsumer08 (
// or 09 for Kafka 0.9.0.x versions, etc.
// or just FlinkKafkaConsumer for Kafka >= 1.0.0 versions).
val stream: DataStream[String] = env.addSource(new FlinkKafkaConsumer[String]("t1", new SimpleStringSchema(), properties))
val stream2 = stream.map(_.split("\\W+")).flatMap(_.toSeq).map((_, 1)).keyBy(0).sum(1)
stream2.addSink(tup=>{ //sink动作,出发action
println(tup._1+", count-> ",tup._2)
})
//启动程序
env.execute("test kafka")
}
}
### 输入测试数据: kafka生产者---------
wang@wang-pc:~$ kafka-console-producer.sh --broker-list localhost:9092 --topi1 t1
>a a a
>b c
### 程序运行结果如下: --------
(a, count-> ,1)
(a, count-> ,2)
(a, count-> ,3)
(c, count-> ,1)
(b, count-> ,1)
更多推荐
已为社区贡献5条内容
所有评论(0)