spring cloud初学者-spring-kafka
上一篇博客我们讲了如何安装kafka,这一章我们就讲一下如何用springcloud来连接kafka并发布消息和获取消息。创建程序pom文件<?xml version="1.0" encoding="UTF-8"?><project xmlns="http://maven.apache.org/POM/4.0.0"xmlns:xsi="http://www.w3.org/200
·
上一篇博客我们讲了如何安装kafka,这一章我们就讲一下如何用springcloud来连接kafka并发布消息和获取消息。
创建程序
pom文件
<?xml version="1.0" encoding="UTF-8"?>
<project xmlns="http://maven.apache.org/POM/4.0.0"
xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd">
<modelVersion>4.0.0</modelVersion>
<groupId>com.mhb.kafka</groupId>
<artifactId>kafka-first</artifactId>
<version>1.0-SNAPSHOT</version>
<parent>
<groupId>org.springframework.boot</groupId>
<artifactId>spring-boot-starter-parent</artifactId>
<version>1.5.3.RELEASE</version>
</parent>
<properties>
<java.version>1.8</java.version>
<spring-shell.version>1.0.0.RELEASE</spring-shell.version>
</properties>
<dependencies>
<dependency>
<groupId>org.springframework.boot</groupId>
<artifactId>spring-boot-starter</artifactId>
</dependency>
<dependency>
<groupId>org.springframework.kafka</groupId>
<artifactId>spring-kafka</artifactId>
<version>1.1.1.RELEASE</version>
</dependency>
</dependencies>
<build>
<plugins>
<plugin>
<groupId>org.springframework.boot</groupId>
<artifactId>spring-boot-maven-plugin</artifactId>
</plugin>
</plugins>
</build>
<repositories>
<repository>
<id>spring-releases</id>
<url>https://repo.spring.io/libs-release</url>
</repository>
</repositories>
</project>
spring-boot-maven-plugin非常的强大,他提供了很多方便的功能。
- 它收集类路径上的所有jar,并构建一个可运行的“über-jar”,这样可以更方便地执行和传输您的服务。
- 他会定位public static void main()方法来标记为可运行类。
- 他会为你默认选择jar包的版本号。
创建生产者
import org.apache.kafka.clients.producer.ProducerConfig;
import org.apache.kafka.common.serialization.StringSerializer;
import org.springframework.beans.factory.annotation.Value;
import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Configuration;
import org.springframework.kafka.annotation.EnableKafka;
import org.springframework.kafka.core.DefaultKafkaProducerFactory;
import org.springframework.kafka.core.KafkaTemplate;
import org.springframework.kafka.core.ProducerFactory;
import java.util.HashMap;
import java.util.Map;
@Configuration
@EnableKafka
public class KafkaProducerConfig {
@Value("${kafka.host}")
private String kafkaHost;
private Map<String, Object> producerConfigs() {
Map<String, Object> props = new HashMap<>();
props.put(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG, kafkaHost);
props.put(ProducerConfig.RETRIES_CONFIG, 0);
props.put(ProducerConfig.BATCH_SIZE_CONFIG, 4096);
props.put(ProducerConfig.LINGER_MS_CONFIG, 1);
props.put(ProducerConfig.BUFFER_MEMORY_CONFIG, 40960);
props.put(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG, StringSerializer.class);
props.put(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG, StringSerializer.class);
return props;
}
private ProducerFactory<String, String> producerFactory() {
return new DefaultKafkaProducerFactory<>(producerConfigs());
}
@Bean
public KafkaTemplate<String, String> kafkaTemplate() {
return new KafkaTemplate(producerFactory());
}
}
producer需要kafka的地址和一些必要设置,在spring-kafka里边我们通过map来赋值,并通过工厂来设置
创建消费者
import org.apache.kafka.clients.consumer.ConsumerConfig;
import org.apache.kafka.common.serialization.StringDeserializer;
import org.springframework.beans.factory.annotation.Value;
import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Configuration;
import org.springframework.kafka.annotation.EnableKafka;
import org.springframework.kafka.config.ConcurrentKafkaListenerContainerFactory;
import org.springframework.kafka.config.KafkaListenerContainerFactory;
import org.springframework.kafka.core.ConsumerFactory;
import org.springframework.kafka.core.DefaultKafkaConsumerFactory;
import org.springframework.kafka.listener.ConcurrentMessageListenerContainer;
import java.util.HashMap;
import java.util.Map;
@Configuration
@EnableKafka
public class KafkaConsumerConfig {
@Value("${kafka.host}")
private String kafkaHost;
@Bean
public KafkaListenerContainerFactory<ConcurrentMessageListenerContainer<String, String>> kafkaListenerContainerFactory() {
ConcurrentKafkaListenerContainerFactory<String, String> factory = new ConcurrentKafkaListenerContainerFactory<>();
factory.setConsumerFactory(consumerFactory());
factory.setConcurrency(3);
factory.getContainerProperties().setPollTimeout(3000);
return factory;
}
public ConsumerFactory<String, String> consumerFactory() {
return new DefaultKafkaConsumerFactory<>(consumerConfigs());
}
public Map<String, Object> consumerConfigs() {
Map<String, Object> propsMap = new HashMap<>();
propsMap.put(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG, this.kafkaHost);
propsMap.put(ConsumerConfig.ENABLE_AUTO_COMMIT_CONFIG, false);
propsMap.put(ConsumerConfig.AUTO_COMMIT_INTERVAL_MS_CONFIG, "100");
propsMap.put(ConsumerConfig.SESSION_TIMEOUT_MS_CONFIG, "15000");
propsMap.put(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class);
propsMap.put(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class);
propsMap.put(ConsumerConfig.GROUP_ID_CONFIG, "test-group");
propsMap.put(ConsumerConfig.AUTO_OFFSET_RESET_CONFIG, "latest");
return propsMap;
}
}
消费者同样是配置kafka的地址,和一些参数,然后通过工厂创建消费者,这里注意的是添加@EnableKafka标签。
业务类
import org.apache.kafka.clients.consumer.ConsumerRecord;
import org.springframework.kafka.annotation.KafkaListener;
import org.springframework.stereotype.Component;
import java.util.Optional;
@Component
public class Listener {
@KafkaListener(topics = {"test-topic"})
public void listen(ConsumerRecord<?, ?> record) {
Optional<?> kafkaMessage = Optional.ofNullable(record.value());
Optional<?> kafkaKey = Optional.ofNullable(record.key());
if (kafkaMessage.isPresent()) {
Object message = kafkaMessage.get();
System.out.println("listen1 " + message + "::::" + kafkaKey);
}
}
}
@KafkaListener帮组我们来确定这是consumer的topic来监听,其实在内部实现也是通过不断的循环来实现的。而if内部的实现就可以是我们的内部类了
启动项目
@SpringBootApplication
public class Application implements CommandLineRunner {
public static void main(String[] args) {
SpringApplication.run(Application.class,args);
}
@Autowired
KafkaTemplate kafkaTemplate;
@Override
public void run(String... strings) throws Exception {
for (int i=0; i<10; i++)
kafkaTemplate.send("test-topic", "key-1", "hello");
}
}
通过spring自带的KafkaTemplate来发送消息,这样我们就能看到控制台的消息了。
小结
虽然这种方法很方便,但是因为现在最新的版本才是1.2+,所以kafka的很多功能他还是不完善的,不让动态订阅,取消订阅的功能,希望在以后的作用总我们可以看到他的更新。
更多推荐
已为社区贡献1条内容
所有评论(0)