发送自定义数据/对象请查看:发送自定义数据/自定义对象,不建议使用耦合性强

一,生产者的的使用:

package com.kuxingseng.lbw.mq;

import com.fasterxml.jackson.core.JsonProcessingException;
import com.fasterxml.jackson.databind.ObjectMapper;
import org.apache.kafka.clients.producer.*;
import org.apache.kafka.common.KafkaException;
import org.apache.kafka.common.errors.RetriableException;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

import java.util.*;

public class MyProducer {
    private final static Logger logger = LoggerFactory.getLogger(MyProducer.class);

    private String TOPIC = "LBW_EPM";
    protected static String IP = "192.168.199.128";
    protected static String PORT = "9092";

    private Properties properties = getProperties();
    private static MyProducer producer = new MyProducer();

    private ObjectMapper mapper = new ObjectMapper();

    public static MyProducer getInstance() {
        if (producer == null) {
            producer = new MyProducer();
        }
        return producer;
    }

    public void send() {
        List<Map<String, String>> list = new ArrayList<>();
        Map<String, String> map = new HashMap<>();
        map.put("1112", "33333");
        map.put("99999", "22222");
        list.add(map);

        Producer<String, String> producer = new KafkaProducer<>(properties);
        ProducerRecord record = null;
        try {
            record = new ProducerRecord<String, String>(TOPIC, mapper.writeValueAsString(list)); //转化为json字符串
        } catch (JsonProcessingException e) {
            e.printStackTrace();
        }
        producer.send(record, new Callback() {
            @Override
            public void onCompletion(RecordMetadata recordMetadata, Exception e) {
                if (e == null) {
                    logger.info("kafka send successful");
                } else {
                    if (e instanceof RetriableException) {
                        //处理可重试异常
                        try {
                            logger.error("kafka send fail Retry sending.");
                            Thread.sleep(3000);
                            MyProducer.getInstance().send();
                        } catch (InterruptedException e1) {
                            logger.error("kafka error :", e1);
                        }
                    } else {
                        throw new KafkaException("kafka server message error.");
                    }
                }
            }
        });
        producer.close();
    }

    public static Properties getProperties() {
        Properties props = new Properties();
        //集群地址,多个服务器用 逗号 ","分隔
        props.put("bootstrap.servers", IP + ":" + PORT);
        //key 的序列化,此处以字符串为例,使用kafka已有的序列化类
        props.put("key.serializer", "org.apache.kafka.common.serialization.StringSerializer");
        //发送字符串 json数据 可以使用StringSerializer kafka默认序列化器
        props.put("value.serializer", "org.apache.kafka.common.serialization.StringSerializer");
        //使用自定义序列化器 发送自定义数据
//        props.put("value.serializer", "com.kuxingseng.lbw.mq.KafkaEntityDataSerializer");
        props.put("request.required.acks", "1");
        return props;
    }
}

二,消费者解析json字符串。

package com.kuxingseng.lbw.mq;

import com.fasterxml.jackson.databind.ObjectMapper;
import org.apache.kafka.clients.consumer.ConsumerRecord;
import org.apache.kafka.clients.consumer.ConsumerRecords;
import org.apache.kafka.clients.consumer.KafkaConsumer;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

import java.io.IOException;
import java.time.Duration;
import java.util.Arrays;
import java.util.List;
import java.util.Map;
import java.util.Properties;

public class MyConsumer implements Runnable {
    private final Logger logger = LoggerFactory.getLogger(this.getClass().getName());

    private Properties props = getProperties();
    private final String TOPIC = "LBW_EPM";

    private ObjectMapper mapper = new ObjectMapper();

    @Override
    public void run() {
        //创建消息者实例
        KafkaConsumer<String, String> consumer = new KafkaConsumer<>(props);
        //订阅topic1的消息
        consumer.subscribe(Arrays.asList(TOPIC));
        try {
            while (true) {
                Thread.sleep(3000);
                //到服务器中读取记录
                ConsumerRecords<String, String> records = consumer.poll(Duration.ofMillis(1000));
                for (ConsumerRecord<String, String> record : records) {
                    System.out.println("value:" + record.value());
                    if (isJsonArray(record.value())) {
                        try {
                            logger.info(mapper.readValue(record.value(), List.class).toString());
                        } catch (IOException e) {
                            e.printStackTrace();
                        }
                    }
                }
            }
        } catch (InterruptedException e) {
            logger.error("MyConsumer error:", e);
        } finally {
            consumer.close();
        }

    }

    private static Properties getProperties() {
        //配置信息
        Properties props = new Properties();
        //kafka服务器地址
        props.put("bootstrap.servers", "192.168.199.128:9092");
        //必须指定消费者组
        props.put("group.id", "DEMO");
        //设置数据key和value的序列化处理类
        props.put("key.deserializer", "org.apache.kafka.common.serialization.StringDeserializer");
        //接受字符串/json字符串 可以使用默认的反序列化 /也可自定义反序列化器
        props.put("value.deserializer", "org.apache.kafka.common.serialization.StringDeserializer");
//        props.put("value.deserializer", "com.kuxingseng.lbw.mq.KafkaEntityDeserializer");
        return props;
    }


    /**
     * 判断是是否为json 组
     */
    public boolean isJsonArray(String content) {
        try {
            mapper.readValue(content, List.class);
            return true;
        } catch (Exception e) {
            return false;
        }
    }

    /**
     * 判断是是否为json 组
     */
    public boolean isJson(String content) {
        try {
            mapper.readValue(content, Map.class);
            return true;
        } catch (Exception e) {
            return false;
        }
    }
}

在这里插入图片描述

Logo

Kafka开源项目指南提供详尽教程,助开发者掌握其架构、配置和使用,实现高效数据流管理和实时处理。它高性能、可扩展,适合日志收集和实时数据处理,通过持久化保障数据安全,是企业大数据生态系统的核心。

更多推荐