任务背景

  1. 随着业务复杂度的提升以及微服务的兴起,传统单一项目会被按照业务规则进行垂直拆分,另外为了防止单点故障我们也会将重要的服务模块进行集群部署,通过负载均衡进行服务的调用。
  2. 那么随着节点的增多,各个服务的日志也会散落在各个服务器上。这对于我们进行日志分析带来了巨大的挑战,总不能一台一台的登录去下载日志吧。那么我们需要一种收集日志的工具将散落在各个服务器节点上的日志收集起来,进行统一的查询及管理统计。
  3. 那么ELK就可以做到这一点。

本文相关文件资料

  • 查看本站(CSDN)资源:微服务日志收集系统-ELK+Kafka相关安装文件,也可以在我的主页去搜
  • 下载地址:

Elasticsearch

  • Elastic Stack 成员
  • Lucene
  • Java

特性

  • 分布式实时全文搜索引擎
  • 分布式实时分析搜索引擎
  • 分布式实时大数据处理引擎

Logstash

  • Elastic Stack 成员
  • 是一个开源数据收集引擎,具有实时流水线功能
  • 数据处理管道
    • Input(采集)
    • Output(迁移JSON数据到ES服务器中)

在这里插入图片描述

Kibana

  • Elastic Stack 成员
  • 开源
  • 数据分析和可视化
    • 图形
    • 表格
  • Web端访问

在这里插入图片描述

ELK

在这里插入图片描述

ELK的缺点

单纯使用EIK实现分布式日志收集缺点:

  • 当产生日志的服务节点越来越多,Logstash也需要部署越来越多,扩展不好。
  • 读取IO文件,可能会产生日志丢失。
  • 读取文件不是实时性。

引入消息中间件

  • 当业务量增大时,日志跟着增多,直接传入会使LogStash压力过大,可能挂掉,所以需要增加一个缓冲区。
  • 日志数据解耦。为其他数据分析平台提供日志,可从Kafka中获取日志进行实时分析处理。
  • 引入Kafka,日志实时发布到Kafka,Logstash订阅并实时获取消息

ELK+Kafka

在这里插入图片描述

Kafka

  • 高吞吐、分布式消息系统
  • 消息中间件

概念

  • Producer:生产者
  • Consumer:消费者
  • Topic:主题
    在这里插入图片描述

ELK+Kafka环境搭建

1.将安装素材上传至服务器 cd /usr/local/soft

2.防止Elasticsearch因虚拟内存问题启动失败

  • 查看当前值: sysctl -a|grep vm.max_map_count
  • 修改配置文件:vim /etc/sysctl.conf
  • 设置虚拟内存:vm.max_map_count=262144
  • 从指定的文件加载系统参数:sysctl -p
  • 重启docker服务:systemctl restart docker

3.创建镜像li/centos7-elasticsearch

cd /usr/local/soft/Elasticsearch
docker build -t li/centos7-elasticsearch .

4.创建容器

docker run -d -p 9200:9200 -p 9300:9300 --restart=always --name env_elasticsearch  li/centos7-elasticsearch

5.验证Elasticsearch服务【先开放端口或关闭防火墙】

  • 访问http://宿主机IP:9200
    在这里插入图片描述

6.安装Kafka

修改资料中server.properties

advertised.listeners=PLAINTEXT://192.168.5.166:9092

创建镜像li/centos7-kafka

cd /usr/local/soft/Kafka
docker build -t li/centos7-kafka .

创建容器

docker run -d -p 9092:9092  --restart=always --name env_kafka  li/centos7-kafka

7.安装Logstash

修改资料中logstash.conf

bootstrap_servers => ["192.168.5.166:9092"]
hosts=> ["192.168.5.166:9200"]

修改资料中logstash.yml

xpack.monitoring.elasticsearch.url: http://192.168.5.166:9200

创建镜像

cd /usr/local/soft/Logstash
docker build -t li/centos7-logstash .

创建容器

docker run -d --restart=always --name env_logstash  li/centos7-logstash

在Logstash中定义收集规则

logstash.conf

input
  • bootstrap_servers:指向Kafka服务
  • group_id:可以自由指定
  • topics:数组形式,可以填写多个
  • type:可以自由指定
output
  • hosts:指向Elasticsearch服务地址,可以有多个,注意IP和端口和实际保持一致
  • index:表示在Elasticsearch中生成index的规则
  • user和password:Elasticsearch的用户名和密码

8.安装Kibana

修改资料中kibana.yml

elasticsearch.url: "http://192.168.5.166:9200"

创建镜像

cd /usr/local/soft/Kibana
docker build -t li/centos7-kibana .

创建容器

docker run -d -p 5601:5601  --restart=always --name env_kibana  li/centos7-kibana

验证Kibana

  • 访问http://宿主机IP:5601
    在这里插入图片描述

在项目中发送日志信息到Kafka

1.创建demo-kafka-client项目

2.添加依赖

在pom.xml中添加依赖

<dependency>
    <groupId>org.springframework.kafka</groupId>
    <artifactId>spring-kafka</artifactId>
</dependency>

3.发送消息到Kafka

@RestController
public class KafkaController {
    @Resource
    private KafkaTemplate<String, String> kafkaTemplate;
    @RequestMapping(value = "/sendMsgToKafka")
    public String sendMsgToKafka() {
        for (int i = 1; i <=10; i++) {
            kafkaTemplate.send("elkservice", "elk", "hello,Kafka!--->" + i);
        }
        return "发送消息到Kafka完毕";
    }
}

代码分析

kafkaTemplate.send(“elkservice”, “elk", “hello,Kafka!—>” + i);

  • "elkservice":Kafka中的topic,读取消息时需要指定
  • "elk":key值,Kafka用key值确定value存放在哪个分区
  • "hello,Kafka!--->" + i:发送的具体数据

4.添加配置

  • application.yml

    spring:
     kafka:
      producer:
        bootstrap-servers: 192.168.5.210:9092 # Kafka服务地址,可以配置多个,以逗号分隔
    

5.启动服务

  • 调用sendMsgToKafka方法发送消息
Logo

Kafka开源项目指南提供详尽教程,助开发者掌握其架构、配置和使用,实现高效数据流管理和实时处理。它高性能、可扩展,适合日志收集和实时数据处理,通过持久化保障数据安全,是企业大数据生态系统的核心。

更多推荐