说明 kafka特点:

高吞吐量、低延迟:kafka每秒可以处理几十万条消息,它的延迟最低只有几毫秒;

可扩展性:kafka集群支持热扩展;  

持久性、可靠性:消息被持久化到本地磁盘,并且支持数据备份防止数据丢失;  

容错性:允许集群中节点故障(若副本数量为n,则允许n-1个节点故障);  

高并发:支持数千个客户端同时读写。

 

kafka使用场景:

日志收集:一个公司可以用Kafka可以收集各种服务的log,通过kafka以统一接口服务的方式开放给各种consumer;  

消息系统:解耦生产者和消费者、缓存消息等;  

用户活动跟踪:kafka经常被用来记录web用户或者app用户的各种活动,如浏览网页、搜索、点击等活动,这些活动信息被各个服务器发布到kafka的topic中,然后消费者通过订阅这些topic来做实时的监控分析,亦可保存到数据库;  

运营指标:kafka也经常用来记录运营监控数据。包括收集各种分布式应用的数据,生产各种操作的集中反馈,比如报警和报告; 流式处理:比如spark streaming和storm。

 

一、先介绍如何搭建kafka的集群环境,毕竟kafka的灵魂就在于分区,分区嘛当然离不开集群咯

1.先去官网下载kafka的包,然后上传至服务器,操作不说了,博主用的kafka_2.12-2.5.0.tgz

解压包:

tar -zxvf kafka_2.12-2.5.0.tgz

解压后改名:

mv kafka_2.12-2.5.0 kafka

进入config目录:

cd /home/devops/kafka/config

修改 server.properties配置文件:

vi server.properties



#以下是要修改的主要内容
(1)修改listener配置    listener=PLAINTEXT://192.168.141.128:9092  监听地址和端口号,PLAINTEXT不能删,这是协议,删了会报错

(2)修改broker.id   默认是0,可以把它看成每个主题的某个分区

(3)修改zookeeper集群地址,这里填已经搭建好的zookeeper集群地址   zookeeper.connect=192.168.141.128:2181,192.168.141.129:2181

系统环境中配置kafka路径:

vi /etc/profile


#最下方添加kafka路径
export KAFKA_HOME=/home/devops/kafka
export PATH=$PATH:$KAFKA_HOME/bin

修改完使环境变量生效:

source /etc/profile

进入kafka安装目录启动kafka:

cd /home/devops/kafka
bin/kafka-server-start.sh config/server.properties

出现INFO [KafkaServer id=1] started (kafka.server.KafkaServer)   表示启动成功

或者通过命令ps aux | grep 'kafka' 查看是否启动成功。

二、springboot集成kafka,写一个超级超级简单的demo,kafka作为一个分布式的消息中间件,灵魂在于分区思想

1.导入依赖

       <!--  kafka依赖  -->
        <dependency>
            <groupId>org.springframework.kafka</groupId>
            <artifactId>spring-kafka</artifactId>
            <!--<version>2.2.0.RELEASE</version>-->
        </dependency>

2.写配置文件

#============== kafka ===================
# 指定kafka 代理地址,可以多个,多个地址用逗号隔开
# 集群模式
spring.kafka.bootstrap-servers= 192.168.141.128:9092,192.168.141.129:9092

#=============== provider  =======================
# 消息发送失败重试次数,为0不重试
spring.kafka.producer.retries=0
# 每次批量发送消息的数量
spring.kafka.producer.batch-size=16384
spring.kafka.producer.buffer-memory=33554432
#如果kafka迟迟不发送消息(这里指的是消息没堆积到指定数量),那么过了这个时间(单位:毫米)开始发送
spring.kafka.producer.properties.linger.ms=1

# 指定消息key和消息体的编解码方式
spring.kafka.producer.key-serializer=org.apache.kafka.common.serialization.StringSerializer
spring.kafka.producer.value-serializer=org.apache.kafka.common.serialization.StringSerializer

#=============== consumer  =======================
# 消费者组名,不同组名可以重复消费
spring.kafka.consumer.group-id=kafkainfo

#1,earliest 当各分区下有已提交的offset时,从提交的offset开始消费;无提交的offset时,从头开始消费
#2,latest 当各分区下有已提交的offset时,从提交的offset开始消费;无提交的offset时,消费新产生的该分区下的数据
#3,none topic各分区都存在已提交的offset时,从offset后开始消费;只要有一个分区不存在已提交的offset,则抛出异常
spring.kafka.consumer.auto-offset-reset=earliest

# 自动提交偏移量
spring.kafka.consumer.enable-auto-commit=true

# 自动提交偏移量的频率,单位ms 每100ms提交一次offset
spring.kafka.consumer.auto-commit-interval=100

# kafka超时时间,单位ms
spring.kafka.consumer.session-timeout-ms=100000

# 指定消息key和消息体的编解码方式
spring.kafka.consumer.key-deserializer=org.apache.kafka.common.serialization.StringDeserializer
spring.kafka.consumer.value-deserializer=org.apache.kafka.common.serialization.StringDeserializer

#消费监听接口监听的主题不存在时,默认会报错因此要关掉这个
spring.kafka.listener.missing-topics-fatal=false

3.创建生产者:

@Component

public class KafkaSender {

    @Autowired(required = false)
    private KafkaTemplate<String, Object> kafkaTemplate;
    private final Logger logger = LoggerFactory.getLogger(KafkaSender.class);
    private static final String TOPICNAME="topicone";


    public void send(String key, String jsonStr) {

        //发送消息
        ListenableFuture<SendResult<String, Object>> future = kafkaTemplate.send(TOPICNAME, key, jsonStr);
        future.addCallback(new ListenableFutureCallback<SendResult<String, Object>>() {
            @Override
            //推送成功
            public void onSuccess(SendResult<String, Object> result) {
                logger.info(TOPICNAME + " 生产者 发送消息成功:" + result.toString());


            }

            @Override
            //推送失败
            public void onFailure(Throwable ex) {
                logger.info(TOPICNAME + " 生产者 发送消息失败:" + ex.getMessage());


            }
        });


    }


}

4.创建消费者:

@Component
public class KafkaReceiver {

    @KafkaListener(topics = {"topicone"})
    public void listen(ConsumerRecord<?, ?> record) {
        System.out.println("topic名称:"+record.topic()+"\n"+"分区位置:"+record.partition()+"\n"+"key:"+record.key()+"\n"+"偏移量:"+record.offset()+"\n"+"消息内容:"+record.value());
    }
}

5.测试,说明一下(在这之前,kafka服务端已存在主题topicone,并存在2个分区,预期的结果是调用生产者向kafka服务端发送消息时,会向分区0和1都发送)

@RestController
public class PubParamTabController {
    @Autowired
    PubParamTabService pubParamTabService;
    @Autowired
    KafkaSender kafkaSender;

    #直接在接口中调用了,懒得写测试类了,这里有三条数据,会向kafka服务端发送三次消息
    @GetMapping(value = "/getApps",produces = "application/json;charset=UTF-8")
    public Result<List<AppTemplate>> findAllAppName(){
        List<AppTemplate> list=pubParamTabService.findAllAppName().getData();
        for (int i=0;i<list.size();i++){
            kafkaSender.send("key"+i,list.toString());
        }
        return pubParamTabService.findAllAppName();
    }
}

6.结果日志打印如下:

topicone 生产者 发送消息成功:SendResult [producerRecord=ProducerRecord(topic=topicone, partition=null, headers=RecordHeaders(headers = [], isReadOnly = true), key=key2, value=[{"appId":null,"userId":null,"sourceIp":"null","sourceUsername":"null","sourcePassword":"null","sourcePort":"null","sourcePath":"null","maxTps":null,"maxMemory":null,"appName":"core","localhostIp":"null","deployPath":"null","approvalStatus":null,"createTime":null,"updateTime":null,"resType":null,"projectType":"null","configurePath":"null","proxyIp":"null","proxyPath":"null","proxyUsr":"null","proxyPsw":"null","businessLine":"null"}, {"appId":null,"userId":null,"sourceIp":"null","sourceUsername":"null","sourcePassword":"null","sourcePort":"null","sourcePath":"null","maxTps":null,"maxMemory":null,"appName":"encrypt","localhostIp":"null","deployPath":"null","approvalStatus":null,"createTime":null,"updateTime":null,"resType":null,"projectType":"null","configurePath":"null","proxyIp":"null","proxyPath":"null","proxyUsr":"null","proxyPsw":"null","businessLine":"null"}, {"appId":null,"userId":null,"sourceIp":"null","sourceUsername":"null","sourcePassword":"null","sourcePort":"null","sourcePath":"null","maxTps":null,"maxMemory":null,"appName":"front","localhostIp":"null","deployPath":"null","approvalStatus":null,"createTime":null,"updateTime":null,"resType":null,"projectType":"null","configurePath":"null","proxyIp":"null","proxyPath":"null","proxyUsr":"null","proxyPsw":"null","businessLine":"null"}], timestamp=null), recordMetadata=topicone-1@0]
topicone 生产者 发送消息成功:SendResult [producerRecord=ProducerRecord(topic=topicone, partition=null, headers=RecordHeaders(headers = [], isReadOnly = true), key=key0, value=[{"appId":null,"userId":null,"sourceIp":"null","sourceUsername":"null","sourcePassword":"null","sourcePort":"null","sourcePath":"null","maxTps":null,"maxMemory":null,"appName":"core","localhostIp":"null","deployPath":"null","approvalStatus":null,"createTime":null,"updateTime":null,"resType":null,"projectType":"null","configurePath":"null","proxyIp":"null","proxyPath":"null","proxyUsr":"null","proxyPsw":"null","businessLine":"null"}, {"appId":null,"userId":null,"sourceIp":"null","sourceUsername":"null","sourcePassword":"null","sourcePort":"null","sourcePath":"null","maxTps":null,"maxMemory":null,"appName":"encrypt","localhostIp":"null","deployPath":"null","approvalStatus":null,"createTime":null,"updateTime":null,"resType":null,"projectType":"null","configurePath":"null","proxyIp":"null","proxyPath":"null","proxyUsr":"null","proxyPsw":"null","businessLine":"null"}, {"appId":null,"userId":null,"sourceIp":"null","sourceUsername":"null","sourcePassword":"null","sourcePort":"null","sourcePath":"null","maxTps":null,"maxMemory":null,"appName":"front","localhostIp":"null","deployPath":"null","approvalStatus":null,"createTime":null,"updateTime":null,"resType":null,"projectType":"null","configurePath":"null","proxyIp":"null","proxyPath":"null","proxyUsr":"null","proxyPsw":"null","businessLine":"null"}], timestamp=null), recordMetadata=topicone-0@1]
topicone 生产者 发送消息成功:SendResult [producerRecord=ProducerRecord(topic=topicone, partition=null, headers=RecordHeaders(headers = [], isReadOnly = true), key=key1, value=[{"appId":null,"userId":null,"sourceIp":"null","sourceUsername":"null","sourcePassword":"null","sourcePort":"null","sourcePath":"null","maxTps":null,"maxMemory":null,"appName":"core","localhostIp":"null","deployPath":"null","approvalStatus":null,"createTime":null,"updateTime":null,"resType":null,"projectType":"null","configurePath":"null","proxyIp":"null","proxyPath":"null","proxyUsr":"null","proxyPsw":"null","businessLine":"null"}, {"appId":null,"userId":null,"sourceIp":"null","sourceUsername":"null","sourcePassword":"null","sourcePort":"null","sourcePath":"null","maxTps":null,"maxMemory":null,"appName":"encrypt","localhostIp":"null","deployPath":"null","approvalStatus":null,"createTime":null,"updateTime":null,"resType":null,"projectType":"null","configurePath":"null","proxyIp":"null","proxyPath":"null","proxyUsr":"null","proxyPsw":"null","businessLine":"null"}, {"appId":null,"userId":null,"sourceIp":"null","sourceUsername":"null","sourcePassword":"null","sourcePort":"null","sourcePath":"null","maxTps":null,"maxMemory":null,"appName":"front","localhostIp":"null","deployPath":"null","approvalStatus":null,"createTime":null,"updateTime":null,"resType":null,"projectType":"null","configurePath":"null","proxyIp":"null","proxyPath":"null","proxyUsr":"null","proxyPsw":"null","businessLine":"null"}], timestamp=null), recordMetadata=topicone-0@2]
[Consumer clientId=consumer-1, groupId=kafkainfo] Cluster ID: qXd7V77GSIqVopO9hGmn9A
[Consumer clientId=consumer-1, groupId=kafkainfo] Discovered group coordinator 192.168.141.129:9092 (id: 2147483646 rack: null)
[Consumer clientId=consumer-1, groupId=kafkainfo] Revoking previously assigned partitions []
kafkainfo: partitions revoked: []
[Consumer clientId=consumer-1, groupId=kafkainfo] (Re-)joining group
[Consumer clientId=consumer-1, groupId=kafkainfo] (Re-)joining group
[Consumer clientId=consumer-1, groupId=kafkainfo] Successfully joined group with generation 5
[Consumer clientId=consumer-1, groupId=kafkainfo] Setting newly assigned partitions: topicone-1, topicone-0
[Consumer clientId=consumer-1, groupId=kafkainfo] Setting offset for partition topicone-0 to the committed offset FetchPosition{offset=1, offsetEpoch=Optional[3], currentLeader=LeaderAndEpoch{leader=192.168.141.129:9092 (id: 1 rack: null), epoch=3}}
[Consumer clientId=consumer-1, groupId=kafkainfo] Setting offset for partition topicone-1 to the committed offset FetchPosition{offset=0, offsetEpoch=Optional.empty, currentLeader=LeaderAndEpoch{leader=192.168.141.129:9092 (id: 1 rack: null), epoch=3}}
kafkainfo: partitions assigned: [topicone-1, topicone-0]
topic名称:topicone
分区位置:1
key:key2
偏移量:0
消息内容:[{"appId":null,"userId":null,"sourceIp":"null","sourceUsername":"null","sourcePassword":"null","sourcePort":"null","sourcePath":"null","maxTps":null,"maxMemory":null,"appName":"core","localhostIp":"null","deployPath":"null","approvalStatus":null,"createTime":null,"updateTime":null,"resType":null,"projectType":"null","configurePath":"null","proxyIp":"null","proxyPath":"null","proxyUsr":"null","proxyPsw":"null","businessLine":"null"}, {"appId":null,"userId":null,"sourceIp":"null","sourceUsername":"null","sourcePassword":"null","sourcePort":"null","sourcePath":"null","maxTps":null,"maxMemory":null,"appName":"encrypt","localhostIp":"null","deployPath":"null","approvalStatus":null,"createTime":null,"updateTime":null,"resType":null,"projectType":"null","configurePath":"null","proxyIp":"null","proxyPath":"null","proxyUsr":"null","proxyPsw":"null","businessLine":"null"}, {"appId":null,"userId":null,"sourceIp":"null","sourceUsername":"null","sourcePassword":"null","sourcePort":"null","sourcePath":"null","maxTps":null,"maxMemory":null,"appName":"front","localhostIp":"null","deployPath":"null","approvalStatus":null,"createTime":null,"updateTime":null,"resType":null,"projectType":"null","configurePath":"null","proxyIp":"null","proxyPath":"null","proxyUsr":"null","proxyPsw":"null","businessLine":"null"}]
topic名称:topicone
分区位置:0
key:key0
偏移量:1
消息内容:[{"appId":null,"userId":null,"sourceIp":"null","sourceUsername":"null","sourcePassword":"null","sourcePort":"null","sourcePath":"null","maxTps":null,"maxMemory":null,"appName":"core","localhostIp":"null","deployPath":"null","approvalStatus":null,"createTime":null,"updateTime":null,"resType":null,"projectType":"null","configurePath":"null","proxyIp":"null","proxyPath":"null","proxyUsr":"null","proxyPsw":"null","businessLine":"null"}, {"appId":null,"userId":null,"sourceIp":"null","sourceUsername":"null","sourcePassword":"null","sourcePort":"null","sourcePath":"null","maxTps":null,"maxMemory":null,"appName":"encrypt","localhostIp":"null","deployPath":"null","approvalStatus":null,"createTime":null,"updateTime":null,"resType":null,"projectType":"null","configurePath":"null","proxyIp":"null","proxyPath":"null","proxyUsr":"null","proxyPsw":"null","businessLine":"null"}, {"appId":null,"userId":null,"sourceIp":"null","sourceUsername":"null","sourcePassword":"null","sourcePort":"null","sourcePath":"null","maxTps":null,"maxMemory":null,"appName":"front","localhostIp":"null","deployPath":"null","approvalStatus":null,"createTime":null,"updateTime":null,"resType":null,"projectType":"null","configurePath":"null","proxyIp":"null","proxyPath":"null","proxyUsr":"null","proxyPsw":"null","businessLine":"null"}]
topic名称:topicone
分区位置:0
key:key1
偏移量:2
消息内容:[{"appId":null,"userId":null,"sourceIp":"null","sourceUsername":"null","sourcePassword":"null","sourcePort":"null","sourcePath":"null","maxTps":null,"maxMemory":null,"appName":"core","localhostIp":"null","deployPath":"null","approvalStatus":null,"createTime":null,"updateTime":null,"resType":null,"projectType":"null","configurePath":"null","proxyIp":"null","proxyPath":"null","proxyUsr":"null","proxyPsw":"null","businessLine":"null"}, {"appId":null,"userId":null,"sourceIp":"null","sourceUsername":"null","sourcePassword":"null","sourcePort":"null","sourcePath":"null","maxTps":null,"maxMemory":null,"appName":"encrypt","localhostIp":"null","deployPath":"null","approvalStatus":null,"createTime":null,"updateTime":null,"resType":null,"projectType":"null","configurePath":"null","proxyIp":"null","proxyPath":"null","proxyUsr":"null","proxyPsw":"null","businessLine":"null"}, {"appId":null,"userId":null,"sourceIp":"null","sourceUsername":"null","sourcePassword":"null","sourcePort":"null","sourcePath":"null","maxTps":null,"maxMemory":null,"appName":"front","localhostIp":"null","deployPath":"null","approvalStatus":null,"createTime":null,"updateTime":null,"resType":null,"projectType":"null","configurePath":"null","proxyIp":"null","proxyPath":"null","proxyUsr":"null","proxyPsw":"null","businessLine":"null"}]

到这就成功啦,博主踩坑多次,一个是kafka客户端与服务端版本尽量一致,负责会报连接超时的错误。另一个就是zookeeper和kafka的配置必须严格正确,否则也会报莫名其妙的错误。

再说一次,kafka的灵魂在于分区概念,为的是减轻单台服务器压力,且宕机恢复后,能从未提交偏移量开始消费,避免重复消费!!

@KafkaListener(topics = {"topicone"}) 这个注解可以花式使用,大家可以百度一下用法,可以指定主题指定分区,指定偏移量进行消费的,可同时订阅多个主题!!

Logo

Kafka开源项目指南提供详尽教程,助开发者掌握其架构、配置和使用,实现高效数据流管理和实时处理。它高性能、可扩展,适合日志收集和实时数据处理,通过持久化保障数据安全,是企业大数据生态系统的核心。

更多推荐