Logstash from Kafka to Elasticsearch学习

目录:

1、Logstash input Kafka配置:

2、Logstash output Elasticsearch:

3、Logstash from Kafka to Elasticsearch:

 

1、Logstash input Kafka配置:

(1)查看/opt/package/kafka_2.10-0.10.1.0/config中的server.properties,查看到生产者的地址:

 

图1.1 截图1

(2)配置remoatest3.conf的input:

input{
        kafka{
                bootstrap_servers => "hdp1.example.com:9092"
                security_protocol => "SASL_PLAINTEXT"
                sasl_kerberos_service_name => "kafka"
                jaas_path => "/tmp/kafka_jaas.conf.demouser"
                kerberos_config => "/etc/krb5.conf"
                topics => ["mytopic"]
        }
}

配置参数参考官网地址:

https://www.elastic.co/guide/en/logstash/current/plugins-inputs-kafka.html

①bootstrap_servers:用于建立与集群的初始连接的URL列表,即Kafka的接入地址(生产者)。

②security_protocol:使用的安全协议,可以为plaintext、ssl、sasl_plaintext、sasl_ssl等等。

③sasl_kerberos_service_name:Kafka服务器运行的Kerberos主题名称。其可以在Kafka的jaas配置中定义。

④jaas_path:Java Authentication and Authorization Service API(JAVA认证和授权服务)的缩写,为Kafka提供用户认证和授权服务,设置jass文件的路径。

⑤kerberos_config:Keberos配置文件的路径。

⑥topics:要订阅的主题列表,默认为“logstash”。

 

2、Logstash output Elasticsearch:

(1)在/opt/package/elasticsearch-5.2.2/config目录下查看elasticsearch.yml文件中的host的ip地址及运行HTTP服务的端口,端口注释即使用默认端口9200。

 

图2.1 截图2

(2)查看keys文件位置:

这里采用了elasticsearch的一款插件search-guard,提供加密、身份验证和授权,基于search guard SSL,也可以提供可插入的身份验证/授权模块。其功能特性:

l 基于用户和角色的权限控制

l 支持SSL和TL方式安全认证

l 支持LDAP认证

Truststore和Keystore文件用来提供客户端与服务器之间的安全数据传输。

 

图2.2 截图3

(3)配置remoatest3.conf的output:

output{
        stdout{
                codec => rubydebug
        }
        elasticsearch{
                hosts => ["kdc1.example.com:9200","kdc2.example.com:9200"]
                user => logstash
                password => logstash
                action => "index"
                index => "logstash-remoatest3-%{+YYYY.MM.dd}"
                truststore => "/opt/package/logstash-5.2.2/config/keys/truststore.jks"
                truststore_password => whoami
                ssl => true
                ssl_certificate_verification => true
                codec => "json"
        }
}

配置参数参考官网地址:

https://www.elastic.co/guide/en/logstash/current/plugins-outputs-elasticsearch.html

①host:elasticsearch集群中任意节点的主机名

②user:连接elasticsearch集群的安全认证的用户名

③password:连接elasticsearch集群的安全认证的密码

④action:默认值是index;

index是来自logstash的事件,索引文档;

delete是按id删除文档(此操作需要一个id);

create是索引文档,如果索引中已存在该id的文档,则失败;

update是按id更新文档。

⑤index:将事件写入的索引名字。可以使用%{}进行动态化。

⑥truststore:JKS(JAVA Keytool Keystore)信任库验证服务器的证书。

⑦truststore_password:证书密码

⑧ssl:启用与Elasticsearch集群的SSL / TLS安全通信。

⑨ssl_certificate_verification:验证服务器证书的选项

⑩codec:用于数据的编码/解码器。

 

3、Logstash from Kafka to Elasticsearch:

(1)首先启动生产者

KAFKA_HEAP_OPTS="-Djava.security.auth.login.config=/usr/hdp/current/kafka-broker/config/kafka_client_jaas.conf -Xmx512M" /opt/package/kafka_2.10-0.10.1.0/bin/kafka-console-producer.sh --topic mytopic --broker-list hdp1.example.com:9092 --producer.config /opt/package/kafka_2.10-0.10.1.0/config/producer.properties

(2)启动remoatest3.conf脚本

bash bin/logstash -f config/remoatest3.conf

(3)启动消费者

KAFKA_HEAP_OPTS="-Djava.security.auth.login.config=/usr/hdp/current/kafka-broker/config/kafka_client_jaas.conf -Xmx512M"  /opt/package/kafka_2.10-0.10.1.0/bin/kafka-console-consumer.sh --topic mytopic --bootstrap-server hdp1.example.com:9092  --from-beginning --consumer.config /opt/package/kafka_2.10-0.10.1.0/config/consumer.properties

(4)生产者生产三条消息:

 

图3.1 截图4

(5)查看remoatest3.conf脚本启动后信息,根据脚本中配置的output{stdout{codec=>rubydebug}}输出了这三条消息:

 

图3.2 截图5

(6)查看消费者是否进行处理消息:

 

图3.3 截图6

(7)在Kibana中查看Elasticsearch中是否存储了这些消息:

①查看Kibana中host及port参数:

 

图3.4 截图7

②登录进入:http://kdc1.example.com:5601,在左边导航栏中选择Dev Tools,在控制台中输入GET _cat/indices,找到这条index信息。其中简单的CRUD操作包括PUT:添加,GET:查询,POST:修改,DELETE:删除。

 

图3.5 截图8

查看到health的健康状态为green,green表示一切正常;yellow表示所有数据都可用,但某些副本尚未分配;red表示某些数据因为某些原因不可用。

(8)在Kibana查看具体内容:

GET logstash-remoatest3-2017.09.08/_search

 

图3.6 截图9

 

Logo

Kafka开源项目指南提供详尽教程,助开发者掌握其架构、配置和使用,实现高效数据流管理和实时处理。它高性能、可扩展,适合日志收集和实时数据处理,通过持久化保障数据安全,是企业大数据生态系统的核心。

更多推荐