【kafka】docker + 单点kafka部署 + nodejs生产者和消费者
而且他的consumer和producer都是在local,可以直接连到kafka的broker,而我的consumer和producer都在另一个container,因此也遇到了访问不到的问题,但正如前面所述已经解决。在我的案例中,kafka在一个container,zookeeper在另一个container,而nodejs的consumer和producer又在另一个container,一共
1、docker-compose启动kafka参数注意事项
PLAINTEXT
表示listener的连接是不需要身份验证且没有加密的
PLAINTEXT://kafka:9092
和PLAINTEXT://:9092
写法的区别在于前者指定了ip地址和端口只能是kafka(这个kafka即kafka所在容器的别名,用于指代IP地址,其他容器能够自动将kafka解析为kafka容器的IP地址)和9092,而后者没有指定IP地址
在我的案例中,kafka在一个container,zookeeper在另一个container,而nodejs的consumer和producer又在另一个container,一共有三个containers。如果把
KAFKA_ADVERTISED_LISTENERS
这个参数写成PLAINTEXT://localhost:9092
,我的producer和consumer都能通过kafka:9092
连接到kafka容器,但consumer并不能接受到消息。因此需要把KAFKA_ADVERTISED_LISTENERS
改成PLAINTEXT://kafka:9092
或者PLAINTEXT://:9092
,便能使整个程序正常运行
KAFKA_LISTENERS
参数指定了broker将要用于创建服务器sockets的IP地址和端口,这样client才能连接进来,这里只需要指定PLAINTEXT://:9092
表示用9092端口即可
KAFKA_ADVERTISED_LISTENERS
指定了client所需要使用的IP地址和端口来连接到brokers。正如前面案例所述,如果把IP指定为localhost,那client只能在本地进行连接brokers,即kafka容器内部。就像你用flask或者nodejs写一个后端,但开放的IP地址是127.0.0.1而不是0.0.0.0时,局域网下的其他人是连接不进来的。因此这里KAFKA_ADVERTISED_LISTENERS
需要把IP定为自己容器的IP(kafka
)或者不指定,即都接受,这样位于另一个容器的producer和consumer才能连接到brokers
2、基于node-rdkafka包的kafka
除了node-rdkafka,还有其他版本的nodejs kafka包。这个版本是用js对c++的kafka做了一个包装,因此在安装的时候,需要c++编译的支持。
官方不推荐在Windows安装(需要安装其他依赖,不然直接安装会报错),因此我也没有尝试在Windows上的node进行安装这个包。而是开了一个node的container,OS是debian,来安装。
在用这个包写producer时,不用单独使用kafka自带的脚本把topic创建好,producer运行会自动创建
3、可以做什么
有kafka之后,我们自己可以做很多自己的小项目
- 比如写一个多人联机小游戏,用kafka来作为服务器的消息传递系统(各个玩家的客户端既是producer也是consumer,produce自己本地的游戏进度,consume其他玩家的游戏进度,以此同步整个游戏)。服务器后端从kafka consume消息,以此更新全局状态并返回给kafka,使所有玩家更新自己本地的游戏状态;在后端也可以做一些作弊检测等等
- 或者写一个多人线上聊天工具,当然这个没什么大用,主要是个人练手。同样每个人都是producer和consumer,kafka进行消息传递,后台服务器后端接受消息,返还消息,中间可以做一些字符串匹配看大家聊天是否文明,也可以把所有消息再存到数据库如mongodb等等
玩法还有很多
4、项目结构
整个项目参考油管上一个视频,但那位老哥是苹果系统,并没有Windows安装node-rdkafka的问题;而且他的consumer和producer都是在local,可以直接连到kafka的broker,而我的consumer和producer都在另一个container,因此也遇到了访问不到的问题,但正如前面所述已经解决
如下
根目录下三个文件分别是docker-compose启动文件,消息类型定义文件,以及node包文件
打开项目后只需要
1、docker-compose up -d
启动container,此时三个containers:一个node,一个zookeeper,一个kafka
2、进入node的container,npm i
安装包
3、打开两个terminal,node producer/index.js
启动producer,node consumer/index.js
启动consumer,就能看到消息的发送和接受(也可以npm run start:producer
和npm run start:consumer
来启动producer和consumer,这是那小哥写的,定义在package.json中)
整个消息传递过程中使用avsc包来序列化我们的自定义消息,定义在eventType.js中
可以看到我们的消息的类型是record,里面有连个字段,一个category,类型是enum,取值为DOG或者CAT;另一个是noise,类型是string。avro负责在传递消息前序列化这个object为序列,另一边接受消息后反序列化这个序列为object
另外第一个红框是我们的consumer和producer文件,第二个红框是用kafka自带的script创建topic,创建一个producer和consumer等等,我这里主要用来调试,不需要可以直接删除
项目链接:https://github.com/lujiazho/nodejs-demos/tree/main/kafka-nodejs-docker-demo
更多推荐
所有评论(0)