kafka的知识了解

  • 名词简单说明;
    • topic : 主题,特指kafka处理的消息源的不同分类
    • partition : topic物理上的分组,一个topic可以分为多个partition
      • 多个副本:选其中一个为leader,其余都是follower

    创建副本的单位是topic的分区,每个分区都有一个leader和零或多个followers.所有的读写操作都由leader处理,一般分区的数量都比broker的数量多的多,各分区的leader均匀的分布在brokers中。所有的followers都复制leader的日志,日志中的消息和顺序都和leader中的一致。flowers向普通的consumer那样从leader那里拉取消息并保存在自己的日志文件中。

分区与leader与follower区别

  • 引入依赖

    	<dependency>
              <groupId>org.springframework.kafka</groupId>
              <artifactId>spring-kafka</artifactId>
          </dependency>
    

简单使用方式

  • 配置文件
 # 指定kafka server的地址,集群配多个,中间,逗号隔开
spring.kafka.bootstrap-servers=1.1.1.1:9092

#-----product 生产者参数-----
# 指定消息key和消息体的编解码方式
spring.kafka.producer.key-serializer=org.apache.kafka.common.serialization.StringSerializer
spring.kafka.producer.value-serializer=org.apache.kafka.common.serialization.StringSerializer

demo:


 @Autowired
    private KafkaTemplate kafkaTemplate;

    @Test
    public void send(){
        //send(topic,消息信息)
        kafkaTemplate.send("test","242323424");
    }

附录一:

文献:

附录二

  • ProducerConfig values:生产者的默认配置:
	
	bootstrap.servers = [1.1.1.1:9092]
	acks = 1
		# 0:客户端发送了,认为是发送成功
	 	# 1:客户端发送了,分区leader收到了,认为是发送成功
		# all: 客户端发送了,分区leader收到了,其他副本follower也有了,认为是发送成功
	batch.size = 16384
	 	# 每个Batch要存放batch.size大小的数据后,才可以发送出去
	linger.ms = 0
		# 一个Batch被创建之后,最多过多久,不管这个Batch有没有写满,都必须发送出去了。避免达不到size导致无法发送
	buffer.memory = 33554432
		# 约束KafkaProducer能够使用的内存缓冲的大小的
		# 个人理解:类似于 文件字符流的管道大小
	retries = 2147483647
		# 失败重新发送次数
	retry.backoff.ms = 100
		# 每次重试的间隔是多少毫秒
		max.request.size = 1048576
		# 决定了每次发送给Kafka服务器请求消息的最大大小
	client.dns.lookup = default
	client.id = 
	compression.type = none
	connections.max.idle.ms = 540000
	delivery.timeout.ms = 120000
	enable.idempotence = false
	interceptor.classes = []
	key.serializer = class org.apache.kafka.common.serialization.StringSerializer
	max.block.ms = 60000
	max.in.flight.requests.per.connection = 5
	metadata.max.age.ms = 300000
	metric.reporters = []
	metrics.num.samples = 2
	metrics.recording.level = INFO
	metrics.sample.window.ms = 30000
	partitioner.class = class org.apache.kafka.clients.producer.internals.DefaultPartitioner
	receive.buffer.bytes = 32768
	reconnect.backoff.max.ms = 1000
	reconnect.backoff.ms = 50
	request.timeout.ms = 30000

	sasl.client.callback.handler.class = null
	sasl.jaas.config = null
	sasl.kerberos.kinit.cmd = /usr/bin/kinit
	sasl.kerberos.min.time.before.relogin = 60000
	sasl.kerberos.service.name = null
	sasl.kerberos.ticket.renew.jitter = 0.05
	sasl.kerberos.ticket.renew.window.factor = 0.8
	sasl.login.callback.handler.class = null
	sasl.login.class = null
	sasl.login.refresh.buffer.seconds = 300
	sasl.login.refresh.min.period.seconds = 60
	sasl.login.refresh.window.factor = 0.8
	sasl.login.refresh.window.jitter = 0.05
	sasl.mechanism = GSSAPI
	security.protocol = PLAINTEXT
	send.buffer.bytes = 131072
	ssl.cipher.suites = null
	ssl.enabled.protocols = [TLSv1.2, TLSv1.1, TLSv1]
	ssl.endpoint.identification.algorithm = https
	ssl.key.password = null
	ssl.keymanager.algorithm = SunX509
	ssl.keystore.location = null
	ssl.keystore.password = null
	ssl.keystore.type = JKS
	ssl.protocol = TLS
	ssl.provider = null
	ssl.secure.random.implementation = null
	ssl.trustmanager.algorithm = PKIX
	ssl.truststore.location = null
	ssl.truststore.password = null
	ssl.truststore.type = JKS
	transaction.timeout.ms = 60000
	transactional.id = null
	value.serializer = class org.apache.kafka.common.serialization.StringSerializer

附录3

kafka配置图,来自http://shixinke.com/java/kafka-configuration

Logo

Kafka开源项目指南提供详尽教程,助开发者掌握其架构、配置和使用,实现高效数据流管理和实时处理。它高性能、可扩展,适合日志收集和实时数据处理,通过持久化保障数据安全,是企业大数据生态系统的核心。

更多推荐