kafka应用,2024年最新java线程池工作原理
Slf4j//异步发送涉及到的两个配置/***producer(生产者)缓存每个分区未发送的消息。缓存的大小是通过 batch.size 配置指定的。*值较大的话将会产生更大的批。并需要更多的内存(因为每个“活跃”的分区都有1个缓冲区)。默认缓冲可立即发送,即便缓冲空间还没有满,但是,如果你想减少请求的数量,可以设置linger.ms大于0。这将指示生产者发送请求之前等待一段时间,希望更多的消息填
import lombok.extern.slf4j.Slf4j;
import org.apache.kafka.clients.producer.*;
import org.apache.kafka.common.serialization.IntegerSerializer;
import org.apache.kafka.common.serialization.StringSerializer;
import java.util.Properties;
import java.util.concurrent.TimeUnit;
@Slf4j
public class ProducerKafka extends Thread {
Producer<Integer, String> producer;
String topic;
public ProducerKafka(String topic) {
Properties properties = new Properties();
properties.put(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG, “192.168.40.137:9092,192.168.40.138:9092,192.168.40.139:9092”);
properties.put(ProducerConfig.CLIENT_ID_CONFIG, “learn-producer”);
//异步发送涉及到的两个配置
/**
*producer(生产者)缓存每个分区未发送的消息。缓存的大小是通过 batch.size 配置指定的。
*值较大的话将会产生更大的批。并需要更多的内存(因为每个“活跃”的分区都有1个缓冲区)。
-
默认缓冲可立即发送,即便缓冲空间还没有满,但是,如果你想减少请求的数量,可以设置linger.ms大于0。
-
这将指示生产者发送请求之前等待一段时间,希望更多的消息填补到未满的批中。
-
这类似于TCP的算法,
-
例如上面的代码段,可能100条消息在一个请求发送,
-
因为我们设置了linger(逗留)时间为1毫秒,然后,如果我们没有填满缓冲区,
-
这个设置将增加1毫秒的延迟请求以等待更多的消息。
-
需要注意的是,在高负载下,相近的时间一般也会组成批,即使是linger.ms=0。
-
在不处于高负载的情况下,如果设置比0大,以少量的延迟代价换取更少的,更有效的请求。
*/
properties.put(ProducerConfig.BATCH_SIZE_CONFIG, 16384);
properties.put(ProducerConfig.LINGER_MS_CONFIG, 1);
properties.put(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG, IntegerSerializer.class);
properties.put(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG, StringSerializer.class);
producer = new KafkaProducer<>(properties);
this.topic = topic;
}
@Override
public void run() {
int num = 0;
log.info(“start”);
while (num < 20) {
try {
String msg = “learn kafka msg” + num;
//get会拿到发送的结果,同步发送
// RecordMetadata recordMetadata = producer.send(new ProducerRecord<>(topic, msg)).get();
//log.info(“recordMetadata offset:{}, partition:{}, topic:{}”, recordMetadata.offset(), recordMetadata.partition(), recordMetadata.topic());
producer.send(new ProducerRecord<>(topic, msg), (recordMetadata, e) -> {
log.info(“recordMetadata offset:{}, partition:{}, topic:{}”, recordMetadata.offset(), recordMetadata.partition(), recordMetadata.topic());
});
TimeUnit.SECONDS.sleep(2);
++num;
} catch (InterruptedException e) {
log.error(“InterruptedException:{}”, e.getMessage());
}
}
}
public static void main(String[] args) {
new ProducerKafka(“learn”).start();
}
}
自我介绍一下,小编13年上海交大毕业,曾经在小公司待过,也去过华为、OPPO等大厂,18年进入阿里一直到现在。
深知大多数Java工程师,想要提升技能,往往是自己摸索成长或者是报班学习,但对于培训机构动则几千的学费,着实压力不小。自己不成体系的自学效果低效又漫长,而且极易碰到天花板技术停滞不前!
因此收集整理了一份《2024年Java开发全套学习资料》,初衷也很简单,就是希望能够帮助到想自学提升又不知道该从何学起的朋友,同时减轻大家的负担。
既有适合小白学习的零基础资料,也有适合3年以上经验的小伙伴深入学习提升的进阶课程,基本涵盖了95%以上Java开发知识点,真正体系化!
由于文件比较大,这里只是将部分目录大纲截图出来,每个节点里面都包含大厂面经、学习笔记、源码讲义、实战项目、讲解视频,并且后续会持续更新
如果你觉得这些内容对你有帮助,可以添加V获取:vip1024b (备注Java)
本次面试答案,以及收集到的大厂必问面试题分享:
一个人可以走的很快,但一群人才能走的更远。不论你是正从事IT行业的老鸟或是对IT行业感兴趣的新人,都欢迎扫码加入我们的的圈子(技术交流、学习资源、职场吐槽、大厂内推、面试辅导),让我们一起学习成长!
779)]
一个人可以走的很快,但一群人才能走的更远。不论你是正从事IT行业的老鸟或是对IT行业感兴趣的新人,都欢迎扫码加入我们的的圈子(技术交流、学习资源、职场吐槽、大厂内推、面试辅导),让我们一起学习成长!
[外链图片转存中…(img-c2JjGR6U-1712651262779)]
更多推荐
所有评论(0)