一、OpenResty安装

OpenResty® - 中文官方站

1.下载安装包

wget https://openresty.org/download/openresty-1.11.2.5.tar.gz

2. 解压

tar -zxvf openresty-1.11.2.5.tar.gz

3. 安装

cd openresty-1.11.2.5
./configure 
make
make install

4. 配置环境变量,OpenResty默认安装在/usr/local/openresty目录下

vi /etc/profile
export PATH=/usr/local/openresty/nginx/sbin:$PATH
source /etc/profile

二、hello world

修改/usr/local/openresty/nginx/conf/nginx.conf

worker_processes  1;

events {
    worker_connections  1024;
}

http {
    include       mime.types;
    default_type  application/octet-stream;

    sendfile        on;

    server {
        listen       80;
        server_name  localhost;
        location / {
            root   html;
            index  index.html index.htm;
        }
		location /hello {
			default_type text/html;
			content_by_lua 'ngx.say("<h1> hello, OpenResty </h1>")';
		}
        error_page   500 502 503 504  /50x.html;
        location = /50x.html {
            root   html;
        }
    }
}

启动nginx,访问ip/hello,可以看到如下输出,说明配置成效 

三、编写lua脚本,作为输出

在ngxin目录下创建lua目录,并编写hello.lua,内容如下

ngx.say('<h1>hello,OpenResty lua</h1>')

修改nginx.conf中的/hello请求

location /hello {
	default_type text/html;
	content_by_lua_file /usr/local/openresty/nginx/lua/hello.lua;
}

请求hello返回如下,说明修改生效

 四、lua操作mysql数据库 (lua-resty-mysql 的使用)

lua-resty-mysql相关操作说明

1. 创建mysql连接对象 mysql:new()

--如果创建失败,返回nil,错误信息err
db, err = mysql:new()

2. 建立连接 db:connect(options)

--语法格式
ok, err, errcode, sqlstate = db:connect(options)
--options 必填项
-- host:mysql地址
-- port:mysql端口
-- database:数据库名称
-- user:用户名
-- password:密码


--options 可选项
-- charset:编码集
-- max_packet_size:mysql最大返回的数据大小,默认1M
-- ssl:是否使用ssl加密,默认false
-- pool:数据库连接池,如果不设置,默认为:user:database:host:port、user:database:path
-- pool_size:数据库连接池大小(针对每个worker,不是整个nginx服务器)
             如果不设置backlog,总的数据库连接不会有限制
             如果不设置,并且backlog没有设置,则不会创建连接池
             如果不设置,但是设置了backlog,poolsize默认为lua_socket_pool_size
             当连接不够时,超过存活时间的连接会断开,断开后可以建立新的连接
-- backlog:设置后可以限制总的数据库连接

3. 设置超时时间 db:set_timeout(time)

--超时时间,单位毫秒
db:set_timeout(time)

4. 设置空闲存活时间 db:set_keepalive(max_idle_timeout, pool_size)

--max_idle_timeout 最大空闲时间
--pool_size 连接池大小
ok, err = db:set_keepalive(max_idle_timeout, pool_size)

5. 获取当前连接重复使用次数 db:get_reused_times()

-- 如果当前连接不来自于连接池,返回0
-- 如果当前连接来自连接池,返回一个非0值
-- 这方法可用来判断当前连接是不是来自于连接池
-- 如果出错,返回nil和错误信息
times, err = db:get_reused_times()

6. 发送查询语句,不需要等待响应结果 db:send_query(sql)

-- 如果发送成功,返回发送成功的响应数据
-- 如果发送失败,返回nil、错误信息err
-- 使用read_result读取响应结果
bytes, err = db:send_query(sql)

7. 读取响应结果 db:read_result()

-- res为查询到的结果数据,可以为一个,也可以为多个
-- 如果出错了,结果为nil,错误信息,错误信息状态码,sql错误码
res, err, errcode, sqlstate = db:read_result()
-- nrows为限制结果数量
res, err, errcode, sqlstate = db:read_result(nrows)

8. send_query 和 read_result的简化:db:query(sql)

-- 如果出错了,结果为nil,错误信息,错误信息状态码,sql错误码
res, err, errcode, sqlstate = db:query(sql)
res, err, errcode, sqlstate = db:query(sql, nrows)

9. 关闭连接 db:close()

-- 关闭成功,返回1
ok, err = db:close()

10. 防止sql注入

因为我们nginx可以直接操作数据库,而链接中带过来的参数如果存在特殊字符,就会导致我们数据库不安全

local name = ngx.unescape_uri(ngx.var.arg_name)
local quoted_name = ngx.quote_sql_str(name)
local sql = "select * from users where name = " .. quoted_name

11. 案例

创建数据库openresty,并创建表user和相关数据

CREATE TABLE `users` (
  `id` int NOT NULL AUTO_INCREMENT,
  `username` varchar(255) DEFAULT NULL,
  `birthday` date DEFAULT NULL,
  `salary` double(10,2) DEFAULT NULL,
  PRIMARY KEY (`id`)
) ENGINE=InnoDB AUTO_INCREMENT=3 DEFAULT CHARSET=utf8;


INSERT INTO `users` (`id`, `username`, `birthday`, `salary`) VALUES ('1', 'xiaowang', '1991-03-15', '9000.00');
INSERT INTO `users` (`id`, `username`, `birthday`, `salary`) VALUES ('2', 'xiaoma', '1992-07-15', '8000.00');

编写mysql.lua

cjson 可以转json字符串

ngx.header.content_type="application/json;charset=utf8"
local mysql = require "resty.mysql"
local db, err = mysql:new()
if not db then
    ngx.say("failed to instantiate mysql: ", err)
    return
end
db:set_timeout(10000)

local ok, err, errcode, sqlstate = db:connect{
    host = "192.168.8.59",
    port = 3306,
    database = "openresty",
    user = "xxx",
    password = "xxx"
}
if not ok then
    ngx.say("failed to connect: ", err, ": ", errcode, " ", sqlstate)
    return
end
ngx.say("connected to mysql.")
local bytes,err = db:send_query("select * from users")
if not bytes then
    ngx.say("bad result: ", err)
    return
end
ngx.say("send ",bytes," bytes.")
local res,err,errcode,sqlstate = db:read_result()
if not res then
  ngx.say("bad result: ",err," :",errcode, ":" ,sqlstate," .")
end
local cjson = require 'cjson'
for i,v in ipairs(res) do
  ngx.say(v.id..","..v.username..","..v.birthday..",".. v.salary)
end
ngx.say("查询数据为:",cjson.encode(res))
db:close()

编写配置文件nginx.conf

location = /mysql{
    content_by_lua_file /usr/local/openresty/nginx/lua/mysql.lua;
}

访问/mysql,可以得到如下打印信息

 四、lua操作redis (resty.redis的使用)

1. 创建redis对象 redis:new()

-- 如果创建失败,返回nil,错误信息err
red, err = redis:new()

2. 建立连接 red:connect()

-- host ip地址
-- port 端口号
-- options_table 可选参数
ok, err = red:connect(host, port, options_table?)
 
-- optional_tables:可选参数
-- ssl:boolean,是否使用ssl加密连接,默认false
-- ssl_verify:是否验证ssl证书,默认false
-- server_name: Specifies the server name for the new TLS extension 
                Server Name Indication (SNI) when connecting over SSL
-- pool:连接池名称,如果不设置,默认为host:port、unix-socket-path
-- pool_size:连接池大小,
   1.如果不设置,并且backlog也没有设置,连接池不会创建
   2.如果不设置,backlog设置了,连接池大小默认为:lua_socket_pool_size 
   3.连接池最多持有pool size个连接,但不会限制总连接,可通过设置backlog限制总连接超过pool size的连接会进入队列排队,如果队列满了,则报错。
-- backlog:超过连接池pool size的连接会进入队列排队,队列满了会报错:too many waiting connect operations

3. 设置超时时间: red:set_timeout(time)

-- time 单位毫秒 
red:set_timeout(time)

4. 设置不同的情形的超时时间 red:set_timeouts(connect_timeout, send_timeout, read_timeout)

-- connect_timeout 连接超时时间
-- send_timeout 写操作超时时间
-- read_timeout 读操作超时时间
red:set_timeouts(connect_timeout, send_timeout, read_timeout)

5. 设置空闲存活时间

-- max_idle_timeout 存活时间单位ms
-- 连接池大小
-- 会将当前连接放进连接池中
ok, err = red:set_keepalive(max_idle_timeout, pool_size)

6. 连接重用次数

-- 获取当前连接的重复使用次数
-- 错误返回nil和错误信息err
times, err = red:get_reused_times()

7. 流水线操作

-- 初始化流水线
red:init_pipeline()
red:init_pipeline(n)
-- 提交流水线
res, err = red:commit_pipeline()
-- 取消流水线
red:cancel_pipeline()

8. 字符串操作

-- 设置key-value 
-- 失败返回nil和错误信息err
ok, err = red:set(key,value)

-- 根据key获取value
-- 失败返回nil和错误信息err
res, err = red:get(key)

9. 权限认证

-- 认证失败返回nil和错误信息err
res, err = red:auth(password)

10. 案例

1. 编写redis.lua脚本文件

ngx.header.content_type="application/json;charset=utf8"
-- 引入 Redis
local redis = require "resty.redis"     
--创建Redis对象
local red = redis:new()  
--设置超时数据为3s          
red:set_timeout(3000)    
--设置redis连接信息         
local ok,err = red:connect("192.168.8.59",6379)   
--判断是否连接成功
if not ok then                         
   ngx.say("failed to connection redis",err)
   return
end
ngx.say("connected to redis.")
--存入 数据
ok,err = red:set("username","TOM")     
--判断是否存入成功
if not ok then                             
   ngx.say("failed to set username",err)
   return
end
--从 redis中获取数据
local res,err = red:get("username")    
--将数据写会消息体中
ngx.say(res) 
red:close()

2. nginx.conf增加/redis请求路径

location = /redis{
   content_by_lua_file /usr/local/openresty/nginx/lua/redis.lua;
}

3. 请求/redis

 五、常用的nginx lua api

1. 获取路径占位符 /item/1001

local id = ngx.var[1]

2. 获取请求头信息

local headers = ngx.req.get_headers()

3. 获取get请求参数

-- 返回table类型 
local args = ngx.req.get_uri_args()
-- xx 表示url上的参数名称 ,多个同名会返回第一个的值
-- ?xx=1&xx=2&xx=3 会返回1 ,而 get_uri_args 返回["1","2","3"]
local xx = ngx.var.arg_xx

4. 获取请求体

-- 读取请求体
ngx.req.read_body()
-- 获取表单参数,返回table类型
local postParams = ngx.req.get_post_args()

5. 获取json参数

-- 读取请求体
ngx.req.read_body()
-- 获取请求体中json参数,返回string类型
local jsonBody = ngx.req.get_body_data()

6. 获取请求方式

-- 获取请求方式()GET POST PUT DELETE 等
local method,err = ngx.req.get_method();

7.  日志打印

-- 日志级别从高到低
-- ngx.STDERR
-- ngx.EMERG
-- ngx.ALERT
-- ngx.CRIT
-- ngx.ERR
-- ngx.WARN
-- ngx.NOTICE
-- ngx.INFO
-- ngx.DEBUG
ngx.log(ngx.ERR,'这是日志的信息') 

8. 案例

1. 编写lua脚本

ngx.header.content_type="application/json;charset=utf8"
local cjson = require "cjson"
 
--请求方法  
local method = ngx.req.get_method()
ngx.say("请求方式 : ",method) 
ngx.say("=======================")
--请求的http协议版本  
ngx.say("http协议版本 : ",ngx.req.http_version())
ngx.say("=======================")
  
--get请求uri参数  
if (method == "GET") then
    ngx.say("get请求参数")
	local uri_args = ngx.req.get_uri_args()
	for k, v in pairs(uri_args) do  
		ngx.say(k, ": ", v)  
	end  
elseif(method == "POST") then
	ngx.req.read_body()
	local post_args = ngx.req.get_post_args()  
	if post_args ~= nil then
	    ngx.say("post请求参数表单方式")
		for k, v in pairs(post_args) do
			if type(v) == "table" then
			   ngx.say(k, ":", table.concat(v,","))
			else
			   ngx.say(k, ": ", v) 
			end
		end
	end
	ngx.say("=======================")
	ngx.say("post get_body_data")
	local p_body_data = ngx.req.get_body_data()
	ngx.say(p_body_data)
end
ngx.say("=======================")

--请求头  
local headers = ngx.req.get_headers()
ngx.say("headers: ")
for k,v in pairs(headers) do  
    if type(v) == "table" then
        ngx.say(k, " : ", table.concat(v, ","))  
    else 
        ngx.say(k, " : ", v)
    end  
end  
ngx.say("=======================")

2. 配置nginx.conf

location = /req{
   content_by_lua_file /usr/local/openresty/nginx/lua/request.lua;
}

3. get请求

4. post请求,表单形式

 

5. post请求 json

 六、lua_resty_kafka 集成

1. 安装kafka依赖

#下载lua_resty_kafka ,当前目录/opt
wget https://github.com/doujiang24/lua-resty-kafka/archive/master.zip  
#解压
unzip master.zip 
#复制脚本文件到openresty的lualib/resty 目录下
cp -rf /opt/lua-resty-kafka-master/lib/resty/kafka/ /usr/local/openresty/lualib/resty/

2. 编写lua脚本

ngx.header.content_type="application/json;charset=utf8"
local producer = require("resty.kafka.producer")

-- kafka的集群信息,单机也是可以的
local broker_list = {
    {host = "192.168.168.160", port = 9092},
}
-- 定义最终kafka接受到的数据是怎样的json格式
local log_json = {}
-- 增加read_body之后即可获取到消息体,默认情况下可能会是nil
log_json["body"] = ngx.req.read_body()
log_json["body_data"] = ngx.req.get_body_data()

-- 定义kafka同步生产者,也可设置为异步 async
-- 注意!!!当设置为异步时,在测试环境需要修改batch_num,默认是200条,若大不到200条kafka端接受不到消息
-- encode()将log_json日志转换为字符串
-- 发送日志消息,send配套之第一个参数topic:
-- 发送日志消息,send配套之第二个参数key,用于kafka路由控制:
-- key为nill(空)时,一段时间向同一partition写入数据
-- 指定key,按照key的hash写入到对应的partition

-- batch_num修改为1方便测试
local bp = producer:new(broker_list, { producer_type = "async",batch_num = 1 })
-- local bp = producer:new(broker_list)

local cjson = require("cjson.safe")
local sendMsg = cjson.encode(log_json)
local ok, err = bp:send("lua_test",nil, sendMsg)
if not ok then
   ngx.say("the message send error :",err)
   ngx.log(ngx.ERR, 'kafka send err:', err)
elseif ok then
   ngx.say("the message send successful")
else
   ngx.say("internal error ")
end

3. 修改nginx.conf

location = /kfk{
    content_by_lua_file /usr/local/openresty/nginx/lua/kfk.lua;
}

4. 编写Java消费端,使用SpringBoot继承kafka

pom依赖

<dependency>
	<groupId>org.springframework.kafka</groupId>
	<artifactId>spring-kafka</artifactId>
</dependency>

application.yml

spring:
  kafka:
    bootstrap-servers: 192.168.168.160:9092
    consumer:
      group-id: auto-dev

listener

@Component
@Slf4j
public class ConsumerListener {
    @KafkaListener(topics = {"lua_test"})
    public void onMessage(ConsumerRecord<String, String> record){
        log.info("【topic】 {} 【消息】{}",record.topic(), record.value());
    }
}

3. 测试结果

发送请求

消费者消费消息

七、nginx的相关命令

#到nginx目录下,使用默认配置文件启动
./sbin/ngxin
#启动时指定配置文件
./sbin/ngxin -c /opt/nginx/nginx.conf
#设置前缀路径
./sbin/ngxin -p /home/www
#重启Nginx
nginx -s reopen 
#程序加载配置文件(nginx.conf),然后以优雅的方式重启Nginx。
nginx -s reload 
#强制停止Nginx服务
nginx -s stop 
#优雅的停止Nginx服务
nginx -s quit 

八、总结

OpenResty是个利器,大家好好把握!哈哈哈

Logo

Kafka开源项目指南提供详尽教程,助开发者掌握其架构、配置和使用,实现高效数据流管理和实时处理。它高性能、可扩展,适合日志收集和实时数据处理,通过持久化保障数据安全,是企业大数据生态系统的核心。

更多推荐