Spring Boot推送消息到Kafka
本文详细介绍了如何在SpringBoot项目中开启kafka连接并将消息推送到kafka中,同时通过websocket推送到前台页面展示
·
Pom.xml文件
<?xml version="1.0" encoding="UTF-8"?>
<project xmlns="http://maven.apache.org/POM/4.0.0"
xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd">
<modelVersion>4.0.0</modelVersion>
<groupId>com.demo</groupId>
<artifactId>boot-demo</artifactId>
<version>1.0-SNAPSHOT</version>
<packaging>jar</packaging>
<parent>
<groupId>org.springframework.boot</groupId>
<artifactId>spring-boot-starter-parent</artifactId>
<version>2.5.3</version>
<relativePath /> <!-- lookup parent from repository -->
</parent>
<dependencies>
<dependency>
<groupId>org.springframework.boot</groupId>
<artifactId>spring-boot-starter-web</artifactId>
</dependency>
<!-- Kafka组件 -->
<dependency>
<groupId>org.springframework.kafka</groupId>
<artifactId>spring-kafka</artifactId>
</dependency>
<dependency>
<groupId>com.google.code.gson</groupId>
<artifactId>gson</artifactId>
</dependency>
<dependency>
<groupId>com.alibaba</groupId>
<artifactId>fastjson</artifactId>
<version>1.2.70</version>
</dependency>
<dependency>
<groupId>com.fasterxml.jackson.core</groupId>
<artifactId>jackson-databind</artifactId>
</dependency>
<dependency>
<groupId>cn.hutool</groupId>
<artifactId>hutool-core</artifactId>
<version>5.7.11</version>
</dependency>
<dependency>
<groupId>cn.hutool</groupId>
<artifactId>hutool-json</artifactId>
<version>5.7.11</version>
</dependency>
<dependency>
<groupId>commons-io</groupId>
<artifactId>commons-io</artifactId>
<version>2.5</version>
</dependency>
<!-- 日志logging -->
<dependency>
<groupId>org.springframework.boot</groupId>
<artifactId>spring-boot-starter-logging</artifactId>
</dependency>
<dependency>
<groupId>com.alibaba</groupId>
<artifactId>fastjson</artifactId>
<version>1.2.70</version>
</dependency>
<!-- Websocket -->
<dependency>
<groupId>org.springframework.boot</groupId>
<artifactId>spring-boot-starter-websocket</artifactId>
</dependency>
<dependency>
<groupId>org.springframework.boot</groupId>
<artifactId>spring-boot-starter-thymeleaf</artifactId>
</dependency>
<dependency>
<groupId>org.projectlombok</groupId>
<artifactId>lombok</artifactId>
</dependency>
</dependencies>
<build>
<finalName>push-api</finalName>
<plugins>
<!-- 要使生成的jar可运行,需要加入此插件 -->
<plugin>
<groupId>org.apache.maven.plugins</groupId>
<artifactId>maven-surefire-plugin</artifactId>
<configuration>
<skip>true</skip>
</configuration>
</plugin>
<plugin>
<groupId>org.springframework.boot</groupId>
<artifactId>spring-boot-maven-plugin</artifactId>
<configuration>
<includeSystemScope>true</includeSystemScope>
<jvmArguments>-Dfile.encoding=UTF-8</jvmArguments>
</configuration>
</plugin>
</plugins>
</build>
</project>
application.yml
server: port: 8800 servlet: context-path: /push-api json: area: D:\\workspace\\本地\\boot-demo\\src\\main\\resources\\json\\test.json ##集成log日志 logging: level: org: springframework: boot: autoconfigure: error root: info com.demo: debug config: classpath:logback.xml log-print-switch: 1 #是否打印log日志【业务要求】(0:不输出log日志,1:输出log日志) spring: thymeleaf: suffix: .html mode: HTML check-template: false enabled: true prefix: classpath:/templates/ cache: false ##集成Kafka中间件 kafka: bootstrap-servers: 127.0.0.1:9092 #带安全认证 #bootstrap-servers: 127.0.0.1:9092 #bootstrap-servers: 127.0.0.1:21007,127.0.0.2:21007,127.0.0.3:21007 producer: retries: 0 #消息发错后,消息重发的次数 # acks=0 : 生产者在成功写入消息之前不会等待任何来自服务器的响应。 # acks=1 : 只要集群的首领节点收到消息,生产者就会收到一个来自服务器成功响应。 # acks=all :只有当所有参与复制的节点全部收到消息时,生产者才会收到一个来自服务器的成功响应。 acks: 1 # 应答级别:多少个分区副本备份完成时向生产者发送ack确认(可选0、1、all/-1) batch-size: 65536 # 批量抓取:当有多个消息需要被发送到同一个分区时,生产者会把它们放在同一个批次里。该参数指定了一个批次可以使用的内存大小,按照字节数计算。 buffer-memory: 524288 # 缓存容量:设置生产者内存缓冲区的大小。 key-serializer: org.apache.kafka.common.serialization.StringSerializer # key/value的序列化 value-serializer: org.apache.kafka.common.serialization.StringSerializer # key/value的序列化 # 服务器地址 bootstrap-servers: 127.0.0.1:9092 # 带安全认证 #bootstrap-servers: 127.0.0.1:9092 #bootstrap-servers: 127.0.0.1:21007,127.0.0.2:21007,3127.0.0.3:21007 #properties: # linger: # ms: 0 #0表示每接收到一条消息就提交给kafka # sasl: # mechanism: PLAIN # security: # protocol: SASL_PLAINTEXT consumer: group-id: kafka-test-group #默认的消费组ID # earliest:当各分区下有已提交的offset时,从提交的offset开始消费;无提交的offset时,从头开始消费 # latest:当各分区下有已提交的offset时,从提交的offset开始消费;无提交的offset时,消费新产生的该分区下的数据 # none:topic各分区都存在已提交的offset时,从offset后开始消费;只要有一个分区不存在已提交的offset,则抛出异常 auto-offset-reset: latest key-deserializer: org.apache.kafka.common.serialization.StringDeserializer value-deserializer: org.apache.kafka.common.serialization.StringDeserializer enable-auto-commit: true #是否自动提交offset auto-commit-interval: 20000 #properties: # sasl:c # mechanism: PLAIN # security: # protocol: SASL_PLAINTEXT max: poll: records: 100 listener: # 在侦听器容器中运行的线程数。 concurrency: 5 #listner负责ack,每调用
启动类
import org.springframework.boot.SpringApplication;
import org.springframework.boot.autoconfigure.SpringBootApplication;
import org.springframework.boot.web.client.RestTemplateBuilder;
import org.springframework.context.annotation.Bean;
import org.springframework.web.client.RestTemplate;
/**
* @author huangyuchen
* @date 2022/12/5 16:21
* @decrption
*/
@SpringBootApplication
public class DemoApplication {
public static void main(String[] args) {
SpringApplication.run(DemoApplication.class);
}
@Bean
public RestTemplate restTemplate(){
return new RestTemplateBuilder().build();
}
}
Websocket配置
package com.demo.websocket.config;
import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Configuration;
import org.springframework.web.socket.server.standard.ServerEndpointExporter;
/**
* @author
* @Description: websocket配置
*/
@Configuration
public class WebSocketConfig {
@Bean
public ServerEndpointExporter serverEndpointExporter() {
return new ServerEndpointExporter();
}
}
package com.demo.websocket.common;
import lombok.extern.slf4j.Slf4j;
import org.springframework.stereotype.Component;
import javax.websocket.*;
import javax.websocket.server.PathParam;
import javax.websocket.server.ServerEndpoint;
import java.io.IOException;
import java.util.HashMap;
import java.util.Map;
import java.util.concurrent.atomic.AtomicInteger;
/**
* @author huangyuchen
* @Description: WebSocket服务端代码,包含接收消息,推送消息等接口【test_topic】
* test_topic 代表当前websocketServer 接受的kafka Topic
*/
@SuppressWarnings("ALL")
@Slf4j
@Component
@ServerEndpoint(value = "/socket/test_topic/{id}")
public class WebSocketServer {
// 静态变量,用来记录当前在线连接数。应该把它设计成线程安全的。
private static AtomicInteger online = new AtomicInteger();
// concurrent包的线程安全Set,用来存放每个客户端对应的WebSocketServer对象。
private static Map<String, Session> sessionPools = new HashMap<>();
/**
* 发送消息方法
* @param session 客户端与socket建立的会话
* @param message 消息
* @throws IOException
*/
public void sendMessage(Session session, String message) throws IOException {
if (session != null) {
session.getBasicRemote().sendText(message);
}
}
/**
* 连接建立成功调用
* @param session 客户端与socket建立的会话
* @param signalId 客户端的signalId
*/
@OnOpen
public void onOpen(Session session, @PathParam(value = "id") String id) {
sessionPools.put(id, session);
addOnlineCount();
log.info("【test_topic】 "+"加入webSocket!当前人数为" + online);
try {
sendMessage(session, "欢迎" + id+ "加入连接!");
} catch (IOException e) {
e.printStackTrace();
}
}
/**
* 关闭连接时调用
* @param signalId 关闭连接的客户端的信号机ID
*/
@OnClose
public void onClose(@PathParam(value = "id") String id) {
sessionPools.remove(id);
subOnlineCount();
log.info("【test_topic】 " + id+ " 断开webSocket连接!当前人数为" + online);
}
/**
* 收到客户端消息时触发(群发)
* @param message
* @throws IOException
*/
@OnMessage
public void onMessage(String message) throws IOException {
for (Session session : sessionPools.values()) {
try {
sendMessage(session, message);
} catch (Exception e) {
e.printStackTrace();
continue;
}
}
}
/**
* 发生错误时候
* @param session
* @param throwable
*/
@OnError
public void onError(Session session, Throwable throwable) {
log.info("【test_topic】 发生错误");
throwable.printStackTrace();
}
/**
* 给指定用户发送消息
* @param id 登录的客户端zhi'da
* @param message 消息
* @throws IOException
*/
public void sendInfo(String id, String message) {
Session session = sessionPools.get(id);
try {
sendMessage(session, message);
} catch (Exception e) {
e.printStackTrace();
}
}
public static void addOnlineCount() {
online.incrementAndGet();
}
public static void subOnlineCount() {
online.decrementAndGet();
}
}
消费者
package com.demo.kafka;
import com.alibaba.fastjson.JSONObject;
import com.demo.websocket.common.WebSocketServer;
import lombok.extern.slf4j.Slf4j;
import org.apache.kafka.clients.consumer.ConsumerRecord;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.beans.factory.annotation.Value;
import org.springframework.kafka.annotation.KafkaListener;
import org.springframework.kafka.support.KafkaHeaders;
import org.springframework.messaging.handler.annotation.Header;
import org.springframework.stereotype.Component;
import java.util.Optional;
/**
* @version : V1.0
* @Description : 消费者
*/
@Component
@Slf4j
public class KafkaConsumer {
@Value("${logging.log-print-switch}")
//#是否打印log日志【业务要求】(0:不输出log日志,1:输出log日志)
private String logprintswitch;
@Autowired
private WebSocketServer webSocketServer;
/**
* 读取kafka数据
* @param record
* @param topic->signaler_rt_phase
*/
@KafkaListener(topics = "test_topic", groupId = "test_group")
public void topic_message(ConsumerRecord<?, ?> record, @Header(KafkaHeaders.RECEIVED_TOPIC) String topic) {
Optional message = Optional.ofNullable(record.value());
if (message.isPresent()) {
Object msg = message.get();
JSONObject jsonObject = JSONObject.parseObject(msg.toString());
//打印log日志
if ("1".equals(logprintswitch)) {
log.info( "Topic:" + topic + ",Message:" + msg);
}
try {
webSocketServer.onMessage(message.toString());
} catch (Exception e) {
e.printStackTrace();
}
}
}
}
生产者
package com.demo.kafka;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.kafka.core.KafkaTemplate;
import org.springframework.kafka.support.SendResult;
import org.springframework.stereotype.Component;
import org.springframework.util.concurrent.ListenableFuture;
import org.springframework.util.concurrent.ListenableFutureCallback;
/**
* @Description: 生产者
* @version : V1.0
*/
@Component
public class KafkaProducer {
private static final Logger log = LoggerFactory.getLogger(KafkaProducer.class);
@Autowired
private KafkaTemplate<String, Object> kafkaTemplate;
/**
*下发
* @param obj
*/
public void send(Object obj) {
ListenableFuture<SendResult<String, Object>> future = kafkaTemplate.send(TopicConstant.signaler_rt_phase, obj);
future.addCallback(new ListenableFutureCallback<SendResult<String, Object>>() {
@Override
public void onFailure(Throwable throwable) {
//发送失败的处理
log.info("生产者 发送消息【失败】:" + throwable.getMessage());
}
@Override
public void onSuccess(SendResult<String, Object> stringObjectSendResult) {
//成功的处理
log.info("生产者 发送消息【成功】:" + stringObjectSendResult.getProducerRecord().value());
}
});
}
}
启动程序后开始推送
读取application.yml中配置的json.area 路径的json文件
package com.demo;
import com.alibaba.fastjson.JSON;
import com.alibaba.fastjson.JSONArray;
import com.alibaba.fastjson.JSONObject;
import com.demo.constant.BasicConstant;
import com.demo.kafka.KafkaProducer;
import lombok.extern.slf4j.Slf4j;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.beans.factory.annotation.Value;
import org.springframework.boot.ApplicationArguments;
import org.springframework.boot.ApplicationRunner;
import org.springframework.core.annotation.Order;
import org.springframework.stereotype.Component;
import java.io.File;
import java.io.FileInputStream;
import java.io.InputStream;
/**
* @date 2022/12/6 16:28
* @decrption 程序启动后直接开始推送
*/
@Component
@Order(value = 1)
@Slf4j
public class ApplicationStartRunner implements ApplicationRunner {
@Value(value = "${json.area}")
private String area;
private JSONObject jsonObject = new JSONObject();
@Autowired
private KafkaProducer kafkaProducer;
public void init() {
File file = new File(area);
InputStream config = null;
try {
config = new FileInputStream(file);
jsonObject = JSON.parseObject(config, JSONObject.class);
} catch (Exception e) {
e.printStackTrace();
}
}
@Override
public void run(ApplicationArguments args) throws Exception {
// 读取需要推送的json文件
init();
// 循环推送
while(true){
System.out.println("task is running.");
kafkaProducer.send(JSON.toJSONString(jsonObject));
Thread.sleep(1000);
}
}
}
前端页面
在 resources下新建 /templates/index.html
<!DOCTYPE HTML>
<html>
<head>
<title>WebSocket</title>
</head>
<body>
Welcome<br/>
<input id="text" type="text" /><button onclick="send()">Send</button> <button onclick="closeWebSocket()">Close</button>
<div id="message">
</div>
</body>
<script type="text/javascript">
var websocket = null;
//判断当前浏览器是否支持WebSocket
if('WebSocket' in window){
websocket = new WebSocket("ws://localhost:8800/push-api/socket/test_topic/1");
}else{
alert('Not support websocket')
}
//连接发生错误的回调方法
websocket.onerror = function(){
setMessageInnerHTML("error");
};
//连接成功建立的回调方法
websocket.onopen = function(event){
setMessageInnerHTML("open");
}
//接收到消息的回调方法
websocket.onmessage = function(event){
setMessageInnerHTML(event.data);
}
//连接关闭的回调方法
websocket.onclose = function(){
setMessageInnerHTML("close");
}
//监听窗口关闭事件,当窗口关闭时,主动去关闭websocket连接,防止连接还没断开就关闭窗口,server端会抛异常。
window.onbeforeunload = function(){
websocket.close();
}
//将消息显示在网页上
function setMessageInnerHTML(innerHTML){
document.getElementById('message').innerHTML += innerHTML + ' <br/>';
}
//关闭连接
function closeWebSocket(){
websocket.close();
}
//发送消息
function send(){
var message = document.getElementById('text').value;
websocket.send(message);
}
</script>
</html>
测试一下
新建demo.json
写入内容
{
"name": "tom",
"age": 1
}
并将application.yml 中的json.area路径改为
json:
area: E:\\workspace\\本地\\boot-demo\\src\\main\\resources\\json\\demo.json
启动项目,访问页面
更多推荐
已为社区贡献1条内容
所有评论(0)