SpringCloudAlibaba学习-06-SpringCloud整合Kafka入门(一)
参考:https://blog.csdn.net/u011019141/article/details/108743342https://blog.csdn.net/qq12547345/article/details/119531607https://blog.csdn.net/JinXYan/article/details/90813592部署kafka服务, 使用docker-compose
参考: https://blog.csdn.net/u011019141/article/details/108743342
https://blog.csdn.net/qq12547345/article/details/119531607
https://blog.csdn.net/JinXYan/article/details/90813592
部署kafka服务, 使用docker-compose部署
docker-compose内容如下 (kafka依赖zookeeper所以会同时部署zookeeper)
version: "3.7"
services:
zookeeper_server:
image: wurstmeister/zookeeper
container_name: zookeeper_server
ports:
- 2181:2181
volumes:
- ./data:/data
logging:
options:
max-size: "50M" # 最大文件上传限制
max-file: "100"
driver: json-file
kafka_server:
image: wurstmeister/kafka
container_name: kafka_server
ports:
- 9092:9092
environment:
KAFKA_ZOOKEEPER_CONNECT: zookeeper_server:2181
KAFKA_LISTENERS: INSIDE://:9093,OUTSIDE://:9092
KAFKA_ADVERTISED_LISTENERS: INSIDE://:9093,OUTSIDE://localhost:9092
KAFKA_LISTENER_SECURITY_PROTOCOL_MAP: INSIDE:PLAINTEXT,OUTSIDE:PLAINTEXT
KAFKA_INTER_BROKER_LISTENER_NAME: OUTSIDE
volumes:
- ./kafka-logs:/kafka
logging:
options:
max-size: "50M" # 最大文件上传限制
max-file: "100"
driver: json-file
depends_on:
- zookeeper_server
可能会有启动项目后可能会有连接报错的可能 记得在hosts文件中添加(一般都会有的) 127.0.0.1 localhost
本来想弄一个publisher模块和subscriber模块,网上的东西复现下来各种问题,于是就先弄个能跑的demo
项目结构:
依赖文件pom.xml
<?xml version="1.0" encoding="UTF-8"?>
<project xmlns="http://maven.apache.org/POM/4.0.0" xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 https://maven.apache.org/xsd/maven-4.0.0.xsd">
<modelVersion>4.0.0</modelVersion>
<parent>
<groupId>org.springframework.boot</groupId>
<artifactId>spring-boot-starter-parent</artifactId>
<version>2.3.2.RELEASE</version>
<relativePath/> <!-- lookup parent from repository -->
</parent>
<groupId>com.demo.springcloud_05_kafka</groupId>
<artifactId>kafka_publisher</artifactId>
<version>0.0.1-SNAPSHOT</version>
<name>kafka_publisher</name>
<description>kafka</description>
<properties>
<spring.cloud.version>Hoxton.SR9</spring.cloud.version>
<spring.boot.version>2.3.2.RELEASE</spring.boot.version>
<spring.cloud.alibaba.version>2.2.6.RELEASE</spring.cloud.alibaba.version>
</properties>
<dependencies>
<dependency>
<groupId>org.springframework.boot</groupId>
<artifactId>spring-boot-starter-web</artifactId>
</dependency>
<dependency>
<groupId>org.springframework.boot</groupId>
<artifactId>spring-boot-starter-test</artifactId>
<scope>test</scope>
</dependency>
<dependency>
<groupId>org.projectlombok</groupId>
<artifactId>lombok</artifactId>
<version>1.18.0</version>
<optional>true</optional>
</dependency>
<!--kafka-->
<dependency>
<groupId>org.springframework.cloud</groupId>
<artifactId>spring-cloud-stream</artifactId>
</dependency>
<dependency>
<groupId>org.springframework.cloud</groupId>
<artifactId>spring-cloud-stream-binder-kafka</artifactId>
</dependency>
</dependencies>
<dependencyManagement>
<dependencies>
<!--spring-boot-->
<dependency>
<groupId>org.springframework.boot</groupId>
<artifactId>spring-boot-dependencies</artifactId>
<version>${spring.boot.version}</version>
<type>pom</type>
<scope>import</scope>
</dependency>
<!-- spring-cloud -->
<dependency>
<groupId>org.springframework.cloud</groupId>
<artifactId>spring-cloud-dependencies</artifactId>
<version>${spring.cloud.version}</version>
<type>pom</type>
<scope>import</scope>
</dependency>
</dependencies>
</dependencyManagement>
<build>
<plugins>
<plugin>
<groupId>org.springframework.boot</groupId>
<artifactId>spring-boot-maven-plugin</artifactId>
</plugin>
</plugins>
</build>
</project>
application.yml配置文件
server:
port: 8581
spring:
application:
name: kafka_app
cloud:
stream:
kafka:
binder:
brokers: localhost:9092
zk-nodes: localhost:2181
autoCreateTopics: true
requiredAcks: 1
autoAddPartitions: true
bindings:
input:
destination: stream-demo
output: #这里用stream给我们提供的默认output,后面会讲到自定义output
destination: stream-demo #消息发往的目的地
content-type: text/plain #消息发送的格式,接收端不用指定格式,但是发送端要
项目文件 KafkaPublisherApplication.java
import org.springframework.boot.SpringApplication;
import org.springframework.boot.autoconfigure.SpringBootApplication;
import org.springframework.cloud.stream.annotation.EnableBinding;
import org.springframework.cloud.stream.messaging.Source;
@SpringBootApplication
//@EnableBinding(Source.class)
public class KafkaPublisherApplication {
public static void main(String[] args) {
SpringApplication.run(KafkaPublisherApplication.class, args);
}
}
消息接受端 KafkaMessageReceiveListener.java
import lombok.extern.slf4j.Slf4j;
import org.springframework.cloud.stream.annotation.EnableBinding;
import org.springframework.cloud.stream.annotation.StreamListener;
import org.springframework.cloud.stream.messaging.Sink;
import org.springframework.messaging.Message;
import java.util.Date;
@Slf4j
@EnableBinding(value = Sink.class)
public class KafkaMessageReceiveListener {
/**
* 从缺省通道接收消息
* @param message
*/
@StreamListener(Sink.INPUT)
public void receive(Message<String> message){
log.info("{}订阅告警消息:通道 = es_default_input,data = {}", new Date(), message);
}
}
消息发送端:KafkaMessageSender.java
import lombok.extern.slf4j.Slf4j;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.cloud.stream.annotation.EnableBinding;
import org.springframework.cloud.stream.messaging.Source;
import org.springframework.messaging.support.MessageBuilder;
import org.springframework.stereotype.Component;
@Slf4j
@Component
@EnableBinding(Source.class)
public class KafkaMessageSender {
@Autowired
private Source channel;
public void sendToDefaultChannel(String message) {
channel.output().send(MessageBuilder.withPayload(message).build());
}
}
发送消息端 KafkaSenderController.java
import lombok.extern.slf4j.Slf4j;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.web.bind.annotation.GetMapping;
import org.springframework.web.bind.annotation.RestController;
@Slf4j
@RestController
public class KafkaSenderController {
@Autowired
private KafkaMessageSender sender;
@GetMapping("/send")
public void testKafkaMessageSend(String message) {
log.info("message:{}",message);
sender.sendToDefaultChannel(message);
sender.sendToDefaultChannel(message);
sender.sendToDefaultChannel(message);
sender.sendToDefaultChannel(message);
}
}
然后启动项目浏览器访问 http://localhost:8581/send?message=%221234521313123
就可以看到接受到的消息了
更多推荐
所有评论(0)