【原创】运维基础之OpenResty,Nginx+Lua+Kafka

使用docker部署

# wget https://github.com/doujiang24/lua-resty-kafka/archive/v0.06.tar.gz

# tar xvf v0.06.tar.gz

2 准备配置文件testkafka.conf

# vi testkafka.conf

    lua_package_path "/usr/local/openresty/lualib/resty/kafka/?.lua;;";
    lua_need_request_body on;
    server {
        listen 80;
        server_name testkafka;
        location /test {
            content_by_lua '
                local testfile = "/tmp/test.log" 

                local cjson = require "cjson"
                local client = require "resty.kafka.client"
                local producer = require "resty.kafka.producer"

                local broker_list = {
                    { host = "127.0.0.1", port = 9092 }
                }

                local topic = "test"
                local key = "key"
                local message = "halo world"

                -- usually we do not use this library directly
                local cli = client:new(broker_list)
                local brokers, partitions = cli:fetch_metadata(topic)
                if not brokers then
                    ngx.say("fetch_metadata failed, err:", partitions)
                end
                --ngx.say("brokers: ", cjson.encode(brokers), "; partitions: ", cjson.encode(partitions))


                -- sync producer_type
                local p = producer:new(broker_list)

                local f = io.open(testfile, "a+")
                f:write(topic .. ":" .. key .. ":" .. message .. "\\n")
                f:close()

                local offset, err = p:send(topic, key, message)
                if not offset then
                    ngx.say("send err:", err)
                    return
                end
                ngx.say("send success, offset: ", tonumber(offset))

                -- this is async producer_type and bp will be reused in the whole nginx worker
                local bp = producer:new(broker_list, { producer_type = "async" })

                local ok, err = bp:send(topic, key, message)
                if not ok then
                    ngx.say("send err:", err)
                    return
                end

                ngx.say("host : ", ngx.var.host)
                ngx.say("uri : ", ngx.var.uri)
                ngx.say("args : ", ngx.var.args)
                ngx.say("body : ", ngx.req.get_body_data())
                ngx.say("client ip : ", ngx.var.remote_addr)
                ngx.say("time : ", ngx.var.time_local)
                ngx.say("send success, ok:", ok)
            ';
        }
    }

功能:发送kafka、写日志到/tmp/test.log,打印请求信息

修改其中broker的ip和端口,以及topic名;

3 启动docker

$ docker -d -p 80:80 -v /path/to/testkafka.conf:/etc/nginx/conf.d/testkafka.conf -v /path/to/lua-resty-kafka-0.06/lib/resty/kafka:/usr/local/openresty/lualib/resty/kafka openresty/openresty

挂载testkafka.conf以及kafka lib目录

4 测试

# curl 'http://testkafka/test?a=1&b=2' -d 'hello' -x 127.0.0.1:82

send success, offset: 13

host : testkafka

uri : /test

args : a=1&b=2

body : hello

client ip : 172.17.0.1

time : 08/Mar/2019:14:26:20 +0000

send success, ok:true

5 更多

1)可以将nginx访问日志发送到kafka

2)可以将请求数据作为消息发送到kafka(从uri中的path解析出topic)

6 报错

有可能报错:no resolver defined to resolve

这是因为kafka broker配置的是hostname,而不是ip,而nginx遇到hostname必须通过dns解析,而不能依靠/etc/hosts来解析,所以会报以上错误,这时有两种解决方法:

1)安装dnsmasq;

2)修改kafka配置中的advertised.host.name,将其修改为ip即可;

参考:https://github.com/doujiang24/lua-resty-kafka