前言

docker已经安装
(如果没安装,请点此链接docker安装和基本原理

温馨提示:仅限于开发和测试哈。

Docker安装单机版zookeeper和kafka

1.从docker仓库拉取kafka和zookeeper镜像

docker pull wurstmeister/kafka:2.11-0.11.0.3
docker pull wurstmeister/zookeeper

注意:我这里控制了kafka的版本,没有控制zookeeper的版本(latest最新的版本),这里就要看你需要的版本

2.创建容器并后台运行zookeeper

docker run -d --name zookeeper -p 2181:2181 -t wurstmeister/zookeeper

3.创建kakfa容器并后台运行kafka,记得修改自己服务器或者虚拟机对应的ip

docker run --name kafka01 \
-p 9092:9092 \
-e KAFKA_BROKER_ID=0 \
-e KAFKA_ZOOKEEPER_CONNECT=192.168.16.4:2181 \
-e KAFKA_ADVERTISED_LISTENERS=PLAINTEXT://192.168.16.4:9092 \	
-e KAFKA_LISTENERS=PLAINTEXT://0.0.0.0:9092 \
-d  wurstmeister/kafka:2.11-0.11.0.3

注意:KAFKA_ADVERTISED_LISTENERS如果你的kafka服务对外提供访问的话(非内网即非同一网段),那就换成公网的ip

4.进入kafka01容器

docker exec -it kafka01 /bin/bash

在这里插入图片描述

5.创建topic,副本数量1个,分区数量1个

/opt/kafka/bin/kafka-topics.sh --create --zookeeper 192.168.16.4:2181 --replication-factor 1 --partitions 1 --topic metric

提示:Created topic “metric”. 表示创建成功。

6.查看topic的list

/opt/kafka/bin/kafka-topics.sh --list --zookeeper 192.168.16.4:2181

在这里插入图片描述

7.生产者查看

/opt/kafka/bin/kafka-console-producer.sh --broker-list 192.168.16.4:9092 --topic metric 

在这里插入图片描述
复制一条数据导入到kafka里面

8.消费者查看

/opt/kafka/bin/kafka-console-consumer.sh --bootstrap-server 192.168.16.4:9092 --topic metric --from-beginning

在这里插入图片描述
因为我之前往里面传过数据,所以这里显示了多条,上图仅供参考哈。

在IDEA里面编写Flink程序

pom文件里面引入依赖

		<properties>
        	<flink.version>1.9.0</flink.version>
        	<scala.binary.version>2.11</scala.binary.version>
    	</properties>
    
		<dependency>
            <groupId>org.apache.flink</groupId>
            <artifactId>flink-connector-kafka-0.11_${scala.binary.version}</artifactId>
            <version>${flink.version}</version>
        </dependency>

Flink代码

package com.happy.connectors;

import org.apache.flink.api.common.serialization.SimpleStringSchema;
import org.apache.flink.streaming.api.datastream.DataStreamSource;
import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
import org.apache.flink.streaming.connectors.kafka.FlinkKafkaConsumer011;

import java.util.Properties;

/**
 * @author happy
 * @create 2020-07-08 22:21
 *
 */
public class KafkaProducer {
    public static void main(String[] args) throws Exception {
        if (args.length!=1){
            System.err.println("请传入有效信息,如192.168.2.112:9092");
            return;
        }

        final StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();

        //Properties参数定义
        Properties props = new Properties();
        props.put("bootstrap.servers",args[0]);
        props.put("group.id","metric-group");
        props.put("key.deserializer","org.apache.kafka.common.serialization.StringDeserializer");//key反序列化
        props.put("value.deserializer","org.apache.kafka.common.serialization.StringDeserializer");//value反序列化

        props.put("auto.offset.reset","latest");//偏移量最新earliest

        DataStreamSource<String> metricStreamSource = env.addSource(new FlinkKafkaConsumer011<>(
                "metric",   //kafka topic
                new SimpleStringSchema(),//String 序列化
                props
        )).setParallelism(1);

        metricStreamSource.print(); //把从kafka读取到的数据打印并输出

        env.execute("Flink DataSource");
    }
}

控制台显示如下,那么接下来就等下消费数据吧,此时你可以往producer里面丢数据,具体命令上面已经给出。

[Kafka 0.10 Fetcher for Source: Custom Source (1/1)] INFO org.apache.kafka.common.utils.AppInfoParser - Kafka version : 0.11.0.2
[Kafka 0.10 Fetcher for Source: Custom Source (1/1)] INFO org.apache.kafka.common.utils.AppInfoParser - Kafka commitId : 73be1e1168f91ee2
[Kafka 0.10 Fetcher for Source: Custom Source (1/1)] INFO org.apache.kafka.clients.consumer.internals.AbstractCoordinator - Discovered coordinator 106.12.2.73:9092 (id: 2147483647 rack: null) for group metric-group.
本程序完整代码:github地址 
https://github.com/DeveloperZJQ/learning-flink/tree/master/flink-learning-connectors/src/main/java/com/happy/connectors```

Logo

Kafka开源项目指南提供详尽教程,助开发者掌握其架构、配置和使用,实现高效数据流管理和实时处理。它高性能、可扩展,适合日志收集和实时数据处理,通过持久化保障数据安全,是企业大数据生态系统的核心。

更多推荐