Spring data kafka操作kafka消息的发送和订阅
本项目是在Spring Boot的基础上构建的,笔者使用的是Spring Boot 1.5.8版本.1.在项目的pom.xml文件中引入如下依赖项: org.apache.kafka kafka-clients 0.10.2.0 org.springframework.kafka spring-kafka 1.2.0.RE
·
本项目是在Spring Boot的基础上构建的,笔者使用的是Spring Boot 1.5.8版本.
1.在项目的pom.xml文件中引入如下依赖项:
<dependency>
<groupId>org.apache.kafka</groupId>
<artifactId>kafka-clients</artifactId>
<version>0.10.2.0</version>
</dependency>
<dependency>
<groupId>org.springframework.kafka</groupId>
<artifactId>spring-kafka</artifactId>
<version>1.2.0.RELEASE</version>
</dependency>
备注:最好依赖的kafka jar和你的kafka消息中间件版本保持一致,否则会有意想不到的问题出现。
2.在application.properties文件中加入kafka所需要的配置信息
## kafka相关配置信息
## kafka集群地址列表
kafka.bootstrap-servers=192.168.74.80:9092,192.168.74.81:9092
## kafka消息主题
kafka.dataacquisition.topic=dataacquisition
3.Producer端配置
3.1.KafkaProducerConfig
@Configuration
@EnableKafka
public class KafkaProducerConfig {
/**
* The Bootstrap servers.
*/
@Value("${kafka.bootstrap-servers}")
private String bootstrapServers;
/**
* The Topic.
*/
@Value("${kafka.dataacquisition.topic}")
private String topic;
/**
* Gets topic.
*
* @return the topic
*/
public String getTopic() {
return topic;
}
/**
* Producer factory producer factory.
*
* @return the producer factory
*/
@Bean
public ProducerFactory<String, String> producerFactory() {
Map<String, Object> configProps = new HashMap<>();
configProps.put(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG, bootstrapServers);
configProps.put(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG, StringSerializer.class);
configProps.put(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG, StringSerializer.class);
return new DefaultKafkaProducerFactory<>(configProps);
}
/**
* Kafka template kafka template.
*
* @return the kafka template
*/
@Bean
public KafkaTemplate<String, String> kafkaTemplate() {
return new KafkaTemplate<>(producerFactory());
}
}
3.2.Producer
@Component
public class Producer {
/**
* The constant logger.
*/
private static final Logger logger = LogManager.getLogger(Producer.class);
/**
* The Kafka template.
*/
@Autowired
private KafkaTemplate<String, String> kafkaTemplate;
/**
* Send.
*
* @param topic the topic
* @param message the message
*/
public void send(String topic , String message) {
logger.debug("发送消息到kafka消息系统, message:" + message);
kafkaTemplate.send(topic, message);
logger.debug("发送消息到kafka消息系统结束");
}
}
4.1.KafkaConsumerConfig
@Configuration
@EnableKafka
public class KafkaConsumerConfig {
/**
* The Bootstrap servers.
*/
@Value("${kafka.bootstrap-servers}")
private String bootstrapServers;
/**
* The Topic.
*/
@Value("${kafka.dataacquisition.topic}")
private String topic;
/**
* Gets topic.
*
* @return the topic
*/
public String getTopic() {
return topic;
}
/**
* Consumer factory consumer factory.
*
* @return the consumer factory
*/
@Bean
public ConsumerFactory<String, String> consumerFactory() {
Map<String, Object> configProps = new HashMap<>();
configProps.put(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG, bootstrapServers);
configProps.put(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class);
configProps.put(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class);
configProps.put(ConsumerConfig.GROUP_ID_CONFIG, topic);
return new DefaultKafkaConsumerFactory<>(configProps);
}
/**
* Kafka listener container factory concurrent kafka listener container factory.
*
* @return the concurrent kafka listener container factory
*/
@Bean
public ConcurrentKafkaListenerContainerFactory<String, String> kafkaListenerContainerFactory() {
ConcurrentKafkaListenerContainerFactory<String, String> factory =
new ConcurrentKafkaListenerContainerFactory<>();
factory.setConsumerFactory(consumerFactory());
return factory;
}
}
4.2.Consumer
@Component
public class Consumer {
/**
* The constant logger.
*/
private static final Logger logger = LogManager.getLogger(Consumer.class);
/**
* The Data handle service.
*/
@Autowired
private DataHandleService dataHandleService;
/**
* Receive.
*
* @param message the message
*/
@KafkaListener(topics = "${kafka.dataacquisition.topic}")
public void receive(String message) {
logger.info("接收到kafka消息系统的message:" + message);
Merchandise merchandise = JsonUtils.jsonToObject(message, Merchandise.class);
dataHandleService.handle(merchandise);
}
}
更多推荐
已为社区贡献1条内容
所有评论(0)