docker依赖于zookeeper,首先安装zookeeper

一、安装zookeeper

1 拉取镜像

在这里插入图片描述

2 创建network

在启动之前,先指定一个网络

docker network create app-tier --driver bridge

3 启动容器

启动zookeeper容器

docker run -d --name zookeeper-server 
--network app-tier 
-p 2181:2181 
-e ALLOW_ANONYMOUS_LOGIN=yes 
bitnami/zookeeper:latest

测试是否成功
进入zookeeper

docker exec -it zookeeper-server /bin/sh

执行代码

zkCli.sh -server 10.249.53.1

二、安装kafka

1 拉取kafka镜像

在这里插入图片描述

2 启动kafka容器

docker run -d --name kafka \
-p 9092:9092 \
-e KAFKA_BROKER_ID=0 \
-e ALLOW_PLAINTEXT_LISTENER=yes \
-e KAFKA_ZOOKEEPER_CONNECT=10.249.53.1:2181 \
-e KAFKA_ADVERTISED_LISTENERS=PLAINTEXT://10.249.53.1:9092 \
-e KAFKA_LISTENERS=PLAINTEXT://0.0.0.0:9092 \
    bitnami/kafka:latest

进入kafka

docker exec -it kafka /bin/sh

3 创建topic

-- 创建topic

./kafka-topics.sh --bootstrap-server 10.249.53.1:9092 --create  --replication-factor 1 --partitions 1 --topic kfk

查看topic
-- 分区topic

./kafka-topics.sh --list --bootstrap-server 10.249.53.1:9092

4 创建生产者

-- 生产者

./kafka-console-producer.sh --broker-list 10.249.53.1:9092 --topic kfk

5 创建消费者

-- 消费者

./kafka-console-consumer.sh --bootstrap-server 10.249.53.1:9092 --topic kfk --from-beginning

三、kafka的java api

1 producer

public class ProducerTest {
    public static void main(String[] args) throws ExecutionException, InterruptedException {

        Properties props = new Properties();

        //指定Kafka集群的IP地址和端口号
        props.put("bootstrap.servers",
                "10.249.53.1:9092");

        //指定等待所有副本节点的应答
        props.put("acks","all");

        //指定消息发送最大尝试次数
        props.put("retries",1);

        //指定一批消息处理大小
        props.put("batch.size",16384);

        //指定请求延时
        props.put("linger.ms",1);

        //指定缓存区内存大小
        props.put("buffer.memory",33554432);

        //设置key序列化
        props.put("key.serializer","org.apache.kafka.common.serialization.StringSerializer");

        //设置value序列化
        props.put("value.serializer","org.apache.kafka.common.serialization.StringSerializer");

        //生产数据
        KafkaProducer<String,String> producer = new KafkaProducer<String, String>(props);

        for (int i =0; i < 50; i++){
            producer.send(new ProducerRecord<String, String>
                    ("kfk",Integer.toString(i),"hello world-" + i)).get();
            try {
                Thread.sleep(500);
            } catch (InterruptedException e) {
                e.printStackTrace();
            }
        }

        producer.close();

    }
}

2 消费者

public class ConsumerTest {
    public static void main(String[] args) {

        Properties props = new Properties();

        props.put("bootstrap.servers", "10.249.53.1:9092");

        props.put("group.id", "kfk1");

        props.put("enable.auto.commit", "true");

        props.put("auto.commit.interval.ms", "1000");

        props.put("auto.offset.reset","earliest");

        props.put("key.deserializer",
                "org.apache.kafka.common.serialization.StringDeserializer");
        props.put("value.deserializer",
                "org.apache.kafka.common.serialization.StringDeserializer");

        KafkaConsumer<String,  String> consumer  =  new
                KafkaConsumer<String,  String>(props);

        // 订阅topic
        consumer.subscribe(Arrays.asList("kfk"));



        // 消费数据
        while (true) {

            ConsumerRecords<String,  String> records  =
                    consumer.poll(100);

            for (ConsumerRecord<String, String> record : records)

                System.out.printf("offset = %d, key = %s, value= %s%n", record.offset(), record.key(), record.value());
        }
    }
}
Logo

Kafka开源项目指南提供详尽教程,助开发者掌握其架构、配置和使用,实现高效数据流管理和实时处理。它高性能、可扩展,适合日志收集和实时数据处理,通过持久化保障数据安全,是企业大数据生态系统的核心。

更多推荐