介绍

在阿里云服务器启动测试用的 Kafka 应用。由于资源有限,使用单机模式。

1.下载镜像

  • zookeeper
docker pull zookeeper
  • kafka
docker pull wurstmeister/kafka

2.启动 zookeeper

  • 单机方式
docker run -d --name docker_zookeeper -p 2181:2181 -t zookeeper

3.启动 Kafka

docker run -d --name docker_kafka \
-p 9092:9092 \
--add-host=${hostname}:${服务器IP}\
-e KAFKA_BROKER_ID=0 \
-e KAFKA_ZOOKEEPER_CONNECT=${hostname}:2181 \
-e KAFKA_ADVERTISED_LISTENERS=PLAINTEXT://${hostname}:9092 \
-e KAFKA_JVM_PERFORMANCE_OPTS="-Xmx256m -Xms256m" \
-e KAFKA_LISTENERS=PLAINTEXT://0.0.0.0:9092 wurstmeister/kafka

说明:
由于阿里云服务器内外网ip不一致,因此为了外网能够正常访问 Kafka,让内外网使用一致的 hostname 访问:

  • --add-host:添加容器 hosts 映射
    • hostname 是云服务器的 hostname;
    • 服务器 ip 为 阿里云内网。

之后,需要在外网机器的 hosts 中添加:

# 阿里云服务器地址映射
${服务器外网IP} ${服务器hostname}

此外,Kafka 的内存默认占用 1G,云服务器资源有限,因此指定 Kafka 的 jvm 参数

  • -e KAFKA_JVM_PERFORMANCE_OPTS

4.命令行验证

  • 进入容器
# 进入容器
docker exec -it ${CONTAINER ID} /bin/bash
cd opt/kafka
  • 创建一个topic
bin/kafka-topics.sh --create --zookeeper ${hostname}:2181 --replication-factor 1 --partitions 1 --topic mykafka
  • 运行一个生产者并发送消息
bin/kafka-console-producer.sh --bootstrap-server ${hostname}:9092 --topic mykafka

例如:

bash-5.1# bin/kafka-console-producer.sh --bootstrap-server zhy:9092 --topic mykafka
>zhy
  • 运行一个消费者并接受消息
bin/kafka-console-consumer.sh --bootstrap-server ${hostname}:9092 --topic mykafka --from-beginning

例如:

bash-5.1# bin/kafka-console-consumer.sh --bootstrap-server zhy:9092 --topic mykafka --from-beginning
zhy

5.外网Java API 验证

  • 启动一个消费者:
import org.apache.kafka.clients.consumer.ConsumerConfig;
import org.apache.kafka.clients.consumer.ConsumerRecord;
import org.apache.kafka.clients.consumer.ConsumerRecords;
import org.apache.kafka.clients.consumer.KafkaConsumer;
import org.apache.kafka.common.serialization.StringDeserializer;

import java.time.Duration;
import java.util.ArrayList;
import java.util.Properties;
import java.util.stream.Collectors;
import java.util.stream.Stream;

public class CustomConsumer {

    public static void main(String[] args) {
        Properties properties = new Properties();
        properties.put(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG, "zhy:9092"); // 此处也可以使用 ip
        properties.put(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class.getName());
        properties.put(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class.getName());
        properties.put(ConsumerConfig.GROUP_ID_CONFIG, "test");

        KafkaConsumer<String, String> kafkaConsumer = new KafkaConsumer<>(properties);

        // 订阅主题 mykafka
        kafkaConsumer.subscribe(Stream.of("mykafka").collect(Collectors.toList()));

        // 3 消费数据
        while (true) {
            ConsumerRecords<String, String> consumerRecords = kafkaConsumer.poll(Duration.ofSeconds(1));
            for (ConsumerRecord<String, String> consumerRecord : consumerRecords) {
                System.out.println(consumerRecord);
            }
            kafkaConsumer.commitAsync();
        }
    }
}
  • 生产者(如果没有上面为 Kafka 容器做hostname映射,并且使用hostname,此时外网始终无法发送数据)
import org.apache.kafka.clients.producer.KafkaProducer;
import org.apache.kafka.clients.producer.ProducerConfig;
import org.apache.kafka.clients.producer.ProducerRecord;
import org.apache.kafka.common.serialization.StringSerializer;

import java.util.Properties;

public class CustomProducer {

    public static void main(String[] args) {
        Properties properties = new Properties();
        properties.put(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG, "zhy:9092");
        properties.put(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG, StringSerializer.class.getName());
        properties.put(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG, StringSerializer.class.getName());
        KafkaProducer<String, String> producer = new KafkaProducer<>(properties);
        
        for (int i = 0; i < 5; i++) {
            producer.send(new ProducerRecord<>("mykafka", "CustomProducerTest@" + i));
        }
        producer.close();
    }
}

此时,之前使用 JavaAPI 启动的 消费者输出如下:

ConsumerRecord(topic = mykafka, partition = 0, leaderEpoch = 0, offset = 1, CreateTime = 1649684611004, serialized key size = -1, serialized value size = 20, headers = RecordHeaders(headers = [], isReadOnly = false), key = null, value = CustomProducerTest@0)
ConsumerRecord(topic = mykafka, partition = 0, leaderEpoch = 0, offset = 2, CreateTime = 1649684611036, serialized key size = -1, serialized value size = 20, headers = RecordHeaders(headers = [], isReadOnly = false), key = null, value = CustomProducerTest@1)
ConsumerRecord(topic = mykafka, partition = 0, leaderEpoch = 0, offset = 3, CreateTime = 1649684611036, serialized key size = -1, serialized value size = 20, headers = RecordHeaders(headers = [], isReadOnly = false), key = null, value = CustomProducerTest@2)
ConsumerRecord(topic = mykafka, partition = 0, leaderEpoch = 0, offset = 4, CreateTime = 1649684611036, serialized key size = -1, serialized value size = 20, headers = RecordHeaders(headers = [], isReadOnly = false), key = null, value = CustomProducerTest@3)
ConsumerRecord(topic = mykafka, partition = 0, leaderEpoch = 0, offset = 5, CreateTime = 1649684611036, serialized key size = -1, serialized value size = 20, headers = RecordHeaders(headers = [], isReadOnly = false), key = null, value = CustomProducerTest@4)

同时,可以在之前打开的消费者窗口看见内容:

bash-5.1# bin/kafka-console-consumer.sh --bootstrap-server zhy:9092 --topic mykafka --from-beginning
zhy
CustomProducerTest@0
CustomProducerTest@1
CustomProducerTest@2
CustomProducerTest@3
CustomProducerTest@4
Logo

Kafka开源项目指南提供详尽教程,助开发者掌握其架构、配置和使用,实现高效数据流管理和实时处理。它高性能、可扩展,适合日志收集和实时数据处理,通过持久化保障数据安全,是企业大数据生态系统的核心。

更多推荐