VUE2 前端应用kafka-node + vue-socket.io 获取kafka数据在前端展示
前端获取kafka数据需要启用node.js服务端。服务端使用kafka-node插件,启动后获取kafka数据。vue-socket.io用于web端与启动的node服务做socket链接,从而将node服务端获取的数据发送到web页面。后续在使用过程中有其他问题会持续更新,以上为个人使用总结,如有错误欢迎指出,会及时更新。个人感觉是相关依赖版本之间的问题,但是没有深入的研究,有兴趣的可以调整寻
·
本次应用理论阐述
前端获取kafka数据需要启用node.js服务端。服务端使用kafka-node插件,启动后获取kafka数据。vue-socket.io用于web端与启动的node服务做socket链接,从而将node服务端获取的数据发送到web页面。
启用node服务,选用的是express。有一点需要注意,socket.io的版本,正常安装的话应该是4点多的版本,但是在node与web做连接的时候,可能会出现连接不通的情况。但是换到2.0.4的版本就可以正常连接了。个人感觉是相关依赖版本之间的问题,但是没有深入的研究,有兴趣的可以调整寻找一下。
vue项目添加相关依赖
"express": "^4.18.2",
"kafka-node": "^5.0.0",
"socket.io": "^2.0.4",
"socket.io-client": "^4.7.2",
"vue-socket.io": "^3.0.10",
src同级新建node需要执行的文件夹

首先说node服务端代码–server文件夹下创建js文件,名字正常即可(我使用的server.js)
node端代码如下:
let express = require('express');
let app = express();
// app.all('*', function (req, res) {//本地如果出现跨域的话可以把注释这一块放开,正常不需要,看情况而定
// //设置可以接收请求的域名
// res.header("Access-Control-Allow-Origin", "*");
// res.header("Access-Control-Allow-Headers", "X-Requested-With");
// res.header("Access-Control-Allow-Methods", "PUT,POST,GET,DELETE,OPTIONS");
// res.header("Content-Type", "application/json;charset=utf-8");
// next();
// })
let server = require('http').createServer(app);
let io = require('socket.io')(server, { cors: true });
let kafka = require('kafka-node');
let payloads = [{//payloads 是kafka要获取数据的主题,可以传多个,此处为数组
topic: "",//需要连接kafka的主题,主题是必填的
groupId: '',//需要连接kafka的组名,组的话正常都会有,也可以看做必填
partitions: 0,//需要连接kafka的分区,一般使用0就行。
}];
let options = {
host: "",//需要连接的kafka的iP+端口
sessionTimeout: 15000,
groupId: '',//需要连接kafka的组名
autoCommit: true,
fromOffset: true,
partitions: 0,
};
let client = new kafka.KafkaClient({
kafkaHost: ""//需要连接的kafka的iP+端口
});
const kafkaConsumer = (payloads) => {
let consumer = new kafka.Consumer(client, payloads, options);
consumer.on("message", function (message) {
io.sockets.emit('server_counter', message);
});
io.on("connection", (socket) => {
//从页面发送过来的需要订阅的kafka主题数据
socket.on("addPlayData", (data) => {
let topicArr = [];
if (data.dpNo) {
topicArr = [{ topic: data.dpNo, offset: 0 }]
} else if (data.dpNos) {
let dpNoArr = data.dpNos.split(',');
dpNoArr.forEach(value => {
let obj = {
topic: value,
// offset: 0
}
topicArr.push(obj)
})
}
consumer.addTopics(topicArr, function (err, added) { })
})
//订阅的kafka如果有重复订阅的可能,需要及时删除订阅的kafka主题内容,防止重复订阅
socket.on("removePlayData", (data) => {
let topicArr = [];
if (data.dpNo) {
topicArr.push(data.dpNo)
} else if (data.dpNos) {
topicArr = data.dpNos.split(',');
}
consumer.removeTopics(topicArr, function (err, added) { })
})
})
}
kafkaConsumer(payloads);
server.listen(3000);//一般node启动服务端口为3000,如果需要修改端口的话,可以另外设置,这里就不做赘述了
web页面代码
1、main.js文件代码–引入vue-socket.io用来连接node服务端
import VueSocketIO from 'vue-socket.io'
Vue.use(new VueSocketIO({
debug: false,//debug可以关闭,正常连接后关闭即可
connection: 'http://localhost:3000',//此处为node服务端ip+端口,本地开发可以直接用localhost
options: {//此处配置,是用来设置在需要位置触发socket接受传输数据
autoConnect: false
},
}))
2、在methods同级增加socket代码。

详细代码内容如下:
sockets: {
server_counter(socketsData) {//server_counter为node服务端通过socket传递的主题名称server_counter
console.log("====", socketsData);//socketsData内容即为从kafka获取的数据内容
},
},
this.$socket.emit("addPlayData", data);//向socket发送要订阅的kafka主题数据
this.$socket.connect();//web端触发socket传输数据,可以放到适当位置。
this.$socket.close();//web端停止接受socket传输数据,可以放到适当位置应用,如果需要一直连接可以不用。
this.$socket.emit("removePlayData", this.nowClickData);//向socket发送要移除订阅的kafka主题数据
以上为本地开发应用。
上线部署服务器的话,server.js可以vue打包后单独放到服务器存放项目文件的文件夹中。相对的server.js文件的依赖需要在服务器单独npm一下。到此node服务端代码和web页面端代码已经结束。应用时先启用node服务端:在存放server文件夹出进行cmd,然后执行node server.js即可(server.js为个人在server文件夹下创建的js文件),web端就可以获取到kafka推送的数据。
"express": "^4.18.2",
"kafka-node": "^5.0.0",
"socket.io": "^2.0.4",
"socket.io-client": "^4.7.2",
我们是把kafka数据,做成echarts动态折线图,在测试过程中,我们kafka推送的数据频率比较高,导致web端生成echarts图形以及页面卡顿现象。偶尔也会出现kafka数据堆积导致web端收不到数据的情况,建议在应用过程中控制kafka发送数据频率。
后续在使用过程中有其他问题会持续更新,以上为个人使用总结,如有错误欢迎指出,会及时更新
更多推荐
所有评论(0)