Kafka分布式消费学习

目录:

1、Logstash input Kafka配置参数解析:

2、Kafka的Topic命令查看:

3、单机多进程实现Kafka的at least once分布式消费:

4、多机多进程实现Kafka分布式消费:


1、Logstash input Kafka配置参数解析:

kafka{
		bootstrap_servers => "hdp1.example.com:9092"
		security_protocol => "SASL_PLAINTEXT"
		sasl_kerberos_service_name => "kafka"
		jaas_path => "/tmp/kafka_jaas.conf.demouser"
		kerberos_config => "/etc/krb5.conf"
		topics => ["remoa3"]
		consumer_threads => 6
		decorate_events => true
		group_id => "remoa3"
}

l topics:

官网描述:Value type is array

Default value is ["logstash"]

A list of topics to subscribe to, defaults to ["logstash"].

翻译:值类型是数组,默认值为[“logstash”]。

要订阅的主题列表,默认为[“logstash”]。

l consumer_threads:

官网描述:Value type is number

Default value is 1

Ideally you should have as many threads as the number of partitions for a perfect balance — more threads than partitions means that some threads will be idle

翻译:消费者线程:

值类型是数字。默认值为1。

理想情况下,你应该拥有与完美平衡的分区数一样多的线程数,比分区更多的线程意味着有些线程将空闲。

l decorate_events

官网描述:Value type is boolean

Default value is false

Option to add Kafka metadata like topic, message size to the event. This will add a field named kafka to the logstash event containing the following attributes: topic: The topic this message is associated with consumer_group: The consumer group used to read in this event partition: The partition this message is associated with offset: The offset from the partition this message is associated with key: A ByteBuffer containing the message key

翻译:装饰事件:

值类型是布尔型,默认值为false。

可以将Kafka元数据(如主题,消息大小)添加到事件中。这将添加一个名为kafka的字段到包含以下属性的logstash事件:

主题

该消息与consumer_group相关联的主题

用于读取这个事件分区的消费者组

此消息与偏移量相关联的分区

此消息与密钥相关联的分区偏移量

包含消息密钥的BtyeBuffer

即设置为True时在输出消息时会输出自身的信息,如Topic来源,消费消息的大小,消费者的group信息等等。

l group_id:

官网描述:Value type is string

Default value is "logstash"

The identifier of the group this consumer belongs to. Consumer group is a single logical subscriber that happens to be made up of multiple processors. Messages in a topic will be distributed to all Logstash instances with the same group_id

翻译:group_id:

值类型是字符串。默认值为”logstash”

该消费者所属的组的标识符。消费者组是单个逻辑用户,是由多个处理器组成。主题中的消息将分发给具有相同group_id的所有的Logstash实例。

即不同的组之间消费是互补影响的,相互隔离的。

 

2、Kafka的Topic命令查看:

(1)创建Topic:

 执行:bash kafka-topics.sh --zookeeper hdp1.example.com:2181/kafka_010 --create --topic remoa2 --partitions 3 --replication-factor 1

创建名为remoa2的Topic,使用3个分区分别存放数据,复制因子为1,数据备份共1份。

 

图2.1 截图1

(2)显示名为remoa2的Topic的详细信息:

执行:bash /opt/package/kafka_2.10-0.10.1.0/bin/kafka-topics.sh --zookeeper hdp1.example.com:2181/kafka_010 --topic remoa2 --describe

 

图2.2 截图2

第一行列出了该Topic的总体情况,包括Topic名称为remoa2,分区数量为3,副本数量为1。

Partition:分区

Leader:负责读写指定分区的节点

Replicas:复制该分区日志的节点列表

Isr:”in-sync replicas”,当前活跃的副本列表

(3)显示所有的Topic的详细信息:

执行:bash /opt/package/kafka_2.10-0.10.1.0/bin/kafka-topics.sh --zookeeper hdp1.example.com:2181/kafka_010 --describe

 

图2.3 截图3

(4)删除Topic:

执行:bash /opt/package/kafka_2.10-0.10.1.0/bin/kafka-topics.sh --zookeeper hdp1.example.com:2181/kafka_010 --delete --topic topic1,topic2,topic3,topic4

删除名为topic1、topic2、topic3、topic4的Topic。

 

图2.4 截图4

(5)查看已存在Topic列表:

pwd所在目录为:/opt/package/kafka_2.10-0.10.1.0/bin

执行:bash kafka-topics.sh --zookeeper hdp1.example.com:2181/kafka_010 --list

 

图2.5 截图5

(6)修改Topic分区数量:

执行:bash kafka-topics.sh --alter --zookeeper hdp1.example.com:2181/kafka_010 --topic remoaindex --partitions 4

将名称为remoaindex的Topic的分区修改为4。

 

图2.6 截图6

执行--describe参数可以查看到remoaindex的Topic分区数量修改为了4。

 

图2.7 截图7

 

3、单机多进程实现Kafka的at least once分布式消费:

(1)注意:

A)使用多个Logstash端协同消费同一个Topic的话,需要把两个或是多个Logstash消费端配置成相同的group_id和topic_id,同时需要把相应的Topic分成多个分区,多个消费者消费是无法保证消息的消费顺序性的。

B)Kafka的消息模型是对Topic分区以达到分布式效果。每个Topic下的不同的partitions只能有一个Owner去消费。所以只有多个分区后才能启动多个消费者,对应不同的区去消费。其中协调消费部分是由Server端协调而成。消息的消费是无序的。

C)若要保证消息的顺序,则使用一个partition。Kafka的每个partition只能同时被同一个group中的consumer消费。

(2)试验原理说明:

A)Kafka保证同一个consumer group中只有一个consumer会消费某条消息。Kafka保证在稳定状态下每一个consumer实例只会消费某一个或多个特定partition的数据,而某个partition的数据只会被某一个特定的consumer实例所消费。

B)三种情况:

①如果某个consumer group中consumer数量少于partition数量,则至少有一个consumer会消费多个partition的数据。

②如果某个consumer group中consumer数量等于partition数量,则正好一个consumer消费一个partition的数据。

③如果某个consumer group中consumer数量多于partition数量,会有部分的consumer无法消费该Topic下任何一条消息。

C)at least once:消息绝不会丢,但可能会重复传输

D)at most once:消息可能会丢,但绝不会重复传输

E)Exactly once:每条消息肯定会被传输一次且仅传输一次。

试验测试第一种情况,consumer数量为2,partition数量为3。

(3)producer.conf脚本及consumer.conf脚本内容如下:

A)producer.conf脚本内容:

input {
        beats{
                port => 5044
        }
}
 
filter{
        if "beats_input_codec_plain_applied" in [tags]{
                mutate{
                        remove_tag => ["beats_input_codec_plain_applied"]
                }
        }
        grok{
                patterns_dir => "/opt/package/logstash-5.2.2/config/patterns/filter1pattern"
                match => {
                        "message" => "<%{INT:systemType}>%{MYTIME:logTime}\s*%{MYCOMMAND:command}\[%{INT:pid}\]:%{MYOPERATE:operate}"
                }
        }
}
 
output {
        stdout{
                codec => rubydebug
        }
        kafka{
                topic_id => "remoa2"
                bootstrap_servers => "hdp1.example.com:9092"
                security_protocol => "SASL_PLAINTEXT"
                sasl_kerberos_service_name => "kafka"
                jaas_path => "/tmp/kafka_jaas.conf.demouser"
                kerberos_config => "/etc/krb5.conf"
                compression_type => "none"
                acks => "1"
        }
}

B)consumer1.conf及consumer2.conf脚本内容:

input{
        kafka{
                bootstrap_servers => "hdp1.example.com:9092"
                security_protocol => "SASL_PLAINTEXT"
                sasl_kerberos_service_name => "kafka"
                jaas_path => "/tmp/kafka_jaas.conf.demouser"
                kerberos_config => "/etc/krb5.conf"
                topics => ["remoa2"]
                consumer_threads => 3
                decorate_events => true
                group_id => "remoa2"
        }
}
 
filter{
        if "beats_input_codec_plain_applied" in [tags]{
                mutate{
                        remove_tag => ["beats_input_codec_plain_applied"]
                }
        }
        grok{
                patterns_dir => "/opt/package/logstash-5.2.2/config/patterns/filter1pattern"
                match => {
                        "message" => "<%{INT:systemType}>%{MYTIME:logTime}\s*%{MYCOMMAND:command}\[%{INT:pid}\]:%{MYOPERATE:operate}"
                }
        }
}
 
output{
        stdout{
                codec => rubydebug
        }
        elasticsearch{
                hosts => ["kdc1.example.com:9200","kdc2.example.com:9200"]
                user => logstash
                password => logstash
                action => "index"
                index => "logstash-kafka1-%{+YYYY.MM.dd}"
                truststore => "/opt/package/logstash-5.2.2/config/keys/truststore.jks"
                truststore_password => whoami
                ssl => true
                ssl_certificate_verification => true
                codec => "json"
        }
}

(4)在单机中启动Filebeat,然后执行两个相同的Logstash消费者脚本,其Logstash消费端配置成相同的group_id和topic_id。

执行:service filebeat start

bash ../../bin/logstash -f producer.conf

bash ../../bin/logstash -f consumer2.conf

bash ../../bin/logstash -f consumer1.conf

(5)查看到producer.conf的Logstash output Kafka中的标准输出:

 

图3.1 截图8

(6)查看到consumer1.conf的标准输出:

 

图3.2 截图9

 

图3.3 截图10

通过上下滑动查看标准输出,查看到consumer1其消费了两个分区的数据,为分区1和分区2,标准输出结果里分区1及分区2的输出结果随机交替出现。

(7)查看到consumer2.conf的标准输出:

 

图3.4 截图11

查看到consumer2消费的分区为分区0。

(8)查看名为remoa2的Topic的具体信息:

bash /opt/package/kafka_2.10-0.10.1.0/bin/kafka-topics.sh --zookeeper hdp1.example.com:2181/kafka_010 --topic remoa2 --describe

 

图3.5 截图12

(9)在Kibana中查看到Elasticsearch中对应index名为logstash-kafka1-2017.09.13的具体内容:

GET logstash-kafka1-2017.09.13/_search


图3.6 截图13

 

图3.7 截图14


4、多机多进程实现Kafka分布式消费:

(1)创建名为remoa3的Topic,使用6个分区,数据备份一份。

bash kafka-topics.sh --zookeeper hdp1.example.com:2181/kafka_010 --create --topic remoa3 --partitions 6 --replication-factor 1

(2)在两台机子分别启动三个消费者进程,两台机的logstash版本分别为5.2.2及5.2.1,实测版本兼容不影响测试。

conf脚本文件如下:

input{
        kafka{
                bootstrap_servers => "hdp1.example.com:9092"
                security_protocol => "SASL_PLAINTEXT"
                sasl_kerberos_service_name => "kafka"
                jaas_path => "/tmp/kafka_jaas.conf.demouser"
                kerberos_config => "/etc/krb5.conf"
                topics => ["remoa3"]
                consumer_threads => 6
                decorate_events => true
                group_id => "remoa3"
        }
}
 
filter{
        if "beats_input_codec_plain_applied" in [tags]{
                mutate{
                        remove_tag => ["beats_input_codec_plain_applied"]
                }
        }
        grok{
                patterns_dir => "/opt/package/logstash-5.2.2/config/patterns/filter1pattern"
                match => {
                        "message" => "<%{INT:systemType}>%{MYTIME:logTime}\s*%{MYCOMMAND:command}\[%{INT:pid}\]:%{MYOPERATE:operate}"
                }
        }
}
 
output{
        stdout{
                codec => rubydebug
        }
        elasticsearch{
                hosts => ["kdc1.example.com:9200","kdc2.example.com:9200"]
                user => logstash
                password => logstash
                action => "index"
                index => "logstash-kafka3-%{+YYYY.MM.dd}"
                truststore => "/opt/package/logstash-5.2.2/config/keys/truststore.jks"
                truststore_password => whoami
                ssl => true
                ssl_certificate_verification => true
                codec => "json"
        }
}

(3)当启动filebeat进行日志采集后,可以查看到两台主机中的三个logstash进程都进行了消费。

 

图4.1 截图15

A)查看hdp1机中的三个logstash进程消费情况:

第一个进程消费了partition1中的数据,共消费数据15条。

 

图4.2 截图16

第二个进程消费了partition3中的数据,共消费数据16条。

 

图4.3 截图17

第三个进程消费了partition4中的数据,共消费数据15条。

 

图4.4 截图18

B)查看hdp2机中的三个logstash进程消费情况:

第一个进程消费了partition5中的数据,共消费数据15条。

 

图4.5 截图19

第二个进程消费了partition0中的数据,共消费数据16条。

 

图4.6 截图20

第三个进程消费了partition2中的数据,共消费数据16条。

 

图4.7 截图21

可以查看到六个消费者进程六个分区,则每个消费者消费一个分区的数据。16 + 16 + 15 + 15 + 16 + 15 = 93条数据,与测试文本中数据总量保持一致,实现了多机多进程分布式消费。

(4)查看名为remoa3的Topic的详细信息:

bash /opt/package/kafka_2.10-0.10.1.0/bin/kafka-topics.sh --zookeeper hdp1.example.com:2181/kafka_010 --topic remoa3 --describe

 

图4.8 截图22

(5)当然,相应的在Kibana也能够查看到结果。

GET _cat/indices

GET logstash-kafka3-2017.09.13/_search

 

图4.9 截图23


图4.10 截图24

 

Logo

Kafka开源项目指南提供详尽教程,助开发者掌握其架构、配置和使用,实现高效数据流管理和实时处理。它高性能、可扩展,适合日志收集和实时数据处理,通过持久化保障数据安全,是企业大数据生态系统的核心。

更多推荐