11038f207e48f691a53f755b9f2e0a9d.png

前言

本文会设计到代码,本文最后会写如何消费数据。用到的kafka的版本为1.1.0.那么在写代码之前先来认识一些关于消费者的一些概念的东西

偏移量offset

首先在老版本中,kafka把偏移量写入到的是zookeeper 中,但是zookeeper并不是一个负责高并发读写的这么一个工具,所以从设计上存在缺陷,于是,后来kafka在新版本中,默认就设置了一个consumer_offsets话题,并有默认的分区为50个,当有偏移量需要写入的时候,就使写入到这个topic中,在提交的时候,key是以group.id+topic+分区号,如果你的kafka部署了一个很大的集群,那么可以应付高并发。

Coordinator

我们知道在consumer消费者中会有消费者组,也就是consumer group,那么消费者组的里面的消费者是不可以消费同一份数据的,比如一个消费者组中有两个消费者,其中一个消费者消费了msg-A ,那么另外一个消费者就不能再次消费这个数据msg-A.

当然了在实际中的情况是,每个consumer消费是一个分区,比如有3个分区,这个时候我们启动一个consumer group中有三个consumer最合适的,这样子,每个consumer都可以消费一个分区的数据。

那么如果分区多了,比如这个时候有10个分区,他又是如何分配的呢?是由睡来分配的呢?

这个时候coordinator就出场了,他主要就是管理将分区分配给consumer group中的每个分区的。

以下是这个分配的过程。

首先假设我们有三台broker,这个时候就会选举出一台broker作为coordinator,选举过程是根据group ID 计算出来一个hash值,计算出来以后就是一个数字,然后用这个数字对_consumer_offset的分区数(默认是50)取模,比如2,拿到2以后就到集群中去看partition为2的这个分区的leaderpartition在哪一台服务器上面,那么那一台服务器就是coordinator服务器。

选举出coordinator之后就开始指定分区消费方案,这里要要注意,不是coordinator来制定。

当上面选举出coordinator之后,这个时候,coordinator服务器会把这个消费者组中的所有消费者向coordinator向注册leader,然后选举出一个leader,这个消费者成为leader之后就开始制定分区的消费方案,然后将这个消费方案发给coordinator,coordinator会将这个分区方案发给这个消费者组中的每个消费者。

分区方案划分策略:

1.range策略

range策略就是按照partiton的序号范围

p0~3 consumer1

p4~7 consumer2

p8~11 consumer3

默认就是这个策略;这个策略就是,以序号范围分配给每个消费者。

2.round-robin策略

consumer1:0,3,6,9

consumer2:1,4,7,10

consumer3:2,5,8,11

这个就是滚动分配,比如第一个分区就是0,然后这个时候1分区就给第二个消费者,以此进行,直到所有的分区分配完毕。

3.sticky策略

这个策略很简单,就是前两个策略的集合,刚开始的时候,以段来分配,然后如果其中某个消费者挂了之后,那么以前他所管理的分区就以滚动分区的方式分配给其他存活的消费者。

代码

好,有了上面的基础知识,让我们开始写代码吧!

同样写代码之前需要先引入maven依赖

<dependency>
        <groupId>org.apache.kafka</groupId>
        <artifactId>kafka-clients</artifactId>
        <version>1.1.0</version>
      </dependency>

具体代码:

package com.cn.niochat.kafka;

import com.alibaba.fastjson.JSONObject;
import org.apache.kafka.clients.consumer.ConsumerRecord;
import org.apache.kafka.clients.consumer.ConsumerRecords;
import org.apache.kafka.clients.consumer.KafkaConsumer;

import java.util.Arrays;
import java.util.Properties;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;


public class KafkaConsumeMsg {
    private static ExecutorService threadPool = Executors.newFixedThreadPool(20);

    public static void main(String[] args) throws Exception {
        KafkaConsumer<String, String> consumer = createConsumer();
        consumer.subscribe(Arrays.asList("order-topic"));
        try {
            while(true) {
                ConsumerRecords<String, String> records = consumer.poll(Integer.MAX_VALUE);
                for(ConsumerRecord<String, String> record : records) {
                    JSONObject order = JSONObject.parseObject(record.value());
                    threadPool.submit(new CreditManageTask(order));
                }
            }
        } catch(Exception e) {
            e.printStackTrace();
            consumer.close();
        }
    }

    private static KafkaConsumer<String, String> createConsumer() {
        Properties props = new Properties();
        props.put("bootstrap.servers", "hadoop03:9092,hadoop04:9092,hadoop05:9092");
        props.put("group.id", "test-group");
        props.put("key.deserializer", "org.apache.kafka.common.serialization.StringDeserializer");
        props.put("value.deserializer", "org.apache.kafka.common.serialization.StringDeserializer");
        props.put("heartbeat.interval.ms", 1000); // 这个尽量时间可以短一点
        props.put("session.timeout.ms", 10 * 1000); // 如果说kafka broker在10秒内感知不到一个consumer心跳
        props.put("max.poll.interval.ms", 30 * 1000); // 如果30秒才去执行下一次poll
        // 就会认为那个consumer挂了,此时会触发rebalance
        // 如果说某个consumer挂了,kafka broker感知到了,会触发一个rebalance的操作,就是分配他的分区
        // 给其他的cosumer来消费,其他的consumer如果要感知到rebalance重新分配分区,就需要通过心跳来感知
        // 心跳的间隔一般不要太长,1000,500
        props.put("fetch.max.bytes", 10485760);
        props.put("max.poll.records", 500); // 如果说你的消费的吞吐量特别大,此时可以适当提高一些
        props.put("connection.max.idle.ms", -1); // 不要去回收那个socket连接
        // 开启自动提交,他只会每隔一段时间去提交一次offset
        // 如果你每次要重启一下consumer的话,他一定会把一些数据重新消费一遍
        props.put("enable.auto.commit", "true");
        // 每次自动提交offset的一个时间间隔
        props.put("auto.commit.ineterval.ms", "1000");
        // 每次重启都是从最早的offset开始读取,不是接着上一次
        props.put("auto.offset.reset", "earliest");


        KafkaConsumer<String, String> consumer = new KafkaConsumer<String, String>(props);
        return consumer;
    }

    static class CreditManageTask implements Runnable {
        private JSONObject order;
        public CreditManageTask(JSONObject order) {
            this.order = order;
        }

        @Override
        public void run() {
            System.out.println("对订单进行积分的维护......" + order.toJSONString());
            // 就可以做一系列的数据库的增删改查的事务操作
        }

    }
}

总结

以上就是全部内容了,点击关注不迷路哦!

Logo

Kafka开源项目指南提供详尽教程,助开发者掌握其架构、配置和使用,实现高效数据流管理和实时处理。它高性能、可扩展,适合日志收集和实时数据处理,通过持久化保障数据安全,是企业大数据生态系统的核心。

更多推荐