springboot配置kafka与原生kafka配置
搭建kafka要注意版本问题,本教程使用的kafka版本是kafka_2.11-0.11.0.2.tgz;首先看下spring-boot链接kafka的使用。 1. 添加pom依赖org.springframework.bootspring-boot-starter-web1.5.8.RELEASE-->org.apache.kafkakafka
·
搭建kafka要注意版本问题,本教程使用的kafka版本是kafka_2.11-0.11.0.2.tgz;首先看下spring-boot链接kafka的使用。
1. 添加pom依赖
<dependencies>
<dependency>
<groupId>org.springframework.boot</groupId>
<artifactId>spring-boot-starter-web</artifactId>
<!--<version>1.5.8.RELEASE</version>-->
</dependency>
<dependency>
<groupId>org.apache.kafka</groupId>
<artifactId>kafka-clients</artifactId>
<version>0.10.2.0</version>
</dependency>
<dependency>
<groupId>org.springframework.boot</groupId>
<artifactId>spring-boot-starter</artifactId>
</dependency>
<dependency>
<groupId>org.springframework.kafka</groupId>
<artifactId>spring-kafka</artifactId>
</dependency>
</dependencies>
2. application.properties配置
kafka.consumer.zookeeper.connect=zookeeper-ip:2181
kafka.consumer.servers=kafka-ip:9092
kafka.consumer.enable.auto.commit=true
kafka.consumer.session.timeout=6000
kafka.consumer.auto.commit.interval=100
kafka.consumer.auto.offset.reset=latest
kafka.consumer.topic=test
kafka.consumer.group.id=test
kafka.consumer.concurrency=10
kafka.producer.servers=kafka-ip:9092
kafka.producer.retries=0
kafka.producer.batch.size=4096
kafka.producer.linger=1
kafka.producer.buffer.memory=40960
3. 添加kafkaConsumer配置类:
package com.databps.bigdaf.admin.config;
import com.databps.bigdaf.admin.manager.HomePageManager;
import com.databps.bigdaf.admin.vo.HomePageVo;
import org.apache.kafka.clients.consumer.ConsumerConfig;
import org.apache.kafka.common.serialization.StringDeserializer;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.beans.factory.annotation.Value;
import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Configuration;
import org.springframework.kafka.annotation.EnableKafka;
import org.springframework.kafka.config.ConcurrentKafkaListenerContainerFactory;
import org.springframework.kafka.config.KafkaListenerContainerFactory;
import org.springframework.kafka.core.ConsumerFactory;
import org.springframework.kafka.core.DefaultKafkaConsumerFactory;
import org.springframework.kafka.listener.ConcurrentMessageListenerContainer;
import java.util.HashMap;
import java.util.Map;
/**
* @author haipeng
* @create 17-11-2 上午11:39
*/
@Configuration
@EnableKafka
public class KafkaConsumerConfig {
@Value("${kafka.consumer.servers}")
private String servers;
@Value("${kafka.consumer.enable.auto.commit}")
private boolean enableAutoCommit;
@Value("${kafka.consumer.session.timeout}")
private String sessionTimeout;
@Value("${kafka.consumer.auto.commit.interval}")
private String autoCommitInterval;
@Value("${kafka.consumer.group.id}")
private String groupId;
@Value("${kafka.consumer.auto.offset.reset}")
private String autoOffsetReset;
@Value("${kafka.consumer.concurrency}")
private int concurrency;
@Autowired
private HomePageManager homePageManager;
@Bean
public KafkaListenerContainerFactory<ConcurrentMessageListenerContainer<String, String>> kafkaListenerContainerFactory() {
ConcurrentKafkaListenerContainerFactory<String, String> factory = new ConcurrentKafkaListenerContainerFactory<>();
factory.setConsumerFactory(consumerFactory());
factory.setConcurrency(concurrency);
factory.getContainerProperties().setPollTimeout(1500);
return factory;
}
public ConsumerFactory<String, String> consumerFactory() {
return new DefaultKafkaConsumerFactory<>(consumerConfigs());
}
public Map<String, Object> consumerConfigs() {
Map<String, Object> propsMap = new HashMap<>();
// propsMap.put("zookeeper.connect", "master1.hdp.com:2181,master2.hdp.com:2181,slave1.hdp.com:2181");
propsMap.put(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG, servers);
propsMap.put(ConsumerConfig.ENABLE_AUTO_COMMIT_CONFIG, enableAutoCommit);
propsMap.put(ConsumerConfig.AUTO_COMMIT_INTERVAL_MS_CONFIG, autoCommitInterval);
propsMap.put(ConsumerConfig.SESSION_TIMEOUT_MS_CONFIG, sessionTimeout);
propsMap.put(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class);
propsMap.put(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class);
propsMap.put(ConsumerConfig.GROUP_ID_CONFIG, groupId);
propsMap.put(ConsumerConfig.AUTO_OFFSET_RESET_CONFIG, autoOffsetReset);
return propsMap;
}
}
4. 添加kafkaProducer配置类
package com.databps.bigdaf.admin.config;
import java.util.HashMap;
import java.util.Map;
import org.apache.kafka.clients.producer.ProducerConfig;
import org.apache.kafka.common.serialization.StringSerializer;
import org.springframework.beans.factory.annotation.Value;
import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Configuration;
import org.springframework.kafka.annotation.EnableKafka;
import org.springframework.kafka.core.DefaultKafkaProducerFactory;
import org.springframework.kafka.core.KafkaTemplate;
import org.springframework.kafka.core.ProducerFactory;
/**
* @author haipeng
* @create 17-11-2 上午11:37
*/
@Configuration
@EnableKafka
public class KafkaProducerConfig {
@Value("${kafka.producer.servers}")
private String servers;
@Value("${kafka.producer.retries}")
private int retries;
@Value("${kafka.producer.batch.size}")
private int batchSize;
@Value("${kafka.producer.linger}")
private int linger;
@Value("${kafka.producer.buffer.memory}")
private int bufferMemory;
public Map<String, Object> producerConfigs() {
Map<String, Object> props = new HashMap<>();
props.put(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG, servers);
props.put(ProducerConfig.RETRIES_CONFIG, retries);
props.put(ProducerConfig.BATCH_SIZE_CONFIG, batchSize);
props.put(ProducerConfig.LINGER_MS_CONFIG, linger);
props.put(ProducerConfig.BUFFER_MEMORY_CONFIG, bufferMemory);
props.put(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG, StringSerializer.class);
props.put(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG, StringSerializer.class);
return props;
}
public ProducerFactory<String, String> producerFactory() {
return new DefaultKafkaProducerFactory<>(producerConfigs());
}
@Bean
public KafkaTemplate<String, String> kafkaTemplate() {
return new KafkaTemplate<String, String>(producerFactory());
}
}
5. 生产者调用过程:
(1)添加kafkaTemplate注入
@Autowired
private KafkaTemplate kafkaTemplate;
(2)将要传输数据转化为json发送,本实例通过Gson进行转换
AuditVo auditVo=new AuditVo();
long sortData=Long.parseLong(DateUtils.getNowDateTime());
auditVo.setId("sdfdf");
auditVo.setCmpyId(cmpyId);
auditVo.setUser("whp");
auditVo.setPluginIp("192.168.1.53");
auditVo.setAccessTime(DateUtils.getNowDateTime());
auditVo.setAccessType("WRITE");
auditVo.setAction("write");
auditVo.setAccessResult("success");
auditVo.setServiceType("hbase");
auditVo.setResourcePath("/whp");
Gson gson=new Gson();
kafkaTemplate.send("test", gson.toJson(auditVo));
(3)消费者类只要在方法上添加注解就可以了:@KafkaListener(topics = {"test"})
@Component
public class KafkaConsumer {
@KafkaListener(topics = {"test"})
public void processMessage(String content) {
System.out.println("消息被消费"+content);
}
}
使用原生kafka的java API进行kafka测试方法如下:
1. pom文件引入依赖
<mapr-storm-kafka.version>1.0.1</mapr-storm-kafka.version>
<scala.version>2.11</scala.version>
<kafka.version>0.10.0.0</kafka.version>
<dependency>
<groupId>org.apache.kafka</groupId>
<artifactId>kafka_${scala.version}</artifactId>
<version>${kafka.version}</version>
</dependency>
<dependency>
<groupId>org.apache.storm</groupId>
<artifactId>storm-kafka</artifactId>
<version>${mapr-storm-kafka.version}</version>
<exclusions>
<exclusion>
<groupId>ch.qos.logback</groupId>
<artifactId>logback-classic</artifactId>
</exclusion>
<exclusion>
<groupId>log4j</groupId>
<artifactId>log4j</artifactId>
</exclusion>
<exclusion>
<groupId>org.slf4j</groupId>
<artifactId>log4j-over-slf4j</artifactId>
</exclusion>
</exclusions>
</dependency>
2. 添加测试类
package com.example.demo.kafka;
import java.util.Arrays;
import java.util.HashMap;
import java.util.List;
import java.util.Map;
import java.util.Properties;
import kafka.consumer.ConsumerConfig;
import kafka.consumer.ConsumerIterator;
import kafka.consumer.KafkaStream;
import kafka.javaapi.consumer.ConsumerConnector;
import kafka.serializer.StringDecoder;
import kafka.utils.VerifiableProperties;
import org.apache.kafka.clients.consumer.ConsumerRecord;
import org.apache.kafka.clients.consumer.ConsumerRecords;
import org.apache.kafka.clients.producer.KafkaProducer;
import org.apache.kafka.clients.producer.Producer;
import org.apache.kafka.clients.producer.ProducerRecord;
import org.junit.Test;
public class kafkaConsumer {
private String topic="test";
@Test
public void Producer(){
Properties props = new Properties();
props.put("bootstrap.servers", "master1.hdp.com:6667");
props.put("acks", "all"); //ack方式,all,会等所有的commit最慢的方式
props.put("retries", 0); //失败是否重试,设置会有可能产生重复数据
props.put("batch.size", 16384); //对于每个partition的batch buffer大小
props.put("linger.ms", 1); //等多久,如果buffer没满,比如设为1,即消息发送会多1ms的延迟,如果buffer没满
props.put("buffer.memory", 33554432); //整个producer可以用于buffer的内存大小
props.put("key.serializer", "org.apache.kafka.common.serialization.StringSerializer");
props.put("value.serializer", "org.apache.kafka.common.serialization.StringSerializer");
Producer<String, String> producer = new KafkaProducer<>(props);
producer.send(new ProducerRecord<String, String>(topic, "", Integer.toString(1)));
producer.close();
}
private ConsumerConnector consumer;
@Test
public void kafkaConsumer() {
Properties props = new Properties();
// zookeeper 配置
props.put("zookeeper.connect", "master1.hdp.com:2181,master2.hdp.com:2181,slave1.hdp.com:2181");
// group 代表一个消费组
props.put("group.id", "jd-group");
// zk连接超时
props.put("zookeeper.session.timeout.ms", "4000");
props.put("zookeeper.sync.time.ms", "200");
props.put("auto.commit.interval.ms", "1000");
props.put("auto.offset.reset", "largest");
// 序列化类
props.put("serializer.class", "kafka.serializer.StringEncoder");
ConsumerConfig config = new ConsumerConfig(props);
consumer = (ConsumerConnector) kafka.consumer.Consumer.createJavaConsumerConnector(config);
Map<String, Integer> topicCountMap = new HashMap<String, Integer>();
topicCountMap.put("test", new Integer(1));
StringDecoder keyDecoder = new StringDecoder(new VerifiableProperties());
StringDecoder valueDecoder = new StringDecoder(
new VerifiableProperties());
Map<String, List<KafkaStream<String, String>>> consumerMap =
consumer.createMessageStreams(topicCountMap, keyDecoder, valueDecoder);
KafkaStream<String, String> stream = consumerMap.get(
"test").get(0);
ConsumerIterator<String, String> it = stream.iterator();
while (it.hasNext())
System.out.println(it.next().message());
}
}
代码请参考个人git账号:https://github.com/whpHarper/springboot-kafka
更多推荐
已为社区贡献3条内容
所有评论(0)