java代码实现向有kerberos认证的kafka中发送avro格式数据
1.在idea中创建一个maven工程
1.1 pom.xml 内容
    <dependencies>
        <dependency>
            <groupId>org.apache.kafka</groupId>
            <artifactId>kafka_2.11</artifactId>
            <version>1.0.0</version>
        </dependency>
        <dependency>
            <groupId>org.apache.avro</groupId>
            <artifactId>avro</artifactId>
            <version>1.8.0</version>
        </dependency>
    </dependencies>

    <build>
        <plugins>
            <plugin>
                <artifactId>maven-compiler-plugin</artifactId>
                <configuration>
                    <source>1.8</source>
                    <target>1.8</target>
                    <encoding>utf8</encoding>
                </configuration>
            </plugin>

            <plugin>
                <artifactId>maven-assembly-plugin</artifactId>
                <configuration>
                    <appendAssemblyId>false</appendAssemblyId>
                    <descriptorRefs>
                        <descriptorRef>jar-with-dependencies</descriptorRef>
                    </descriptorRefs>
                    <archive>
                        <manifest>
                            <!-- 此处指定main方法入口的class -->
                            <mainClass>com.test.kafkaProducer</mainClass>
                        </manifest>
                    </archive>
                </configuration>
                <executions>
                    <execution>
                        <id>make-assembly</id>
                        <phase>package</phase>
                        <goals>
                            <goal>assembly</goal>
                        </goals>
                    </execution>
                </executions>
            </plugin>
        </plugins>
    </build>
1.2 创建一个AvroKafkaProducter的类
package com.test;

import org.apache.avro.Schema;
import org.apache.avro.generic.GenericData;
import org.apache.avro.generic.GenericDatumWriter;
import org.apache.avro.generic.GenericRecord;
import org.apache.avro.io.BinaryEncoder;
import org.apache.avro.io.DatumWriter;
import org.apache.avro.io.Encoder;
import org.apache.avro.io.EncoderFactory;
import org.apache.avro.specific.SpecificDatumWriter;
import org.apache.kafka.clients.producer.KafkaProducer;
import org.apache.kafka.clients.producer.ProducerRecord;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

import java.io.ByteArrayOutputStream;
import java.io.IOException;
import java.text.SimpleDateFormat;
import java.util.Date;
import java.util.Properties;

public class AvroKafkaProducter {
    Logger logger = LoggerFactory.getLogger("AvroKafkaProducter");
    public static final String USER_SCHEMA = "{"
            + "\"type\":\"record\","
            + "\"name\":\"es_test02\","
            + "\"fields\":["
            + "  { \"name\":\"id\", \"type\":\"string\" },"
            + "  { \"name\":\"name\", \"type\":\"string\" },"
            + "  { \"name\":\"inserttime\", \"type\":\"string\" }"
            + "]}";


    public static byte[] recordToByte(Schema schema, GenericRecord record) {
        DatumWriter userDatumWriter = new SpecificDatumWriter(schema);
        ByteArrayOutputStream outputStream = new ByteArrayOutputStream();
        BinaryEncoder binaryEncoder = EncoderFactory.get().directBinaryEncoder(outputStream, null);
        try {
            userDatumWriter.write(record, binaryEncoder);
        } catch (IOException e) {

            // logger.info("record转成byte的发生异常"+e.getMessage());
        }
        return outputStream.toByteArray();
    }


    public static void main(String[] args) throws InterruptedException, IOException {
        System.setProperty("java.security.auth.login.config", "/etc/kafka/conf/kafka_jaas.conf");
        System.setProperty("java.security.krb5.conf", "/etc/krb5.conf");
        Properties props = new Properties();
        props.put("bootstrap.servers", "198.16.5.100:9888");
        props.put("key.serializer", "org.apache.kafka.common.serialization.StringSerializer");
        props.put("value.serializer", "org.apache.kafka.common.serialization.ByteArraySerializer");
        props.put("security.protocol", "SASL_PLAINTEXT");
        props.put("sasl.mechanism", "GSSAPI");
        props.put("sasl.kerberos.service.name", "kafka");
        Schema.Parser parser = new Schema.Parser();
        Schema schema = parser.parse(USER_SCHEMA);
        ByteArrayOutputStream outputStream = new ByteArrayOutputStream();
        Encoder e = EncoderFactory.get().binaryEncoder(outputStream, null);
        GenericDatumWriter w = new GenericDatumWriter(schema);
        KafkaProducer<String, byte[]> producer = new KafkaProducer<>(props);
        for (int i = 6; i < 15; i++) {
            GenericData.Record avroRecord = new GenericData.Record(schema);
            String time = new SimpleDateFormat("yyyy-MM-dd HH:mm:ss").format(new Date());
            avroRecord.put("id", "test"+i);
            avroRecord.put("name","zhangsan"+i);
            avroRecord.put("inserttime",time+i);
            System.out.println("-----发送数据--------");
            System.out.println(avroRecord.toString());
            System.out.println("----------------------");
//         byte[] bytes = recordToByte(schema, avroRecord);
            w.write(avroRecord, e);


        }
        e.flush();
        byte[] bytes = outputStream.toByteArray();

        System.out.println(bytes.length);
        ProducerRecord<String, byte[]> record = new ProducerRecord<>("es_test10", bytes);
        producer.send(record);
        Thread.sleep(1000);
        outputStream.close();
        producer.close();
    }
}

以上就是向有kerberos认证的kafka中发送数据的demo,其他的业务需求可以在这demo基础上进行改动。

Logo

Kafka开源项目指南提供详尽教程,助开发者掌握其架构、配置和使用,实现高效数据流管理和实时处理。它高性能、可扩展,适合日志收集和实时数据处理,通过持久化保障数据安全,是企业大数据生态系统的核心。

更多推荐