1. Java对象序列化成JSON的序列化类:JsonSerializer

public class JsonSerializer {

private final ObjectMapper jsonMapper = new ObjectMapper();

public String toJSONString(T r) {

try {

return jsonMapper.writeValueAsString®;

} catch (JsonProcessingException e) {

throw new IllegalArgumentException("Could not serialize record: " + r, e);

}

}

public byte[] toJSONBytes(T r) {

try {

return jsonMapper.writeValueAsBytes®;

} catch (JsonProcessingException e) {

throw new IllegalArgumentException("Could not serialize record: " + r, e);

}

}

}

  1. 向kafka发送消息的工具类:KafkaProducer:

public class KafkaProducer implements Consumer {

private final String topic;

private final org.apache.kafka.clients.producer.KafkaProducer<byte[], byte[]> producer;

private final JsonSerializer serializer;

public KafkaProducer(String kafkaTopic, String kafkaBrokers) {

this.topic = kafkaTopic;

this.producer = new org.apache.kafka.clients.producer.KafkaProducer<>(createKafkaProperties(kafkaBrokers));

this.serializer = new JsonSerializer<>();

}

@Override

public void accept(UserBehavior record) {

// 将对象序列化成byte数组

byte[] data = serializer.toJSONBytes(record);

// 封装

ProducerRecord<byte[], byte[]> kafkaRecord = new ProducerRecord<>(topic, data);

// 发送

producer.send(kafkaRecord);

// 通过sleep控制消息的速度,请依据自身kafka配置以及flink服务器配置来调整

try {

Thread.sleep(500);

}catch(InterruptedException e){

e.printStackTrace();

}

}

/**

  • kafka配置

  • @param brokers The brokers to connect to.

  • @return A Kafka producer configuration.

*/

private static Properties createKafkaProperties(String brokers) {

Properties kafkaProps = new Properties();

kafkaProps.put(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG, brokers);

kafkaProps.put(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG, ByteArraySerializer.class.getCanonicalName());

kafkaProps.put(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG, ByteArraySerializer.class.getCanonicalName());

return kafkaProps;

}

}

  1. 最后是应用类SendMessageApplication,CSV文件路径、kafka的topic和borker地址都在此设置,另外借助java8的Stream API,只需少量代码即可完成所有工作:

public class SendMessageApplication {

public static void main(String[] args) throws Exception {

// 文件地址

String filePath = “D:\temp\202005\02\UserBehavior.csv”;

// kafka topic

String topic = “user_behavior”;

// kafka borker地址

String broker = “192.168.50.43:9092”;

Stream.generate(new UserBehaviorCsvFileReader(filePath))

.sequential()

.forEachOrdered(new KafkaProducer(topic, broker));

}

}

验证

  1. 请确保kafka已经就绪,并且名为user_behavior的topic已经创建;

  2. 请将CSV文件准备好;

  3. 确认SendMessageApplication.java中的文件地址、kafka topic、kafka broker三个参数准确无误;

  4. 运行SendMessageApplication.java;

  5. 开启一个 控制台消息kafka消息,参考命令如下:

./kafka-console-consumer.sh \

–bootstrap-server 127.0.0.1:9092 \

–topic user_behavior \

–consumer-property group.id=old-consumer-test \

–consumer-property consumer.id=old-consumer-cl \

–from-beginning

自我介绍一下,小编13年上海交大毕业,曾经在小公司待过,也去过华为、OPPO等大厂,18年进入阿里一直到现在。

深知大多数Java工程师,想要提升技能,往往是自己摸索成长或者是报班学习,但对于培训机构动则几千的学费,着实压力不小。自己不成体系的自学效果低效又漫长,而且极易碰到天花板技术停滞不前!

因此收集整理了一份《2024年Java开发全套学习资料》,初衷也很简单,就是希望能够帮助到想自学提升又不知道该从何学起的朋友,同时减轻大家的负担。
img
img
img
img
img
img

既有适合小白学习的零基础资料,也有适合3年以上经验的小伙伴深入学习提升的进阶课程,基本涵盖了95%以上Java开发知识点,真正体系化!

由于文件比较大,这里只是将部分目录大纲截图出来,每个节点里面都包含大厂面经、学习笔记、源码讲义、实战项目、讲解视频,并且后续会持续更新

如果你觉得这些内容对你有帮助,可以添加V获取:vip1024b (备注Java)
img

最后

还有Java核心知识点+全套架构师学习资料和视频+一线大厂面试宝典+面试简历模板可以领取+阿里美团网易腾讯小米爱奇艺快手哔哩哔哩面试题+Spring源码合集+Java架构实战电子书+2021年最新大厂面试题。
在这里插入图片描述

一个人可以走的很快,但一群人才能走的更远。如果你从事以下工作或对以下感兴趣,欢迎戳这里加入程序员的圈子,让我们一起学习成长!

AI人工智能、Android移动开发、AIGC大模型、C C#、Go语言、Java、Linux运维、云计算、MySQL、PMP、网络安全、Python爬虫、UE5、UI设计、Unity3D、Web前端开发、产品经理、车载开发、大数据、鸿蒙、计算机网络、嵌入式物联网、软件测试、数据结构与算法、音视频开发、Flutter、IOS开发、PHP开发、.NET、安卓逆向、云计算

(https://bbs.csdn.net/forums/4304bb5a486d4c3ab8389e65ecb71ac0)

AI人工智能、Android移动开发、AIGC大模型、C C#、Go语言、Java、Linux运维、云计算、MySQL、PMP、网络安全、Python爬虫、UE5、UI设计、Unity3D、Web前端开发、产品经理、车载开发、大数据、鸿蒙、计算机网络、嵌入式物联网、软件测试、数据结构与算法、音视频开发、Flutter、IOS开发、PHP开发、.NET、安卓逆向、云计算

Logo

Kafka开源项目指南提供详尽教程,助开发者掌握其架构、配置和使用,实现高效数据流管理和实时处理。它高性能、可扩展,适合日志收集和实时数据处理,通过持久化保障数据安全,是企业大数据生态系统的核心。

更多推荐