想要在自己的服务器上容器化部署Kafka,主要需要部署kafka broker和zookeeper两部分。

一、准备工作

保证自己的服务器上已经安装好了docker和kubernetes。

二、编写yaml文件(全部代码)

kafka.yaml

将下面yaml文件中【value: “PLAINTEXT://x.x.x.x:30192”】的【x.x.x.x】修改为你的linux服务器的IP地址!!!!!

---
apiVersion: apps/v1
kind: Deployment
metadata:
  name: kafka
  namespace: kafka-cluster
spec:
  replicas: 1
  selector:
    matchLabels:
      app: kafka
  template:
    metadata:
      labels:
        app: kafka
    spec:
      containers:
        - name: kafka
          image: registry.cn-hangzhou.aliyuncs.com/skywxw/kafka:wurstmeister_kafka_213270
          ports:
            - containerPort: 9092
          env:
            - name: KAFKA_BROKER_ID
              value: "0"
            - name: KAFKA_ZOOKEEPER_CONNECT
              value: "zookeeper.kafka-cluster.svc.cluster.local:2181"
            - name: KAFKA_ADVERTISED_LISTENERS
              value: "PLAINTEXT://x.x.x.x:30192"
            - name: KAFKA_LISTENERS
              value: "PLAINTEXT://0.0.0.0:9092"
            - name: KAFKA_PORT
              value: "9092" 
            - name: KAFKA_LISTENER_SECURITY_PROTOCOL_MAP
              value: "PLAINTEXT:PLAINTEXT"
            - name: KAFKA_INTER_BROKER_LISTENER_NAME
              value: "PLAINTEXT"
            - name: KAFKA_OFFSETS_TOPIC_REPLICATION_FACTOR
              value: "1"
            - name: KAFKA_LOG_RETENTION_HOURS
              value: "168"
          volumeMounts:
            - name: kafka-storage
              mountPath: /kafka-logs
      volumes:
        - name: kafka-storage
          emptyDir: {}
---
apiVersion: v1
kind: Service
metadata:
  name: kafka
  namespace: kafka-cluster
spec:
  type: NodePort
  ports:
    - port: 9092
      targetPort: 9092
      nodePort: 30192
  selector:
    app: kafka

zookeeper.yaml

---
apiVersion: apps/v1
kind: Deployment
metadata:
  name: zookeeper
  namespace: kafka-cluster
spec:
  replicas: 1
  selector:
    matchLabels:
      app: zookeeper
  template:
    metadata:
      labels:
        app: zookeeper
    spec:
      containers:
      - name: zookeeper
        image: registry.cn-hangzhou.aliyuncs.com/skywxw/kafka:wurstmeister_zookeeper
        ports:
        - containerPort: 2181
---
apiVersion: v1
kind: Service
metadata:
  name: zookeeper
  namespace: kafka-cluster
spec:
  ports:
  - port: 2181
    targetPort: 2181
  selector:
    app: zookeeper

注意,这里的两个image使用的是楼主从docker hub中下载后托管在阿里云上的wurstmeister镜像,方面国内用户直接拉取,对应的分别是:

  • wurstmeister/kafka:2.13-2.7.0(sha256:4bad02cf8f07d0bf65d5cc73cce7aa75f9a90e32b585f867fce7c3fff229bd6d)
  • wurstmeister/zookeeper:latest(sha256:3f43f72cb2832e7a5fbed7f7dbcd0c43004357974d8f32555d101bd53e81e74f)

你也可以修改image的配置为你自己想要的镜像版本。

三、启动kafka服务器

1. 移动文件至服务器

将刚刚编写好的两个yaml文件移动到你的服务器中你想要的文件夹下,并通过cd命令进入该文件夹。

2. 创建一个namespace

输入下面的命令,创建一个名为“kafka-cluster”的命名空间。

% kubectl create namespace kafka-cluster

3. 部署yaml文件

输入下面的命令,启动zookeeper和kafka的pod和service.

% kubectl apply -f zookeeper.yaml -n kafka-cluster
% kubectl apply -f kafka.yaml -n kafka-cluster

4.测试

输入下面的命令,应该能够看到已经有两个pod被部署成功,为【RUNNING】状态.

% kubectl get pods -n kafka-cluster

输入下面的命令,应该能够看到已经有两个service被部署成功,同时能看到kafka的9092端口被映射到了30192端口,以方面你的外部应用能够访问该kafka服务器。

% kubectl get services -n kafka-cluster

四、使用kafka服务器

在本地使用python启动一个生产者和一个消费者即可使用刚刚部署好的kafka服务器,其中kafka_server应该是你服务器的IP地址x.x.x.x,端口号为30192。

producer.py

from kafka import KafkaProducer
import time

# Kafka服务器地址和端口
kafka_server = 'x.x.x.x:30192'
topic_name = 'testkafka'

# 创建Kafka生产者
producer = KafkaProducer(bootstrap_servers=kafka_server)

# 要发送的消息
message = b'Run succeed'

while True:
    producer.send(topic_name, message)
    print("data_preprocessed:", message)
    producer.flush()
    time.sleep(3)

producer.close()

consumer.py

from kafka import KafkaConsumer

# Kafka Broker的地址和端口
kafka_broker = 'x.x.x.x:30192'  # 使用上面kubectl输出的NodePort

# Kafka topic名称
topic_name = 'testkafka'  # 替换为你之前发送消息时使用的topic

# 创建Kafka消费者
consumer = KafkaConsumer(topic_name, group_id='tests', bootstrap_servers=[kafka_broker])

# 循环接收消息并处理
for message in consumer:
    print(f"Received message: {message.value.decode('utf-8')}")

# 关闭消费者
consumer.close()

同时运行上面的两个py文件,你可以发现你的消费者已经可以及时收到来自生产者的信息了。

Logo

Kafka开源项目指南提供详尽教程,助开发者掌握其架构、配置和使用,实现高效数据流管理和实时处理。它高性能、可扩展,适合日志收集和实时数据处理,通过持久化保障数据安全,是企业大数据生态系统的核心。

更多推荐