接着上一篇补充

官网上关于这一块迷迷糊糊的看不懂,自己总结了下其中的差异:

    我们一般没做特殊处理的就是同步模式,生产者发送消息,然后交给消费者,这里面我们也可以对消息的结果进行处理,防止消息丢失

kafkademo中,修改REST接口如下:   

ListenableFuture返回对象自带callback方法;我们可以根据成功或者失败进行下一步操作,不过这里主要讲的不是这个

在同步消息推送有可能存在的问题,如果你只用一个kafka或者部署在一台机子上时,当网络波动时,就会出现大量的信息丢失情况造成阻塞状态,考虑到这种情况,建议采用异步推送方式

具体实现:

第一步:@EnableAsync//开启异步模式 ,将此注解配置到config里

第二步:方法上加@Async

为了保证我们对于同步的缺陷进一步完善,我们可以进行异常抛出,在配置yml配置如下:

kafka:
  bootstrap-servers: 服务地址
  producer:
    group-id: newgroup
    buffer-memory: 40960

buffer-memory控制生产者可用的缓存总量,如果消息发送速度比其传输到服务器的快,将会耗尽这个缓存空间。当缓存空间耗尽,其他发送调用将被阻塞,阻塞时间的阈值通过max.block.ms设定, 之后它将抛出一个TimeoutException

测试下,我们启动demo发送消息,关闭demo1,demo2,模拟异常

发送成功;

开启demo1,demo2,发现:

同步,异步模式下都行,想模拟测试没有好的办法,哎

继续整理发现

kafka发送消息居然还可以指定分区,时间,学习到了。。

至于异步推送的缺陷,暂时还处于学习了解,希望大佬解惑!

Logo

Kafka开源项目指南提供详尽教程,助开发者掌握其架构、配置和使用,实现高效数据流管理和实时处理。它高性能、可扩展,适合日志收集和实时数据处理,通过持久化保障数据安全,是企业大数据生态系统的核心。

更多推荐