知识点记录-使用kafka

目录

package com.test

import org.apache.kafka.clients.producer._
import org.apache.kafka.common.Cluster

import java.util
import java.util.Properties

//测试KAFKA
object SparkKafka {

  def main(args: Array[String]): Unit = {

    //设置基本参数
    val properties = new Properties()
    properties.put(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG, "localhost:9092") //多个用,号
    //设置KEY序列化
    properties.put(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG, "org.apache.kafka.common.serialization.StringSerializer")
    //    properties.put(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG, classOf[StringSerializer])
    //设置VALUE序列化
    properties.put(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG, "org.apache.kafka.common.serialization.StringSerializer")
    //设置分区器
    properties.put(ProducerConfig.PARTITIONER_CLASS_CONFIG, classOf[CustomPartitioner])

    //优化PRODUCER端参数
    //增加缓冲区recordaccumulator  64M
    properties.put(ProducerConfig.BUFFER_MEMORY_CONFIG, 64 * 1024 * 1024)
    //修改bach.size 默认16K 增加64K
    properties.put(ProducerConfig.BATCH_SIZE_CONFIG, 16384 * 4) //批次64K
    //linger.ms 默认0ms  -> 5-100ms
    properties.put(ProducerConfig.LINGER_MS_CONFIG, 50) //50ms
    //compression.type 打开压缩snappy
    properties.put(ProducerConfig.COMPRESSION_TYPE_CONFIG, "snappy")

    //设置ACK
    properties.put(ProducerConfig.ACKS_CONFIG, "-1")
    //设置RETRY
    properties.put(ProducerConfig.RETRIES_CONFIG, 3)

    val kafkaProducer = new KafkaProducer[String, String](properties)

    for (k <- 1 to 5) {
      System.out.println("kafkaProducer=" + kafkaProducer + "  k=" + k)

      // 异步 PRODUCER->缓冲区
      kafkaProducer.send(new ProducerRecord[String, String]("topic1", "topic1:" + k))
      // 异步 回调
      kafkaProducer.send(new ProducerRecord[String, String]("topic1", "topic1:" + k), new Callback {
        override def onCompletion(recordMetadata: RecordMetadata, e: Exception): Unit = {
          e.printStackTrace()
          if (e == null) {
            System.out.println("主题=" + recordMetadata.topic() + " 分区=" + recordMetadata.partition())
          }
        }
      })

      //同步PRODUCER->缓冲区->BROKER
      kafkaProducer.send(new ProducerRecord[String, String]("topic1", "topic1:" + k)).get()

    }

    // 分区 默认 org.apache.kafka.clients.producer.internals.DefaultPartitioner
    // 可切分存多个BROKER
    // 负载均衡
    // 并行度
    //指定分区0
    kafkaProducer.send(new ProducerRecord[String, String]("topic1", 0, "", "topic1:"), new Callback {
      override def onCompletion(recordMetadata: RecordMetadata, e: Exception): Unit = {
        e.printStackTrace()
        if (e == null) {
          System.out.println("主题=" + recordMetadata.topic() + " 分区=" + recordMetadata.partition())
        }
      }
    })

    //指定key分区策略   HASH(KEY)%分区
    kafkaProducer.send(new ProducerRecord[String, String]("topic1", "key1", "topic1:"), new Callback {
      override def onCompletion(recordMetadata: RecordMetadata, e: Exception): Unit = {
        e.printStackTrace()
        if (e == null) {
          System.out.println("主题=" + recordMetadata.topic() + " 分区=" + recordMetadata.partition())
        }
      }
    })

    //默认分区策略 STICKY
    for (k <- 1 to 500) {
      kafkaProducer.send(new ProducerRecord[String, String]("topic1", "topic1:" + k), new Callback {
        override def onCompletion(recordMetadata: RecordMetadata, e: Exception): Unit = {
          e.printStackTrace()
          if (e == null) {
            System.out.println("主题=" + recordMetadata.topic() + " 分区=" + recordMetadata.partition())
          }
        }
      })
      Thread.sleep(20)
    }

    //测试分区器
    for (k <- 1 to 50) {
      kafkaProducer.send(new ProducerRecord[String, String]("topic1", "分区器:" + k))
    }

    //ACK ISR队列节点:存储运行的LEAD+FOLLER节点编号    LEAD与FOLLER同步参数replica.lag.time.max.ms
    //0 -> 内存中 丢失  1 -> LEAD存储 丢失  -1 -> LEAD+ISR队列节点存储
    //AT LEAST ONCE (不丢有重): ACK=-1 + REPLICA>=2 + ISR队列参数(min.insync.replicas)>=2
    //AT MOST ONCE (有丢不重): ACK=0
    //不丢不重: 幂等 + 事务
    //消息判重:<PID/*重启更新*/,分区编号, 序列号/*递增*/>
    //事务
    //properties.put(ProducerConfig.TRANSACTIONAL_ID_CONFIG,"transactionid1")
    kafkaProducer.initTransactions()
    kafkaProducer.beginTransaction()
    try {
      for (k <- 1 to 5) {
        kafkaProducer.send(new ProducerRecord[String, String]("topic1", "事务:" + k))
      }
      kafkaProducer.commitTransaction()
    } catch (Exception e) {
      kafkaProducer.abortTransaction()
    }

    //分区内有序
    //版本<1.X: 设置max.in.flight.requests.per.connection = 1
    //版本>=1.X: 打开幂等 设置max.in.flight.requests.per.connection <= 5

    kafkaProducer.close()
  }

  //定义分区器
  class CustomPartitioner extends Partitioner {
    override def partition(topic: String, key: Any, keyBytes: Array[Byte], value: Any, valueBytes: Array[Byte], cluster: Cluster): Int = {
      if (value.toString.contains("分区器")) {
        0
      }
      else {
        1
      }
    }

    override def close(): Unit = {

    }

    override def configure(map: util.Map[String, _]): Unit = {

    }
  }


}
Logo

Kafka开源项目指南提供详尽教程,助开发者掌握其架构、配置和使用,实现高效数据流管理和实时处理。它高性能、可扩展,适合日志收集和实时数据处理,通过持久化保障数据安全,是企业大数据生态系统的核心。

更多推荐