一、kafka环境安装

1)kafka下载地址:http://kafka.apache.org/downloads.html
2)kafka安装需要依赖zookeeper,需要安装zookeeper
3)解压kafka

tar -zxvf kafka_2.12-2.1.0.tgz -C /usr/local/kafka_2.12

4)修改server.properties文件

vim /usr/local/kafka_2.12/config/server.properties
broker.id=0 
port=9092 
host.name=192.168.11.51 
advertised.host.name=192.168.11.51 
log.dirs=/usr/local/kafka_2.12/kafka-logs 
num.partitions=2 
zookeeper.connect=192.168.11.111:2181,192.168.11.112:2181,192.168.11.113:2181

5)创建日志文件夹

mkdir /usr/local/kafka_2.12/kafka-logs

6)启动kafka
/usr/local/kafka_2.12/bin/kafka-server-start.sh /usr/local/kafka_2.12/config/server.properties &

二、kafka常用命令

1)创建topic主题命令:(创建名为test的topic, 1个分区分别存放数据,数据备份总共1份)

kafka-topics.sh --zookeeper 192.168.11.111:2181 --create --topic topic1 --partitions 1 --replication-factor 1 
  • –zookeeper 为zk服务列表
  • –create 命令后 --topic 为创建topic 并指定 topic name
  • –partitions 为指定分区数量
  • –replication-factor 为指定副本集数量

2)查看topic列表命令:

kafka-topics.sh --zookeeper 192.168.11.111:2181 --list

3)kafka命令发送数据:(然后我们就可以编写数据发送出去了)

kafka-console-producer.sh --broker-list 192.168.11.51:9092 --topic topic1

4)kafka命令接受数据:(然后我们就可以看到消费的信息了)

kafka-console-consumer.sh --bootstrap-server 192.168.11.51:9092 --topic topic1 --from-beginning 

5)删除topic命令:

kafka-topics.sh --zookeeper 192.168.11.111:2181 --delete --topic topic1

6)kafka查看消费进度:(当我们需要查看一个消费者组的消费进度时,则使用下面的命令)

kafka-consumer-groups.sh --bootstrap-server 192.168.11.51:9092 --describe --group group1 
  • –describe --group 为订阅组, 后面指定 group name

三、SpringBoot整合kafka

1、添加maven依赖
<dependency>
    <groupId>org.springframework.kafka</groupId>
    <artifactId>spring-kafka</artifactId>
</dependency>
2、生产者配置
#kafka服务地址
spring.kafka.bootstrap-servers=192.168.85.200:9092
#发送消息失败时的一个重试次数
spring.kafka.producer.retries=0
#批量发送数据的配置
spring.kafka.producer.batch-size=16384
#设置kafka 生产之内存缓存区的大小(32m)
spring.kafka.producer.buffer-memory=33554432
#kafka消息的序列化配置
spring.kafka.producer.key-serializer=org.apache.kafka.common.serialization.StringSerializer
spring.kafka.producer.value-serializer=org.apache.kafka.common.serialization.StringSerializer
#acks=0:生产者在成功写入消息之前不会等待任何来自服务器的响应
#acks=1:只要集群的首领节点收到消息,生产者就会收到一个来自服务器成功响应
#acks=-1:表示分区leader必须等待消息被成功写入到所有的ISR副本中才认为producer请求成功。
spring.kafka.producer.acks=0
3、生产者发送消息
@slf4j
@Component
public class KafkaProducerService{
    @Autowired
    private KafkaTemplate<String, Object> kafkaTemplate;
    
    public void sendMessage(String topic, Object object){
        ListenableFuture<SendResult<String,Object>> future = kafkaTemplate.send(topic,object);
        future.addCallback(new ListenableFutureCallback<SendResult<String,Object>>(){
            @Override
            public void onSuccess(SendResult<String,Object> result){
                log.info("发送消息成功:"+result.toString());
            }
            
            @Override
            public void onFailure(Throwable ex){
                log.info("发送消息失败:"+ex.getMessage());
            }
        });
    }
}
4、消费者配置
#kafka服务地址
spring.kafka.bootstrap-servers=192.168.85.200:9092
#消息的签收方式(手工签收)
spring.kafka.consumner.enable-auto-commit=false
spring.kafka.listener.ack-mode=manual
#指定了消费者在读取一个没有偏移量的分区或者偏移量无效的情况下做的处理
#latest(默认值)在偏移量无效的时候,消费之将从最新的记录开始读取数据(消费者启动之后的生产的数据)
#earliest:在偏移量无效的情况下,消费者将从起始位置读取分区的记录
spring.kafka.consumer.auto-offset-reset=earliest
#kafka消息的序列化配置
spring.kafka.consumer.key-serializer=org.apache.kafka.common.serialization.StringSerializer
spring.kafka.consumer.value-serializer=org.apache.kafka.common.serialization.StringSerializer
spring.kafka.listener.concurrency=5
5、生产者接收消息
@Slf4j
@Component
public class KafkaConsumerService{
    //需要手动创建topic
    @KafkaListener(groupId = "group02" , topics = "topic02");
    public void onMessage(ConsumerRecord<String,Object>record,Acknowledgment acknowledgment, Consumer<?,?> consumer){
        log.info("消费者接收消息:{}",record.value());
        //手工签收消息
        acknowledgment.acknowledge();
    }
}

四、使用kafka进行海量日志收集

1、海量日志收集结构

在这里插入图片描述

2、使用log4j2日志输出

(1)引入jar
<dependency>
    <groupId>org.springframework.boot</groupId>
    <artifactId>spring-boot-starter-web</artifactId>
    <!- 这里需要排除掉原本的日志框架 ->
    <exclusions>
        <exclusion>
            <groupId>org.springframework.boot</groupId>
            <artifacktId>spring-boot-starter-logging</artifacktId>
        </exclusion>
    </exclusions>
</dependency>

<!- lombok ide需要插件才能使用 ->
<dependency>
    <groupId>org.projectlombok</groupId>
    <artifacktId>lombok</artifacktId>
</dependency>

<!- log4j2 ->
<dependency>
    <groupId>org.springframework.boot</groupId>
    <artifacktId>spring-boot-starter-log4j2</artifacktId>
</dependency>

<!- log4j2强依赖disruptor ->
<dependency>
    <groupId>com.lmax</groupId>
    <artifacktId>disruptor</artifacktId>
    <version>3.3.5</version>
</dependency>

<dependency>
    <groupId>com.alibaba</groupId>
    <artifacktId>fastjson</artifacktId>
    <version>1.2.58</version>
</dependency>
(2)日志配置

application.properties中添加配置信息

spring.appliction.name=collector
spring.http.encoding.charset=UTF=8
spring.jackson.date-format=yyyy-MM-dd HH:mm:ss
spring.jackson.time-zone=GMT+8
spring.jackson.default-property-inclusion=NON_NULL

在resources中添加一个log4j2.xml

<?xml version="1.0" encoding="UTF-8"?>
<Configuration status="INFO" schema="Log4J-V2.0.xsd" monitorInterval="600" >
    <Properties>
        <Property name="LOG_HOME">logs</Property>
        <property name="FILE_NAME">collector</property>
        <!- 日志输出格式   时间  日志级别  线程编号  
class的全名  host名称  ip地址  应用名 类名  执行所在行  包名  方法名 日志信息## '错误信息' 换行符->
        <property name="patternLayout">[%d{yyyy-MM-dd'T'HH:mm:ss.SSSZZ}] [%level{length=5}] [%thread-%tid] [%logger] [%X{hostName}] [%X{ip}] [%X{applicationName}] [%F,%L,%C,%M] [%m] ## '%ex'%n</property>
    </Properties>
    <Appenders>
        <!- 输出到控制台 ->
        <Console name="CONSOLE" target="SYSTEM_OUT">
            <PatternLayout pattern="${patternLayout}"/>
        </Console>  
        <!- 全量日志的输出 ->
        <RollingRandomAccessFile name="appAppender" fileName="${LOG_HOME}/app-${FILE_NAME}.log" filePattern="${LOG_HOME}/app-${FILE_NAME}-%d{yyyy-MM-dd}-%i.log" >
          <PatternLayout pattern="${patternLayout}" />
          <Policies>
              <TimeBasedTriggeringPolicy interval="1"/>
              <SizeBasedTriggeringPolicy size="500MB"/>
          </Policies>
          <DefaultRolloverStrategy max="20"/>         
        </RollingRandomAccessFile>
        <!- 错误日志的输出 ->
        <RollingRandomAccessFile name="errorAppender" fileName="${LOG_HOME}/error-${FILE_NAME}.log" filePattern="${LOG_HOME}/error-${FILE_NAME}-%d{yyyy-MM-dd}-%i.log" >
          <PatternLayout pattern="${patternLayout}" />
          <Filters>
              <ThresholdFilter level="warn" onMatch="ACCEPT" onMismatch="DENY"/>
          </Filters>              
          <Policies>
              <TimeBasedTriggeringPolicy interval="1"/>
              <SizeBasedTriggeringPolicy size="500MB"/>
          </Policies>
          <DefaultRolloverStrategy max="20"/>         
        </RollingRandomAccessFile>            
    </Appenders>
    <Loggers>
        <!-- 业务相关 异步logger -->
        <AsyncLogger name="com.bfxy.*" level="info" includeLocation="true">
          <AppenderRef ref="appAppender"/>
        </AsyncLogger>
        <AsyncLogger name="com.bfxy.*" level="info" includeLocation="true">
          <AppenderRef ref="errorAppender"/>
        </AsyncLogger>       
        <Root level="info">
            <Appender-Ref ref="CONSOLE"/>
            <Appender-Ref ref="appAppender"/>
            <AppenderRef ref="errorAppender"/>
        </Root>         
    </Loggers>
</Configuration>
(3)日志输出
@Slf4j
@RestController
public class IndexController{
    @RequestMapping(value = "/index")
    public String index(){
        log.info("我是一条info日志");
        log.warn("我是一条warn日志");
        log.error("我是一条error日志");
        return "index";
    }
}
(4)MDC线程变量(sl4j的特性)

在输出日志的时候有三个自定的参数
[%X{hostName}] [%X{ip}] [%X{applicationName}]
这里就是将参数添加到日志的线程变量中

@Component
public class InputMDC implements EnvironmentAware{
    private static Environment environment;
    
    @Override
    public void setEnvironment(Environment environment){
        InputMDC.environment = environment;
    }
    
    //在spring启动之后就可以调用这个方法了
    public static void putMDC(){
       //NetUtil是netty的工具包 去netUtil源码里面搞一份就好了
       MDC.put("hostName",NetUtil.getLocalHostName());  
       MDC.put("ip",NetUtil.getLocalIp());
       //这里读取的就是application配置文件里面的信息
       MDC.put("applicationName",environment.getProperty("spring.application.name"));
    }
}

3、使用filebeat日志搜集

(1)安装filebeat

1)下载地址
https://www.elastic.co/downloads/beats/filebeat
2)解压

tar -zxvf filebeat-6.6.0-linux-x86_64.tar.gz -C /usr/local/filebeat-6.6.0

3)修改配置

vim /usr/local/filebeat-6.6.0/filebeat.yml

之前的删了 换成下面的

######### Filebeat Configuration ##########
filebeat.prospectors:

- input_type: log

    paths:
        #app-服务名.log
        - /usr/local/logs/app-collector.log
    #定义写入ES时的_type 值
    document_type: "app-log"
    multiline:
        #pattern: '^\s*(\d{4}|\d{2})\-(\d{2}|[a-zA-z]{3})\-(\d{2}|\d{4})'   #指定匹配的表达式
        pattern: '^\['    #指定匹配的表达式
        negate: true    #是否匹配到
        match: after    #如果没有匹配到,就合并到上一行的末尾
        max_lines: 2000  #最大行数
        timeout: 2s   #如果在规定时间没有新的日志事件就不等待后面的日志了,开始把数据推送出去
    fields:
        logbiz: collector
        logtopic: app-log-collector  #按服务划分用作kafka topic
        evn: dev
        
- input_type: log

    paths:
        #app-服务名.log
        - /usr/local/logs/error-collector.log
    #定义写入ES时的_type 值
    document_type: "error-log"
    multiline:
        #pattern: '^\s*(\d{4}|\d{2})\-(\d{2}|[a-zA-z]{3})\-(\d{2}|\d{4})'   #指定匹配的表达式
        pattern: '^\['    #指定匹配的表达式
        negate: true    #是否匹配到
        match: after    #如果没有匹配到,就合并到上一行的末尾
        max_lines: 2000  #最大行数
        timeout: 2s   #如果在规定时间没有新的日志事件就不等待后面的日志了,开始把数据推送出去
    fields:
        logbiz: collector
        logtopic: error-log-collector  #按服务划分用作kafka topic
        evn: dev
        
output.kafka:
    enabled:true
    hosts: ["192.168.85.200:9092"]
    topic: '%{[fields.logtopic]}'
    partition.hash:
        reachable_only: true
    compression: gzip
    max_message_bytes: 1000000
    #acks=0:生产者在成功写入消息之前不会等待任何来自服务器的响应
    #acks=1:只要集群的首领节点收到消息,生产者就会收到一个来自服务器成功响应
    #acks=-1:表示分区leader必须等待消息被成功写入到所有的ISR副本中才认为producer请求成功。
    required_acks: 1
logging.to_files: true

4)检查配置是否正确
在filebeat-6.6.0根目录下

./filebeat -c filebeat.yml -configtest

5)启动filebeat之前需要先创建kafka的topic

kafka-topics.sh --zookeeper 192.168.11.111:2181 --create --topic app-log-collector --partitions 1 --replication-factor 1
kafka-topics.sh --zookeeper 192.168.11.111:2181 --create --topic error-log-collector --partitions 1 --replication-factor 1

6)启动filebeat

/usr/local/filebeat-6.6.0/filebeat &

4、使用logstash日志过滤

(1)安装logstash
# 解压安装
tar -zxvf logstash-6.6.0.tar.gz -C /usr/local/

# 启动logstash
nohup /usr/local/logstash-6.6.0/bin/logstash -f /usr/local/logstash-6.6.0/script/logstash-script.conf &
(2)conf下配置文件说明:
  • logstash配置文件:/config/logstash.yml
  • JVM参数文件:/config/jvm.options
  • 日志格式配置文件:log4j2.properties
  • 制作Linux服务参数:/config/startup.options
(3)配置文件说明:
vim /usr/local/logstash-6.6.0/config/logstash.yml

–path.config 或 –f :logstash启动时使用的配置文件
–configtest 或 –t:测试 Logstash 读取到的配置文件语法是否能正常解析
–log或-l:日志输出存储位置
–pipeline.workers 或 –w:运行 filter 和 output 的 pipeline 线程数量。默认是 CPU 核数。
–pipeline.batch.size 或 –b:每个 Logstash pipeline 线程,在执行具体的 filter 和 output 函数之前,最多能累积的日志条数。
–pipeline.batch.delay 或 –u:每个 Logstash pipeline 线程,在打包批量日志的时候,最多等待几毫秒。
–verbose:输出调试日志
–debug:输出更多的调试日志

(4)虚拟机配置
vim /usr/local/logstash-6.6.0/config/jvm.options
(5) 启动配置 比如启动时的java位置、LS的home等
vim /usr/local/logstash-6.6.0/config/startup.options
(6)数据收集目录:

/usr/local/logstash-6.6.0/data

(7) 插件目录:
/usr/local/logstash-6.6.0/vendor/bundle/jruby/1.9/gems

1)查看插件命令:
/usr/local/logstash-6.6.0/bin/logstash-plugin list
2)更新插件命令:
/usr/local/logstash-6.6.0/bin/logstash-plugin update logstash-xxxx-xxxxx
3)安装插件命令:
/usr/local/logstash-6.6.0/bin/logstash-plugin install logstash-xxxx-xxxxx
4)插件地址: https://github.com/logstash-plugins

(8) 配置logstash-script.conf(重要)
vim /usr/local/logstash-6.6.0/script/logstash-script.conf

1)语法框架

# 注释. 
input { 
    ... 
} 
filter {
    ... 
} 
output {
    ... 
}

2)数据类型

# bool类型
debug => true
# string类型
host => "hostname"
# number类型
port => 6789
# array or list类型
path => ["/var/log/message","/var/log/*.log"]
# hash类型
match => {
    "field1" => "value1"
    "field2" => "value2"
}
# codec类型
codec => "json"

#字段引用方式:
{
    "agent":  "Mozilla/5.0  (compatible;  MSIE  9.0)",
    "ip":  "192.168.24.44",
    "request":  "/index.html"
    "response":  {
        "status":  200,
        "bytes":  52353
    },
    "ua":  {
        "os":  "Windows  7"
    }
}
#获取字段值:
[response][status]
[ua][os]

3)条件判断condition

if EXPRESSION { 
    ... 
} else if EXPRESSION {
    ... 
}else {
    ...
} 

==(等于), !=(不等于), <(小于), >(大于), <=(小于等于), >=(大于等于), =~(匹配正则), !~(不匹配正则) in(包含), not in(不包含), and(与), or(或), nand(非与), xor(非或) ()(复合表达式), !()(对复合表达式结果取反)

4)使用环境变量

#缺失报错:
input { 
	tcp { 
		port => "${TCP_PORT}" 
	} 
}
#缺失使用默认值:
input { 
	tcp { 
		port => "${TCP_PORT:54321}" 
	} 
}

5)完整配置

input {
    kafka {
        #app-log-服务名称
        topics_pattern => "app-log-.*"
        bootstrap_servers => "192.168.11.51:9092"
        codec => json
        consumer_threads => 4  #增加consumer的并行消费线程数
        decorate_events => true
        #auto_offset_rest => "latest"
        group_id => "app-log-group"
    }
    kafka {
        #error-log-服务名称
        topics_pattern => "error-log-.*"
        bootstrap_servers => "192.168.11.51:9092"
        codec => json
        consumer_threads => 4
        decorate_events => true
        #auto_offset_rest => "latest"
        group_id => "app-log-group"
    }
}

filter {
    #时区转换
    ruby{
        code => "event.set('index_time',event.timestamp.time.localtime.strftime('%Y.%m.%d'))"
    }
    # [fields][logtopic]这串东西 对应的是filebeat的配置文件filebeat.yml里面的fields下的logtopic属性,具体的回头看filebeat的内容
    if "app-log" in [fields][logtopic]{
        grok{
            #这个是匹配日志的格式的,日志的格式可以匹配成功这条数据就不过滤,否则就过滤掉
            match => ["message","\[%{NOTSPACE:currentDateTime}\] \[%{NOTSPACE:level}\] \[%{NOTSPACE:thread-id}\] \[%{NOTSPACE:class}\] \[%{NOTSPACE:hostName}\] \[%{NOTSPACE:ip}\] \[%{NOTSPACE:applicationName}\] \[%{NOTSPACE:location}\] \[%{NOTSPACE:messageInfo}\] ## (\'\'|%{QUOTEDSTRING:throwable})"]
        }
    }
    
    if "error-log" in [fields][logtopic]{
        grok{
            match => ["message","\[%{NOTSPACE:currentDateTime}\] \[%{NOTSPACE:level}\] \[%{NOTSPACE:thread-id}\] \[%{NOTSPACE:class}\] \[%{NOTSPACE:hostName}\] \[%{NOTSPACE:ip}\] \[%{NOTSPACE:applicationName}\] \[%{NOTSPACE:location}\] \[%{NOTSPACE:messageInfo}\] ## (\'\'|%{QUOTEDSTRING:throwable})"]
        }
    }
}

#输出到控制台
output {
    stdout {
        codec => rubydebug
    }
}

5、使用ElasticSearch日志持久化

ElasticSearch具体的安装和使用看我以前的文章,这里不再赘述

(1) 在logstash-script.conf中添加es的输出配置
output{
    
    if "app-log" in [fields][logtopic]{
        #es插件
        elasticsearch{
        host => ["192.168.11.35:9200"]
        #用户名密码
        user => "elastic"
        password => "123456"
        #索引名 +号开头的,就会姿容任务后面是时间格式
        #javalog-app-service-2019.01.23
        index => "app-log-%{[fields][logbiz]}-%{index_time}"
        #是否嗅探集群ip:一般设置true
        #通过嗅探机制进行es集群负载均衡发日志消息
        sniffing => true
        #logstash默认值自带一个mapping模板,进行模板覆盖
        template_overwrite => true
        }
    }
    
    if "error-log" in [fields][logtopic]{
        elasticsearch{
        host => ["192.168.11.35:9200"]
        user => "elastic"
        password => "123456"
        index => "app-log-%{[fields][logbiz]}-%{index_time}"
        sniffing => true
        template_overwrite => true
        }
    }
}

6、使用kibana日志可视化

(1) 安装Kibana
  ## 解压kibana
  wget https://artifacts.elastic.co/downloads/kibana/kibana-5.6.2-linux-x86_64.tar.gz
  tar -zxvf kibana-5.6.2-linux-x86_64.tar.gz -C /usr/local/
  mv kibana-5.6.2-linux-x86_64/ kibana-5.6.2
  # 进入kibana目录,修改配置文件
  vim /usr/local/kibana-5.6.2/config/kibana.yml
  # 修改配置如下:
  server.host: "192.168.11.35"
  elasticsearch.url: http://192.168.11.161:9200
  # 启动:
  /usr/local/kibana-5.6.2/bin/kibana &
  # 指定配置文件启动:
  nohup /usr/local/kibana-5.6.2/bin/kibana -c /usr/local/kibana-5.6.2/config/kibana.yml > /dev/null 2>&1 &
  
  # 访问:
  http://192.168.11.35:5601/app/kibana (5601为kibana默认端口)
(2) 将es索引设置到Kibana中

1)访问kibana

 http://192.168.11.35:5601/app/kibana (5601为kibana默认端口)

2)将es索引设置到Kibana中
点击manager-》index Patterns-》Create index patten-》在文本框中输入 索引的匹配规则(我这里输入app-log-*)-》next step -》选择currentDateTime -》点击完成
这部有几个需要匹配的索引就需要操作几次

3)查看es中的信息
点击Discover-》文本框中选择想要查询的索引名称就能找到相关数据了

(3)附:集成X-Pack更好的可视化数据

https://blog.csdn.net/weixin_30898109/article/details/99055801

7、使用watcher监控告警

watcher是X-Pack插件的一个功能,如果没有这个功能需要自行安装一下X-Pack

1)添加模板信息

进入Kibana-》点击Dev Tools-》运行以下内容

PUT _template/error-log-
{
  "template": "error-log-*",
  "order": 0,
  "settings": {
    "index": {
      "refresh_interval": "5s"
    }
  },
  "mappings": {
    "_default_": {
      "dynamic_templates": [
        {
          "message_field": {
            "match_mapping_type": "string",
            "path_match": "message",
            "mapping": {
              "norms": false,
              "type": "text",
              "analyzer": "ik_max_word",
              "search_analyzer": "ik_max_word"
            }
          }
        },
        {
          "throwable_field": {
            "match_mapping_type": "string",
            "path_match": "throwable",
            "mapping": {
              "norms": false,
              "type": "text",
              "analyzer": "ik_max_word",
              "search_analyzer": "ik_max_word"
            }
          }
        },
        {
          "string_fields": {
            "match_mapping_type": "string",
            "match": "*",
            "mapping": {
              "norms": false,
              "type": "text",
              "analyzer": "ik_max_word",
              "search_analyzer": "ik_max_word",
              "fields": {
                "keyword": {
                  "type": "keyword"
                }
              }
            }
          }
        }
      ],
      "_all": {
        "enabled": false
      },
      "properties": {         
        "hostName": {
          "type": "keyword"
        },
        "ip": {
          "type": "ip"
        },
        "level": {
          "type": "keyword"
        },
		"currentDateTime": {
		  "type": "date"
		}
      }
    }
  }
}
2)添加watcher

在Dev Tools中运行以下内容

PUT _xpack/watcher/watch/error_log_collector_watcher
{
  "trigger": {
    "schedule": {
        #每五秒查询一下日志信息
      "interval": "5s"
    }
  },
  #这个就是每五秒要做的事情
  "input": {
    "search": {
      "request": {
        #这是监控的索引名称,增加8小时为了统一时区
        "indices": ["<error_log_collector-{now+8h/d}>"],
        "body": {
          "size": 0,
          "query": {
            "bool": {
              "must": [
                  {
                  #匹配等级为ERROR的日志,配到就报警
                    "term": {"level": "ERROR"}
                  }
              ],
              "filter": {
                "range": {
                    #查询的的范围,当前时间30秒内的时间,这里有个问题,同一条error会被反复查询到,所以会警报多次,需要更具实际情况自行设定
                    "currentDateTime": {
                    "gt": "now-30s" , "lt": "now"
                  }
                }
              } 
            }
          }
        }
      }
    }
  },


# 对于上面的查询结果进行比较
  "condition": {
    # compare进行比较
    "compare": {
        # 上面的query查询的结果会放入到ctx.payload中
        #上面的那个查询到的个数要是超过0个就执行后面的任务
      "ctx.payload.hits.total": {
        "gt": 0
      }
    }
  },
    #这里和之前的很想,之前只是查询出数量,这里就是查询出实际数据了
  "transform": {
    "search": {
      "request": {
        "indices": ["<error-log-collector-{now+8h/d}>"],
        "body": {
          "size": 1,
          "query": {
            "bool": {
              "must": [
                  {
                    "term": {"level": "ERROR"}
                  }
              ],
              "filter": {
                "range": {
                    "currentDateTime": {
                    "gt": "now-30s" , "lt": "now"
                  }
                }
              } 
            }
          },
          "sort": [
            {
                #更具时间排序一下
                "currentDateTime": {
                    "order": "desc"
                }
            }
          ]
        }
      }
    }
  },
# 根据上面的查询、比较结果,执行actions里面定义的动作(定义多种报警类型)
  "actions": {
    # 报警名字
    "test_error": {
        #发一个post请求给指定地址,这个接口就随便写了,可以写个短信发送的功能,把收到的警报发给指定手机号
      "webhook" : {
        "method" : "POST",
        "url" : "http://192.168.11.31:8001/accurateWatch",
        "body" : "{\"title\": \"异常错误告警\", \"applicationName\": \"{{#ctx.payload.hits.hits}}{{_source.applicationName}}{{/ctx.payload.hits.hits}}\", \"level\":\"告警级别P1\", \"body\": \"{{#ctx.payload.hits.hits}}{{_source.messageInfo}}{{/ctx.payload.hits.hits}}\", \"executionTime\": \"{{#ctx.payload.hits.hits}}{{_source.currentDateTime}}{{/ctx.payload.hits.hits}}\"}"
      }
    }
 }
}

到这里为止所有的功能已经全部实现

3)watcher一些其他常用api的使用
查看一个watcher
# 
GET _xpack/watcher/watch/error_log_collector_watcher


#删除一个watcher
DELETE _xpack/watcher/watch/error_log_collector_watcher

#执行watcher
# POST _xpack/watcher/watch/error_log_collector_watcher/_execute

#查看执行结果
GET /.watcher-history*/_search?pretty
{
  "sort" : [
    { "result.execution_time" : "desc" }
  ],
  "query": {
    "match": {
      "watch_id": "error_log_collector_watcher"
    }
  }
}

GET error-log-collector-2019.09.18/_search?size=10
{

  "query": {
    "match": {
      "level": "ERROR"
    }
  }
  ,
  "sort": [
    {
        "currentDateTime": {
            "order": "desc"
        }
    }
  ] 
}


GET error-log-collector-2019.09.18/_search?size=10
{

  "query": {
    "match": {
      "level": "ERROR"
    }
  }
  ,
  "sort": [
    {
        "currentDateTime": {
            "order": "desc"
        }
    }
  ] 
}
4)附:ctx.payload取值规范

比如我们进行search搜索school里面name=zhangsan的数据:
这是通过普通的es接口查询

# payload取值规范:比如我们进行search搜索school:
GET school/_search
{
  "query": {
    "match": {
      "name": "zhangsan"
    }
  }
}

得到结果如下:

{
  "took": 14,
  "timed_out": false,
  "_shards": {
    "total": 2,
    "successful": 2,
    "skipped": 0,
    "failed": 0
  },
  "hits": {
    "total": 1,
    "max_score": 1.5404451,
    "hits": [
      {
        "_index": "school",
        "_type": "student",
        "_id": "1",
        "_score": 1.5404451,
        "_source": {
          "name": "zhangsan",
          "age": 25,
          "course": "elasticsearch",
          "study_date": "2018-06-15T20:30:50",
          "mark": "today is a good day"
        }
      }
    ]
  }
}

使用表示查询:ctx.payload结果集:

{{#ctx.payload.hits.hits}} {{_source.name}} {{_source.course}} {{/ctx.payload.hits.hits}}
5)附:triggers的类型

triggers的7种类型

  • hourly
  • daily
  • weekly
  • monthly
  • yearly
  • cron
  • interval

具体事例如下:

#hourly:每小时执行
   #例如:12:00, 12:15, 12:30, 12:45, 1:00, 1:15
   {
     "trigger" : {
       "schedule" : {
         "hourly" : { "minute" : [ 0, 15, 30, 45 ] }
       }
     }
   }
   
   #daily:每天执行
   #每天00:00, 12:00, and 17:00
   {
     "trigger" : {
       "schedule" : {
         "daily" : { "at" : [ "midnight", "noon", "17:00" ] }
       }
     }
   }
   #每天00:00, 00:30, 12:00, 12:30, 17:00 and 17:30
   {
     "trigger" : {
       "schedule" : {
         "daily" : {
           "at" {
             "hour" : [ 0, 12, 17 ],
             "minute" : [0, 30]
           }
         }
       }
     }
   }
   
   #weekly:指定星期几
   #周二12:00,周五17:00
   {
     "trigger" : {
       "schedule" : {
         "weekly" : [
           { "on" : "tuesday", "at" : "noon" },
           { "on" : "friday", "at" : "17:00" }
         ]
       }
     }
   }
   #周二、周五的17:00
   {
     "trigger" : {
       "schedule" : {
         "weekly" : {
           "on" : [ "tuesday", "friday" ],
           "at" : [ "noon", "17:00" ]
         }
       }
     }
   }
   
   #monthly:指定每月哪天执行
   #每月10号中午、每月20号17:00
   {
     "trigger" : {
       "schedule" : {
         "monthly" : [
           { "on" : 10, "at" : "noon" },
           { "on" : 20, "at" : "17:00" }
         ]
       }
     }
   }
   #每月10号、20号的00:00,12:00
   {
     "trigger" : {
       "schedule" : {
         "monthly" : {
           "on" : [ 10, 20 ],
           "at" : [ "midnight", "noon" ]
         }
       }
     }
   }
   #yearly-指定月、日、时
   #每年的1月10日12:00,每年的7月20日17:00
   {
     "trigger" : {
       "schedule" : {
         "yearly" : [
           { "in" : "january", "on" : 10, "at" : "noon" },
           { "in" : "july", "on" : 20, "at" : "17:00" }
         ]
       }
     }
   }
   #每年1月10日,1月20日,12月10日,12月20日的12:00,00:00
   {
     "trigger" : {
       "schedule" : {
         "yearly" : {
           "in" : [ "jan", "dec" ],
           "on" : [ 10, 20 ],
           "at" : [ "midnight", "noon" ]
         }
       }
     }
   }
   #cron-表达式
   <seconds> <minutes> <hours> <day_of_month> <month> <day_of_week> [year]
   0 5 9 * * ?
   0 0-5 9 * * ?
   0 0/15 9 * * ?
   
   #interval-周期的
   #间隔单位:s:秒、m:分钟、h:小时、d:天、w:星期
6)附:input的类型

Inputs的4种类型

  • Simple
  • Search
  • HTTP
  • Chain

具体事例如下:

#Simple Input-静态数据
   #每天12点触发
   {
     "trigger" : {
       "schedule" : {
         "daily" : { "at" : "noon" }
       }
     },
     "input" : {
       "simple" : {
         "name" : "John"
       }
     },
     "actions" : {
       "reminder_email" : {
         "email" : {
           "to" : "to@host.domain",
           "subject" : "Reminder",
           "body" : "Dear {{ctx.payload.name}}, by the time you read these lines, I'll be gone"
         }
       }
     }
   }
   
   
   #Search-搜索
   {
     "input" : {
       "search" : {
         "request" : {
           "indices" : [ "logs" ],
           "body" : {
             "query" : { "match_all" : {} }
           }
         }
       }
     },
     "condition" : {
       "compare" : { "ctx.payload.hits.total" : { "gt" : 5 }}
     }
     ...
   }
   
#Http-请求
#request.host
#request.port
#request.path
#request.headers
#request.params
#request.url:request.scheme, request.host, request.port and request.params
#request.method:head、get、post、put、delete
#request.auth
#request.body
#request.proxy.host
#request.proxy.port
#request.connection_timeout
#request.read_timeout
#response_content_type:json, yaml and text
#extract
#get请求
   {
   	"input" : {
   	  "http" : {
   	    "request" : {
   	      "host" : "example.com",
   	      "port" : 9200,
   	      "path" : "/idx/_search"
   	    }
   	  }
   	}
   }
   
#含有body体内容
   {
   	"input" : {
   		"http" : {
   			"request" : {
   			  "host" : "host.domain",
   			  "port" : 9200,
   			  "path" : "/idx/_search",
   			  "body" :  "{\"query\" :  {  \"match\" : { \"category\" : \"event\"}}}"
   			}
   		}
   	}
   }
   
#含有参数的
   {
   	"input" : {
   	  "http" : {
   	    "request" : {
   	      "host" : "host.domain",
   	      "port" : "9200",
   	      "path" : "/_cluster/stats",
   	      "params" : {
   	        "human" : "true" 
   	      }
   	    }
   	  }
   	}
   }

#含有用户密码
   {
   	"input" : {
   	  "http" : {
   	    "request" : {
   	      "host" : "host.domain",
   	      "port" : "9200",
   	      "path" : "/myservice",
   	      "auth" : {
   	        "basic" : {
   	          "username" : "user",
   	          "password" : "pass"
   	        }
   	      }
   	    }
   	  }
   	}
   }
   
#直接请求url的
   {
   	"input" : {
   	  "http" : {
   	    "request" : {
   	      "url" : "http://api.openweathermap.org/data/2.5/weather",
   	      "params" : {
   	        "lat" : "52.374031",
   	        "lon" : "4.88969",
   	        "appid" : "<your openweathermap appid>"
   	      }
   	    }
   	  }
   	}
   }
   
#Chain-input-同时设置多个input,串行
   {
   	"input" : {
   	  "chain" : {
   	    "inputs" : [ 
   	      ## 第一步input
   	      {
   	        "first" : {
   	          "simple" : { "path" : "/_search" }
   	        }
   	      },
   	      ## 第二步input (可以去使用第一步input返回的结果)
   	      {
   	        "second" : {
   	          "http" : {
   	            "request" : {
   	              "host" : "localhost",
   	              "port" : 9200,
   	              "path" : "{{ctx.payload.first.path}}" 
   	            }
   	          }
   	        }
   	      }
   	    ]
   	  }
   	}
   }
7)附:condition条件设置
#Always Condition
"condition" : {
     "always" : {}
}
#Never Condition
"condition" : {
     "never" : {}
}


#Compare Condition (进行和查询的结果进行比较语法如下:)
# eq:、not_eq、gt、gte、lt、lte
## 比如错误条数超过了5条进行报警、响应长时间超过多少毫秒进行报警等
{
     "condition" : {
       "compare" : {
         "ctx.payload.hits.total" : { 
           "gte" : 5 
         }
     }
}

#<{expression}> 正则表达式 使用 <> 中写正则表达式: 比如 当前时间 - 5分钟 进行比较,如下:
{
     "condition" : {
       "compare" : {
         "ctx.execution_time" : {
           "gte" : "<{now-5m}>"
         }
     }
}


#{{path}} 比较,这个就是最开始的示例里面的获取参数方式,如下:
{
     "condition" : {
       "compare" : {
         "ctx.payload.aggregations.status.buckets.error.doc_count" : {
           "not_eq" : "{{ctx.payload.aggregations.handled.buckets.true.doc_count}}"
         }
     }
}
   
  
#Array Compare Condition 数组比较: 比如当前的doc_count大于25 就进行报警
{
     "condition": {
       "array_compare": {
         "ctx.payload.aggregations.top_tweeters.buckets" : { 
           "path": "doc_count" ,
           "gte": { 
             "value": 25, 
           }
         }
       }
     }
}
   
#Script Condition 脚本方式
{
     "input" : {
       "search" : {
         "indices" : "log-events",
         "body" : {
           "size" : 0,
           "query" : { "match" : { "status" : "error" } }
         }
       }
     },
     "condition" : {
       "script" : {
         ## 当前返回的条数是否大于阈值,进行报警
         "inline" : "return ctx.payload.hits.total > threshold",
         "params" : {
           "threshold" : 5
         }
       }
     }
}
8)附:Action 触发器
#Email Action--发送邮件 
#如果使用发送邮件的报警,则需要在elasticsearch.yml中配置发送邮件服务的信息
xpack.notification.email:
   default_account: gmail_account
   account:
     gmail_account:
         profile: gmail
         smtp:
             auth: true
             starttls.enable: true
             host: smtp.gmail.com
             port: 587
             user: <username>
             password: <password>
     outlook_account:
         profile: outlook
         smtp:
             auth: true
             starttls.enable: true
             host: smtp-mail.outlook.com
             port: 587
             user: <username>
             password: <password>:
   	exchange_account:
         profile: outlook
         email_defaults:
             from: <email address of service account> 
         smtp:
             auth: true
             starttls.enable: true
             host: <your exchange server>
             port: 587
             user: <email address of service account> 
             password: <password>
   
#发送邮件
"actions" : {
     ## actions名字
     "send_email" : { 
       "email" : { 
         "to" : "'Recipient Name <recipient@example.com>'", 
         #"to" : ['Personal Name <user1@host.domain>', 'user2@host.domain'], 
         "subject" : "Watcher Notification", 
         "body" : "{{ctx.payload.hits.total}} error logs found" 
       }
     }
}

#发送含有附件信息的邮件
"actions" : {
     "email_admin" : {
       "email": {
         "to": "'John Doe <john.doe@example.com>'",
         "attachments" : {
           ## 附件方式
           "my_image.png" : { 
             "http" : { 
               "content_type" : "image.png",
               "request" : {
                 "url": "http://example.org/foo/my-image.png" 
               }
             }
           },
           ## xpack reporting插件生成方式:
           "dashboard.pdf" : {
             "reporting" : {
               "url": "http://example.org:5601/api/reporting/generate/dashboard/Error-Monitoring"
             }
           },
           ## 自定义附件
           "data.yml" : {
             "data" : {
               "format" : "yaml" 
             }
           }
         }
       }
     }
}

#Webhook Action,发送一个http请求
#发送github的issue
"actions" : {
     "create_github_issue" : {
       ## 因为发邮件到达率不是特别高,所以可以使用外部的接口调用方式
       ## 比如这里调用url为外部的手机短信接口进行发送 
       "webhook" : {
         ## 请求方式
         "method" : "POST",
         ## 外部请求地址
         "url" : "https://api.github.com/repos/<owner>/<repo>/issues",
         ## 请求报文
         "body" : "{
           \"title\": \"Found errors in 'contact.html'\",
           \"body\": \"Found {{ctx.payload.hits.total}} errors in the last 5 minutes\",
           \"assignee\": \"web-admin\",
           \"labels\": [ \"bug\", \"sev2\" ]
         }",
         ## 用户名密码
         "auth" : {
           "basic" : {
             "username" : "<username>", 
             "password" : "<password>"
           }
         }
       }
     }
}

#带有url参数的请求
"actions" : {
     "my_webhook" : {
       "webhook" : {
         "method" : "POST",
         "host" : "mylisteningserver",
         "port" : 9200,
         "path": ":/alert",
         "params" : {
           "watch_id" : "{{ctx.watch_id}}" 
         }
       }
     }
}

#自定义header
"actions" : {
     "my_webhook" : {
       "webhook" : {
         "method" : "POST",
         "host" : "mylisteningserver",
         "port" : 9200,
         "path": ":/alert/{{ctx.watch_id}}",
         "headers" : {
           "Content-Type" : "application/yaml" 
         },
         "body" : "count: {{ctx.payload.hits.total}}"
       }
     }
}
   
#Index Action--创建索引文档
"actions" : {
     "index_payload" : { 
       "index" : {
         "index" : "my-index", 
         "doc_type" : "my-type", 
         "doc_id": "my-id" 
       }
     }
}
   
#Logging Action--记录日志
#level:error, warn, info, debug and trace
## 日志种类:
#category:xpack.watcher.actions.logging
"actions" : {
     "log" : { 
       "transform" : { ... }, 
       ## 日志报警
       "logging" : {
         "text" : "executed at {{ctx.execution_time}}",
         ## 日志级别
         "level": "info"
       }
     }
}

8、总结(之前没看懂的看这里)

  1. 应用程序产生日志信息,通过log4j2输出到磁盘上的两个日志文件中(app-log-服务名.log和error-log-服务名.log)
  2. filebeat会监听这个两个日志文件,日志中的新数据会被整合打包推送到kafka中
  3. kafka作为一个负载缓冲的作用,将数据推送到logstash中
  4. logstash用来做日志的筛选和过滤,把需要日志推送到elasticsearch
  5. elasticsearch用于持久化日志,并为数据可视化提供数据来源
  6. 使用kibana进行数据可视化
  7. 通过x-pack插件中的watcher功能,实现警报的能力
Logo

Kafka开源项目指南提供详尽教程,助开发者掌握其架构、配置和使用,实现高效数据流管理和实时处理。它高性能、可扩展,适合日志收集和实时数据处理,通过持久化保障数据安全,是企业大数据生态系统的核心。

更多推荐