Kafka部署到服务器

下载

从官方镜像地址下载到安装目录并解压

wget http://mirrors.tuna.tsinghua.edu.cn/apache/kafka/2.0.0/kafka_2.11-2.0.0.tgz
tar -zxf kafka_2.11-2.0.0.tgz -C /data1/ehserver/env

修改配置文件

修改 config/server.properties 配置文件

修改0:

# 每个 Kafka 实例的唯一 id
broker.id=0

修改1:

listeners=PLAINTEXT://:19092

# Hostname and port the broker will advertise to producers and consumers. If not set, 
# it uses the value for "listeners" if configured.  Otherwise, it will use the value
# returned from java.net.InetAddress.getCanonicalHostName().
# 10.1.120.111 为当前 Kafka 服务器的 ip 地址,这个配置是必须的,否则只有本机的程序可以连上 kafka
advertised.listeners=PLAINTEXT://10.1.120.111:19092

修改2:

# 日志的存储位置,如果所有实例都是部署在同一台服务器上,则每个 Kafka 实例应该使用不同的目录
log.dirs=/data1/ehserver/env/kafka_2.11-2.0.0/logs/kafka-logs-1

mkdir /data1/ehserver/env/kafka_2.11-2.0.0/logs/kafka-logs-1 -pv

修改3:

# zookeeper 的连接信息
zookeeper.connect=localhost:2181

新增:

# 将默认的 delete 改成压缩模式
log.cleanup.policy=compact

启动服务

nohup bin/kafka-server-start.sh config/server.properties >/dev/null 2>&1 &

()因为要启动 3 个副本,所以将 Kafka 目录复制两份后,修改对应的配置,再次启动两个节点)单节点不用执行

添加开机自启动

vim /etc/rc.local
su - everhomes -c "cd /data1/ehserver/env/zookeeper-3.4.13 && bin/zkServer.sh start"
su - everhomes -c "cd /data1/ehserver/env/kafka_2.11-2.0.0 && nohup bin/kafka-server-start.sh config/server.properties >/dev/null 2>&1 &"

初始化数据

删除 topic (新装kafka不用执行)

bin/kafka-topics.sh --delete --zookeeper localhost:2181 --topic system-event

创建 topic,指定 partition 为 100

bin/kafka-topics.sh --create --zookeeper localhost:2181 --replication-factor 1 --partitions 100 --topic system-event

修改服务器配置

# kafka
spring.kafka:
  enabled: true # 如果是自己的环境不需要 Kafka, 则把这个值配置为 false
  bootstrap-servers: localhost:19092 # kafka的ip:port
  properties:
    request.timeout.ms: 20000
  producer:
    retries: 0
    batch-size: 16384
    buffer-memory: 33554432
    key-serializer: org.apache.kafka.common.serialization.StringSerializer
    value-serializer: org.apache.kafka.common.serialization.StringSerializer
  consumer:
    group-id: ${spring.application.name}-group
    auto-offset-reset: earliest
    enable-auto-commit: true
    auto-commit-interval: 100
    key-serializer: org.apache.kafka.common.serialization.StringSerializer
    value-serializer: org.apache.kafka.common.serialization.StringSerializer

Kafka常用命令

检查总共有多少个 group

bin/kafka-consumer-groups.sh --bootstrap-server 10.1.120.56:9092 --list

检查对应的 group 的消费者情况

bin/kafka-consumer-groups.sh --bootstrap-server 10.1.120.56:9092 --describe --group ehcore-group --members --verbose

检查消息堆积情况

bin/kafka-consumer-groups.sh --bootstrap-server 10.1.120.56:9092 --describe --group ehcore-group

启动一个消费者(不会影响程序正常消费)

bin/kafka-console-consumer.sh --bootstrap-server 10.1.120.56:9092 --topic user-logoff --from-beginning
Logo

Kafka开源项目指南提供详尽教程,助开发者掌握其架构、配置和使用,实现高效数据流管理和实时处理。它高性能、可扩展,适合日志收集和实时数据处理,通过持久化保障数据安全,是企业大数据生态系统的核心。

更多推荐