项目主机规划

主机名

外网IP

安装软件

所属集群

elstack

192.168.1.176

elasticsearch8.4

logstash

192.168.1.178

logstash8.4+Kibana8.4

kafka

192.168.1.179

kafka+zookeeper

1、环境准备

1.1 配置hosts文件

[yu@elk~]$ sudo vim /etc/hosts

192.168.1.177 elk

192.168.1.178 logstash

192.168.1.179 kafka

推送hosts文件到 yu的其他主机上

[yu@elk~]$ sudo scp /etc/hosts 192.168.1.178:/etc/hosts

[yu@elk~]$ sudo scp /etc/hosts 192.168.1.179:/etc/hosts

1.2、elk软件下载地址

 wget https://artifacts.elastic.co/downloads/elasticsearch/elasticsearch-8.4.0-linux-x86_64.tar.gz

 wget https://artifacts.elastic.co/downloads/beats/filebeat/filebeat-8.4.0-linux-x86_64.tar.gz

 wget https://artifacts.elastic.co/downloads/logstash/logstash-8.4.0-linux-x86_64.tar.gz

 wget https://artifacts.elastic.co/downloads/kibana/kibana-8.4.0-linux-x86_64.tar.gz

1.3、kafka+zookeeper下载地址

wget https://dlcdn.apache.org/zookeeper/zookeeper-3.7.2/apache-zookeeper-3.7.2-bin.tar.g

wget https://downloads.apache.org/kafka/3.2.1/kafka_2.13-3.2.1.tgzkafka_2.12-3.6.1https://downloads.apache.org/kafka/3.2.1/kafka_2.13-3.2.1.tgz

 2、部署elasticsearch

2.1、上传elasticsearch安装包

下载地址

2.2、解压软件包

[yu@elk~]$  cd /tmp

[yu@elk tmp]$ sudo tar -zxvf elasticsearch-8.4.0-linux-x86_64.tar.gz -C /usr/local/

2.3、给文件授权

[yu@elk~]$ sudo chown -R yu:yu /usr/local/elasticsearch-8.4.0/

2.4、修改linux系统打开文件最大数

[yu@elk~]$ sudo vim /etc/sysctl.conf

vm.max_map_count=655360

[yu@elk~]$ sudo  sysctl -p /etc/sysctl.conf

[yu@elk~]$ sudo  vim /etc/security/limits.conf 

#在文件最后添加以下行

#修改最大打开文件数

* soft nofile 65536

* hard nofile 65536

# 修改最大进程数

* soft nproc 65536

* hard nproc 65536

2.5、安装jdk

由于YU 3个软件都需要JDK只支持,所以只要安装Elasticsearch + Logstash + Kibana的服务器都要装JDK,

Elasticsearch安装包自带jdk,我们使用自带版本

[yu@elk~]$  ll /usr/local/elasticsearch-8.4.0/

推送jdk到logstash服务器上

[yu@elk~]$  sudo scp -r  /usr/local/elasticsearch-8.4.0/jdk  logstash:/usr/local/

2.6、配置JDK环境变量

[yu@elk~]$ sudo vim /etc/profile  #在文件最后加入一下行

export JAVA_HOME=/usr/local/elasticsearch-8.4.0/jdk

export PATH=$PATH:$JAVA_HOME/bin

export CLASSPATH=$JAVA_HOME/lib/dt.jar:$JAVA_HOME/lib/tools.jar

export JAVA_HOME  PATH CLASSPATH

[yu@elk~]$  source /etc/profile   #使环境变量生效

2.7、修改配置文件

[yu@elk~]$ cd /usr/local/elasticsearch-8.4.0/

[yu@elkelasticsearch-8.4.0]$ vim config/elasticsearch.yml

cluster.name: my-application

node.name: node-1

bootstrap.memory_lock: false  #防止启动检测报错

network.host: 0.0.0.0   #修改监听IP

http.port: 9200

cluster.initial_master_nodes: ["node-1"]

xpack.security.enabled: false   #关闭安全认证,否则使用http访问不到es,第一次启动没有这个参数,第二次启动再关闭

xpack.security.http.ssl:

  enabled: false

2.8、启动Elasticsearch

[yu@elkelasticsearch-8.4.0]$  ./bin/elasticsearch

2.8、修改配置文件后再次启动,后台启动

[yu@elk elasticsearch-8.4.0]$ vim config/elasticsearch.yml

xpack.security.enabled: false   #关闭安全认证,否则使用http访问不到es,第一次启动没有这个参数,第二次启动再关闭

xpack.security.http.ssl:

  enabled: false

[yu@elk elasticsearch-8.4.0]$ ./bin/elasticsearch &

2.9、测试

网页 192.168.1.177:9200

2.10、加入开机自启动

[yu@elk elasticsearch-8.4.0]$ sudo  vim /etc/init.d/elasticsearch

#!/bin/sh

#chkconfig: 2345 80 05

#description: elasticsearch

#author: taft

export JAVA_HOME=/usr/local/elasticsearch-8.4.0/jdk

export PATH=$PATH:$JAVA_HOME/bin

export CLASSPATH=$JAVA_HOME/lib/dt.jar:$JAVA_HOME/lib/tools.jar

export JAVA_HOME  PATH CLASSPATH

case "$1" in

start)

    su yu<<!

    cd /usr/local/elasticsearch-8.4.0/

    ./bin/elasticsearch -d

!

    echo "elasticsearch startup"

    ;;

stop)

    es_pid=`jps | grep Elasticsearch | awk '{print $1}'`

    kill -9 $es_pid

    echo "elasticsearch stopped"

    ;;

restart)

    es_pid=`jps | grep Elasticsearch | awk '{print $1}'`

    kill -9 $es_pid

    echo "elasticsearch stopped"

    su yu<<!

    cd /usr/local/elasticsearch-8.4.0/

    ./bin/elasticsearch -d

!

    echo "elasticsearch startup"

    ;;

*)

    echo "start|stop|restart"

    ;;

esac

exit $?

su yu<<! 切换为yu用户执行下面的命令,<<! 相当于<<EOF

注意:

以上脚本的用户为yu,如果你的用户不是,则需要替换

以上脚本的JAVA_HOME以及elasticsearch_home如果不同请替换

[yu@elk~]$ sudo chmod +x /etc/init.d/elasticsearch #给脚本添加权限

[yu@elk~]$ sudo  chkconfig --add /etc/init.d/elasticsearch  #添加开机自启动

[yu@elk~]$ sudo  chkconfig elasticsearch on

2.11、测试脚本

[yu@elk~]$  sudo systemctl restart elasticsearch

[yu@elk~]$  ps -aux | grep elasticsearch

3、安装Kibana

3.1、上传Kibana安装包

[yu@logstash ~]$   cd /tmp

[yu@logstash ~]$ ls

3.2、解压软件包

[yu@logstash tmp]$  sudo  tar -zxvf kibana-8.4.0-linux-x86_64.tar.gz -C /usr/local/

3.3、给文件授权

[yu@logstash ~]$ sudo chown -R yu:yu /usr/local/kibana-8.4.0

3.4、修改配置文件

[yu@logstash tmp]$ cd

[yu@logstash ~]$  sudo  vim /usr/local/kibana-8.4.0/config/kibana.yml

#开启以下选项并修改

server.port: 5601

server.host: "192.168.1.178"

elasticsearch.hosts: ["http://192.168.1.176:9200"]  #修改elasticsearch地址,多个服务器请用逗号隔开。

i18n.locale: "zh-CN"                          #修改语言为中文

注意: host:与IP地址中间有个空格不能删除,否则报错。

3.5、启动服务

[yu@logstash ~]$  /usr/local/kibana-8.4.0/bin/kibana

3.6、kibana 设置后台启动

[yu@logstash ~]$ nohup /usr/local/kibana-8.4.0/bin/kibana &

[yu@logstash ~]$ ps -ef | grep kibana

3.7、网页测试

192.168.1.177:5601

4、搭建kafka服务 在kafka服务器上

4.1、环境搭建

4.11、上传kafka、zookeeper、jdk安装包 到/tmp目录下

[yu@kafka ~]$ ll  /tmp

4.1.2、解压jdk

[yu@kafka ~]$ cd /tmp

[yu@kafka tmp]$ sudo  tar -zxvf jdk-8u171-linux-x64.tar.gz -C /usr/local/

4.1.3配置JDK环境变量

[yu@kafka tmp]$ sudo  vim /etc/profile 

#在文件最后加入一下行

JAVA_HOME=/usr/local/jdk1.8.0_171

PATH=$JAVA_HOME/bin:$PATH

CLASSPATH=$JAVA_HOME/jre/lib/ext:$JAVA_HOME/lib/tools.jar

export PATH JAVA_HOME CLASSPATH

[yu@kafka tmp]$ source /etc/profile

4.1.4、查看java环境

[yu@kafka tmp]$ java –version

4.2、安装zookeepe

4.2.1解压zookeeper安装包

[yu@kafka tmp]$ sudo tar -zxf apache-zookeeper-3.7.2-bin.tar.gz  -C /usr/local/

4.2.2、创建快照日志存放目录并赋予权限:

[yu@kafka tmp]$ sudo mkdir -p /data/zk/data

[yu@kafka tmp]$  sudo chmod -R 777 /data/zk/data

4.2.3、 创建事务日志存放目录:

[yu@kafka tmp]$ sudo mkdir -p /data/zk/datalog

[yu@kafka tmp]$  sudo chmod -R 777  /data/zk/datalog

4.2.4、生成配置文件

[yu@kafka tmp]$  cd /usr/local/apache-zookeeper-3.7.2-bin/conf/

[yu@kafka conf]$ sudo cp zoo_sample.cfg  zoo.cfg

4.2.5、修改主配置文件zoo.cfg

dataDir=/data/zk/data        #修改这一行为我们创建的目录

dataLogDir=/data/zk/datalog   #添加这一行

4.2.6、添加path环境变量

[yu@kafka conf]$ sudo vim /etc/profile

export ZOOKEEPER_HOME=/usr/local/apache-zookeeper-3.7.2-bin

export PATH=$ZOOKEEPER_HOME/bin:$PATH

[yu@kafka conf]$  source /etc/profile

4.2.7、给文件授权

[yu@kafka conf]$  sudo chown -R yu:yu /usr/local/apache-zookeeper-3.7.2-bin

4.2.8、启动Zookeeper

[yu@kafka conf]$ zkServer.sh start

4.2.9、验证

[yu@kafka conf]$ zkServer.sh status

4.2.10、添加开机启动

在/lib/systemd/system/文件夹下创建一个启动脚本zookeeper.service

[yu@kafka conf]$ sudo vim /lib/systemd/system/zookeeper.service

[Unit]

Description=Zookeeper service

After=network.target

[Service]

Type=forking

Environment="JAVA_HOME=/usr/local/jdk1.8.0_171"

ExecStart=/usr/local/apache-zookeeper-3.7.2-bin/bin/zkServer.sh start

ExecStop=/usr/local/apache-zookeeper-3.7.2-bin/bin/zkServer.sh stop

[Install]

WantedBy=multi-user.target

[yu@kafka conf]$  sudo systemctl daemon-reload

[yu@kafka conf]$ sudo  systemctl enable zookeeper

测试:

[yu@kafka conf]$ jps

67608 Jps

67439 QuorumPeerMain

[yu@kafka conf]$ kill 67439

[yu@kafka conf]$ jps

67620 Jps

[yu@kafka conf]$ sudo systemctl start zookeeper

[yu@kafka conf]$ ps -ef |grep zookeeper

4.3.Kafka 单节点单Broker部署

这里部署的是kafka单节点,生产环境应该部署kafka多节点集群。

4.3.1解压软件到指定目录

[yu@kafka conf]$ cd /tmp

[yu@kafka tmp]$ sudo  tar -zxvf kafka_2.12-3.6.1.tgz  -C /usr/local/

4.3.2 修改配置文件

[yu@kafka tmp]$ sudo  vim /usr/local/kafka_2.12-3.6.1/config/server.properties

# broker的全局唯一编号,不能重复

broker.id=0

# 监听

listeners=PLAINTEXT://:9092  #开启此项

# 日志目录

log.dirs=/data/kafka/log      #修改日志目录

# 配置zookeeper的连接(如果不是本机,需要该为ip或主机名)

zookeeper.connect=localhost:2181

auto.create.topics.enable=true  自动创建topic  添加这一条

4.3.3、创建日志目录并改权限

[yu@kafka tmp]$ sudo mkdir -p /data/kafka/log

[yu@kafka tmp]$  sudo chmod -R 777 /data/kafka/log

4.3.4添加path环境变量

[yu@kafka tmp]$ sudo vim /etc/profile

export KAFKA_HOME=/usr/local/kafka_2.12-3.6.1

export PATH=$KAFKA_HOME/bin:$PATH

[yu@kafka tmp]$  source /etc/profile

4.3.5、给文件授权

[yu@kafka tmp]$ sudo chown -R yu:yu /usr/local/kafka_2.12-3.6.1

4.3.6、启动kafka

后台启动kafka

[yu@kafka tmp]$ kafka-server-start.sh -daemon /usr/local/kafka_2.12-3.6.1/config/server.properties

4.3.7添加开机自启

将kafka添加到开机自启服务

在/lib/systemd/system/文件夹下创建一个启动脚本kafka.service

[yu@kafka tmp]$ sudo  vim /lib/systemd/system/kafka.service

[Unit]

Description=Apache Kafka server (broker)

After=network.target  zookeeper.service

[Service]

Type=simple

Environment="PATH=/usr/local/jdk1.8.0_171/bin:/usr/local/sbin:/usr/local/bin:/usr/sbin:/usr/bin:/root/bin"

ExecStart=/usr/local/kafka_2.12-3.6.1/bin/kafka-server-start.sh /usr/local/kafka_2.12-3.6.1/config/server.properties

ExecStop=/usr/local/kafka_2.12-3.6.1/bin/kafka-server-stop.sh

Restart=on-failure

[Install]

WantedBy=multi-user.target

[yu@kafka tmp]$ sudo  systemctl daemon-reload

[yu@kafka tmp]$ sudo systemctl enable kafka

测试

[yu@kafka tmp]$ jps

68931 Jps

68799 Kafka

[yu@kafka tmp]$ kill 68799

[yu@kafka tmp]$ jps

68954 Jps

[yu@kafka tmp]$ sudo  systemctl start kafka

[yu@kafka tmp]$ sudo  systemctl status  kafka

5、 设置filebeat 在kafka服务器上,收集kafka服务器的日志

5.1、上传软件包

5.2、解压缩软件

[yu@kafka tmp]$ sudo  tar -zxvf filebeat-8.4.0-linux-x86_64.tar.gz -C /usr/local/

5.3、给文件授权

[yu@kafka tmp]$  sudo chown -R yu:yu /usr/local/filebeat-8.4.0-linux-x86_64

[yu@ kafka tmp]$  sudo chmod o+r  /var/log/*

5.4、修改配置文件

Filebeat 要求配置文件只能由其所有者写入,不允许组(group)和其他用户(others)也有写权限。

[yu@kafka tmp]$ sudo vim /usr/local/filebeat-8.4.0-linux-x86_64/filebeat.yml

filebeat.idle_timeout: 2s

filebeat.name: filebeat-shiper

filebeat.spool_size: 50000

filebeat.inputs:

- type: log

  paths:

    - /usr/local/kafka_2.12-3.6.1/logs/controller.log

  fields:

    topic: topic-kafka-controller-log

  enabled: true

  backoff: 1s

  backoff_factor: 2

  close_inactive: 1h

  encoding: plain

  harvester_buffer_size: 262144

  max_backoff: 10s

  max_bytes: 10485760

  scan_frequency: 10s

  tail_lines: true

- type: log

  paths:

    - /usr/local/kafka_2.12-3.6.1/logs/state-change.log

  fields:

    topic: topic-kafka-state-change-log

  enabled: true

  backoff: 1s

  backoff_factor: 2

  close_inactive: 1h

  encoding: plain

  harvester_buffer_size: 262144

  max_backoff: 10s

  max_bytes: 10485760

  scan_frequency: 10s

  tail_lines: true

- type: log

  paths:

    -  /usr/local/kafka_2.12-3.6.1/logs/server.log

  fields:

    topic: topic-kafka-run-log

  enabled: true

  backoff: 1s

  backoff_factor: 2

  close_inactive: 1h

  encoding: plain

  harvester_buffer_size: 262144

  max_backoff: 10s

  max_bytes: 10485760

  scan_frequency: 10s

  tail_lines: true

- type:  log

  paths:

    -  /usr/local/kafka_2.12-3.6.1/logs/kafka-request.log

  fields:

    topic: topic-kafka-request-log

  enabled: true

  backoff: 1s

  backoff_factor: 2

  close_inactive: 1h

  encoding: plain

  harvester_buffer_size: 262144

  max_backoff: 10s

  max_bytes: 10485760

  scan_frequency: 10s

  tail_lines: true

- type:  log

  paths:

    -  /var/log/messages

  fields:

    topic: topic-kafka-messages-log

  enabled: true

  backoff: 1s

  backoff_factor: 2

  close_inactive: 1h

  encoding: plain

  harvester_buffer_size: 262144

  max_backoff: 10s

  max_bytes: 10485760

  scan_frequency: 10s

  tail_lines: true

output.kafka:          

  bulk_flush_frequency: 0

  bulk_max_size: 2048

  codec.format:

    string: '%{[message]}'

  compression: gzip

  compression_level: 4

  hosts:

  - 192.168.1.179:9092

  max_message_bytes: 10485760

  partition.round_robin:

    reachable_only: true

  required_acks: 1

  topic: '%{[fields.topic]}'

setup.ilm.enabled: false

配置文件详解

全局配置

filebeat.idle_timeout: 2s:

#Filebeat在没有数据发送时保持打开状态的超时时间。

filebeat.name: filebeat-apache:

#Filebeat实例的名称,这有助于在日志和监控中识别它。每个单独的filebeat实例的名称必须不同

filebeat.spool_size: 50000:

#Filebeat内部使用的队列大小,它决定了可以缓冲多少个事件。

输入配置

type: log:

#指定输入类型为日志。

paths:

#定义Filebeat需要监控的日志文件的路径。这里指定的是/usr/local/apache/logs/access_log。

fields:

#允许你添加自定义字段到输出的事件中。这里添加了一个topic字段,值为topic-apache-access-log。

enabled: true:

#启用此输入配置。

backoff: 1s和 close_inactive: 1h:

#在读取文件遇到错误时,Filebeat会等待backoff指定的时间,然后每次重试等待的时间会乘以backoff_factor。

close_inactive:

#如果文件在给定的时间段内没有新事件,Filebeat将关闭文件句柄。

#句柄(Handle)在计算机科学中是一个用来标识对象或资源的引用或者指针。在操作系统和许多软件应用程序中,句柄被用作一个内部标识符,它允许程序访问和操作系统或应用程序中的对象或资源,而无需直接知道对象的内存地址或其他详细信息。

encoding: plain

#指定日志文件的编码方式。

harvester_buffer_size: 262144

#读取文件时使用的缓冲区大小。

max_backoff: 10s

#最大重试等待时间。

max_bytes: 10485760

#单个事件的最大字节数。

scan_frequency: 10s

#Filebeat检查指定路径下的新文件的频率。

tail_lines: true

#如果设置为true,Filebeat将从文件末尾开始读取,这通常用于实时日志流。

输出配置

output.kafka:

#指定输出到Kafka的配置。

  bulk_flush_frequency: 0

#批量发送事件到Kafka的频率,这里设置为0,意味着立即发送。

bulk_max_size:

#批量发送事件的最大数量。

  codec.format:

    string: '%{[message]}'

#这表示Filebeat将使用字符串格式来格式化输出到Kafka的消息,并且该字符串的内容将是日志事件中的message字段。%{[message]} 是一个字段引用,它告诉Filebeat从每个日志事件中提取message字段的值,并将其作为Kafka消息的内容。

  compression: gzip 和  compression_level: 4

#启用压缩,并设置压缩级别。

hosts:

#Kafka服务器的地址和端口。

max_message_bytes: 10485760

#Kafka消息的最大字节数。

  partition.round_robin:

    reachable_only: true

#partition.round_robin 设置启用了轮询(round-robin)策略,即Filebeat将按顺序将数据分发到Kafka的可用分区。这种策略确保在没有特定分区选择逻辑的情况下,数据能够均匀地在所有分区之间分布。reachable_only: true 是partition.round_robin下的一个子选项,它指定Filebeat只选择可达(reachable)的分区来发送数据。

  required_acks: 1

#Kafka要求的确认数量,确保消息已经被写入。

  topic: '%{[fields.topic]}'

#使用前面在fields中定义的topic字段的值作为Kafka的topic。

setup.ilm.enabled: false

#禁用索引生命周期管理(ILM)。

5.5、启动filebeat

[yu@kafka tmp]$  cd /usr/local/filebeat-8.4.0-linux-x86_64/ && ./filebeat -e -c filebeat.yml &

-e: 这个选项告诉Filebeat以“输出到标准错误”(Error output)的模式运行。这意味着所有的日志和错误信息会直接打印到控制台,而不是写入日志文件。这有助于在调试过程中快速查看问题。 测试时使用测试成功后取消这一个选项

-c filebeat.yml: 这个选项指定了Filebeat的配置文件路径。在这个例子中,配置文件是filebeat.yml,它应该包含Filebeat如何收集、处理和转发日志的指令。

5.6、再开一个终端查看kafka主题是否创建

[yu@kafka logs]$ kafka-topics.sh --bootstrap-server kafka:9092 --list

topic-kafka-controller-log

topic-kafka-run-log

topic-kafka-state-change-log

表示kafka接收到filebeat收集到的日志

5.7设置开机自启动

[yu@kafka logs]$ sudo  echo "nohup /usr/local/filebeat-8.4.0-linux-x86_64/filebeat -e -c /usr/local/filebeat-8.4.0-linux-x86_64/filebeat.yml &" >> /etc/rc.d/rc.local

6、搭建logstash服务器

6.1、上传软件包

6.2、解压缩软件

[yu@logstash tmp]$ sudo  tar -zxvf logstash-8.4.0-linux-x86_64.tar.gz -C /usr/local/

6.3、给文件授权

[yu@logstash tmp]$ sudo chown -R yu:yu /usr/local/logstash-8.4.0/

6.4、做软连接

[yu@logstash tmp]$ sudo  ln -s /usr/local/logstash-8.4.0/bin/* /usr/local/bin/

6.5、生成配置文件

input {

  kafka {

    bootstrap_servers => "192.168.1.179:9092"

    topics_pattern => "topic.*"

    consumer_threads => 5

    decorate_events => true

    codec => plain { charset => "UTF-8" }

    auto_offset_reset => "latest"

    group_id => "logstash1"

  }

}

filter {

  # 如果需要转换时间戳为本地时间,请取消注释并正确编写Ruby代码

  # 注意:下面的代码可能需要调整,确保它符合您的具体需求

  #ruby {

  #  code => "event.set('@timestamp', Time.parse(event.get('time')).localtime)"

  #}

  mutate {

    remove_field => ["beat"]

  }

  grok {

    match => {

      "message" => "\[(?<time>\d{4}-\d{2}-\d{2}\s\d{2}:\d{2}:\d{2})\] \[(?<level>\w+)\] (?<thread>[\w|-]+) (?<class>[\w|\.]+) (?<lineNum>\d+):(?<msg>.+)"

    }

 overwrite => ["message"]

    tag_on_failure => ["_grokparsefailure"]

  }

  if "[@metadata][kafka][topic]" {

    mutate {

      add_field => { "topic_name" => "%{[@metadata][kafka][topic]}" }

    }

  }

}

output {

  # 确保topic_name字段存在才发送到Elasticsearch

  if [topic_name] {

    elasticsearch {

      hosts => ["192.168.1.177:9200"]

      # 使用topic_name字段来构建索引名

      index => "%{topic_name}-%{+YYYY.MM.dd}"

    }

  }

  stdout {

    codec => rubydebug

  }

}

配置文件详解

Input部分

input { 

  kafka { 

    bootstrap_servers => "192.168.1.179:9092" 

#Kafka集群的地址和端口。

    topics_pattern => "topic.*" 

#订阅所有以"topic."开头的Kafka主题。

    consumer_threads => 5 

#消费者线程数,用于并行读取Kafka消息。

    decorate_events => true 

#的配置选项用于控制是否在Logstash事件中添加与Kafka相关的元数据字段。当设置为true时,Logstash会向每个事件添加一些额外的字段,这些字段包含了关于Kafka消息的一些信息。

    codec => plain { charset => "UTF-8" } 

#告诉Logstash以UTF-8编码的方式从Kafka消息中读取纯文本数据,并将其转换为Logstash事件以供后续处理。这样,无论Kafka中的消息包含何种字符,Logstash都能够正确解析并处理它们。codec => json 用于指定输入插件使用JSON格式来解码数据。这通常用于处理存储在Kafka等消息队列中的JSON格式的消息。

    auto_offset_reset => "latest" 

#它决定了当Logstash开始消费Kafka主题时,从哪里开始读取消息。

auto_offset_reset 参数有以下几个可能的值:

"earliest": 从最早的记录开始读取,即从头开始消费。

"latest": 从最新的记录开始读取,即只消费在Logstash开始消费之后新产生的消息。

"none": 如果找不到初始偏移量,就抛出异常。

    group_id => "logstash1" 

  } 

}

Filter部分

filter { 

  # ... Ruby代码块(已注释) 

  mutate { 

    remove_field => ["beat"] 

#mutate:移除beat字段

  } 

  grok { 

    match => { 

      "message" => "\[(?<time>\d{4}-\d{2}-\d{2}\s\d{2}:\d{2}:\d{2})\] \[(?<level>\w+)\] (?<thread>[\w|-]+) (?<class>[\w|\.]+) (?<lineNum>\d+):(?<msg>.+)" 

#使用正则表达式解析message字段,并提取出时间、日志级别、线程、类名、行号和消息内容等字段。

    } 

    overwrite => ["message"] 

#将原始message字段替换为grok解析后的内容。

    tag_on_failure => ["_grokparsefailure"] 

#如果grok解析失败,则给事件添加一个标签_grokparsefailure

  } 

  if "[@metadata][kafka][topic]" { 

    mutate { 

      add_field => { "topic_name" => "%{[@metadata][kafka][topic]}" } 

#if条件判断:如果事件中存在[@metadata][kafka][topic]字段,则添加一个名为topic_name的新字段,其值为Kafka主题名。

    } 

  } 

}

Output部分

output { 

  if [topic_name] { 

#只有当事件中存在topic_name字段时,才执行下面的输出操作。

    elasticsearch { 

      hosts => ["192.168.1.177:9200"] 

      index => "%{topic_name}-%{+YYYY.MM.dd}" 

#使用topic_name和日期来构建索引名。

    } 

  } 

  stdout { 

#输出到标准输出(通常用于调试)。

    codec => rubydebug 

#使用rubydebug编码,以易读的格式显示事件内容

  } 

}

6.6、启动logstash

[yu@logstash tmp]$ logstash -f /usr/local/logstash-8.4.0/config/logstash.conf

6.7后台启动

[yu@logstash tmp]$ sudo nohup logstash -f /usr/local/logstash-8.4.0/config/logstash.conf &

6.8、开机自启动

[yu@logstash tmp]$ sudo echo "source /etc/profile" >> /etc/rc.local #让开机加载java环境

[yu@logstash tmp]$ sudo echo "nohup logstash -f /usr/local/logstash-8.4.0/config/logstash.conf &" >> /etc/rc.local

[yu@logstash tmp]$ sudo chmod +x /etc/rc.local

7、登录kibana网页查看索引生成情况生成数据视图

8、Zabbix与ELK整合实现对安全日志数据的实时监控告警

     有些时候,我们希望在收集日志的时候,能够将日志中的异常信息(警告、错误、失败等信息)及时的提取出来,因为日志中的异常信息意味着操作系统、应用程序可能存在故障,如果能将日志中的故障信息及时的告知运维人员,那么运维就可以第一时间去进行故障排查和处理,进而也就可以避免很多故障的发生。

     ELK(更确切的说应该是logstash)可以实时的读取日志的内容,并且还可以过滤日志信息,通过ELK的读取和过滤功能,就可以将日志中的一些异常关键字(errorfailedOutOffWarning)过滤出来,然后通过logstashzabbix插件将这个错误日志信息发送给zabbix,那么zabbix在接收到这个数据后,结合自身的机制,然后发起告警动作,这样就实现了日志异常zabbix实时告警的功能了。

8.1、Logstash与zabbix插件的使用

    Logstash支持多种输出介质,比如syslogHTTPTCPelasticsearchkafka等,而有时候我们想将收集到的日志中一些错误信息输出,并告警时,就用到了logstash-output-zabbix这个插件,此插件可以将Logstashzabbix进行整合,也就是将Logstash收集到的数据进行过滤,将有错误标识的日志输出到zabbix中,最后通过zabbix的告警机制进行触发、告警。

   logstash-output-zabbix是一个社区维护的插件,它默认没有在Logstash中安装,但是安装起来也很容易,

8.1.1下载阿里源

[yu@logstash ~]$ sudo wget   -O   /etc/yum.repos.d/CentOS-Base.repo    http://mirrors.aliyun.com/repo/Centos-7.repo

8.1.2直接在logstash中运行如下命令即可:

[yu@logstash ~]$ logstash-plugin install logstash-output-zabbix

Using bundled JDK: /usr/local/logstash-8.4.0/jdk

Validating logstash-output-zabbix

Resolving mixin dependencies

Installing logstash-output-zabbix

Installation successful

列出目前已经安装的插件

[yu@logstash ~]$ logstash-plugin list

列出已安装的插件及版本信息

[yu@logstash ~]$ logstash-plugin list --verbose

8.2、logstash-output-zabbix插件的使用

logstash-output-zabbix安装好之后,就可以在logstash配置文件中使用了,下面是一个logstash-output-zabbix使用的例子:

zabbix {

        zabbix_host =>"[@metadata][zabbix_host]"

        zabbix_key =>"[@metadata][zabbix_key]"

        zabbix_server_host =>"x.x.x.x"

        zabbix_server_port =>"xxxx"

        zabbix_value ="xxxx"

        }

其中:

zabbix_host:表示Zabbix主机名字段名称, 可以是单独的一个字段, 也可以是 @metadata 字段的子字段, 是必需的设置,没有默认值。

zabbix_key表示Zabbix项目键的值,也就是zabbix中的item,此字段可以是单独的一个字段, 也可以是 @metadata 字段的子字段,没有默认值。

zabbix_server_host:表示Zabbix服务器的IP或可解析主机名,默认值是 “localhost”,需要设置为zabbix server服务器所在的地址。

zabbix_server_port表示Zabbix服务器开启的监听端口,默认值是10051。

zabbix_value表示要发送给zabbix item监控项的值对应的字段名称,默认值是 “message”,也就是将”message”字段的内容发送给上面zabbix_key定义的zabbix item监控项,当然也可以指定一个具体的字段内容发送给zabbix item监控项。

8.3、将logstash与zabbix进行整合

这里我们以logstash收集日志,然后对日志进行读取,最后选择关键字进行过滤并调用zabbix告警的流程,来看看如何配置logstash实现zabbix告警。

先说明一下我们的应用需求:通过对系统日志文件的监控,然后去过滤日志信息中的一些关键字,例如ERROR、Failed、WARNING等,将日志中这些信息过滤出来,然后发送到zabbix上,最后借助zabbix的报警功能实现对系统日志中有上述关键字的告警。

对于过滤关键字,进行告警,不同的业务系统,可能关键字不尽相同,例如对http系统,可能需要过滤500、403、503等这些错误码,对于java相关的系统,可能需要过滤OutOfMemoryError、PermGen、Java heap等关键字。在某些业务系统的日志输出中,可能还有一些自定义的错误信息,那么这些也需要作为过滤关键字来使用。

8.3.1配置logstash事件配置文件

input { 

  kafka { 

    bootstrap_servers => "192.168.1.179:9092" 

    topics_pattern => "topic.*" 

    consumer_threads => 5 

    decorate_events => true 

    codec => plain { charset => "UTF-8" } 

    auto_offset_reset => "latest" 

    group_id => "logstash1" 

  } 

filter { 

  # 移除不需要的字段 

  mutate { 

    remove_field => ["beat"] 

  } 

  # 使用grok来解析日志 

  grok { 

    match => { 

      "message" => "\[(?<time>\d{4}-\d{2}-\d{2}\s\d{2}:\d{2}:\d{2})\] \[(?<level>\w+)\] (?<thread>[\w|-]+) (?<class>[\w|\.]+) (?<lineNum>\d+):(?<msg>.+)" 

    } 

    overwrite => ["message"] 

    tag_on_failure => ["_grokparsefailure"] 

  } 

  # 添加topic_name字段 

  if "[@metadata][kafka][topic]" { 

    mutate { 

      add_field => { "topic_name" => "%{[@metadata][kafka][topic]}" } 

    } 

  } 

  # 为Zabbix准备特定的字段 

  if [topic_name] { 

    mutate { 

      add_field => { "zabbix_topic" => "%{topic_name}" } 

      add_field => { "zabbix_level" => "%{level}" } 

      add_field => { "zabbix_message" => "%{msg}" } 

      # 如果需要,继续添加其他Zabbix需要的字段 

    } 

  } 

output { 

  # 发送给Elasticsearch 

  if [topic_name] { 

    elasticsearch { 

      hosts => ["192.168.1.177:9200"] 

      index => "%{topic_name}-%{+YYYY.MM.dd}" 

    } 

  } 

  # 发送给Zabbix 

  if [zabbix_topic] and [zabbix_level] and [zabbix_message] { 

    zabbix { 

      zabbix_host => "your_zabbix_server_host" 

      zabbix_port => 10051 

      zabbix_sender_host => "logstash_host_name" 

      # 定义item key和对应的字段 

      item_key => { 

        "topic" => "%{zabbix_topic}" 

        "level" => "%{zabbix_level}" 

        "message" => "%{zabbix_message}" 

        # ... 其他键值对 ... 

      } 

      # 其他可选参数,比如zabbix_server等 

    } 

  } 

}

                        }

可以在一个服务器上同时启动多个logstash进程。但是,当同时启动多个logstash进程时,需要指定不同的path.data,否则会报错。例如,使用以下命令启动两个logstash进程:

./logstash -f /etc/logstash/config.d/xxx1.conf --path.data=/etc/logstash/data1/ & 

./logstash -f /etc/logstash/config.d/xxx2.conf --path.data=/etc/logstash/data2/ &

Logo

Kafka开源项目指南提供详尽教程,助开发者掌握其架构、配置和使用,实现高效数据流管理和实时处理。它高性能、可扩展,适合日志收集和实时数据处理,通过持久化保障数据安全,是企业大数据生态系统的核心。

更多推荐