kafka logstash elasticsearch kibana window安装配置
kafkalogstash elasticsearch kibana进行配置日志分析收集,window机器上搭建
主要使用 kafka logstash elasticsearch kibana进行配置日志分析收集,由于只是学习,所以在window机器上搭建
- kafka_2.10-0.10.1.1
http://kafka.apache.org/downloads - zookeeper-3.4.6
http://zookeeper.apache.org/ - logstash-5.3.2
https://www.elastic.co/cn/downloads/logstash - elasticsearch-5.3.2
https://www.elastic.co/cn/downloads/elasticsearch kibana-5.3.2-windows-x86
https://www.elastic.co/cn/downloads/kibana
由于 logstash 、kibana 、elasticsearch都是一个公司的,可各个产品首页显示的不是5.3.2 版本 ,可以点击产品页面上的View past releases. 下载对应的版本,为了避免版本冲突 建议这三个采用同一个版本
简单的原理:
kafka(在应用中采集日志)—-logstash(从kafka中收集日志到ES中)—-elasticsearch(负责存储日志信息)——–kibana(负责用来展示数据)
所以启动顺序:
先把zookeeper kafka elasticsearch
1. zookeeper启动
cd zookeeper-3.4.6/bin
zkServer.cmd
2.kafka服务启动
cd kafka_2.10-0.10.1.1\bin\windows
kafka-server-start.bat ../../config/server.properties
为了查看效果 顺带启动一下 kafka 消费者
kafka-console-consumer.bat –zookeeper localhost:2181 –topic mytopic –from-beginning
3.启动elasticsearch
cd elasticsearch-5.3.2\bin
elasticsearch.bat
elasticsearch启动后 可以看http://localhost:9200/
正常会返回一个json 对应你的elasticsearch的版本相关信息
4.配置kibana
首先修改一下kibana的配置文件 config\kibana.yml
cd kibana-5.3.2-windows-x86\config
在config增加一行配置
elasticsearch.url: “http://localhost:9200”
cd kibana-5.3.2-windows-x86\bin
kibana.bat
启动后可以进入kibana首页查看http://localhost:5601
一开始进去会提示没有index pattern。原因是因为我们还没有把数据弄到elasticsearch中
5.配置logstash
cd logstash-5.3.2\bin
我们在这个目录下 新建一个配置文件 名字:logstash.conf
内容如下:
input{
kafka {
bootstrap_servers => "localhost:9092"
topics => ["mytopic"]
enable_auto_commit => "true"
auto_offset_reset => "latest"
}
}
output{
stdout{}
elasticsearch {
hosts => [ "localhost:9200" ]
index => "logstash-cylog"
}
}
简单说明一下这个配置文件:
input:代表数据源怎么来的。我们这里使用kafka消费者
filter :代表过滤数据,这里配置代表是 将message进行json解析。(非必须)
output:输出数据,stdout代表输出到控制台中(方便直接查看效果)
elasticsearch 配置 指向本地的地址
input filter output 这个只是简单配置,详细的可以参考logstash的官网
https://www.elastic.co/cn/products/logstash 里面有非常详细,针对每个配置说明
由于我们使用需要使用到kafka-input插件 所以我们要安装kafka的插件
cd logstash-5.3.2\bin
logstash-plugin.bat install logstash-input-kafka安装插件后 启动logstash
提示成功后,就是我们最重要的一步了。
把日志发送到kafka 这里有很多种方式,我们采用最简单的log4j kafka appender
新建一个maven项目
KafkaAppenderApplication
package com.test;
import org.apache.logging.log4j.LogManager;
import org.apache.logging.log4j.Logger;
import org.springframework.boot.SpringApplication;
import org.springframework.boot.autoconfigure.SpringBootApplication;
@SpringBootApplication
public class KafkaAppenderApplication {
public static Logger logger = LogManager.getLogger(KafkaAppenderApplication.class);
public static void main(String[] args) {
long t = System.currentTimeMillis();
logger.info("inf1o123" + t + "info");
SpringApplication.run(KafkaAppenderApplication.class, args);
System.exit(0);
}
}
pom.xml
<?xml version="1.0" encoding="UTF-8"?>
<project xmlns="http://maven.apache.org/POM/4.0.0"
xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd">
<modelVersion>4.0.0</modelVersion>
<groupId>com.test</groupId>
<artifactId>kafkaAppender</artifactId>
<version>1.0</version>
<parent>
<groupId>org.springframework.boot</groupId>
<artifactId>spring-boot-starter-parent</artifactId>
<version>1.4.0.RELEASE</version>
<relativePath/>
</parent>
<dependencies>
<dependency>
<groupId>org.springframework.boot</groupId>
<artifactId>spring-boot-starter-web</artifactId>
<exclusions>
<exclusion>
<groupId>org.springframework.boot</groupId>
<artifactId>spring-boot-starter-logging</artifactId>
</exclusion>
</exclusions>
</dependency>
<dependency>
<groupId>org.apache.logging.log4j</groupId>
<artifactId>log4j-core</artifactId>
<version>2.5</version>
</dependency>
<dependency>
<groupId>commons-logging</groupId>
<artifactId>commons-logging</artifactId>
<version>1.2</version>
</dependency>
<dependency>
<groupId>org.apache.kafka</groupId>
<artifactId>kafka_2.10</artifactId>
<version>0.10.1.1</version>
<exclusions>
<exclusion>
<groupId>org.slf4j</groupId>
<artifactId>slf4j-log4j12</artifactId>
</exclusion>
</exclusions>
</dependency>
</dependencies>
</project>
log4j2.xml
<Configuration status="warn" monitorInterval="30" strict="true" schema="Log4J-V2.2.xsd">
<Appenders>
<!-- 输出到控制台 -->
<Console name="Console" target="SYSTEM_OUT">
<!-- 需要记录的级别 -->
<ThresholdFilter level="trace" onMatch="ACCEPT" onMismatch="DENY"/>
<PatternLayout pattern="%d{yyyy-MM-dd HH:mm:ss,SSS}:%4p %t (%F:%L) - %m%n"/>
</Console>
<!-- 输出到Kafka topci:mytopic bootstrap.servers: localhost:9092-->
<Kafka name="KAFKA" topic="mytopic">
<PatternLayout pattern="%d{yyyy-MM-dd HH:mm:ss.SSS} [%t] [%-5p] {%F:%L} - %m%n" />
<Property name="bootstrap.servers">localhost:9092</Property>
</Kafka>
</Appenders>
<Loggers>
<Root level="info"> <!-- 全局配置 -->
<AppenderRef ref="Console"/>
<AppenderRef ref="KAFKA"/>
</Root>
</Loggers>
</Configuration>
启动项目后
kafka消费者显示日志:
2017-05-04 14:16:35.221 [main] [INFO ] {KafkaAppenderApplication.java:15} - inf1
o1231493878595218info
2017-05-04 14:16:35.865 [background-preinit] [INFO ] {Version.java:30} - HV00000
1: Hibernate Validator 5.2.4.Final
然后我们可以登录到kafka 首先建立index 可以选择logstash-*
进入 Discover后 可以看到我们刚才的日志
更多推荐
所有评论(0)