Logstash output Kafka with Kerberos学习

目录:

1、Kafka中的一些术语:

2、ELK流程图:

3、Kafka发布、订阅信息的流程:

4、通过Logstash收集日志到Kafka:


1、Kafka中的一些术语:

(1)Topic:话题,Kafka将消息种子(Feed)进行分类,每一类的消息称为话题。

(2)Producer:生产者,发布消息的对象称为话题生产者。

(3)Consumer:消费者,订阅消息并处理发布的消息种子的对象称为话题消费者。

(4)Broker:服务器,已发布的消息保存在一组服务器中称之为Kafka集群。集群中的每一个服务器都是一个代理(Broker),一个Broker可以容纳多个Topic,消费者可以订阅一个或多个话题并从Broker拉数据从而消费这些已发布的消息。

(5)Partition:分区,每个Topic包含一个或多个Partition,Kafka分配的单位是Partition。

 

2、ELK流程图:

 

图2.1 ELK流程图

ELK日志分析系统的架构图的说明如下:

(1)Filebeat:

日志采集器,负责将数据tail到logstash;

(2)第一个Logstash:

data shipper,作为数据入kafka的适配器;

(3)Kafka:

消息队列,起消峰解耦的作用;

(4)Zookeeper:

分布式配置和同步服务,是Kafka代理和消费者之间的协调接口,Kafka服务器通过Zookeeper集群共享信息。

Kafka在Zookeeper中存储基本元数据,例如Topic,代理,消费者便宜等信息。

(5)第二个Logstash:

data indexer,负责数据解析和ElasticSearch入库;

(6)Elasticsearch:

存储,搜索和分析,用于存储所有的日志。

(7)Kerberos:一种认证机制,用于Kafka Broker服务器的认证。

查看ELK的中文文档:https://kibana.logstash.es/content/

 

图2.2 logstash架构图

Shipper:日志收集者,负责监控本地日志文件的变化,及时把日志文件的最新内容收集起来,输出到Redis暂存。

Indexer:日志存储者,负责从Redis接收日志,写入到Elasticsearch中。

Broker:日志中转站,用来连接多个Shipper和多个Indexer。

 

3、Kafka发布、订阅信息的流程:

①生产者定期向主题发送消息。

②Kafka代理存储为该特定主题配置的分区中的所有消息。 它确保消息在分区之间平等共享。 如果生产者发送两个消息并且有两个分区,Kafka将在第一分区中存储一个消息,在第二分区中存储第二消息。

③消费者订阅特定主题。

④一旦消费者订阅主题,Kafka将向消费者提供主题的当前偏移,并且还将偏移保存在Zookeeper系综中。

⑤消费者将定期请求Kafka(如100 Ms)新消息。

⑥一旦Kafka收到来自生产者的消息,它将这些消息转发给消费者。

⑦消费者将收到消息并进行处理。

⑧一旦消息被处理,消费者将向Kafka代理发送确认。

⑨一旦Kafka收到确认,它将偏移更改为新值,并在Zookeeper中更新它。 由于偏移在Zookeeper中维护,消费者可以正确地读取下一封邮件,即使在服务器处于峰值期间。

以上流程将重复,直到消费者停止请求。

消费者可以随时回退/跳到所需的主题偏移量,并阅读所有后续消息。

 

4、通过Logstash收集日志到Kafka:

(1)在/opt/package/kafka_2.10-0.10.1.0/config目录下查看kafka中producer.properties文件:

查看broker服务器位置。

 

图4.1 截图1

(2)在/tmp目录下查看kafka_jaas.conf文件:

查看serverName=”kafka”

 

图4.2 截图2

(3)因为logstash的版本较高,需要1.7版本以上的环境,因此运行logstash脚本前首先将jdk版本从1.7转换为1.8,否则会出现如下报错信息:

LoadError: JRuby ext built for wrong Java version in `com.purbon.jrmonitor.JRMonitorService': java.lang.UnsupportedClassVersionError: com/purbon/jrmonitor/JRMonitorService : Unsupported major.minor version 52.0
  require at org/jruby/RubyKernel.java:1040
   (root) at /opt/package/logstash-5.2.2/vendor/bundle/jruby/1.9/gems/jrmonitor-0.4.2/lib/jrmonitor.rb:4
  require at org/jruby/RubyKernel.java:1040
   (root) at /opt/package/logstash-5.2.2/logstash-core/lib/logstash/instrument/periodic_poller/jvm.rb:1
  require at org/jruby/RubyKernel.java:1040
   (root) at /opt/package/logstash-5.2.2/logstash-core/lib/logstash/instrument/periodic_poller/jvm.rb:5
  require at org/jruby/RubyKernel.java:1040
   (root) at /opt/package/logstash-5.2.2/logstash-core/lib/logstash/instrument/periodic_pollers.rb:1
  require at org/jruby/RubyKernel.java:1040
   (root) at /opt/package/logstash-5.2.2/logstash-core/lib/logstash/instrument/periodic_pollers.rb:3
  require at org/jruby/RubyKernel.java:1040
   (root) at /opt/package/logstash-5.2.2/lib/bootstrap/environment.rb:70

①查看jdk当前版本:

 

图4.3 截图3

②新建shell:reload_path.sh

export JAVA_HOME=/opt/package/jdk1.8.0_112

export PATH=/opt/package/jdk1.8.0_112/bin:$PATH

③执行reload_path.sh

source reload_path.sh

④再查看jdk版本:

 

图4.4 截图4

(4)编写脚本remoatest1.sh:

input { stdin { } }
output {
    stdout{codec => rubydebug}
    kafka{
                topic_id => "mytopic"
                bootstrap_servers => "hdp1.example.com:9092"
                security_protocol => "SASL_PLAINTEXT"
                sasl_kerberos_service_name => "kafka"
                jaas_path => "/tmp/kafka_jaas.conf.demouser"
                kerberos_config => "/etc/krb5.conf"
                compression_type => "none"
                acks => "1"
        }
}

Kafka output configuration Options的说明可以参考官网:

https://www.elastic.co/guide/en/logstash/current/plugins-outputs-kafka.html

这里用到的参数说明:

①topic_id:必填项,指定消费话题,指定某个topic,实际上就是订阅某个主题,然后去消费。

②bootstrap_servers:用于建立与集群的初始连接的URL列表,即Kafka的接入地址(生产者)

③jass_path:Java Authentication and Authorization Service API(JAVA认证和授权服务)的缩写,为Kafka提供用户认证和授权服务,设置jass文件的路径。

④security_protocol:使用的安全协议,可以为plaintext、ssl、sasl_plaintext、sasl_ssl等等。

⑤kerberos_config:Keberos配置文件的路径。

⑥sasl_kerberos_service_name:Kafka服务器运行的Kerberos主题名称。其可以在Kafka的jaas配置中定义。

⑦codec:用于输入数据的编码/解码器。

⑧compression_type:压缩方式,默认是none,其它可选的是gzip和snappy

(5)运行脚本:

bash ../bin/logstash -f remoatest1.conf

 

图4.5 截图5

输入hello和remoa,生产者向主题mytopic发送消息成功。

(6)查看Kafka消费者是否接收到消息:

KAFKA_HEAP_OPTS="-Djava.security.auth.login.config=/usr/hdp/current/kafka-broker/config/kafka_client_jaas.conf -Xmx512M"  /opt/package/kafka_2.10-0.10.1.0/bin/kafka-console-consumer.sh --topic mytopic --bootstrap-server hdp1.example.com:9092  --from-beginning --consumer.config /opt/package/kafka_2.10-0.10.1.0/config/consumer.properties

 

图4.6 截图6

查看到消费者接收到消息并进行处理。

(7)将一个日志文件信息通过logstash给Kafka:

输入指令:cat install.log.syslog | bash ../bin/logstash -f remoatest1.conf

 

图4.7 截图7

查看消费者接收处理结果如下:

 

图4.8 截图8


Logo

Kafka开源项目指南提供详尽教程,助开发者掌握其架构、配置和使用,实现高效数据流管理和实时处理。它高性能、可扩展,适合日志收集和实时数据处理,通过持久化保障数据安全,是企业大数据生态系统的核心。

更多推荐