本次应用理论阐述

前端获取kafka数据需要启用node.js服务端。服务端使用kafka-node插件,启动后获取kafka数据。vue-socket.io用于web端与启动的node服务做socket链接,从而将node服务端获取的数据发送到web页面。
启用node服务,选用的是express。有一点需要注意,socket.io的版本,正常安装的话应该是4点多的版本,但是在node与web做连接的时候,可能会出现连接不通的情况。但是换到2.0.4的版本就可以正常连接了。个人感觉是相关依赖版本之间的问题,但是没有深入的研究,有兴趣的可以调整寻找一下。

vue项目添加相关依赖

"express": "^4.18.2",
"kafka-node": "^5.0.0",
"socket.io": "^2.0.4",
"socket.io-client": "^4.7.2",
"vue-socket.io": "^3.0.10",

src同级新建node需要执行的文件夹

在这里插入图片描述

首先说node服务端代码–server文件夹下创建js文件,名字正常即可(我使用的server.js)

node端代码如下:

let express = require('express');
let app = express();
// app.all('*', function (req, res) {//本地如果出现跨域的话可以把注释这一块放开,正常不需要,看情况而定
//    //设置可以接收请求的域名
//    res.header("Access-Control-Allow-Origin", "*");
//    res.header("Access-Control-Allow-Headers", "X-Requested-With");
//    res.header("Access-Control-Allow-Methods", "PUT,POST,GET,DELETE,OPTIONS");
//    res.header("Content-Type", "application/json;charset=utf-8");
//    next();
// })
let server = require('http').createServer(app);
let io = require('socket.io')(server, { cors: true });
let kafka = require('kafka-node');
let payloads = [{//payloads 是kafka要获取数据的主题,可以传多个,此处为数组
   topic: "",//需要连接kafka的主题,主题是必填的
   groupId: '',//需要连接kafka的组名,组的话正常都会有,也可以看做必填
   partitions: 0,//需要连接kafka的分区,一般使用0就行。
}];
let options = {
   host: "",//需要连接的kafka的iP+端口
   sessionTimeout: 15000,
   groupId: '',//需要连接kafka的组名
   autoCommit: true,
   fromOffset: true,
   partitions: 0,
};
let client = new kafka.KafkaClient({
   kafkaHost: ""//需要连接的kafka的iP+端口
});
const kafkaConsumer = (payloads) => {
   let consumer = new kafka.Consumer(client, payloads, options);
   consumer.on("message", function (message) {
      io.sockets.emit('server_counter', message);
   });
   io.on("connection", (socket) => {
   	//从页面发送过来的需要订阅的kafka主题数据
      socket.on("addPlayData", (data) => {
         let topicArr = [];
         if (data.dpNo) {
            topicArr = [{ topic: data.dpNo, offset: 0 }]
         } else if (data.dpNos) {
            let dpNoArr = data.dpNos.split(',');
            dpNoArr.forEach(value => {
               let obj = {
                  topic: value,
                  // offset: 0
               }
               topicArr.push(obj)
            })
         }
         consumer.addTopics(topicArr, function (err, added) { })
      })
      //订阅的kafka如果有重复订阅的可能,需要及时删除订阅的kafka主题内容,防止重复订阅
      socket.on("removePlayData", (data) => {
         let topicArr = [];
         if (data.dpNo) {
            topicArr.push(data.dpNo)
         } else if (data.dpNos) {
            topicArr = data.dpNos.split(',');
         }
         consumer.removeTopics(topicArr, function (err, added) { })
      })
   })
}
kafkaConsumer(payloads);
server.listen(3000);//一般node启动服务端口为3000,如果需要修改端口的话,可以另外设置,这里就不做赘述了

web页面代码

1、main.js文件代码–引入vue-socket.io用来连接node服务端

import VueSocketIO from 'vue-socket.io'
Vue.use(new VueSocketIO({
  debug: false,//debug可以关闭,正常连接后关闭即可
  connection: 'http://localhost:3000',//此处为node服务端ip+端口,本地开发可以直接用localhost
  options: {//此处配置,是用来设置在需要位置触发socket接受传输数据
    autoConnect: false
  },
}))

2、在methods同级增加socket代码。

在这里插入图片描述
详细代码内容如下:

  sockets: {
    server_counter(socketsData) {//server_counter为node服务端通过socket传递的主题名称server_counter
      console.log("====", socketsData);//socketsData内容即为从kafka获取的数据内容
    },
  },
this.$socket.emit("addPlayData", data);//向socket发送要订阅的kafka主题数据
this.$socket.connect();//web端触发socket传输数据,可以放到适当位置。
this.$socket.close();//web端停止接受socket传输数据,可以放到适当位置应用,如果需要一直连接可以不用。
this.$socket.emit("removePlayData", this.nowClickData);//向socket发送要移除订阅的kafka主题数据

以上为本地开发应用。

上线部署服务器的话,server.js可以vue打包后单独放到服务器存放项目文件的文件夹中。相对的server.js文件的依赖需要在服务器单独npm一下。到此node服务端代码和web页面端代码已经结束。应用时先启用node服务端:在存放server文件夹出进行cmd,然后执行node server.js即可(server.js为个人在server文件夹下创建的js文件),web端就可以获取到kafka推送的数据。

"express": "^4.18.2",
"kafka-node": "^5.0.0",
"socket.io": "^2.0.4",
"socket.io-client": "^4.7.2",

我们是把kafka数据,做成echarts动态折线图,在测试过程中,我们kafka推送的数据频率比较高,导致web端生成echarts图形以及页面卡顿现象。偶尔也会出现kafka数据堆积导致web端收不到数据的情况,建议在应用过程中控制kafka发送数据频率。

后续在使用过程中有其他问题会持续更新,以上为个人使用总结,如有错误欢迎指出,会及时更新

Logo

Kafka开源项目指南提供详尽教程,助开发者掌握其架构、配置和使用,实现高效数据流管理和实时处理。它高性能、可扩展,适合日志收集和实时数据处理,通过持久化保障数据安全,是企业大数据生态系统的核心。

更多推荐