一、前言

老周这里编译 Kafka 的版本是 2.7,为啥采用这个版本来搭建源码的阅读环境呢?因为该版本相对来说比较新。而我为啥不用 2.7 后的版本呢?比如 2.8,这是因为去掉了 ZooKeeper,还不太稳定,生产环境也不太建议使用,所以以 2.7 版本进行源码搭建并研究。

二、环境准备

  • JDK:1.8.0_241
  • Scala:2.12.8
  • Gradle:6.6
  • Zookeeper:3.4.14

三、环境搭建

3.1 JDK 环境搭建

这个就不用我说了吧,搞 Java 的本机都有 JDK 环境。

3.2 Scala 环境搭建

下载链接:https://www.scala-lang.org/download/2.12.8.html

在这里插入图片描述

这里老周是 Mac OS 系统,这里大家看着自己的系统来下就好了哈。

3.2.1 配置 Scala 环境变量

终端输入以下命令进行编辑:

vim ~/.bash_profile

# 这里的路径是你安装
SCALA_HOME=/Users/Riemann/Tools/scala-2.12.8
export SCALA_HOME
export PATH=$PATH:$SCALA_HOME/bin

# 使环境变量生效,在命令行执行。
source  ~/.bash_profile

3.2.2 验证

终端输入以下命令:

scala -version

出现以下提示,说明 Scala 环境搭建成功。
在这里插入图片描述

3.3 Gradle 环境搭建

首先来到 Gradle官网:https://services.gradle.org/distributions/

如下图:
在这里插入图片描述
我们选择想要安装的发布版本,gradle-x.x-bin.zip 是需要下载的安装发布版,gradle-x.x-src.zip 是源码,gradle-x.x-all.zip 则是下载全部的文件。 我本地为 gradle-6.6。

Gradle下载的源码不需要安装,我们将下载的压缩包在本机的目录下直接解压即可,解压后的目录如下图所示。

在这里插入图片描述

3.3.1 配置 Gradle 环境变量

终端输入以下命令进行编辑:

vim ~/.bash_profile

# 这里的路径是你安装
GRADLE_HOME=/Users/Riemann/Tools/gradle-6.6
export GRADLE_HOME
export PATH=$PATH:$GRADLE_HOME/bin

# 使环境变量生效,在命令行执行。
source  ~/.bash_profile

3.3.2 验证

终端输入以下命令:

gradle -v

出现以下提示,说明 Gradle 环境搭建成功。

在这里插入图片描述
3.4 Zookeeper 环境搭建

Zookeeper 环境老周在 Linux 环境已经搭建好了的,直接用。这里我也给出搭建的步骤,不管你是啥系统,都是类似的~

3.4.1 下载

wget http://mirrors.hust.edu.cn/apache/zookeeper/zookeeper-3.4.14/zookeeper-3.4.14.tar.gz

3.4.2 解压

tar -zxvf zookeeper-3.4.14.tar.gz

3.4.3 进入 zookeeper-3.4.14 目录,创建 data 文件夹

 cd zookeeper-3.4.14 
 mkdir data

3.4.4 修改配置文件

cd conf
mv zoo_sample.cfg zoo.cfg

3.4.5 修改 zoo.cfg 中的 data 属性

dataDir=/root/zookeeper-3.4.14/data

3.4.6 zookeeper 服务启动

进入 bin 目录,启动服务输入命令

./zkServer.sh start

输出以下内容表示启动成功

在这里插入图片描述

3.5 Kafka 源码环境搭建

官网下载对应版本的源码包,网址:http://kafka.apache.org/downloads

在这里插入图片描述
下载完后解压,这个源码文件还需要导入依赖 jar 包,个人使用 IDEA 来 import 导入项目,导入完后需使用前面配置好的 gradle 作为 Gradle home 地址。

3.5.1 导入 Kafka 源码至 IDEA
在这里插入图片描述
在这里插入图片描述
在这里插入图片描述
3.5.2 修改 build.gradle

接下来还不能导 jar 包,需要把镜像文件下载服务器更换为国内的私服,否则会相当慢,直接导致 “time out” 报错。

进入 kafka 源码包,修改 build.gradle 文件,在原来配置上,添加 ali 私服配置。

buildscript {
    repositories {
        maven {
            url 'http://maven.aliyun.com/nexus/content/groups/public/'
        }
        maven {
            url 'http://maven.aliyun.com/nexus/content/repositories/jcenter'
        }
    }
}
 
allprojects {
    repositories {
        maven {
            url 'http://maven.aliyun.com/nexus/content/groups/public/'
        }
        maven {
            url 'http://maven.aliyun.com/nexus/content/repositories/jcenter'
        }
    }
}

在这里插入图片描述
3.5.3 代码构建

可以用命令来构建,也可以在 idea 图形界面的 gradle 来构建,这里肯定是 idea 图形化界面操作更简单,但这里也提供 gradle 的命令来构建。

./gradlew clean build -x test

去找一下直接下载 Wrapper 所需的 Jar 包,手动把这个 Jar 文件拷贝到 kafka 路径下的 gradle/wrapper 子目录下,然后重新执行 gradlew build 命令去构建工程。

链接: https://pan.baidu.com/s/1W6EHysWY3ZWQZRWNdNZn3Q 提取码: hpj5

gradle 其它命令:

# 构建 jar包并运行
./gradlew jar

# 构建项目,看你是idea工具还是eclipse
./gradlew idea
./gradlew eclipse

# 构建源码包
./gradlew srcJar

# 构建javadoc文档
./gradlew aggregatedJavadoc

# 清理并构建
./gradlew clean

四、代码结构

在这里插入图片描述

4.1 代码安装包结构

  • bin 目录:保存 Kafka 工具行脚本,我们熟知的 kafka-server-start 和 kafka-console-producer 等脚本都存放在这里。

  • checkstyle 目录:代码规范,自动化检测。

    Checkstyle 是什么,关于格式化的讨论就不曾中断过,到底什么才是正确的,什么才是错误的,到现在也没有完整的定论。但随着时间发展,渐渐衍生出一套规范出来。没有什么绝对的正确和错误,关键在于规范的定义。最出名的就是 google style guide,Checkstyle 就是以这种风格开发出的一个自动化插件,来辅助判断代码格式是否满足规范。

    该目录下的文件定义了工程代码格式的规范,我们可以在 build.gradle 中看到相关 checkstyle 的配置和自动化代码格式化配置:

    checkstyle 配置:
    在这里插入图片描述
    在这里插入图片描述
    scala 自动化代码格式化配置:
    在这里插入图片描述

  • clients 目录:保存 Kafka 客户端代码,比如生产者和消费者的代码都在该目录下。

  • config 目录:保存 Kafka 的配置文件,其中比较重要的配置文件是 server.properties。

  • connect 目录:保存 Connect 组件的源代码。 Kafka Connect 组件是用来实现 Kafka 与外部系统之间的实时数据传输的。

  • core 目录:保存 Broker 端代码。Kafka 服务器端代码全部保存在该目录下。

  • docs 目录:Kafka 设计文档以及组件相关结构图。

  • examples 目录:Kafka 样例相关目录。

  • generator 目录:Kafka 消息类处理模块,主要是根据 clients 模块下的 message json 文件生成对应的 java 类,在 build.gradle 文件中,可以看到定义了一个任务 processMessages:
    在这里插入图片描述

  • gradle 目录:gradle 的脚本和依赖包定义等相关文件。

  • jmh-benchmarks 目录:Kafka 代码微基准测试相关类。

    JMH,即 Java Microbenchmark Harness,是专门用于代码微基准测试的工具套件。何谓 Micro Benchmark 呢?简单的来说就是基于方法层面的基准测试,精度可以达到微秒级。当你定位到热点方法,希望进一步优化方法性能的时候,就可以使用 JMH 对优化的结果进行量化的分析。

    JMH 比较典型的应用场景有:

    • 想准确的知道某个方法需要执行多长时间,以及执行时间和输入之间的相关性;
    • 对比接口不同实现在给定条件下的吞吐量,找到最优实现。
  • kafka-logs 目录:server.properties 文件中配置 log.dirs 生成的目录。

  • log4j-appender 目录:

    A log4j appender that produces log messages to Kafka

    这个目录里面就一个 KafkaLog4jAppender 类。

  • raft 目录:raft 一致性协议相关。

  • streams 目录:

    Kafka Streams is a client library for building applications and microservices, where the input and output data are stored in Kafka clusters.

    提供一个基于 Kafka 的流式处理类库,直接提供具体的类给开发者调用,整个应用的运行方式主要由开发者控制,方便使用和调试。

    Kafka Streams 是一个用来构建流处理程序的库,特别是其输入是一个 Kafka topic,输出是另一个 Kafka topic 的程序(或者是调用外部服务,或者是更新数据库,或者其它)。它使得你以一种分布式以及容错的方式来做这件事情。

  • tests 目录:此目录的内容介绍如何进行 Kafka 系统集成和性能测试。

  • tools 目录:工具类模块。

  • vagrant 目录:介绍如何在 Vagrant 虚拟环境中运行 Kafka,提供了相关的脚本文件和说明文档。

    Vagrant 是一个基于 Ruby 的工具,用于创建和部署虚拟化开发环境。它使用 Oracle 的开源 VirtualBox 虚拟化系统,使用 Chef 创建自动化虚拟环境。

4.2 项目结构

项目结构的话主要关注 core 目录,core 目录 是 Kafka 核心包,有集群管理,分区管理,存储管理,副本管理,消费者组管理,网络通信,消费管理等核心类。
在这里插入图片描述

  • admin 包:执行管理命令的功能;
  • api 包:封装请求和响应 DTO 对象;
  • cluster 包:集群对象,例如 Replica 类代表一个分区副本,Partition 类代表一个分区;
  • common 包:通用 jar 包;
  • controller包: 和kafkaController(kc)相关的类,重点模块,一个kafka集群只有一个leader kc,该kc负责 分区管理,副本管理,并保证集群信息在集群中同步;
  • coordinator 包:保存了消费者端的 GroupCoordinator 代码和用于事务的 TransactionCoordinator 代码。对 coordinator 包进行分析,特别是对消费者端的 GroupCoordinator 代码进行分析,是 Broker 端协调者组件设计原理的关键。
  • log 包:保存了 Kafka 最核心的日志结构代码,包括日志、日志段、索引文件等, 另外,该包下还封装了 Log Compaction 的实现机制,是非常重要的源码包。
  • network 包:封装了 Kafka 服务器端网络层的代码,特别是 SocketServer.scala 这个文件,是 Kafka 实现 Reactor 模式的具体操作类,非常值得一读。
  • consumer 包:后面会丢弃该包,用 clients 包下 consumer 相关类代替。
  • server 包:顾名思义,它是 Kafka 的服务器端主代码,里面的类非常多,很多关键的 Kafka 组件都存放在这里,比如状态机、Purgatory 延时机制等。
  • tools 包:工具类。

五、环境验证

下面我们来验证一下 Kafka 源码环境是否搭建成功。

5.1 首先,我们在 core/src/main 目录下新建 resources 目录,再将 conf 目录下的 log4j.properties 配置文件拷贝到 resources 目录下。

如下图所示:
在这里插入图片描述
5.2 修改 conf 目录下的 server.properties 文件

log.dirs=/Users/Riemann/Code/framework-source-code-analysis/kafka-2.7.0-src/kafka-logs

server.properties 文件中的其他配置暂时不用修改。

5.3 在 IDEA 中配置 kafka.Kafka 这个入口类

具体配置如下图所示:

在这里插入图片描述
5.4 启动 Kafka Broker

启动成功的话,控制台输出没有异常,且能看到如下输出:

在这里插入图片描述

5.5 可能出现以下异常

5.5.1 异常1

log4j:WARN No appenders could be found for logger (kafka.utils.Log4jControllerRegistration$).
log4j:WARN Please initialize the log4j system properly.
log4j:WARN See http://logging.apache.org/log4j/1.2/faq.html#noconfig for more info.

在 project structure 中加入 slf4j-log4j12-1.7.30.jar 和 log4j-1.2.17.jar 两个日志包,当然也可以在 build.gradle 中添加对应的配置来添加包。

方法1:
在这里插入图片描述
方法2:

compile group: 'log4j', name: 'log4j', version: '1.2.17'
compile group: 'org.slf4j', name: 'slf4j-api', version: '1.7.30'
compile group: 'org.slf4j', name: 'slf4j-log4j12', version: '1.7.30'

加到 build.gradle 文件中的 core 模块:

在这里插入图片描述
5.5.2 异常2

SLF4J: Failed to load class "org.slf4j.impl.StaticLoggerBinder".
SLF4J: Defaulting to no-operation (NOP) logger implementation
SLF4J: See http://www.slf4j.org/codes.html#StaticLoggerBinder for further details.

在这里插入图片描述
5.6 发送、消费 message

我们这里使用 Kafka 自带的脚本工具来验证上面搭建的 Kafka 源码环境

首先,我们进入到 ${KAFKA_HOME}/bin 目录,通过 kafka-topics.sh 命令来创建一个名为 topic_test 的 topic:

执行效果如下图所示:

在这里插入图片描述
然后我们通过 kafka-console-consumer.sh 命令启动一个命令行的 consumer 来消费 topic_test 这个 topic,如下:

./kafka-console-consumer.sh --bootstrap-server localhost:9092 --topic topic_test

在这里插入图片描述
接下来,我们通过 kafka-console-producer.sh 命令启动一个命令行的 producer 向 topic_test 这个 topic 中生成数据,如下:

在这里插入图片描述
当我们输入一条 message 并回车之后,message 会发送到 topic_test 这个 topic 中。

在这里插入图片描述

我们输入完 message 并回车之后,就可以在 consumer 处收到该 message 了,效果如下图所示:
在这里插入图片描述
大功告成,后续会陆续分析 Kafka Broker 端的源码,尽情期待~

Logo

Kafka开源项目指南提供详尽教程,助开发者掌握其架构、配置和使用,实现高效数据流管理和实时处理。它高性能、可扩展,适合日志收集和实时数据处理,通过持久化保障数据安全,是企业大数据生态系统的核心。

更多推荐