Logstash output Kafka with Kerberos学习
1、Kafka中的一些术语:2、ELK流程图:3、Kafka发布、订阅信息的流程:4、通过Logstash收集日志到Kafka:
Logstash output Kafka with Kerberos学习
目录:
1、Kafka中的一些术语:
2、ELK流程图:
3、Kafka发布、订阅信息的流程:
4、通过Logstash收集日志到Kafka:
1、Kafka中的一些术语:
(1)Topic:话题,Kafka将消息种子(Feed)进行分类,每一类的消息称为话题。
(2)Producer:生产者,发布消息的对象称为话题生产者。
(3)Consumer:消费者,订阅消息并处理发布的消息种子的对象称为话题消费者。
(4)Broker:服务器,已发布的消息保存在一组服务器中称之为Kafka集群。集群中的每一个服务器都是一个代理(Broker),一个Broker可以容纳多个Topic,消费者可以订阅一个或多个话题并从Broker拉数据从而消费这些已发布的消息。
(5)Partition:分区,每个Topic包含一个或多个Partition,Kafka分配的单位是Partition。
2、ELK流程图:
图2.1 ELK流程图
ELK日志分析系统的架构图的说明如下:
(1)Filebeat:
日志采集器,负责将数据tail到logstash;
(2)第一个Logstash:
data shipper,作为数据入kafka的适配器;
(3)Kafka:
消息队列,起消峰解耦的作用;
(4)Zookeeper:
分布式配置和同步服务,是Kafka代理和消费者之间的协调接口,Kafka服务器通过Zookeeper集群共享信息。
Kafka在Zookeeper中存储基本元数据,例如Topic,代理,消费者便宜等信息。
(5)第二个Logstash:
data indexer,负责数据解析和ElasticSearch入库;
(6)Elasticsearch:
存储,搜索和分析,用于存储所有的日志。
(7)Kerberos:一种认证机制,用于Kafka Broker服务器的认证。
查看ELK的中文文档:https://kibana.logstash.es/content/
图2.2 logstash架构图
Shipper:日志收集者,负责监控本地日志文件的变化,及时把日志文件的最新内容收集起来,输出到Redis暂存。
Indexer:日志存储者,负责从Redis接收日志,写入到Elasticsearch中。
Broker:日志中转站,用来连接多个Shipper和多个Indexer。
3、Kafka发布、订阅信息的流程:
①生产者定期向主题发送消息。
②Kafka代理存储为该特定主题配置的分区中的所有消息。 它确保消息在分区之间平等共享。 如果生产者发送两个消息并且有两个分区,Kafka将在第一分区中存储一个消息,在第二分区中存储第二消息。
③消费者订阅特定主题。
④一旦消费者订阅主题,Kafka将向消费者提供主题的当前偏移,并且还将偏移保存在Zookeeper系综中。
⑤消费者将定期请求Kafka(如100 Ms)新消息。
⑥一旦Kafka收到来自生产者的消息,它将这些消息转发给消费者。
⑦消费者将收到消息并进行处理。
⑧一旦消息被处理,消费者将向Kafka代理发送确认。
⑨一旦Kafka收到确认,它将偏移更改为新值,并在Zookeeper中更新它。 由于偏移在Zookeeper中维护,消费者可以正确地读取下一封邮件,即使在服务器处于峰值期间。
以上流程将重复,直到消费者停止请求。
消费者可以随时回退/跳到所需的主题偏移量,并阅读所有后续消息。
4、通过Logstash收集日志到Kafka:
(1)在/opt/package/kafka_2.10-0.10.1.0/config目录下查看kafka中producer.properties文件:
查看broker服务器位置。
图4.1 截图1
(2)在/tmp目录下查看kafka_jaas.conf文件:
查看serverName=”kafka”
图4.2 截图2
(3)因为logstash的版本较高,需要1.7版本以上的环境,因此运行logstash脚本前首先将jdk版本从1.7转换为1.8,否则会出现如下报错信息:
LoadError: JRuby ext built for wrong Java version in `com.purbon.jrmonitor.JRMonitorService': java.lang.UnsupportedClassVersionError: com/purbon/jrmonitor/JRMonitorService : Unsupported major.minor version 52.0
require at org/jruby/RubyKernel.java:1040
(root) at /opt/package/logstash-5.2.2/vendor/bundle/jruby/1.9/gems/jrmonitor-0.4.2/lib/jrmonitor.rb:4
require at org/jruby/RubyKernel.java:1040
(root) at /opt/package/logstash-5.2.2/logstash-core/lib/logstash/instrument/periodic_poller/jvm.rb:1
require at org/jruby/RubyKernel.java:1040
(root) at /opt/package/logstash-5.2.2/logstash-core/lib/logstash/instrument/periodic_poller/jvm.rb:5
require at org/jruby/RubyKernel.java:1040
(root) at /opt/package/logstash-5.2.2/logstash-core/lib/logstash/instrument/periodic_pollers.rb:1
require at org/jruby/RubyKernel.java:1040
(root) at /opt/package/logstash-5.2.2/logstash-core/lib/logstash/instrument/periodic_pollers.rb:3
require at org/jruby/RubyKernel.java:1040
(root) at /opt/package/logstash-5.2.2/lib/bootstrap/environment.rb:70
①查看jdk当前版本:
图4.3 截图3
②新建shell:reload_path.sh
export JAVA_HOME=/opt/package/jdk1.8.0_112
export PATH=/opt/package/jdk1.8.0_112/bin:$PATH
③执行reload_path.sh
source reload_path.sh
④再查看jdk版本:
图4.4 截图4
(4)编写脚本remoatest1.sh:
input { stdin { } }
output {
stdout{codec => rubydebug}
kafka{
topic_id => "mytopic"
bootstrap_servers => "hdp1.example.com:9092"
security_protocol => "SASL_PLAINTEXT"
sasl_kerberos_service_name => "kafka"
jaas_path => "/tmp/kafka_jaas.conf.demouser"
kerberos_config => "/etc/krb5.conf"
compression_type => "none"
acks => "1"
}
}
Kafka output configuration Options的说明可以参考官网:
https://www.elastic.co/guide/en/logstash/current/plugins-outputs-kafka.html
这里用到的参数说明:
①topic_id:必填项,指定消费话题,指定某个topic,实际上就是订阅某个主题,然后去消费。
②bootstrap_servers:用于建立与集群的初始连接的URL列表,即Kafka的接入地址(生产者)
③jass_path:Java Authentication and Authorization Service API(JAVA认证和授权服务)的缩写,为Kafka提供用户认证和授权服务,设置jass文件的路径。
④security_protocol:使用的安全协议,可以为plaintext、ssl、sasl_plaintext、sasl_ssl等等。
⑤kerberos_config:Keberos配置文件的路径。
⑥sasl_kerberos_service_name:Kafka服务器运行的Kerberos主题名称。其可以在Kafka的jaas配置中定义。
⑦codec:用于输入数据的编码/解码器。
⑧compression_type:压缩方式,默认是none,其它可选的是gzip和snappy
(5)运行脚本:
bash ../bin/logstash -f remoatest1.conf
图4.5 截图5
输入hello和remoa,生产者向主题mytopic发送消息成功。
(6)查看Kafka消费者是否接收到消息:
KAFKA_HEAP_OPTS="-Djava.security.auth.login.config=/usr/hdp/current/kafka-broker/config/kafka_client_jaas.conf -Xmx512M" /opt/package/kafka_2.10-0.10.1.0/bin/kafka-console-consumer.sh --topic mytopic --bootstrap-server hdp1.example.com:9092 --from-beginning --consumer.config /opt/package/kafka_2.10-0.10.1.0/config/consumer.properties
图4.6 截图6
查看到消费者接收到消息并进行处理。
(7)将一个日志文件信息通过logstash给Kafka:
输入指令:cat install.log.syslog | bash ../bin/logstash -f remoatest1.conf
图4.7 截图7
查看消费者接收处理结果如下:
图4.8 截图8
更多推荐
所有评论(0)