知识点记录-使用kafka
知识点记录-使用kafka目录package com.testimport org.apache.kafka.clients.producer._import org.apache.kafka.common.Clusterimport java.utilimport java.util.Properties//测试KAFKAobject SparkKafka {def main(args: Arr
·
知识点记录-使用kafka
目录
package com.test
import org.apache.kafka.clients.producer._
import org.apache.kafka.common.Cluster
import java.util
import java.util.Properties
//测试KAFKA
object SparkKafka {
def main(args: Array[String]): Unit = {
//设置基本参数
val properties = new Properties()
properties.put(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG, "localhost:9092") //多个用,号
//设置KEY序列化
properties.put(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG, "org.apache.kafka.common.serialization.StringSerializer")
// properties.put(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG, classOf[StringSerializer])
//设置VALUE序列化
properties.put(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG, "org.apache.kafka.common.serialization.StringSerializer")
//设置分区器
properties.put(ProducerConfig.PARTITIONER_CLASS_CONFIG, classOf[CustomPartitioner])
//优化PRODUCER端参数
//增加缓冲区recordaccumulator 64M
properties.put(ProducerConfig.BUFFER_MEMORY_CONFIG, 64 * 1024 * 1024)
//修改bach.size 默认16K 增加64K
properties.put(ProducerConfig.BATCH_SIZE_CONFIG, 16384 * 4) //批次64K
//linger.ms 默认0ms -> 5-100ms
properties.put(ProducerConfig.LINGER_MS_CONFIG, 50) //50ms
//compression.type 打开压缩snappy
properties.put(ProducerConfig.COMPRESSION_TYPE_CONFIG, "snappy")
//设置ACK
properties.put(ProducerConfig.ACKS_CONFIG, "-1")
//设置RETRY
properties.put(ProducerConfig.RETRIES_CONFIG, 3)
val kafkaProducer = new KafkaProducer[String, String](properties)
for (k <- 1 to 5) {
System.out.println("kafkaProducer=" + kafkaProducer + " k=" + k)
// 异步 PRODUCER->缓冲区
kafkaProducer.send(new ProducerRecord[String, String]("topic1", "topic1:" + k))
// 异步 回调
kafkaProducer.send(new ProducerRecord[String, String]("topic1", "topic1:" + k), new Callback {
override def onCompletion(recordMetadata: RecordMetadata, e: Exception): Unit = {
e.printStackTrace()
if (e == null) {
System.out.println("主题=" + recordMetadata.topic() + " 分区=" + recordMetadata.partition())
}
}
})
//同步PRODUCER->缓冲区->BROKER
kafkaProducer.send(new ProducerRecord[String, String]("topic1", "topic1:" + k)).get()
}
// 分区 默认 org.apache.kafka.clients.producer.internals.DefaultPartitioner
// 可切分存多个BROKER
// 负载均衡
// 并行度
//指定分区0
kafkaProducer.send(new ProducerRecord[String, String]("topic1", 0, "", "topic1:"), new Callback {
override def onCompletion(recordMetadata: RecordMetadata, e: Exception): Unit = {
e.printStackTrace()
if (e == null) {
System.out.println("主题=" + recordMetadata.topic() + " 分区=" + recordMetadata.partition())
}
}
})
//指定key分区策略 HASH(KEY)%分区
kafkaProducer.send(new ProducerRecord[String, String]("topic1", "key1", "topic1:"), new Callback {
override def onCompletion(recordMetadata: RecordMetadata, e: Exception): Unit = {
e.printStackTrace()
if (e == null) {
System.out.println("主题=" + recordMetadata.topic() + " 分区=" + recordMetadata.partition())
}
}
})
//默认分区策略 STICKY
for (k <- 1 to 500) {
kafkaProducer.send(new ProducerRecord[String, String]("topic1", "topic1:" + k), new Callback {
override def onCompletion(recordMetadata: RecordMetadata, e: Exception): Unit = {
e.printStackTrace()
if (e == null) {
System.out.println("主题=" + recordMetadata.topic() + " 分区=" + recordMetadata.partition())
}
}
})
Thread.sleep(20)
}
//测试分区器
for (k <- 1 to 50) {
kafkaProducer.send(new ProducerRecord[String, String]("topic1", "分区器:" + k))
}
//ACK ISR队列节点:存储运行的LEAD+FOLLER节点编号 LEAD与FOLLER同步参数replica.lag.time.max.ms
//0 -> 内存中 丢失 1 -> LEAD存储 丢失 -1 -> LEAD+ISR队列节点存储
//AT LEAST ONCE (不丢有重): ACK=-1 + REPLICA>=2 + ISR队列参数(min.insync.replicas)>=2
//AT MOST ONCE (有丢不重): ACK=0
//不丢不重: 幂等 + 事务
//消息判重:<PID/*重启更新*/,分区编号, 序列号/*递增*/>
//事务
//properties.put(ProducerConfig.TRANSACTIONAL_ID_CONFIG,"transactionid1")
kafkaProducer.initTransactions()
kafkaProducer.beginTransaction()
try {
for (k <- 1 to 5) {
kafkaProducer.send(new ProducerRecord[String, String]("topic1", "事务:" + k))
}
kafkaProducer.commitTransaction()
} catch (Exception e) {
kafkaProducer.abortTransaction()
}
//分区内有序
//版本<1.X: 设置max.in.flight.requests.per.connection = 1
//版本>=1.X: 打开幂等 设置max.in.flight.requests.per.connection <= 5
kafkaProducer.close()
}
//定义分区器
class CustomPartitioner extends Partitioner {
override def partition(topic: String, key: Any, keyBytes: Array[Byte], value: Any, valueBytes: Array[Byte], cluster: Cluster): Int = {
if (value.toString.contains("分区器")) {
0
}
else {
1
}
}
override def close(): Unit = {
}
override def configure(map: util.Map[String, _]): Unit = {
}
}
}
更多推荐
已为社区贡献1条内容
所有评论(0)