00 前言

上回书,我们提到直接使用 Filebeat 在节点上采集日志数据然后直接传输给 Elasticsearch 存储,在日志规模较大时,可能导致 Elasticsearch 存储集群容量不足,或者出现性能瓶颈的问题。

针对这个问题我们提出了👉 使用 Redis 缓存优化 ELK 日志收集 👈

但是 ELK 使用 Redis 作为缓存的一个缺陷在于:Filebeat 采集的日志数据只能传输给 Redis 的单节点,不能传输给 Redis 集群或者 Redis 哨兵(在新版本中应该会有所改进)

针对这个问题我们提出了👉 ELK 日志收集中使用 Nginx 代理优化 Redis 缓存 👈

又是 Nginx 又是 Redis 的整起来确实有点晕, 那么有没有更加快捷的方法呢?

这就揭晓答案: 🎯 使用 Kafka 缓存优化 ELK 日志收集 🎯

01 介绍个朋友: Kafka

1.1 什么是 kafka

kafka 是一种被广泛使用的消息队列中间件,你问我什么是消息队列❓

首先, 我相信你知道操作系统中经典的生产者-消费者问题 👀: 问题描述了共享固定大小缓冲区的两个线程即所谓的“生产者”和“消费者”。生产者的主要作用是生成一定量的数据放到缓冲区中,然后重复此过程。与此同时,消费者也在缓冲区消耗这些数据。

你可以将消息队列视为这个生产者-消费者问题中的缓冲区

在这里插入图片描述

举个更加通俗的例子,🐔 吃鸡蛋问题 🥚: 生产者生产鸡蛋,消费者消费鸡蛋,生产者生产一个鸡蛋,消费者就消费一个鸡蛋,在吃鸡蛋的过程中可能出现下面两种情况

  • 假设消费者消费鸡蛋的时候噎住了,生产者还在生产鸡蛋,那新生产的鸡蛋就浪费掉了; 这就是系统宕机的情况, 没有缓存消息。
  • 再比如生产者很强劲,生产者1秒钟生产 5 个鸡蛋,消费者1秒钟只能吃 2 个鸡蛋,这就出现消费者就吃不消的情况,消费者拒绝再吃了,新生成的鸡蛋又浪费了 ;这就是消息量过大的情况下消息堵塞,最终导致系统超时。

怎么解决这种浪费粮食的情况呢? 那就是 🎯 加个盘子 🎯

这个时候我们放个盘子在它们中间,生产出来的鸡蛋都放到盘子里,消费者去盘子里拿鸡蛋,这样鸡蛋就不会因为没法吃完而浪费了,而在实际系统中这个盘子就是 kafka 。

鸡蛋其实就是数据流,系统之间的交互都是通过数据流来传输的,也称为报文或者消息。

消息队列满了,其实就是盘子满了,鸡蛋放不下了,那赶紧多放几个盘子,其实就是 kafka 的扩容。

1.2 kafka 名词解释

再来介绍一些关于kafka的基础名词,比如 topic、producer、consumer、broker:

  • producer:生产者,就是它来生产鸡蛋的。
  • consumer:消费者,生出的鸡蛋它来消费。
  • topic:你把它理解为标签,生产者每生产出来一个鸡蛋就贴上一个标签(topic),消费者可不是谁生产的鸡蛋都吃的,这样不同的生产者生产出来的鸡蛋,消费者就可以选择性的吃了。
  • broker:就是盘子了。

02 Kafka 安装部署

了解完概念,是时候大展伸手了,从安装开始吧 🐎🐎🐎

2.1 安装环境准备

使用三个节点部署 kafka 集群,为三个节点配置 hosts,并使它们之间能够 ping 通

如果你没有那么多机器,可以和我一样使用虚拟机搭建一个伪分布式环境

root@master:/home/wang$ vim /etc/hosts
root@master:/home/wang$ cat /etc/hosts
127.0.0.1 localhost
127.0.1.1 master-vm

# The following lines are desirable for IPv6 capable hosts
::1     ip6-localhost ip6-loopback
fe00::0 ip6-localnet
ff00::0 ip6-mcastprefix
ff02::1 ip6-allnodes
ff02::2 ip6-allrouters

# the cluster's three hosts # 配置节点,在三个节点中都要进行相同的hosts配置
172.16.255.131 master
172.16.255.132 node1
172.16.255.139 node2

三个节点中都进行如上 hosts 配置定义好主机名称后,测试彼此之间是否能够 ping 通

在这里插入图片描述

在这里插入图片描述

2.2 安装zookeeper

下载 kafka 和 zookeeper 安装包,kafka下载地址zookeeper下载地址

cd /opt/es/
wget https://downloads.apache.org/kafka/2.8.0/kafka_2.13-2.8.0.tgz
wget https://downloads.apache.org/zookeeper/stable/apache-zookeeper-3.6.3-bin.tar.gz

安装配置 zookeeper

zookeeper 可以实现:配置管理,名字服务,提供分布式同步以及集群管理等功能 Zookeeper到底是干嘛的

🏇🏇🏇 Zookeeper到底是干嘛的? 🏇🏇🏇

在分布式的集群中,经常会由于各种原因,比如硬件故障,软件故障,网络问题,有些节点会进进出出。有新的节点加入进来,也有老的节点退出集群。

这个时候,集群中其他机器需要感知到这种变化,然后根据这种变化做出对应的决策。比如我们是一个分布式存储系统,有一个中央控制节点负责存储的分配,当有新的存储进来的时候我们要根据现在集群目前的状态来分配存储节点。这个时候我们就需要动态感知到集群目前的状态。

还有,比如一个分布式的SOA架构中,服务是一个集群提供的,当消费者访问某个服务时,就需要采用某种机制发现现在有哪些节点可以提供该服务,这也称之为服务发现,比如Alibaba开源的SOA框架Dubbo就采用了Zookeeper作为服务发现的底层机制。还有开源的Kafka队列就采用了Zookeeper作为Cosnumer的上下线管理。

解压安装 zookeeper

cd /opt/es/
tar -zxvf apache-zookeeper-3.6.3.tar.gz -C /opt/
ln -s /opt/apache-zookeeper-3.6.3/ /opt/zookeeper # 添加软连接

# 查看目录结构
root@master:/opt$ tree -L 1 /opt/
/opt/
├── apache-zookeeper-3.6.3
├── containerd
├── es
├── redis_cluster
└── zookeeper -> /opt/apache-zookeeper-3.6.3/

配置 zookeeper

# 创建数据目录
mkdir -p /data/zookeeper
# 拷贝官方配置文件
cp /opt/zookeeper/conf/zoo_sample.cfg /opt/zookeeper/conf/zoo.cfg
vim /opt/zookeeper/conf/zoo.cfg

修改配置文件如下,加入节点

# The number of milliseconds of each tick
tickTime=2000
# The number of ticks that the initial 
# synchronization phase can take
initLimit=10
# The number of ticks that can pass between 
# sending a request and getting an acknowledgement
syncLimit=5
# the directory where the snapshot is stored.
# do not use /tmp for storage, /tmp here is just 
# example sakes.
dataDir=/data/zookeeper
# the port at which the clients will connect
clientPort=2181
# the maximum number of client connections.
# increase this if you need to handle more clients
#maxClientCnxns=60

# 节点配置
server.1=172.16.255.131:2888:3888
server.2=172.16.255.132:2888:3888
server.3=172.16.255.139:2888:3888

为不同节点分配id,myid

# 分配id
root@master:/opt$ echo "1" > /data/zookeeper/myid
# 查看是否成功插入
root@master:/opt$ ls -lh /data/zookeeper/
total 4.0K
-rw-r--r-- 1 root root 2 Aug 19 08:10 myid
root@master:/opt$ cat /data/zookeeper/myid 
1

其他节点用相同的方式安装配置 zookeeper

🔖 一种便捷的方法 🔖:其他节点的安装配置过程和上述过程一致,唯一的不同在于myid不一样,所以可以直接使用rsync命令将所有zookeeper相关的文件传输到其他节点,然后修改myid即可。

值得注意的是,安装zookeeper的基础是该节点具备java环境,如果没有需要先安装 jdk

使用rsync命令传输zookeeper文件到其他节点,不需要修改配置文件

cd /opt
# 传输zookeeper文件到其他节点
root@master:/opt$ rsync -avz apache-zookeeper* wang@node1:/opt/es
root@master:/opt$ rsync -avz apache-zookeeper* wang@node2:/opt/es
# 在node1节点上为zookeeper创建软链接 
root@node1:/opt/es$ cd /opt/es/
root@node1:/opt/es$ mv apache-zookeeper-3.6.3/ /opt/apache-zookeeper-3.6.3
root@node1:/opt/es$ ln -s /opt/apache-zookeeper-3.6.3/ /opt/zookeeper
# 在node2节点上为zookeeper创建软链接
root@node2:/opt/es$ cd /opt/es/
root@node2:/opt/es$ mv apache-zookeeper-3.6.3/ /opt/apache-zookeeper-3.6.3
root@node2:/opt/es$ ln -s /opt/apache-zookeeper-3.6.3/ /opt/zookeeper

在其他节点上创建数据目录和myid,值得注意的是myid的编号zoo.cfg配置文件中节点配置的id号一致server.1 server.2server.myid

# 在node1节点上为zookeeper创建数据目录和myid
root@node1:/opt/es$ mkdir -p /data/zookeeper
root@node1:/opt/es$ echo "2" > /data/zookeeper/myid
# 在node2节点上为zookeeper创建数据目录和myid 
root@node2:/opt/es$ mkdir -p /data/zookeeper
root@node2:/opt/es$ echo "3" > /data/zookeeper/myid

启动 zookeeper

在三个节点中使用如下命令启动zookeeper

/opt/zookeeper/bin/zkServer.sh start

启动成功,显示如下内容

/usr/bin/java
ZooKeeper JMX enabled by default
Using config: /opt/zookeeper/bin/../conf/zoo.cfg
Starting zookeeper ... STARTED

启动报错,报如下错误 解决方法

/usr/bin/java
ZooKeeper JMX enabled by default
Using config: /opt/zookeeper/bin/../conf/zoo.cfg
Starting zookeeper ... FAILED TO START

实际上只要 >= 3.5.5 版本都会出现这种问题。❌ 问题原因 ❌:下载了错误的版本文件,Zookeeper 从3.5.5后开始拆分为两个版本,而且他们的结构还很类似。

  • 标准版本(Apache ZooKeeper x.y.z ),下载的文件名为:apache-zookeeper-x.y.z-bin.tar.gz
  • 另一个是源码版本(Apache ZooKeeper x.y.z Source Release),下载的文件名为:apache-zookeeper-x.y.z.tar.gz

查看 zookeeper状态

使用命令/opt/zookeeper/bin/zkServer.sh status查看zookeeper状态

root@master:/opt$ /opt/zookeeper/bin/zkServer.sh status
/usr/bin/java
ZooKeeper JMX enabled by default
Using config: /opt/zookeeper/bin/../conf/zoo.cfg
Client port found: 2181. Client address: localhost. Client SSL: false.
Mode: follower # 表示节点关系

2.3 验证zookeeper

在集群中任意一个节点上生成数据,然后在其他节点上验证获取数据

使用如下命令登录节点并生成数据

root@master:/opt$ /opt/zookeeper/bin/zkCli.sh -server master:2181
[zk: master:2181(CONNECTED) 1] create /test "hello"
Created /test

在其他节点验证数据

root@node1:/opt$ /opt/zookeeper/bin/zkCli.sh -server node1:2181
[zk: node1:2181(CONNECTED) 0] get /test
hello
root@node2:/opt$ /opt/zookeeper/bin/zkCli.sh -server node2:2181
[zk: node2:2181(CONNECTED) 0] get /test
hello

2.4 安装kafka

整了半天准备工作,这才开始安装 Kafka 😅😅

解压安装 kafka

cd /opt/es/
tar -zxvf kafka_2.13-2.8.0.tgz -C /opt/
ln -s /opt/kafka_2.13-2.8.0/ /opt/kafka  # 添加软连接

配置 kafka

先创建日志目录/opt/kafka/logs,在通过配置文件对其进行配置如下

mkdir /opt/kafka/logs
cp /opt/kafka/config/server.properties /opt/kafka/config/server.properties.bak # 备份原始参考配置文件
vim /opt/kafka/config/server.properties # 打开配置文件

修改配置内容如下

root@master:/home/wang$ grep "^[a-Z]" /opt/kafka/config/server.properties
broker.id=1
listeners=PLAINTEXT://172.16.255.131:9092
log.dirs=/opt/kafka/logs
log.retention.hours=24
zookeeper.connect=172.16.255.131:2181,172.16.255.132:2181,172.16.255.139:2181

其他节点用相同的方式安装配置 kafka

🔖 一种便捷的方法 🔖:其他节点的安装配置过程和上述过程一致,唯一的不同在于配置文件中broker.idlistener不一样,所以可以直接使用rsync命令将所有kafka相关的文件传输到其他节点,然后修改上述配置内容即可。

使用rsync命令传输kafka文件到其他节点

# 传输kafka文件到其他节点
root@master:/opt$ cd /opt/
root@master:/opt$ rsync -avz kafka* wang@node1:/opt/es/
root@master:/opt$ rsync -avz kafka* wang@node2:/opt/es/
# 其他节点中加文件移动到opt目录下
root@node1:/opt/es$ mv kafka* /opt/
root@node2:/opt/es$ mv kafka* /opt/

在其他节点中创建kafka日志目录(传输的文件中包含日志目录不需要再做修改),并修改配置文件

root@node1:/opt/kafka# vim /opt/kafka/config/server.properties # 打开配置文件
root@node1:/opt/kafka# grep "^[a-Z]" /opt/kafka/config/server.properties
broker.id=2 # 修改为对应id号
listeners=PLAINTEXT://172.16.255.132:9092 # 修改为对应主机ip
log.dirs=/opt/kafka/logs
log.retention.hours=24
zookeeper.connect=172.16.255.131:2181,172.16.255.132:2181,172.16.255.139:2181

在node2中进行类似操作完成kafka安装配置

启动kafka

在三个节点中使用如下命令启动kafka

root@master:/opt$ /opt/kafka/bin/kafka-server-start.sh /opt/kafka/config/server.properties

启动成功后,终端尾行会打印如下内容

[2021-08-19 12:24:31,440] INFO Kafka version: 2.8.0 (org.apache.kafka.common.utils.AppInfoParser)
[2021-08-19 12:24:31,441] INFO Kafka commitId: ebb1d6e21cc92130 (org.apache.kafka.common.utils.AppInfoParser)
[2021-08-19 12:24:31,441] INFO Kafka startTimeMs: 1629375871435 (org.apache.kafka.common.utils.AppInfoParser)
[2021-08-19 12:24:31,442] INFO [KafkaServer id=1] started (kafka.server.KafkaServer)

后台启动kafka

使用如下命令可以实现启动kafka

root@master:/opt$ /opt/kafka/bin/kafka-server-start.sh -daemon /opt/kafka/config/server.properties

后台启动可以通过查看日志的方式,检验kafka是否启动成功

root@master:/opt$ tail -f /opt/kafka/logs/server.log 
[2021-08-19 12:28:47,204] INFO [ExpirationReaper-1-AlterAcls]: Starting (kafka.server.DelayedOperationPurgatory$ExpiredOperationReaper)
[2021-08-19 12:28:47,237] INFO [/config/changes-event-process-thread]: Starting (kafka.common.ZkNodeChangeNotificationListener$ChangeEventProcessThread)
[2021-08-19 12:28:47,246] INFO [SocketServer listenerType=ZK_BROKER, nodeId=1] Starting socket server acceptors and processors (kafka.network.SocketServer)
[2021-08-19 12:28:47,252] INFO [SocketServer listenerType=ZK_BROKER, nodeId=1] Started data-plane acceptor and processor(s) for endpoint : ListenerName(PLAINTEXT) (kafka.network.SocketServer)
[2021-08-19 12:28:47,253] INFO [SocketServer listenerType=ZK_BROKER, nodeId=1] Started socket server acceptors and processors (kafka.network.SocketServer)
[2021-08-19 12:28:47,255] INFO Kafka version: 2.8.0 (org.apache.kafka.common.utils.AppInfoParser)
[2021-08-19 12:28:47,255] INFO Kafka commitId: ebb1d6e21cc92130 (org.apache.kafka.common.utils.AppInfoParser)
[2021-08-19 12:28:47,255] INFO Kafka startTimeMs: 1629376127253 (org.apache.kafka.common.utils.AppInfoParser)
[2021-08-19 12:28:47,256] INFO [KafkaServer id=1] started (kafka.server.KafkaServer

在其他节点中都可以用如上命令,启动kafka

2.5 验证kafka

终于可以用了 😪😪

验证kafka进程

使用如下命令验证kafka进程

root@master:/opt$ /opt/jdk/bin/jps
2835 Jps
2728 Kafka
1385 QuorumPeerMain

测试创建topic

使用如下命令创建名称为kafkatest,分区partition为3,备份replication为3的topic,创建一个topic可以理解为创建了一个聊天室,所有在聊天室内的主机可以互相发消息

root@master:/opt$ /opt/kafka/bin/kafka-topics.sh --create --zookeeper 172.16.255.131:2181,172.16.255.132:2181,172.16.255.139:2181 --partitions 3 --replication-factor 3 --topic kafkatest
Created topic kafkatest.

测试获取toppid

在任意一个节点中都可以使用如下命令获取topic

root@master:/opt# /opt/kafka/bin/kafka-topics.sh --describe --zookeeper 172.16.255.131:2181,172.16.255.132:2181,172.16.255.139:2181 --topic kafkatest
Topic: kafkatest	TopicId: ztxIlKMkTQCHwAczt0lfog	PartitionCount: 3	ReplicationFactor: 3	Configs: 
	Topic: kafkatest	Partition: 0	Leader: 1	Replicas: 1,2,3	Isr: 1,2,3
	Topic: kafkatest	Partition: 1	Leader: 2	Replicas: 2,3,1	Isr: 2,3,1
	Topic: kafkatest	Partition: 2	Leader: 3	Replicas: 3,1,2	Isr: 3,1,2

kafka测试发送消息

先创建一个名为messageTest的topic

root@master:/opt# /opt/kafka/bin/kafka-topics.sh --create --zookeeper 172.16.255.131:2181,172.16.255.132:2181,172.16.255.139:2181 --partitions 3 --replication-factor 3 --topic messageTest
Created topic messageTest.

然后在其中一个节点使用如下命名,使用生产者producer发送消息。值得注意的是,发送消息端口9092,而不是zookeeper使用的2181

root@master:/opt# /opt/kafka/bin/kafka-console-producer.sh --broker-list 172.16.255.131:9092,172.16.255.132:9092,172.16.255.139:9092 --topic messageTest 
>hello
>,
>kafka
>!
>

其他节点可以使用如下命令,使用消费者consumer消费消息,而该命令是使用的是zookeeper的2181端口

root@master:/opt$ /opt/kafka/bin/kafka-console-consumer.sh --bootstrap-server 172.16.255.131:2181,172.16.255.132:2181,172.16.255.139:2181 --topic messageTest --from-beginning

03 ELK使用Kafka作为缓存

惊不惊喜,意不意外! 这才真正进入这边文章的正题 😛 😜 😝

3.1 配置 Filebeat 传输数据到 kafka

配置Filebeat缓存日志数据到Kafka, Filebeat输出到Kafka配置官方文档

在Filebeat的配置文件/etc/filebeat/filebeat.yml中修改输出配置,删除ES输出配置添加Kafka输出配置如下

# 输入配置不变,还是收集Nginx日志

setup.template.settings:
  index.number_of_shards: 3

# ------------------------------ Kafka Output -------------------------------
output.kafka:
  hosts: ["172.16.255.131:9092", "172.16.255.132:9092", "172.16.255.139:9092"]
  topic: elklog

3.2 配置Logstash消费kafka日志数据

打开Logstash的配置文件,添加kafka.conf配置文件,对其进行配置,将输入源配置为kafka

root@master:/opt$ vim /etc/logstash/conf.d/kafka.conf 

配置内容如下

input {
    kafka {
        bootstrap_servers => "172.16.255.131:9092" # 仅需要配置kafka集群中的任意一个节点的ip
        topics => ["elklog"]
        group_id => "logstash"
        codec => "json"
    }
}

filter {
    mutate {
        convert => ["upstream_time","float"]
        convert => ["request_time","float"]
    }

    json {
        source => "message"
    }
}

output{
    stdout {}
    if "access" in [tags] {
            elasticsearch {
                hosts => "http://172.16.255.131:9200/"
                manage_template => false
                index => "nginx_access_kafka-%{+yyyy.MM}"
            }
    }

    if "error" in [tags] {
            elasticsearch {
                hosts => "http://172.16.255.131:9200/"
                manage_template => false
                index => "nginx_error_kafka-%{+yyyy.MM}"
            }
    }
}

3.3 验证ELK使用kafka缓存日志数据

完成上述配置之后,重新启动Filebeat和Logstash

root@master:/opt$ systemctl restart filebeat
root@master:/opt$ /usr/share/logstash/bin/logstash -f /etc/logstash/conf.d/kafka.conf 

❗️❗️❗️对了笔者太懒就没有把Nginx日志采集内容拷贝进来,下面的呈现效果需要您参考 ELK 收集 Nginx 日志 完成日志收集的准备工作哦 🔥🔥🔥

重新启动之后,在浏览器访问Nginx让其产生测试日志数据,在使用ES-head查看数据,可以看到日志数据被成功收集如下图

在这里插入图片描述

参考资料

Zookeeper到底是干嘛的

Filebeat输出到Kafka配置官方文档

Logo

Kafka开源项目指南提供详尽教程,助开发者掌握其架构、配置和使用,实现高效数据流管理和实时处理。它高性能、可扩展,适合日志收集和实时数据处理,通过持久化保障数据安全,是企业大数据生态系统的核心。

更多推荐