
Filebeat读取日志推送kafka(docker-compose)
Filebeat是本地文件的日志数据采集器,可监控日志目录或特定日志文件(tail file),并将它们转发给Elasticsearch或Logstatsh进行索引、kafka等。带有内部模块(auditd,Apache,Nginx,System和MySQL),可通过一个指定命令来简化通用日志格式的收集,解析和可视化。*** kafka常用命令。
·
简介
Filebeat是本地文件的日志数据采集器,可监控日志目录或特定日志文件(tail file),并将它们转发给Elasticsearch或Logstatsh进行索引、kafka等。带有内部模块(auditd,Apache,Nginx,System和MySQL),可通过一个指定命令来简化通用日志格式的收集,解析和可视化。
- 工作流程
- 安装filebeat
mkdir filebeat/{etc,logs}
vim docker-compose-filebeat.yml
version: "3.3"
services:
filebeat:
image: filebeat:7.13.2
container_name: filebeat
#restart: always
restart: unless-stopped
user: root
privileged: true
volumes:
- ./filebeat/etc/filebeat.yml:/usr/share/filebeat/filebeat.yml
- ./filebeat/logs:/usr/share/filebeat/logs
- /var/log/test:/logs #映射本地需要收集的日志文件或目录
- 编辑配置文件
vim
filebeat.inputs:
#文件类型
- type: log
enabled: true
tail_files: true #从文件末尾开始读取
#文件位置
paths:
- /logs/mysql.log
scan_frequency: 10s #每十秒扫描一次,如果设置为0s,则Filebeat会尽可能快地感知更新(占用的CPU会变高)。默认是10s
close_older: 1h # 如果一个文件在某个时间段内没有发生过更新,则关闭监控的文件handle。默认1h
#自定义字段
fields:
level: notice
appname: mysql
#日志多行合并
multiline:
# pattern for error log, if start with space or cause by
pattern: '^\\d{4}/\\d{2}/\\d{2}' #希望匹配到的结果(正则表达式)
negate: true #值为 true 或 false。使用 false 代表匹配到的行合并到上一行;使用 true 代表不匹配的行合并到上一行
match: after #值为 after 或 before。after 代表合并到上一行的末尾;before 代表合并到下一行的开头
# ============================== Filebeat modules ==============================
#设置模块文件位置
filebeat.config.modules:
path: ${path.config}/modules.d/*.yml
reload.enabled: false
# ======================= Elasticsearch template setting =======================
setup.template.settings:
index.number_of_shards: 1
# ---------------------------- Elasticsearch Output ----------------------------
#output.elasticsearch:
# hosts: ["192.168.1.253:19200"]
# Protocol - either `http` (default) or `https`.
#protocol: "https"
# Authentication credentials - either API key or username/password.
#api_key: "id:api_key"
#username: "elastic"
#password: "changeme"
# ---------------------------- KAFKA Output ----------------------------
output.kafka:
enabled: true
hosts: ["192.168.0.161:9092","192.168.0.161:9093","192.168.0.161:9094"]
topic: log_filebeat
#version: "0.9.0.1"
compression: gzip
#用于从事件中删除指定字段
processors:
- drop_fields:
fields: ["beat", "input", "source", "offset"]
# ================================== Logging ===================================
#设置日志级别
logging.level: debug
name: filebeat.log
# ================================= Processors =================================
#processors:
# - add_host_metadata:
# when.not.contains.tags: forwarded
# - add_cloud_metadata: ~
# - add_docker_metadata: ~
# - add_kubernetes_metadata: ~
- 启动模块
docker-compose -f docker-compose-filebeat.yml up -d
- 搭建测试kafka集群接收日志
docker run -d --name zookeeper -p 2181:2181 -t wurstmeister/zookeeper
docker run -d --name kafka0 -p 9092:9092 -e KAFKA_BROKER_ID=0 -e KAFKA_ZOOKEEPER_CONNECT=192.168.0.161:2181 -e KAFKA_ADVERTISED_LISTENERS=PLAINTEXT://192.168.0.161:9092 -e KAFKA_LISTENERS=PLAINTEXT://0.0.0.0:9092 -t wurstmeister/kafka
docker run -d --name kafka1 -p 9093:9093 -e KAFKA_BROKER_ID=1 -e KAFKA_ZOOKEEPER_CONNECT=192.168.0.161:2181 -e KAFKA_ADVERTISED_LISTENERS=PLAINTEXT://192.168.0.161:9093 -e KAFKA_LISTENERS=PLAINTEXT://0.0.0.0:9093 -t wurstmeister/kafka
docker run -d --name kafka2 -p 9094:9094 -e KAFKA_BROKER_ID=2 -e KAFKA_ZOOKEEPER_CONNECT=192.168.0.161:2181 -e KAFKA_ADVERTISED_LISTENERS=PLAINTEXT://192.168.0.161:9094 -e KAFKA_LISTENERS=PLAINTEXT://0.0.0.0:9094 -t wurstmeister/kafka
docker run --name kafka-manager -d -p 9000:9000 -e ZK_HOSTS="192.168.0.161:2181" sheepkiller/kafka-manager
*** kafka常用命令
#查看kafka版本
docker exec -it kafka2 /bin/bash
find /opt/kafka_2.13-2.8.1/libs/ -name \*kafka_\* | head -1 | grep -o '\kafka[^\n]*'
#进入zookeeper客户端
zkCli.sh -server 192.168.0.161:2181
#创建topic
kafka-topics.sh --zookeeper 192.168.0.161:2181 --create --topic log_filebeat --partitions 3 --replication-factor 3
#删除topic
kafka-topics --zookeeper localhost:2181 --topic log_filebeat --delete
#查看topic
kafka-topics.sh --zookeeper 192.168.0.161:2181 --describe --topic log_filebeat
#topic消费者
kafka-console-consumer.sh --bootstrap-server localhost:9092 --from-beginning --topic log_filebeat
#topic生产者
kafka-console-producer --broker-list test1:9092,test2:9092,test3:9092 --topic log_filebeat
- 启动消费者监听是否收到filebeat推送的日志文件
kafka-console-consumer.sh --bootstrap-server localhost:9092 --from-beginning --topic log_filebeat
更多推荐
所有评论(0)