目前kafka消息格式有三个版本(假定v0,v1,v2),0.10.0之前使用的是v0版本,之后慢慢演变出v1,v2,后两个版本在设计方式上没有什么特别大的区别,只是做了些空间上的优化,同样的消息,新版本的使用存储空间会更小,优化主要在于消息头部的压缩,当然还有些功能上的优化,例如添加了时间戳,相对偏移量等;这些不是今天讨论的重点,今天主要来介绍下v0版本的消息格式设计,首先来看下kafka消息的具体构成
在这里插入图片描述
由上图可知kafka消息格式,主要

CRC : 校验码
Magic :版本号:0,1,2,对应目前的三个版本
Attribute : 属性字段,目前使用后3位存储压缩类型
Key len :key长度
Key :key值
Value len:value长度
Value :具体消息内容

除了key和value以外的,我们成为消息头部,消息头部长度:

8+1+1+4+4=18 byte

有些文章指出crc效验码为4字节,用的是无符号long类型,但这里我们直接使用有符号数,所以多了4个字节
先来定义一个Message.java对象
Message.java

package com.fan.reactor.protocol;

import java.io.Serializable;

public class Message implements Serializable {

    private long crc;//8,CRC32.getValue()
    private byte magic;//1,默认0
    private byte attribute;//1,后3位用来存储压缩类型
    private int keylen ;//4
    private String key;
    private int vallen;//4
    private String value;

    public Message(long crc, byte magic, byte attribute, int keylen, String key, int vallen, String value) {
        this.crc = crc;
        this.magic = magic;
        this.attribute = attribute;
        this.keylen = keylen;
        this.key = key;
        this.vallen = vallen;
        this.value = value;
    }

    @Override
    public String toString() {
        return "Message{" +
                "crc=" + crc +
                ", magic=" + magic +
                ", attribute=" + attribute +
                ", keylen=" + keylen +
                ", key='" + key + '\'' +
                ", vallen=" + vallen +
                ", value='" + value + '\'' +
                '}';
    }
}

常量类
Contants.java

package com.fan.reactor.protocol;

public class Contants {

    //压缩类型
    public static final class Codec{
        public final static byte NONE = (byte)0;
        public final static byte GZiP = (byte)1;
        public final static byte Snappy = (byte)2;
        public final static byte LZ4 = (byte)3;
    }

    //版本号
    public static final byte MAGIC = (byte)0;
    public static final int MESSAGE_HEAD_LENGTH = 18;//默认消息头长度
}

测试序列化反序列化方法

package com.fan.reactor.protocol;

import java.nio.ByteBuffer;
import java.util.zip.CRC32;

public class Main {

    public static void main(String[] args) {
        String key = "first";
        String message = "this is the first message";
        //消息转换成bytebuffer
        ByteBuffer byteBuffer = message2byte(key,message);
        
        //bytebuffer 转换成对象
        Message messageObj = byte2message(byteBuffer);
        System.out.println(messageObj);

    }

    public static ByteBuffer message2byte(String key,String message){
    //根据key和value的长度开辟ByteBuffer空间,避免空间浪费和不足
        ByteBuffer messageBuff = ByteBuffer.allocate(Contants.MESSAGE_HEAD_LENGTH+key.length()+message.length());
    //对value进行校验,生成long类型的CRC校验码
        CRC32 crc32 = new CRC32();
        byte[] data = message.getBytes();
        crc32.update(data);
    // 按顺序填充消息
        messageBuff.putLong(crc32.getValue());
        messageBuff.put(Contants.MAGIC);
        messageBuff.put(Contants.Codec.GZiP);
        messageBuff.putInt(key.length());
        messageBuff.put(key.getBytes());
        messageBuff.putInt(message.length());
        messageBuff.put(message.getBytes());
    //返回bytebuff,用于消息传输
        return messageBuff;
    }

//将接收到的消息反序列化为具体对象
    public static Message byte2message(ByteBuffer messageBuff){
        System.out.println("messageBuff current postition is :"+messageBuff.position());
        //这里特别要注意获取消息的postition
        int postition = 0;
        System.out.println("reset messageBuff's  posti+    tion");
        messageBuff.position(postition);
        long crc = messageBuff.getLong(postition);
        System.out.println("crc32:"+crc);//0
        postition +=8;
        byte magic = messageBuff.get(postition);
        System.out.println("MAGIC:"+magic);//8
        postition ++;
        byte codec = messageBuff.get(postition);
        System.out.println("Codec:"+codec);//9
        postition ++;
        int keylen = messageBuff.getInt(postition);//10
        System.out.println("Key length:"+keylen);
        byte[] keycontent = new byte[keylen];
        postition += 4;
        //重点注意
        messageBuff.position(postition);
        messageBuff.get(keycontent);
        System.out.println("keycontent:"+new String(keycontent));
        postition += keylen;
        int messagelen = messageBuff.getInt(postition);
        System.out.println("message length is :"+messagelen);
        postition += 4;
        //重点注意
        messageBuff.position(postition);
        byte[] messagescontent = new byte[messagelen];
        messageBuff.get(messagescontent);
        System.out.println("message is :"+new String(messagescontent));
        postition+=messagelen;
        System.out.println("next postition is :"+postition);
        return new Message(crc,magic,codec,keylen,new String(keycontent),messagelen,new String (messagescontent));
    }
}

输出如下

messageBuff current postition is :48
reset messageBuff's  postition
crc32:4252115574
MAGIC:0
Codec:1
Key length:5
keycontent:first
message length is :25
message is :this is the first message
next postition is :48
Message{crc=4252115574, magic=0, attribute=1, keylen=5, key='first', vallen=25, value='this is the first message'}

我们可以看到这条带key的消息总长度为48个字节,如果我们采用java默认的序列化方式,光对象头部就需要使用16字节,加上字段开销,一条空消息可能差不多就要:16(对象头部)+18(消息头部)+4(key 引用)+4 (value 引用)+k(若干对象对齐空间)>42个字节;可想而知,一个好的消息设计是多么的重要,还有一点就是,java的序列化方式,其他语言是很难或者无法解析

Logo

Kafka开源项目指南提供详尽教程,助开发者掌握其架构、配置和使用,实现高效数据流管理和实时处理。它高性能、可扩展,适合日志收集和实时数据处理,通过持久化保障数据安全,是企业大数据生态系统的核心。

更多推荐