搭建kafka要注意版本问题,本教程使用的kafka版本是kafka_2.11-0.11.0.2.tgz;首先看下spring-boot链接kafka的使用。

1. 添加pom依赖

org.springframework.boot

spring-boot-starter-web

org.apache.kafka

kafka-clients

0.10.2.0

org.springframework.boot

spring-boot-starter

org.springframework.kafka

spring-kafka

2. application.properties配置

kafka.consumer.zookeeper.connect=zookeeper-ip:2181

kafka.consumer.servers=kafka-ip:9092

kafka.consumer.enable.auto.commit=true

kafka.consumer.session.timeout=6000

kafka.consumer.auto.commit.interval=100

kafka.consumer.auto.offset.reset=latest

kafka.consumer.topic=test

kafka.consumer.group.id=test

kafka.consumer.concurrency=10

kafka.producer.servers=kafka-ip:9092

kafka.producer.retries=0

kafka.producer.batch.size=4096

kafka.producer.linger=1

kafka.producer.buffer.memory=40960

3. 添加kafkaConsumer配置类:

package com.databps.bigdaf.admin.config;

import com.databps.bigdaf.admin.manager.HomePageManager;

import com.databps.bigdaf.admin.vo.HomePageVo;

import org.apache.kafka.clients.consumer.ConsumerConfig;

import org.apache.kafka.common.serialization.StringDeserializer;

import org.springframework.beans.factory.annotation.Autowired;

import org.springframework.beans.factory.annotation.Value;

import org.springframework.context.annotation.Bean;

import org.springframework.context.annotation.Configuration;

import org.springframework.kafka.annotation.EnableKafka;

import org.springframework.kafka.config.ConcurrentKafkaListenerContainerFactory;

import org.springframework.kafka.config.KafkaListenerContainerFactory;

import org.springframework.kafka.core.ConsumerFactory;

import org.springframework.kafka.core.DefaultKafkaConsumerFactory;

import org.springframework.kafka.listener.ConcurrentMessageListenerContainer;

import java.util.HashMap;

import java.util.Map;

/**

* @author haipeng

* @create 17-11-2 上午11:39

*/

@Configuration

@EnableKafka

public class KafkaConsumerConfig {

@Value("${kafka.consumer.servers}")

private String servers;

@Value("${kafka.consumer.enable.auto.commit}")

private boolean enableAutoCommit;

@Value("${kafka.consumer.session.timeout}")

private String sessionTimeout;

@Value("${kafka.consumer.auto.commit.interval}")

private String autoCommitInterval;

@Value("${kafka.consumer.group.id}")

private String groupId;

@Value("${kafka.consumer.auto.offset.reset}")

private String autoOffsetReset;

@Value("${kafka.consumer.concurrency}")

private int concurrency;

@Autowired

private HomePageManager homePageManager;

@Bean

public KafkaListenerContainerFactory> kafkaListenerContainerFactory() {

ConcurrentKafkaListenerContainerFactory factory = new ConcurrentKafkaListenerContainerFactory<>();

factory.setConsumerFactory(consumerFactory());

factory.setConcurrency(concurrency);

factory.getContainerProperties().setPollTimeout(1500);

return factory;

}

public ConsumerFactory consumerFactory() {

return new DefaultKafkaConsumerFactory<>(consumerConfigs());

}

public Map consumerConfigs() {

Map propsMap = new HashMap<>();

// propsMap.put("zookeeper.connect", "master1.hdp.com:2181,master2.hdp.com:2181,slave1.hdp.com:2181");

propsMap.put(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG, servers);

propsMap.put(ConsumerConfig.ENABLE_AUTO_COMMIT_CONFIG, enableAutoCommit);

propsMap.put(ConsumerConfig.AUTO_COMMIT_INTERVAL_MS_CONFIG, autoCommitInterval);

propsMap.put(ConsumerConfig.SESSION_TIMEOUT_MS_CONFIG, sessionTimeout);

propsMap.put(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class);

propsMap.put(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class);

propsMap.put(ConsumerConfig.GROUP_ID_CONFIG, groupId);

propsMap.put(ConsumerConfig.AUTO_OFFSET_RESET_CONFIG, autoOffsetReset);

return propsMap;

}

}

4. 添加kafkaProducer配置类

package com.databps.bigdaf.admin.config;

import java.util.HashMap;

import java.util.Map;

import org.apache.kafka.clients.producer.ProducerConfig;

import org.apache.kafka.common.serialization.StringSerializer;

import org.springframework.beans.factory.annotation.Value;

import org.springframework.context.annotation.Bean;

import org.springframework.context.annotation.Configuration;

import org.springframework.kafka.annotation.EnableKafka;

import org.springframework.kafka.core.DefaultKafkaProducerFactory;

import org.springframework.kafka.core.KafkaTemplate;

import org.springframework.kafka.core.ProducerFactory;

/**

* @author haipeng

* @create 17-11-2 上午11:37

*/

@Configuration

@EnableKafka

public class KafkaProducerConfig {

@Value("${kafka.producer.servers}")

private String servers;

@Value("${kafka.producer.retries}")

private int retries;

@Value("${kafka.producer.batch.size}")

private int batchSize;

@Value("${kafka.producer.linger}")

private int linger;

@Value("${kafka.producer.buffer.memory}")

private int bufferMemory;

public Map producerConfigs() {

Map props = new HashMap<>();

props.put(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG, servers);

props.put(ProducerConfig.RETRIES_CONFIG, retries);

props.put(ProducerConfig.BATCH_SIZE_CONFIG, batchSize);

props.put(ProducerConfig.LINGER_MS_CONFIG, linger);

props.put(ProducerConfig.BUFFER_MEMORY_CONFIG, bufferMemory);

props.put(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG, StringSerializer.class);

props.put(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG, StringSerializer.class);

return props;

}

public ProducerFactory producerFactory() {

return new DefaultKafkaProducerFactory<>(producerConfigs());

}

@Bean

public KafkaTemplate kafkaTemplate() {

return new KafkaTemplate(producerFactory());

}

}

5. 生产者调用过程:

(1)添加kafkaTemplate注入

@Autowired

private KafkaTemplate kafkaTemplate;

(2)将要传输数据转化为json发送,本实例通过Gson进行转换

AuditVo auditVo=new AuditVo();

long sortData=Long.parseLong(DateUtils.getNowDateTime());

auditVo.setId("sdfdf");

auditVo.setCmpyId(cmpyId);

auditVo.setUser("whp");

auditVo.setPluginIp("192.168.1.53");

auditVo.setAccessTime(DateUtils.getNowDateTime());

auditVo.setAccessType("WRITE");

auditVo.setAction("write");

auditVo.setAccessResult("success");

auditVo.setServiceType("hbase");

auditVo.setResourcePath("/whp");

Gson gson=new Gson();

kafkaTemplate.send("test", gson.toJson(auditVo));

(3)消费者类只要在方法上添加注解就可以了:@KafkaListener(topics = {"test"})

@Component

public class KafkaConsumer {

@KafkaListener(topics = {"test"})

public void processMessage(String content) {

System.out.println("消息被消费"+content);

}

}

使用原生kafka的java API进行kafka测试方法如下:

1. pom文件引入依赖

1.0.1

2.11

0.10.0.0

org.apache.kafka

kafka_${scala.version}

${kafka.version}

org.apache.storm

storm-kafka

${mapr-storm-kafka.version}

ch.qos.logback

logback-classic

log4j

log4j

org.slf4j

log4j-over-slf4j

2. 添加测试类

package com.example.demo.kafka;

import java.util.Arrays;

import java.util.HashMap;

import java.util.List;

import java.util.Map;

import java.util.Properties;

import kafka.consumer.ConsumerConfig;

import kafka.consumer.ConsumerIterator;

import kafka.consumer.KafkaStream;

import kafka.javaapi.consumer.ConsumerConnector;

import kafka.serializer.StringDecoder;

import kafka.utils.VerifiableProperties;

import org.apache.kafka.clients.consumer.ConsumerRecord;

import org.apache.kafka.clients.consumer.ConsumerRecords;

import org.apache.kafka.clients.producer.KafkaProducer;

import org.apache.kafka.clients.producer.Producer;

import org.apache.kafka.clients.producer.ProducerRecord;

import org.junit.Test;

public class kafkaConsumer {

private String topic="test";

@Test

public void Producer(){

Properties props = new Properties();

props.put("bootstrap.servers", "master1.hdp.com:6667");

props.put("acks", "all"); //ack方式,all,会等所有的commit最慢的方式

props.put("retries", 0); //失败是否重试,设置会有可能产生重复数据

props.put("batch.size", 16384); //对于每个partition的batch buffer大小

props.put("linger.ms", 1); //等多久,如果buffer没满,比如设为1,即消息发送会多1ms的延迟,如果buffer没满

props.put("buffer.memory", 33554432); //整个producer可以用于buffer的内存大小

props.put("key.serializer", "org.apache.kafka.common.serialization.StringSerializer");

props.put("value.serializer", "org.apache.kafka.common.serialization.StringSerializer");

Producer producer = new KafkaProducer<>(props);

producer.send(new ProducerRecord(topic, "", Integer.toString(1)));

producer.close();

}

private ConsumerConnector consumer;

@Test

public void kafkaConsumer() {

Properties props = new Properties();

// zookeeper 配置

props.put("zookeeper.connect", "master1.hdp.com:2181,master2.hdp.com:2181,slave1.hdp.com:2181");

// group 代表一个消费组

props.put("group.id", "jd-group");

// zk连接超时

props.put("zookeeper.session.timeout.ms", "4000");

props.put("zookeeper.sync.time.ms", "200");

props.put("auto.commit.interval.ms", "1000");

props.put("auto.offset.reset", "largest");

// 序列化类

props.put("serializer.class", "kafka.serializer.StringEncoder");

ConsumerConfig config = new ConsumerConfig(props);

consumer = (ConsumerConnector) kafka.consumer.Consumer.createJavaConsumerConnector(config);

Map topicCountMap = new HashMap();

topicCountMap.put("test", new Integer(1));

StringDecoder keyDecoder = new StringDecoder(new VerifiableProperties());

StringDecoder valueDecoder = new StringDecoder(

new VerifiableProperties());

Map>> consumerMap =

consumer.createMessageStreams(topicCountMap, keyDecoder, valueDecoder);

KafkaStream stream = consumerMap.get(

"test").get(0);

ConsumerIterator it = stream.iterator();

while (it.hasNext())

System.out.println(it.next().message());

}

}

代码请参考个人git账号:https://github.com/whpHarper/springboot-kafka

Logo

Kafka开源项目指南提供详尽教程,助开发者掌握其架构、配置和使用,实现高效数据流管理和实时处理。它高性能、可扩展,适合日志收集和实时数据处理,通过持久化保障数据安全,是企业大数据生态系统的核心。

更多推荐