import lombok.extern.slf4j.Slf4j;

import org.apache.kafka.clients.producer.*;

import org.apache.kafka.common.serialization.IntegerSerializer;

import org.apache.kafka.common.serialization.StringSerializer;

import java.util.Properties;

import java.util.concurrent.TimeUnit;

@Slf4j

public class ProducerKafka extends Thread {

Producer<Integer, String> producer;

String topic;

public ProducerKafka(String topic) {

Properties properties = new Properties();

properties.put(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG, “192.168.40.137:9092,192.168.40.138:9092,192.168.40.139:9092”);

properties.put(ProducerConfig.CLIENT_ID_CONFIG, “learn-producer”);

//异步发送涉及到的两个配置

/**

*producer(生产者)缓存每个分区未发送的消息。缓存的大小是通过 batch.size 配置指定的。

*值较大的话将会产生更大的批。并需要更多的内存(因为每个“活跃”的分区都有1个缓冲区)。

  • 默认缓冲可立即发送,即便缓冲空间还没有满,但是,如果你想减少请求的数量,可以设置linger.ms大于0。

  • 这将指示生产者发送请求之前等待一段时间,希望更多的消息填补到未满的批中。

  • 这类似于TCP的算法,

  • 例如上面的代码段,可能100条消息在一个请求发送,

  • 因为我们设置了linger(逗留)时间为1毫秒,然后,如果我们没有填满缓冲区,

  • 这个设置将增加1毫秒的延迟请求以等待更多的消息。

  • 需要注意的是,在高负载下,相近的时间一般也会组成批,即使是linger.ms=0。

  • 在不处于高负载的情况下,如果设置比0大,以少量的延迟代价换取更少的,更有效的请求。

*/

properties.put(ProducerConfig.BATCH_SIZE_CONFIG, 16384);

properties.put(ProducerConfig.LINGER_MS_CONFIG, 1);

properties.put(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG, IntegerSerializer.class);

properties.put(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG, StringSerializer.class);

producer = new KafkaProducer<>(properties);

this.topic = topic;

}

@Override

public void run() {

int num = 0;

log.info(“start”);

while (num < 20) {

try {

String msg = “learn kafka msg” + num;

//get会拿到发送的结果,同步发送

// RecordMetadata recordMetadata = producer.send(new ProducerRecord<>(topic, msg)).get();

//log.info(“recordMetadata offset:{}, partition:{}, topic:{}”, recordMetadata.offset(), recordMetadata.partition(), recordMetadata.topic());

producer.send(new ProducerRecord<>(topic, msg), (recordMetadata, e) -> {

log.info(“recordMetadata offset:{}, partition:{}, topic:{}”, recordMetadata.offset(), recordMetadata.partition(), recordMetadata.topic());

});

TimeUnit.SECONDS.sleep(2);

++num;

} catch (InterruptedException e) {

log.error(“InterruptedException:{}”, e.getMessage());

}

}

}

public static void main(String[] args) {

new ProducerKafka(“learn”).start();

}

}

自我介绍一下,小编13年上海交大毕业,曾经在小公司待过,也去过华为、OPPO等大厂,18年进入阿里一直到现在。

深知大多数Java工程师,想要提升技能,往往是自己摸索成长或者是报班学习,但对于培训机构动则几千的学费,着实压力不小。自己不成体系的自学效果低效又漫长,而且极易碰到天花板技术停滞不前!

因此收集整理了一份《2024年Java开发全套学习资料》,初衷也很简单,就是希望能够帮助到想自学提升又不知道该从何学起的朋友,同时减轻大家的负担。
img
img
img
img
img
img

既有适合小白学习的零基础资料,也有适合3年以上经验的小伙伴深入学习提升的进阶课程,基本涵盖了95%以上Java开发知识点,真正体系化!

由于文件比较大,这里只是将部分目录大纲截图出来,每个节点里面都包含大厂面经、学习笔记、源码讲义、实战项目、讲解视频,并且后续会持续更新

如果你觉得这些内容对你有帮助,可以添加V获取:vip1024b (备注Java)
img

本次面试答案,以及收集到的大厂必问面试题分享:

字节跳动超高难度三面java程序员面经,大厂的面试都这么变态吗?

一个人可以走的很快,但一群人才能走的更远。不论你是正从事IT行业的老鸟或是对IT行业感兴趣的新人,都欢迎扫码加入我们的的圈子(技术交流、学习资源、职场吐槽、大厂内推、面试辅导),让我们一起学习成长!
img

779)]

一个人可以走的很快,但一群人才能走的更远。不论你是正从事IT行业的老鸟或是对IT行业感兴趣的新人,都欢迎扫码加入我们的的圈子(技术交流、学习资源、职场吐槽、大厂内推、面试辅导),让我们一起学习成长!
[外链图片转存中…(img-c2JjGR6U-1712651262779)]

Logo

Kafka开源项目指南提供详尽教程,助开发者掌握其架构、配置和使用,实现高效数据流管理和实时处理。它高性能、可扩展,适合日志收集和实时数据处理,通过持久化保障数据安全,是企业大数据生态系统的核心。

更多推荐