Kafka使用工具封装
maven依赖<!--kafka --><dependency><groupId>org.apache.kafka</groupId><artifactId>kafka_2.10</artifactId><version>0.10.0.1</version></depend
·
maven依赖
<!--kafka -->
<dependency>
<groupId>org.apache.kafka</groupId>
<artifactId>kafka_2.10</artifactId>
<version>0.10.0.1</version>
</dependency>
<!-- log4j -->
<dependency>
<groupId>log4j</groupId>
<artifactId>log4j</artifactId>
<version>1.2.17</version>
</dependency>
log4j.properties
log4j.rootLogger=INFO,R,stdout
log4j.appender.stdout=org.apache.log4j.ConsoleAppender
log4j.appender.stdout.layout=org.apache.log4j.PatternLayout
log4j.appender.stdout.layout.ConversionPattern=%-d{yyyy-MM-dd HH:mm:ss} [%t] [%c]-[%p] %m%n
log4j.appender.R=org.apache.log4j.DailyRollingFileAppender
log4j.appender.R.File=./logs/info.log
#log4j.appender.R.File=./logs/compSearcher.log
log4j.appender.R.DatePattern ='.'yyyy-MM-dd
log4j.appender.R.layout=org.apache.log4j.PatternLayout
log4j.appender.R.layout.ConversionPattern=%-d{yyyy-MM-dd HH:mm:ss} [%c]-[%p] %m%n
KafkaUtils封装
package com.wg.utils;
import kafka.consumer.Consumer;
import kafka.consumer.ConsumerIterator;
import kafka.consumer.KafkaStream;
import kafka.javaapi.consumer.ConsumerConnector;
import kafka.producer.KeyedMessage;
import org.apache.kafka.clients.consumer.ConsumerConfig;
import org.apache.kafka.clients.consumer.ConsumerRecord;
import org.apache.kafka.clients.consumer.ConsumerRecords;
import org.apache.kafka.clients.consumer.KafkaConsumer;
import org.apache.kafka.clients.producer.KafkaProducer;
import org.apache.kafka.clients.producer.Producer;
import org.apache.kafka.clients.producer.ProducerConfig;
import org.apache.kafka.clients.producer.ProducerRecord;
import java.io.FileReader;
import java.io.IOException;
import java.util.*;
public class KafkaUtils {
private static Producer<String,String> producer =null;
private static String broker_ip="xx.xx.10.182:9092";
private static String zk_ip="xx.xx.10.182:2181";
public static String group_id="test";
static {
Properties prop = loadOutProp("./conf/kafka.properties");
broker_ip= prop.getProperty("broker.list");
zk_ip= prop.getProperty("zookeeper.connect");
group_id=prop.getProperty("group.id");
}
/**
* 实际上调用send方法并不能保证消息被成功写入到kafka。为了实现同步的发送消息,
* 并监控每条消息是否发送成功,需要对每次调用send方法后返回的Future对象调用get方法。
* (get方法的调用会导致当前线程block,直到发送结果返回,不管是成功还是失败)
* @param topic
* @param key
* @param value
* @return
*/
public static boolean send(String topic,String key,String value){
ProducerRecord<String,String> r = new ProducerRecord<String,String>(topic,key,value);
try {
if(producer==null){
producer = new KafkaProducer<String,String>(getProducerProp());
}
producer.send(r).get();
System.out.println("send to topic "+topic);
return true;
} catch (Exception e) {
e.printStackTrace();
return false;
}
}
private static Properties getProducerProp() {
// 构造一个java.util.Properties对象
Properties props = new Properties();
// 指定bootstrap.servers属性。必填,无默认值。用于创建向kafka broker服务器的连接。
props.put("bootstrap.servers", broker_ip);
// 指定key.serializer属性。必填,无默认值。被发送到broker端的任何消息的格式都必须是字节数组。
// 因此消息的各个组件都必须首先做序列化,然后才能发送到broker。该参数就是为消息的key做序列化只用的。
props.put("key.serializer", "org.apache.kafka.common.serialization.StringSerializer");
// 指定value.serializer属性。必填,无默认值。和key.serializer类似。此被用来对消息体即消息value部分做序列化。
// 将消息value部分转换成字节数组。
props.put("value.serializer", "org.apache.kafka.common.serialization.StringSerializer");
//acks参数用于控制producer生产消息的持久性(durability)。参数可选值,0、1、-1(all)。
props.put("acks", "-1");
//props.put(ProducerConfig.ACKS_CONFIG, "1");
//在producer内部自动实现了消息重新发送。默认值0代表不进行重试。
props.put("retries", 3);
//props.put(ProducerConfig.RETRIES_CONFIG, 3);
//调优producer吞吐量和延时性能指标都有非常重要作用。默认值16384即16KB。
props.put("batch.size", 16384);
//props.put(ProducerConfig.BATCH_SIZE_CONFIG, 323840);
//控制消息发送延时行为的,该参数默认值是0。表示消息需要被立即发送,无须关系batch是否被填满。
props.put("linger.ms", 10);
//props.put(ProducerConfig.LINGER_MS_CONFIG, 10);
//指定了producer端用于缓存消息的缓冲区的大小,单位是字节,默认值是33554432即32M。
props.put("buffer.memory", 33554432);
//props.put(ProducerConfig.BUFFER_MEMORY_CONFIG, 33554432);
props.put("max.block.ms", 3000);
//props.put(ProducerConfig.MAX_BLOCK_MS_CONFIG, 3000);
//设置producer段是否压缩消息,默认值是none。即不压缩消息。GZIP、Snappy、LZ4
//props.put("compression.type", "none");
//props.put(ProducerConfig.COMPRESSION_TYPE_CONFIG, "none");
//该参数用于控制producer发送请求的大小。producer端能够发送的最大消息大小。
//props.put("max.request.size", 10485760);
//props.put(ProducerConfig.MAX_REQUEST_SIZE_CONFIG, 10485760);
//producer发送请求给broker后,broker需要在规定时间范围内将处理结果返还给producer。默认30s
//props.put("request.timeout.ms", 60000);
//props.put(ProducerConfig.REQUEST_TIMEOUT_MS_CONFIG, 60000);
// 使用上面创建的Properties对象构造KafkaProducer对象
//如果采用这种方式创建producer,那么就不需要显示的在Properties中指定key和value序列化类了呢。
// Serializer<String> keySerializer = new StringSerializer();
// Serializer<String> valueSerializer = new StringSerializer();
// Producer<String, String> producer = new KafkaProducer<String, String>(props,
// keySerializer, valueSerializer);
return props;
}
/**
* 不能容忍回写到kafka的日志丢失,因此必须使用同步的方式发送消息的配置
* @return
*/
private Producer<Integer, String> initKafkaProducer(){
Properties props = new Properties();
props.put(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG, broker_ip);//格式:host1:port1,host2:port2,....
props.put(ProducerConfig.BATCH_SIZE_CONFIG, 0);//a batch size of zero will disable batching entirely
props.put(ProducerConfig.LINGER_MS_CONFIG, 0);//send message without delay
props.put(ProducerConfig.ACKS_CONFIG, "1");//对应partition的leader写到本地后即返回成功。极端情况下,可能导致失败
props.put("key.serializer", "org.apache.kafka.common.serialization.IntegerSerializer");
props.put("value.serializer", "org.apache.kafka.common.serialization.StringSerializer");
Producer<Integer, String> kafkaProducer = new KafkaProducer<Integer, String>(props);
return kafkaProducer;
}
private static Properties loadInProp(String fileName) {
Properties properties = new Properties();
try {
properties.load(MyIOUtils.class.getClassLoader().getResourceAsStream(fileName));
} catch (IOException e) {
e.printStackTrace();
}
return properties;
}
private static Properties loadOutProp(String fileName) {
Properties properties = new Properties();
try {
properties.load(new FileReader(fileName));
} catch (IOException e) {
e.printStackTrace();
}
return properties;
}
private static Properties getConsumerProp(){
Properties props = new Properties();
//bootstrap.servers 必要
props.put(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG,broker_ip);
//group id
props.put(ConsumerConfig.GROUP_ID_CONFIG, group_id);
//是否后台自动提交offset 到kafka
props.put(ConsumerConfig.ENABLE_AUTO_COMMIT_CONFIG, "true");
//消费者偏移自动提交到Kafka的频率(以毫秒为单位enable.auto.commit)设置为true
props.put(ConsumerConfig.AUTO_COMMIT_INTERVAL_MS_CONFIG, "1000");
//故障检测,心跳检测机制 的间隔时间,,在该值范围内,没有接收到心跳,则会删除该消费者
//并启动再平衡(rebanlance),值必须在group.min.session.timeout 和 group.max.session.timeout.ms之间
props.put(ConsumerConfig.SESSION_TIMEOUT_MS_CONFIG, "30000");
//key - value 的序列化类
props.put(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG, "org.apache.kafka.common.serialization.StringDeserializer");
props.put(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG, "org.apache.kafka.common.serialization.StringDeserializer");
//从头开始消费
props.put(ConsumerConfig.AUTO_OFFSET_RESET_CONFIG, "earliest");
//从最新数据开始消费
// props.put(ConsumerConfig.AUTO_OFFSET_RESET_CONFIG, "latest");
//默认一次poll 多少条消息,要合理设置这个值
//kafka如果超过 SESSION_TIMEOUT_MS_CONFIG 没有收到消费者的心跳,则会把消费者踢出消费组,进行rebalance,把分区分配给其他消费者。
//如果没有设置好,就会出现 消费者不断被剔除,不断重新消费的情况
props.put(ConsumerConfig.MAX_POLL_RECORDS_CONFIG, props.getProperty("max.poll.records","20"));
return props;
}
private static kafka.consumer.ConsumerConfig initJavaConsumerConfig(String group_id) {
Properties props = new Properties();
props.put("zookeeper.connect", zk_ip);
props.put("group.id", group_id );
props.put("auto.offset.reset", "smallest");
props.put("zookeeper.session.timeout.ms", "400000");
props.put("zookeeper.sync.time.ms", "200");
props.put("auto.commit.interval.ms", "1000");
props.put("fetch.message.max.bytes", "100000000");
props.put("max.poll.records","5000");
props.put("rebalance.backoff.ms","8000");
props.put("rebalance.max.retries","7");
kafka.consumer.ConsumerConfig consumerConfig = new kafka.consumer.ConsumerConfig(props);
return consumerConfig;
}
public static KafkaConsumer getConsumer(){
KafkaConsumer<String, String> consumer = new KafkaConsumer<String, String>(getConsumerProp());
return consumer;
}
/**
* 适用于多线程消费
* @param topic
* @param thread_num 和 topic的 partition 保持一直
* @return
*/
public static List<KafkaStream<byte[], byte[]>> getConsumerStreams(String topic,String group_Id,int thread_num) {
Map<String, Integer> topicCountMap = new HashMap<String, Integer>();
ConsumerConnector consumerConnector = getConsumerConnector(group_Id);
topicCountMap.put(topic,thread_num);
Map<String, List<KafkaStream<byte[], byte[]>>> consumerMap = consumerConnector
.createMessageStreams(topicCountMap);
List<KafkaStream<byte[], byte[]>> kafkaStreams = consumerMap.get(topic);
return kafkaStreams;
}
/**
* 单线程消费
* @param topic
* @return
*/
public static KafkaStream<byte[], byte[]> getConsumerStream(String topic,String group_id) {
Map<String, Integer> topicCountMap = new HashMap<String, Integer>();
ConsumerConnector consumerConnector = getConsumerConnector(group_id);
topicCountMap.put(topic,1);
Map<String, List<KafkaStream<byte[], byte[]>>> consumerMap = consumerConnector
.createMessageStreams(topicCountMap);
KafkaStream<byte[], byte[]> stream = consumerMap.get(topic).get(0);
return stream;
}
public static void main(String[] args) {
KafkaStream<byte[], byte[]> stream = getConsumerStream("test", "test0");
testConsumerSream(stream);
}
private static ConsumerConnector getConsumerConnector(String group_id) {
kafka.consumer.ConsumerConfig consumerProp = initJavaConsumerConfig(group_id);
ConsumerConnector javaConsumerConnector = Consumer
.createJavaConsumerConnector(consumerProp);
return javaConsumerConnector;
}
/**
* 一次取多条 使用样例
* @param topic
*/
public static void testConsumer(String topic){
KafkaConsumer consumer = getConsumer();
consumer.subscribe(Arrays.asList(topic));
// consumer.subscribe(Collections.singletonList(this.topic));
while (true) {
//consumer.poll()
long t1 = System.currentTimeMillis();
ConsumerRecords<String, String> records = consumer.poll(2000);
for (ConsumerRecord<String, String> record : records) {
System.out.println("Received message: (" + record.key() + ", " + record.value()
+ ") offset " + record.offset()
+ " partition " + record.partition() + ")");
}
long t2 = System.currentTimeMillis();
System.out.println("one poll cost==="+(t2-t1)+" ms ");
}
}
public static void testConsumerSream(KafkaStream<byte[], byte[]> stream){
ConsumerIterator<byte[], byte[]> it = stream.iterator();
while (it.hasNext()) {
String str = new String(it.next().message());
System.out.println("str======>>>>"+str);
}
}
/** @deprecated */
private static kafka.javaapi.producer.Producer<String, String> getOldProducer(){
Properties props = new Properties();
props.put("metadata.broker.list", broker_ip);
props.put("serializer.class", "kafka.serializer.StringEncoder");
props.put("key.serializer.class", "kafka.serializer.StringEncoder");
props.put("producer.type", "sync");
props.put("queue.buffering.max.ms", "10000");
props.put("request.timeout.ms", "10000");
// 触发acknowledgement机制,否则是fire and forget,可能会引起数据丢失
// 值为0,1,-1,可以参考
props.put("request.required.acks", "1");
kafka.producer.ProducerConfig producerConfig = new kafka.producer.ProducerConfig(props);
kafka.javaapi.producer.Producer<String, String> producer = new kafka.javaapi.producer.Producer<>(producerConfig);
return producer;
}
/**
* 向kafka中插入数据
*@deprecated
* @param topic
* @param input
* @return 这个方法的问题是发送失败时候不会报错
*/
private static kafka.javaapi.producer.Producer<String, String> oldProducer=null;
public static void oldSend(String topic, String input) {
if(oldProducer==null){
oldProducer = getOldProducer();
}
KeyedMessage<String, String> message = new KeyedMessage<String, String>(
topic, input);
//为了能够随机发送到不同partition
/* String key = UUID.randomUUID().toString().substring(0, 5);
KeyedMessage<String, String> message = new KeyedMessage<>(topic, key, input);*/
oldProducer.send(message);
}
public static String getKey(int partionNum){
return UUID.randomUUID().toString().substring(0, partionNum);
}
}
引用
Kafka Producer API的使用
更多推荐
已为社区贡献2条内容
所有评论(0)