lua-resty-kafka配置文档

参考网址:https://github.com/doujiang24/lua-resty-kafka

一、例子

content_by_lua '

-- 引入lua所有api

local cjson = require "cjson"

local producer = require "resty.kafka.producer"

-- 定义kafka broker地址,ip需要和kafka的host.name配置一致

local broker_list = {

{ host = "192.168.101.223", port = 9092 },

{ host = "192.168.101.224", port = 9092 }

}

local key = "key"

local message = "halo world"

local error_handle = function (topic, partition_id, queue, index, err, retryable)

ngx.log(ngx.ERR, "failed to send to kafka, topic: ", topic, "; partition_id: ", partition_id, "; retryable: ", retryable)

end

local p = producer:new(broker_list, { producer_type = "async", max_retry = 1, batch_num = 1, error_handle = error_handle })

local ok, err = p:send("test", key, message)

if not ok then

ngx.say("send err:", err)

return

end

ngx.say("send ok:", ok)

p:flush()

';

二、语法

New

p = producer:new(broker_list, producer_config?, cluster_name?)

broker_list:客户端列表

local broker_list = {

{ host = "192.168.101.223", port = 9092 },

{ host = "192.168.101.224", port = 9092 }

}

producer_config?:可选参数

producer_type生产者类型,同步或者异步,"async" or "sync"

request_timeout请求超时,默认是 2000 ms

required_acks请求应答不能为0,默认是1

max_retry信息发送重试最大次数,默认是3

retry_backoff信息发送重试补偿,默认100ms

partitioner选择分区从键和分区num的分区。

local partitioner = function (key, partition_num, correlation_id) end

默认

         local function default_partitioner(key, num, correlation_id)

local id = key and crc32(key) or correlation_id

-- partition_id is continuous and start from 0

return id % num

end

!!!以下为缓冲区参数,只有producer_type= "async"有效

flush_time队列最大缓存时间,默认1000ms

batch_num队列最大批次数量,默认200

batch_size保存缓存大小,默认是1M(最大可以为2M),需要小心,这个跟kafka配置有关,socket.request.max.byts为2-10k

max_buffering队列最大缓存大小,默认50000

error_handle错误处理,当缓冲区发送到kafka错误时处理数据。

error_handle = function (topic, partition_id, message_queue, index, err, retryable) end

失败的消息队列如{ key1, msg1, key2, msg2 },键值key在消息队列中是为””相当与orign中的nil,

Index为消息队列的长度

Retryable为true时,这意味着kafka服务器肯定没有提交这些消息。可以尝试重试发送消息,

暂时不支持压缩

cluster_name?:可选参数

指定集群的名称,默认是1(该参数是一个数值),当您有两个或多个kafka集群时,您可以指定不同的名称,只有producer_type = "async"有效

Send

ok, err = p:send(topic, key, message)

同步模式时 :

如果成功,返回当前代理和分区的偏移量(** cdata: LL **)。如果出现错误,返回nil,用字符串描述错误

异步模式时 :

消息将首先写入缓冲区。当缓冲区超过batch_num时,它将发送到kafka服务器,或者每个flush_time刷新缓冲区。