缘起

有一个埋点收集系统,架构是Nginx+Flume。web,小程序,App等客户端将数据报送至Nginx,Nginx将请求写入本地文件,然后Flume读取日志文件的数据,将日志写入Kafka。这个架构本来没什么问题,但是部署在K8s容器就有问题了,当前一个Nginx后面是3个Flume,Nginx根据渠道将日志写入web.log,mp.log,app.log,3个log文件各对应一个Flume将数据写入Kafka,遇到的问题首先是health check问题,K8s一个容器只能提供1个health check地址(最佳实战),因为Nginx后面有3个Flume,所以无法感知这3个健康状态,最多感知一个。第二个动态扩容问题,比如某一段时间web报送数据较多,但其它两个渠道较少,扩容成两个pod会浪费资源,第三个是优雅退出问题,由于Nginx速度较快,写入文件也比较快,Flume处理的较慢,导致如果我想把这个容器关闭的话,不知道Flume有没有把所有的日志都写入Kafka。
优化思路:Flume其实是有多个进口和多个出口的,对于客户的业务来讲,入口只有Nginx,出口只有Kafka,于是决定将Flume去掉,使用Nginx+Lua脚本形式直接将Nginx的日志写入Kafka,同时写一份文件以备出现问题补数据使用,也作为Bug定位追踪使用。

环境准备

  1. openresty-1.15.8.2
    下载地址:http://openresty.org/cn/download.html
  2. lua-resty-kafka 是lua版本的kafka驱动,内置producer
    下载地址:https://github.com/doujiang24/lua-resty-kafka
  3. zlib的lua库 解gzip使用的,如果你不用gzip 可以不安装到脚本
    下载地址:https://github.com/madler/zlib
  4. lua-zlib lua调用gzip使用的库
    下载地址:https://github.com/brimworks/lua-zlib
  5. Nginx和上面组件编译用的依赖:(我用的debian10.11)
apt-get update -y && apt-get install --fix-missing zlib1g zlib1g-dev libpcre3-dev libssl-dev perl make build-essential curl cmake -y

步骤

  1. 解压openresty并编译安装
cp openresty-1.15.8.2.tar.gz /opt/app/source/
tar -zxvf openresty-1.15.8.2.tar.gz

cd openresty-1.15.8.2 && ./configure --prefix=/opt/app/openresty && make && make install
  1. 将kafka驱动放入lualib 就是将lua-resty-kafka-0.10.zip解压开,把lua-resty-kafka-0.10/lib下的resty文件夹直接拷贝进/opt/app/openresty/site/lualib/下
unzip lua-resty-kafka-0.10.zip
cd lua-resty-kafka-0.10/lib 
cp -r resty  /opt/app/openresty/site/lualib/
  1. 安装lua写的zlib库和lua-zlib库
tar -zxvf zlib-master.tar.gz
cp -a zlib-master/* /opt/app/openresty/site/lualib/

tar -zxvf lua-zlib-master.tgz
cd lua-zlib-master \
    && cmake -DLUA_INCLUDE_DIR=/opt/app/openresty/luajit/include/luajit-2.1 -DLUA_LIBRARIES=/opt/app/openresty/luajit/lib -DUSE_LUAJIT=ON -DUSE_LUA=OFF \
    && make \
    && cp zlib.so /opt/app/openresty/lualib/zlib.so
  1. 编写kafkaconfig.lua并放入/opt/app/openresty/site/lualib/ 这一步是为了配置kafka的ip,端口和topic。
    内容是:
kafka_broker_list={
    {host="192.168.1.1",port=9092},
    {host="192.168.1.2",port=9092},
    {host="192.168.1.3",port=9092}
}

kafka_topic_mp="mp_topic"
kafka_topic_app="app_topic"
kafka_topic_web="web_topic"
  1. 编写nginx配置文件nginx.conf.内容如下:
worker_processes  auto;
error_log /opt/app/logs/error.log;
events {
    worker_connections 10240;
}

http {
    gzip on;
    gzip_types application/javascript text/plain text/xml text/css application/x-javascript application/xml text/javascript application/x-httpd-php image/jpeg image/gif image/png;
    gzip_vary on;
    gzip_comp_level 9;
    
    server {
        listen 80;
        include /opt/app/openresty/nginx/conf/web.conf;
        }

    }
}

web.conf

location ^~ /health {
    default_type text/html;
    return 200 'ok';
}
# app 批量
location ^~ /app {
    if ($request_method != POST) {
        return 405;
    }
    default_type 'text/plain';
    expires off;
    add_header Last-Modified '';
    add_header Cache-Control 'no-cache';
    add_header Pragma "no-cache";
    empty_gif;
    echo_read_request_body;
    access_by_lua_file /opt/app/openresty/nginx/conf/app.lua;
    access_log   /opt/app/logs/app.log;
}

app.lua:

ngx.req.read_body()
local body = ngx.req.get_body_data()

#引入kafka的生产者
local producer = require "resty.kafka.producer"
#引入上面写的kafka配置
local kafka_config = require "kafkaconfig"

# 因为引入了kafkaconfig 里面的变量直接访问就好
# local p = producer:new(kafka_broker_list)
# --------> 2023-03-16 分界线 start<------------
# 请使用以下方式初始化producer 相比于上面的初始化,下面添加了broker刷新时间
# 可以在网络抖动获取不到boker list,防止后续无法继续使用的问题
# 详细见我另一篇文章 https://blog.csdn.net/codeblf2/article/details/129505283
local p = producer:new(kafka_broker_list, {producer_type = "async",refresh_interval=10000})
# --------> 2023-03-16 分界线 end <------------

# 中间为nil的参数是kafka将本条消息写入哪个分区所使用的key,为nil代表轮询写入
local offset, err = p:send(kafka_topic_app, nil, body)
if not offset then
    ngx.say("send err:", err)
    return
end

以上示例只展示了app的,web和mp的类似即可。

Logo

Kafka开源项目指南提供详尽教程,助开发者掌握其架构、配置和使用,实现高效数据流管理和实时处理。它高性能、可扩展,适合日志收集和实时数据处理,通过持久化保障数据安全,是企业大数据生态系统的核心。

更多推荐