导言

项目主要是通过Filebeat读取日志文件,传输到Kafka上,Logstash获取Kafka的消息,过滤日志信息,传输到ElasticSearch上。实现数据的实时统计。楼主也是刚接触了ElasticSearch,有什么错误的,或者更好的操作,可以提供下,一起讨论。

1. 下载Filebeat的基础文件(根据自己的系统选择对应的文件

https://www.elastic.co/cn/downloads/beats/filebeat)
根据版本选择

2.filebeat的配置文件(filebeat.yml)

在这里插入图片描述

3.日志源是json形式
1.D:\program\php\a.log
	{"ip":"168.168.1.1"}
2.D:\program\php\b.log
	{“ips”:"127.0.0.2"}
4.配置filebeat的配置文件如下
filebeat.inputs:
# 日志类型
- type: log
  fields:
  # 用于后面的识别判断
    log_source: ip
    # 是否实时读取
  enabled: true
  # 日志文件位置
  paths:
     -D:\program\php\a.log
  # 日志文件中包含id数据进行收集   
  include_lines: ['id']  
  # 多长时间去检测新的文件生成的时间 ,默认为 10s  
  scan_frequency: 1s
- type: log
  fields:
    log_source: ips
  enabled: true  
  paths:
     -D:\program\php\b.log
  # 日志文件中包含ids数据进行收集   
  include_lines: ['ids']  
  # 多长时间去检测新的文件生成的时间 ,默认为 10s  
  scan_frequency: 1s
processors:
# 删除掉filebeat再传输过程中附加的多余字段"log", "ecs","agent","agent"
- drop_fields:
     fields: ["log", "ecs","agent","agent"]     
   # 输出源,这是是输出到kafka         
output.kafka:
  enabled: true
  hosts: ["127.0.0.1:9092"]
  # 并发负载均衡kafka输出的数量
  worker: 2
  topic: "ip"
  # 单条消息的大小,默认值为10M
  max_message_bytes: 1024
  # 配置多个topic输出
  topics:
  	# 根据上列设置的log_source设置传输到kafka的topics
    - topic: "%{[fields.log_source]}"
      when.contains:
      	# 日志文件中包含的内容传输
        message: "ip"
    - topic: "%{[fields.log_source]}"
      when.contains:
        message: "ips" 
# 进程数量
max_procs: 2
queue.mem:
  # 消息队列大小,默认值为4096(filebeat最大的可能占用的内存是max_message_bytes * queue.mem.events )
  events: 4096
# 发送的最小事件数,默认为 2048
  flush.min_events: 512
# 最长等待时间,设为为 0,事件将立即使用
  flush.timeout: 1s

5.运行命令 ./filebeat -e -c filebeat-test.yml (在filebeat的根目录下运行)
6 Filebeat配置信息参数(后续补充)
Logo

Kafka开源项目指南提供详尽教程,助开发者掌握其架构、配置和使用,实现高效数据流管理和实时处理。它高性能、可扩展,适合日志收集和实时数据处理,通过持久化保障数据安全,是企业大数据生态系统的核心。

更多推荐