From c72876cfcadb4b1b6bfdd87ed49736fa7a9f16e2 Mon Sep 17 00:00:00 2001 From: shuwen5 Date: Mon, 27 Nov 2017 17:27:09 +0800 Subject: [PATCH 01/17] fix async_wait_co_sema timeout --- lib/pipe/pipe.lua | 15 ++++++++--- t/test.lua | 65 ++++++++++++++++++++++++++++++++++++++++++----- 2 files changed, 69 insertions(+), 11 deletions(-) diff --git a/lib/pipe/pipe.lua b/lib/pipe/pipe.lua index 77b56a4..05e8de3 100644 --- a/lib/pipe/pipe.lua +++ b/lib/pipe/pipe.lua @@ -13,8 +13,8 @@ _M.writer = pipe_writer _M.filter = pipe_filter local to_str = strutil.to_str -local READ_TIMEOUT = 300 --seconds -local WRITE_TIMEOUT = 300 --seconds +local READ_TIMEOUT = 300 * 1000 --ms +local WRITE_TIMEOUT = 300 * 1000 --ms local function wrap_co_func(co, ...) local ok, rst, err_code, err_msg = pcall(co.func, ...) @@ -175,12 +175,15 @@ local function async_wait_co_sema(self, cos, sema, quorum, timeout, err_code) while ngx.now() <= dead_time do local n_ok = 0 + local n_active = 0 for _, co in ipairs(cos) do if co.is_dead then if co.err == nil then n_ok = n_ok + 1 end + else + n_active = n_active + 1 end end @@ -188,6 +191,10 @@ local function async_wait_co_sema(self, cos, sema, quorum, timeout, err_code) return end + if n_active + n_ok < quorum then + break + end + ngx.sleep(0.001) end @@ -246,8 +253,8 @@ function _M.new(_, rds, wrts, filters, rd_timeout, wrt_timeout) wrt_filters = filters.wrt_filters or {pipe_filter.make_write_quorum_filter(#wrts)}, - rd_timeout = rd_timeout or READ_TIMEOUT, - wrt_timeout = wrt_timeout or WRITE_TIMEOUT, + rd_timeout = (rd_timeout or READ_TIMEOUT)/1000, + wrt_timeout = (wrt_timeout or WRITE_TIMEOUT)/1000, } return setmetatable(obj, mt) diff --git a/t/test.lua b/t/test.lua index 20c32bc..839506f 100644 --- a/t/test.lua +++ b/t/test.lua @@ -153,7 +153,7 @@ function _M.test_pipe_empty_reader() local check_filter = make_check_err_filter('r', 1, 'ReadTimeout', 'TestSuccess') local cpipe, err_code, err_msg = pipe_pipe:new({empty_reader}, - {memery_writer}, {rd_filters = {check_filter}}, 2) + {memery_writer}, {rd_filters = {check_filter}}, 2000) if err_code ~= nil then return nil, err_code, err_msg end @@ -331,13 +331,13 @@ end function _M.test_pipe_read_timeout() local read_datas = {'xxx', 'yyy', 'zzz'} - local rd_timeout = 2 - local wrt_timeout = 3 + local rd_timeout = 2000 + local wrt_timeout = 3000 local timeout_reader = function(pobj, ident) for i, buf in ipairs(read_datas) do if i > 1 then - ngx.sleep(rd_timeout) + ngx.sleep(rd_timeout/1000) end local _, err_code, err_msg = pobj:write_pipe(ident, buf) @@ -375,15 +375,15 @@ end function _M.test_pipe_write_timeout() local read_datas = {'xxx', 'yyy', 'zzz'} - local rd_timeout = 3 - local wrt_timeout = 2 + local rd_timeout = 3000 + local wrt_timeout = 2000 local timeout_writer = function(pobj, ident) local n_write = 0 while true do n_write = n_write + 1 if n_write > 1 then - ngx.sleep(wrt_timeout + 2) + ngx.sleep(wrt_timeout/1000 + 2) end local data, err_code, err_msg = pobj:read_pipe(ident) if err_code ~= nil then @@ -426,6 +426,57 @@ function _M.test_pipe_write_timeout() end end +function _M.test_pipe_async_wait() + local read_datas = 'xxx' + + local rd_timeout = 3000 + local wrt_timeout = 2000 + + local writer = function(pobj, ident) + while true do + local data, err_code, err_msg = pobj:read_pipe(ident) + if err_code ~= nil then + return nil, err_code, err_msg + end + + if data == '' then + break + end + end + + ngx.sleep(0.3) + + if ident == 1 then + ngx.sleep(wrt_timeout/1000 + 2) + return nil, 'WriterError', 'writer 1 error' + elseif ident == 2 then + return nil, 'WriterError', 'writer 2 error' + end + end + + + local cpipe, err_code, err_msg = pipe_pipe:new( + {pipe_pipe.reader.make_memery_reader(read_datas)}, + {writer, writer, writer}, + {}, + rd_timeout, wrt_timeout) + + if err_code ~= nil then + return nil, err_code, err_msg + end + + local t0 = ngx.now() + + cpipe:pipe(is_running, 3) + + ngx.update_time() + local itv = ngx.now() - t0 + + if itv > 1 then + return nil, 'TestAsyncWait', 'test async error' + end +end + function _M.test() local test_prefix = 'test_pipe_' From a47bd08f1a40060561c7f98703dbaa2741f0fd39 Mon Sep 17 00:00:00 2001 From: shuwen5 Date: Tue, 28 Nov 2017 11:30:17 +0800 Subject: [PATCH 02/17] add comment for write_data_to_ngx --- lib/pipe/writer.lua | 10 ++++------ 1 file changed, 4 insertions(+), 6 deletions(-) diff --git a/lib/pipe/writer.lua b/lib/pipe/writer.lua index 84d474a..892cd27 100644 --- a/lib/pipe/writer.lua +++ b/lib/pipe/writer.lua @@ -15,8 +15,10 @@ local INF = math.huge local function write_data_to_ngx(pobj, ident, opts) - -- range = {start, end} - -- range is a closed interval. + opts = opts or {} + + -- range = {start, end} is rfc2612 Range header, + -- a closed interval, starts with index 0 local range = opts.range local pipe_log = opts.pipe_log @@ -253,8 +255,6 @@ function _M.make_http_writer(ips, port, verb, uri, opts) end function _M.make_ngx_writer(opts) - opts = opts or {} - return function(pobj, ident) return write_data_to_ngx(pobj, ident, opts) end @@ -262,8 +262,6 @@ end function _M.make_ngx_resp_writer(status, headers, opts) - opts = opts or {} - ngx.status = status for k, v in pairs(headers) do ngx.header[k] = v From 7a011c6558b72eb6c291d03a850b99b826c7086b Mon Sep 17 00:00:00 2001 From: shuwen5 Date: Mon, 5 Mar 2018 19:13:37 +0800 Subject: [PATCH 03/17] set default semaphore timeout 600s --- lib/pipe/pipe.lua | 4 ++-- 1 file changed, 2 insertions(+), 2 deletions(-) diff --git a/lib/pipe/pipe.lua b/lib/pipe/pipe.lua index 05e8de3..e08473b 100644 --- a/lib/pipe/pipe.lua +++ b/lib/pipe/pipe.lua @@ -13,8 +13,8 @@ _M.writer = pipe_writer _M.filter = pipe_filter local to_str = strutil.to_str -local READ_TIMEOUT = 300 * 1000 --ms -local WRITE_TIMEOUT = 300 * 1000 --ms +local READ_TIMEOUT = 600 * 1000 --ms +local WRITE_TIMEOUT = 600 * 1000 --ms local function wrap_co_func(co, ...) local ok, rst, err_code, err_msg = pcall(co.func, ...) From cd9a33bfdf6f3d9650ac121afe9db21da818d19e Mon Sep 17 00:00:00 2001 From: shuwen5 Date: Tue, 21 Nov 2017 20:00:35 +0800 Subject: [PATCH 04/17] remove http related functions to httplib --- lib/pipe/httplib.lua | 143 +++++++++++++++++++++++++++++++++++++++++++ lib/pipe/reader.lua | 90 ++++++++------------------- lib/pipe/writer.lua | 106 ++------------------------------ 3 files changed, 174 insertions(+), 165 deletions(-) create mode 100644 lib/pipe/httplib.lua diff --git a/lib/pipe/httplib.lua b/lib/pipe/httplib.lua new file mode 100644 index 0000000..16d39d1 --- /dev/null +++ b/lib/pipe/httplib.lua @@ -0,0 +1,143 @@ +local httpclient = require("acid.httpclient") +local tableutil = require("acid.tableutil") +local strutil = require("acid.strutil") + +local _M = { _VERSION = '1.0' } + +local to_str = strutil.to_str + +local BLOCK_SIZE = 1024 * 1024 +local SOCKET_TIMEOUTS = {5 * 1000, 100 * 1000, 100 * 1000} + +function _M.connect_http(ips, port, verb, uri, opts) + opts = opts or {} + + local try_times = math.max(opts.try_times or 1, 1) + + local http, _, err_code, err_msg + + for _, ip in ipairs(ips) do + local headers = tableutil.dup(opts.headers or {}, true) + headers.Host = headers.Host or ip + + local req = { + ip = ip, + port = port, + uri = uri, + verb = verb, + headers = headers, + } + + if opts.signature_cb ~= nil then + req = opts.signature_cb(req) + end + + http = httpclient:new(ip, port, opts.timeouts or SOCKET_TIMEOUTS) + + local h_opts = {method=req.verb, headers=req.headers} + for i=1, try_times, 1 do + _, err_code, err_msg = http:send_request(req.uri, h_opts) + if err_code == nil then + return http + end + end + end + + if err_code ~= nil then + err_msg = to_str(err_code, ':', err_msg) + err_code = 'ConnectError' + end + + return nil, err_code, err_msg +end + +function _M.get_http_response(http, opts) + opts = opts or {} + + local _, err_code, err_msg = http:finish_request() + if err_code ~= nil then + return nil, err_code, err_msg + end + + if opts.success_status ~= nil and opts.success_status ~= http.status then + return nil, 'InvalidHttpStatus', to_str('response http status:', http.status) + end + + local resp = { + status = http.status, + headers = http.ori_headers, + } + + if opts.read_body == false then + return resp + end + + local body = {} + + while true do + local data, err_code, err_msg = http:read_body(BLOCK_SIZE) + if err_code ~= nil then + return resp, err_code, err_msg + end + + if data == '' then + break + end + + table.insert(body, data) + end + resp.body = table.concat(body) + + return resp +end + +function _M.loop_http_read(pobj, ident, http, block_size) + local bytes = 0 + + while true do + local data, err_code, err_msg = + http:read_body(block_size or BLOCK_SIZE) + if err_code ~= nil then + return nil, err_code, err_msg + end + + local rst, err_code, err_msg = pobj:write_pipe(ident, data) + if err_code ~= nil then + return nil, err_code, err_msg + end + + bytes = bytes + #data + + if data == '' then + break + end + end + + return bytes +end + +function _M.loop_http_write(pobj, ident, http) + local bytes = 0 + + while true do + local data, err_code, err_msg = pobj:read_pipe(ident) + if err_code ~= nil then + return nil, err_code, err_msg + end + + if data == '' then + break + end + + local _, err_code, err_msg = http:send_body(data) + if err_code ~= nil then + return nil, err_code, err_msg + end + + bytes = bytes + #data + end + + return bytes +end + +return _M diff --git a/lib/pipe/reader.lua b/lib/pipe/reader.lua index cfdad1e..4b47203 100644 --- a/lib/pipe/reader.lua +++ b/lib/pipe/reader.lua @@ -1,13 +1,9 @@ local tableutil = require("acid.tableutil") -local strutil = require("acid.strutil") -local httpclient = require("acid.httpclient") - -local to_str = strutil.to_str +local httplib = require("pipe.httplib") local _M = { _VERSION = '1.0' } local BLOCK_SIZE = 1024 * 1024 -local SOCKET_TIMEOUTS = {5 * 1000, 100 * 1000, 100 * 1000} local err_socket = { [ "default" ] = "InvalidRequest", @@ -21,72 +17,40 @@ local function socket_err_code( err, default ) return err_socket[err] or default end -function _M.make_http_reader(ips, port, verb, uri, opts) - opts = opts or {} - - local ret = { - size = 0, - time = 0, - } - - return function(pobj, ident) - local http, _, err_code, err_msg - - for _, ip in ipairs(ips) do - local headers = tableutil.dup(opts.headers or {}, true) - headers.Host = headers.Host or ip - - local req = { - ip = ip, - port = port, - uri = uri, - verb = verb, - headers = headers, - } - - if opts.signature_cb ~= nil then - req = opts.signature_cb(req) - end +function _M.connect_http(ips, port, verb, uri, opts) + return httplib.connect_http(ips, port, verb, uri, opts) +end - http = httpclient:new(ip, port, opts.timeouts or SOCKET_TIMEOUTS) +function _M.get_http_response(http, opts) + return httplib.get_http_response(http, opts) +end - local h_opts = {method=req.verb, headers=req.headers} - for i=1, 3, 1 do - _, err_code, err_msg = http:request(req.uri, h_opts) - if err_code == nil then - break - end - end +function _M.loop_http_read(pobj, ident, http) + return httplib.loop_http_read(pobj, ident, http) +end - if err_code ~= nil then - return nil, err_code, err_msg - end - end +function _M.make_connected_http_reader(http) + return function(pobj, ident) + return _M.loop_http_read(pobj, ident, http) + end +end - if opts.success_status ~= nil and opts.success_status ~= http.status then - return nil, 'InvalidHttpStatus', to_str('response http status:', http.status) +function _M.make_http_reader(ips, port, verb, uri, opts) + return function(pobj, ident) + local http , err_code, err_msg = _M.connect_http(ips, port, verb, uri, opts) + if err_code ~= nil then + return nil, err_code, err_msg end - while true do - local t0 = ngx.now() - local buf, err_code, err_msg = - http:read_body(opts.block_size or BLOCK_SIZE) - ret.time = ret.time + (ngx.now() - t0) - if err_code ~= nil then - return nil, err_code, err_msg - end - - local rst, err_code, err_msg = pobj:write_pipe(ident, buf) - if err_code ~= nil then - return nil, err_code, err_msg - end - - ret.size = ret.size + #buf + opts = tableutil.dup(opts, true) + opts.read_body = false - if buf == '' then - break - end + local _, err_code, err_msg = _M.get_http_response(http, opts) + if err_code ~= nil then + return nil, err_code, err_msg end + + return _M.loop_http_read(pobj, ident, http) end end diff --git a/lib/pipe/writer.lua b/lib/pipe/writer.lua index 892cd27..301067e 100644 --- a/lib/pipe/writer.lua +++ b/lib/pipe/writer.lua @@ -1,19 +1,11 @@ -local httpclient = require("acid.httpclient") -local tableutil = require("acid.tableutil") -local strutil = require("acid.strutil") +local httplib = require("pipe.httplib") local rpc_logging = require("acid.rpc_logging") local acid_setutil = require("acid.setutil") local _M = { _VERSION = '1.0' } -local to_str = strutil.to_str - -local BLOCK_SIZE = 1024 * 1024 -local SOCKET_TIMEOUTS = {5 * 1000, 100 * 1000, 100 * 1000} - local INF = math.huge - local function write_data_to_ngx(pobj, ident, opts) opts = opts or {} @@ -90,106 +82,16 @@ local function write_data_to_ngx(pobj, ident, opts) return recv_right end - function _M.connect_http(ips, port, verb, uri, opts) - opts = opts or {} - - local try_times = math.max(opts.try_times or 1, 1) - - local http, _, err_code, err_msg - - for _, ip in ipairs(ips) do - local headers = tableutil.dup(opts.headers or {}, true) - headers.Host = headers.Host or ip - - local req = { - ip = ip, - port = port, - uri = uri, - verb = verb, - headers = headers, - } - - if opts.signature_cb ~= nil then - req = opts.signature_cb(req) - end - - http = httpclient:new(ip, port, opts.timeouts or SOCKET_TIMEOUTS) - - local h_opts = {method=req.verb, headers=req.headers} - for i=1, try_times, 1 do - _, err_code, err_msg = http:send_request(req.uri, h_opts) - if err_code == nil then - return http - end - end - end - - if err_code ~= nil then - err_msg = to_str(err_code, ':', err_msg) - err_code = 'ConnectError' - end - - return nil, err_code, err_msg + return httplib.connect_http(ips, port, verb, uri, opts) end function _M.loop_http_write(pobj, ident, http) - local bytes = 0 - - while true do - local data, err_code, err_msg = pobj:read_pipe(ident) - if err_code ~= nil then - return nil, err_code, err_msg - end - - if data == '' then - break - end - - local _, err_code, err_msg = http:send_body(data) - if err_code ~= nil then - return nil, err_code, err_msg - end - - bytes = bytes + #data - end - - return bytes + return httplib.loop_http_write(pobj, ident, http) end function _M.get_http_response(http, opts) - opts = opts or {} - - local _, err_code, err_msg = http:finish_request() - if err_code ~= nil then - return nil, err_code, err_msg - end - - if opts.success_status ~= nil and opts.success_status ~= http.status then - return nil, 'InvalidHttpStatus', to_str('response http status:', http.status) - end - - local resp = { - status = http.status, - headers = http.headers, - } - local body = {} - - while true do - local data, err_code, err_msg = http:read_body(BLOCK_SIZE) - if err_code ~= nil then - return resp, err_code, err_msg - end - - if data == '' then - break - end - - table.insert(body, data) - end - resp.body = table.concat(body) - - return resp + return httplib.get_http_response(http, opts) end From 77a7b0b4a74befad34e470a1a4291f58d47b1aa4 Mon Sep 17 00:00:00 2001 From: shuwen5 Date: Tue, 21 Nov 2017 20:00:59 +0800 Subject: [PATCH 05/17] add make_read_max_size_filter --- lib/pipe/filter.lua | 11 +++++++++++ 1 file changed, 11 insertions(+) diff --git a/lib/pipe/filter.lua b/lib/pipe/filter.lua index c700cce..b7c4966 100644 --- a/lib/pipe/filter.lua +++ b/lib/pipe/filter.lua @@ -49,5 +49,16 @@ function _M.make_read_timeout_filter(r_idx) end end +function _M.make_read_max_size_filter(max_size, r_idx) + local size = 0 + + return function(rbufs, n_rd, wbufs, n_wrt, pipe_rst) + size = size + #(rbufs[r_idx] or '') + if size > max_size then + return nil, 'EntityTooLarge', + string.format('read size %s large than %s', size, max_size) + end + end +end return _M From 7abe6f529347333f674ad948c781af7c23eba673 Mon Sep 17 00:00:00 2001 From: shuwen5 Date: Tue, 21 Nov 2017 20:05:02 +0800 Subject: [PATCH 06/17] add make_quorum_http_writers --- lib/pipe/writer.lua | 44 ++++++++++++++++++++++++++++++++++++++++++++ 1 file changed, 44 insertions(+) diff --git a/lib/pipe/writer.lua b/lib/pipe/writer.lua index 301067e..1a0033a 100644 --- a/lib/pipe/writer.lua +++ b/lib/pipe/writer.lua @@ -1,4 +1,6 @@ local httplib = require("pipe.httplib") +local strutil = require("acid.strutil") +local to_str = strutil.to_str local rpc_logging = require("acid.rpc_logging") local acid_setutil = require("acid.setutil") @@ -209,4 +211,46 @@ function _M.make_buffer_writer(buffer, do_concat) end end +function _M.make_quorum_http_writers(dests, writer_opts, quorum) + local conn_threads = {} + + for _, dest in ipairs(dests) do + local th = ngx.thread.spawn(_M.connect_http, + dest.ips, dest.port, dest.method, dest.uri, writer_opts) + table.insert(conn_threads, th) + end + + local writers = {} + local n_ok = 0 + for _, th in ipairs(conn_threads) do + local wrt = {} + + local ok, http, err_code, err_msg = ngx.thread.wait(th) + if ok and err_code == nil then + n_ok = n_ok + 1 + wrt.http = http + wrt.writer = _M.make_connected_http_writer(http, writer_opts) + else + wrt.err = { + err_code = err_code or 'CoroutineError', + err_msg = err_msg or 'coroutine error, when connect', + } + end + table.insert(writers, wrt) + end + + if n_ok >= quorum then + return writers + end + + for _, wrt in ipairs(writers) do + if wrt.http ~= nil then + wrt.http:close() + end + wrt.http = nil + end + + return nil, 'NotEnoughConnect', to_str('quorum:', quorum, ", actual:", n_ok) +end + return _M From eec869cf01049a5bcd15211889ce9e369eca23fb Mon Sep 17 00:00:00 2001 From: shuwen5 Date: Tue, 21 Nov 2017 20:10:08 +0800 Subject: [PATCH 07/17] add make_aws_put_s3_writer --- lib/pipe/writer.lua | 48 ++++++++++++++++++++++++++++++++++++++++++++- 1 file changed, 47 insertions(+), 1 deletion(-) diff --git a/lib/pipe/writer.lua b/lib/pipe/writer.lua index 1a0033a..af9f45c 100644 --- a/lib/pipe/writer.lua +++ b/lib/pipe/writer.lua @@ -1,11 +1,13 @@ local httplib = require("pipe.httplib") local strutil = require("acid.strutil") -local to_str = strutil.to_str local rpc_logging = require("acid.rpc_logging") local acid_setutil = require("acid.setutil") +local s3_client = require('resty.aws_s3.client') +local aws_chunk_writer = require("resty.aws_chunk.writer") local _M = { _VERSION = '1.0' } +local to_str = strutil.to_str local INF = math.huge local function write_data_to_ngx(pobj, ident, opts) @@ -253,4 +255,48 @@ function _M.make_quorum_http_writers(dests, writer_opts, quorum) return nil, 'NotEnoughConnect', to_str('quorum:', quorum, ", actual:", n_ok) end +function _M.make_aws_put_s3_writer(access_key, secret_key, endpoint, params, opts) + local s3_cli, err_code, err_msg = + s3_client.new(access_key, secret_key, endpoint, opts) + if err_code ~= nil then + return nil, err_code, err_msg + end + + local request, err_code, err_msg = + s3_cli:get_signed_request(params, 'put_object', opts) + if err_code ~= nil then + return nil, err_code, err_msg + end + + return function(pobj, ident) + local chunk_writer = + aws_chunk_writer:new(request.signer, request.auth_ctx) + + local _, err_code, err_msg = s3_cli:send_request( + request.verb, request.uri, request.headers,request.body) + if err_code ~= nil then + return nil, err_code, err_msg + end + + while true do + local data, err_code, err_msg = pobj:read_pipe(ident) + if err_code ~= nil then + return nil, err_code, err_msg + end + + local chunked_data = chunk_writer:make_chunk(data) + local _, err_code, err_msg = s3_cli:send_body(chunked_data) + if err_code ~= nil then + return nil, err_code, err_msg + end + + if data == '' then + break + end + end + + return s3_cli:finish_request() + end +end + return _M From 1aeace77fc0d747d7b469e9b5de3a55eeb5f3b01 Mon Sep 17 00:00:00 2001 From: shuwen5 Date: Thu, 8 Mar 2018 14:35:13 +0800 Subject: [PATCH 08/17] log writer/reader indent --- lib/pipe/pipe.lua | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/lib/pipe/pipe.lua b/lib/pipe/pipe.lua index e08473b..609ed26 100644 --- a/lib/pipe/pipe.lua +++ b/lib/pipe/pipe.lua @@ -30,7 +30,7 @@ local function wrap_co_func(co, ...) err_code, err_msg = 'CoroutineError', rst end co.err = {err_code = err_code, err_msg = err_msg} - ngx.log(ngx.ERR, to_str(co.rd_or_wrt, " coroutine exit with error:", co.err)) + ngx.log(ngx.ERR, to_str(co.rd_or_wrt, ' ', co.ident, " coroutine exit with error:", co.err)) end co.is_dead = true From 85b4447151a6ea93d090ed2c4f3426c6ddcf09ad Mon Sep 17 00:00:00 2001 From: shuwen5 Date: Wed, 21 Mar 2018 13:05:48 +0800 Subject: [PATCH 09/17] modify reader writer return value --- lib/pipe/httplib.lua | 46 +++++++++++++++++++++++++++++++++++++++----- lib/pipe/reader.lua | 10 +++++----- lib/pipe/writer.lua | 18 +++++++++++------ 3 files changed, 58 insertions(+), 16 deletions(-) diff --git a/lib/pipe/httplib.lua b/lib/pipe/httplib.lua index 16d39d1..77a605e 100644 --- a/lib/pipe/httplib.lua +++ b/lib/pipe/httplib.lua @@ -2,6 +2,10 @@ local httpclient = require("acid.httpclient") local tableutil = require("acid.tableutil") local strutil = require("acid.strutil") +local resty_sha1 = require( "resty.sha1" ) +local resty_md5 = require( "resty.md5" ) +local resty_string = require( "resty.string" ) + local _M = { _VERSION = '1.0' } local to_str = strutil.to_str @@ -91,8 +95,24 @@ function _M.get_http_response(http, opts) return resp end -function _M.loop_http_read(pobj, ident, http, block_size) - local bytes = 0 +function _M.loop_http_read(pobj, ident, http, block_size, opts) + opts = opts or {} + + local rst = { + size = 0, + md5 = nil, + sha1 = nil, + } + + local md5_alg + if opts.calc_md5 == true then + md5_alg = resty_md5:new() + end + + local sha1_alg + if opts.calc_sha1 == true then + sha1_alg = resty_sha1:new() + end while true do local data, err_code, err_msg = @@ -101,19 +121,35 @@ function _M.loop_http_read(pobj, ident, http, block_size) return nil, err_code, err_msg end - local rst, err_code, err_msg = pobj:write_pipe(ident, data) + local _, err_code, err_msg = pobj:write_pipe(ident, data) if err_code ~= nil then return nil, err_code, err_msg end - bytes = bytes + #data + if opts.calc_md5 == true then + md5_alg:update(data) + end + + if opts.calc_sha1 == true then + sha1_alg:update(data) + end + + rst.size = rst.size + #data if data == '' then + if opts.calc_md5 == true then + rst.md5 = resty_string.to_hex(md5_alg:final()) + end + + if opts.calc_sha1 == true then + rst.sha1 = resty_string.to_hex(sha1_alg:final()) + end + break end end - return bytes + return rst end function _M.loop_http_write(pobj, ident, http) diff --git a/lib/pipe/reader.lua b/lib/pipe/reader.lua index 4b47203..e69916b 100644 --- a/lib/pipe/reader.lua +++ b/lib/pipe/reader.lua @@ -25,13 +25,13 @@ function _M.get_http_response(http, opts) return httplib.get_http_response(http, opts) end -function _M.loop_http_read(pobj, ident, http) - return httplib.loop_http_read(pobj, ident, http) +function _M.loop_http_read(pobj, ident, http, block_size, opts) + return httplib.loop_http_read(pobj, ident, http, block_size, opts) end -function _M.make_connected_http_reader(http) +function _M.make_connected_http_reader(http, block_size, opts) return function(pobj, ident) - return _M.loop_http_read(pobj, ident, http) + return _M.loop_http_read(pobj, ident, http, block_size, opts) end end @@ -50,7 +50,7 @@ function _M.make_http_reader(ips, port, verb, uri, opts) return nil, err_code, err_msg end - return _M.loop_http_read(pobj, ident, http) + return _M.loop_http_read(pobj, ident, http, opts.block_size, opts) end end diff --git a/lib/pipe/writer.lua b/lib/pipe/writer.lua index af9f45c..294c09b 100644 --- a/lib/pipe/writer.lua +++ b/lib/pipe/writer.lua @@ -13,25 +13,29 @@ local INF = math.huge local function write_data_to_ngx(pobj, ident, opts) opts = opts or {} - -- range = {start, end} is rfc2612 Range header, + -- range = {from, to} is rfc2612 Range header, -- a closed interval, starts with index 0 local range = opts.range local pipe_log = opts.pipe_log + local ret = { + size = 0, + } + local recv_left, recv_right = 0, 0 local from, to if range ~= nil then - from = range['start'] + 1 + from = range['from'] + 1 - if range['end'] ~= nil then - to = range['end'] + 1 + if range['to'] ~= nil then + to = range['to'] + 1 else to = INF end if from > to then return nil, 'InvalidRange', string.format( - 'start: %d is greater than end: %d', from, to) + 'from: %d is greater than to: %d', from, to) end end @@ -76,6 +80,8 @@ local function write_data_to_ngx(pobj, ident, opts) end end + ret.size = ret.size + #data + ngx.print(data) local _, err = ngx.flush(true) if err then @@ -83,7 +89,7 @@ local function write_data_to_ngx(pobj, ident, opts) end end - return recv_right + return ret end function _M.connect_http(ips, port, verb, uri, opts) From 5535554f8497f5f348d7803bb1d87d479ad7bf0e Mon Sep 17 00:00:00 2001 From: shuwen5 Date: Tue, 20 Mar 2018 15:05:08 +0800 Subject: [PATCH 10/17] add make_aws_put_s3_writer --- lib/pipe/writer.lua | 25 ++++++++++++++++++++----- 1 file changed, 20 insertions(+), 5 deletions(-) diff --git a/lib/pipe/writer.lua b/lib/pipe/writer.lua index 294c09b..81d9061 100644 --- a/lib/pipe/writer.lua +++ b/lib/pipe/writer.lua @@ -1,5 +1,6 @@ local httplib = require("pipe.httplib") local strutil = require("acid.strutil") +local tableutil = require("acid.tableutil") local rpc_logging = require("acid.rpc_logging") local acid_setutil = require("acid.setutil") local s3_client = require('resty.aws_s3.client') @@ -261,7 +262,7 @@ function _M.make_quorum_http_writers(dests, writer_opts, quorum) return nil, 'NotEnoughConnect', to_str('quorum:', quorum, ", actual:", n_ok) end -function _M.make_aws_put_s3_writer(access_key, secret_key, endpoint, params, opts) +function _M.make_put_s3_writer(access_key, secret_key, endpoint, params, opts) local s3_cli, err_code, err_msg = s3_client.new(access_key, secret_key, endpoint, opts) if err_code ~= nil then @@ -275,8 +276,11 @@ function _M.make_aws_put_s3_writer(access_key, secret_key, endpoint, params, opt end return function(pobj, ident) - local chunk_writer = - aws_chunk_writer:new(request.signer, request.auth_ctx) + local chunk_writer + if opts.aws_chunk == true then + chunk_writer = + aws_chunk_writer:new(request.signer, request.auth_ctx) + end local _, err_code, err_msg = s3_cli:send_request( request.verb, request.uri, request.headers,request.body) @@ -290,8 +294,12 @@ function _M.make_aws_put_s3_writer(access_key, secret_key, endpoint, params, opt return nil, err_code, err_msg end - local chunked_data = chunk_writer:make_chunk(data) - local _, err_code, err_msg = s3_cli:send_body(chunked_data) + local send_data = data + if opts.aws_chunk == true then + send_data = chunk_writer:make_chunk(send_data) + end + + local _, err_code, err_msg = s3_cli:send_body(send_data) if err_code ~= nil then return nil, err_code, err_msg end @@ -305,4 +313,11 @@ function _M.make_aws_put_s3_writer(access_key, secret_key, endpoint, params, opt end end +function _M.make_aws_put_s3_writer(access_key, secret_key, endpoint, params, opts) + opts = tableutil.dup(opts or {}, true) + opts.aws_chunk = true + + return _M.make_put_s3_writer(access_key, secret_key, endpoint, params, opts) +end + return _M From cb4fb40e1724f3e18a8639d461d1f8d2f29ab6dd Mon Sep 17 00:00:00 2001 From: liubaohai Date: Thu, 26 Apr 2018 11:44:13 +0800 Subject: [PATCH 11/17] support http opts --- lib/pipe/httplib.lua | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/lib/pipe/httplib.lua b/lib/pipe/httplib.lua index 77a605e..465fab9 100644 --- a/lib/pipe/httplib.lua +++ b/lib/pipe/httplib.lua @@ -36,7 +36,7 @@ function _M.connect_http(ips, port, verb, uri, opts) req = opts.signature_cb(req) end - http = httpclient:new(ip, port, opts.timeouts or SOCKET_TIMEOUTS) + http = httpclient:new(ip, port, opts.timeouts or SOCKET_TIMEOUTS, opts.http_opts) local h_opts = {method=req.verb, headers=req.headers} for i=1, try_times, 1 do From e76db3df38de040f92bc4f3f5c10db1133774dcf Mon Sep 17 00:00:00 2001 From: shuwen5 Date: Thu, 13 Sep 2018 12:04:33 +0800 Subject: [PATCH 12/17] add write client rpc log --- lib/pipe/writer.lua | 22 ++++++++++++---------- 1 file changed, 12 insertions(+), 10 deletions(-) diff --git a/lib/pipe/writer.lua b/lib/pipe/writer.lua index 81d9061..c6bfbb7 100644 --- a/lib/pipe/writer.lua +++ b/lib/pipe/writer.lua @@ -17,7 +17,9 @@ local function write_data_to_ngx(pobj, ident, opts) -- range = {from, to} is rfc2612 Range header, -- a closed interval, starts with index 0 local range = opts.range - local pipe_log = opts.pipe_log + + local log = rpc_logging.new_entry('write_client') + rpc_logging.add_log(log) local ret = { size = 0, @@ -41,17 +43,12 @@ local function write_data_to_ngx(pobj, ident, opts) end while true do - local data, err, err_msg - if pipe_log ~= nil then - rpc_logging.reset_start(pipe_log) + rpc_logging.reset_start(log) - data, err, err_msg = pobj:read_pipe(ident) + local data, err, err_msg = pobj:read_pipe(ident) - rpc_logging.set_err(pipe_log, err) - rpc_logging.incr_stat(pipe_log, 'downstream', 'sendbody', #(data or '')) - else - data, err, err_msg = pobj:read_pipe(ident) - end + rpc_logging.set_err(log, err) + rpc_logging.incr_stat(log, 'upstream', 'recvbody', #(data or '')) if err ~= nil then return nil, err, err_msg @@ -83,8 +80,13 @@ local function write_data_to_ngx(pobj, ident, opts) ret.size = ret.size + #data + rpc_logging.reset_start(log) + ngx.print(data) local _, err = ngx.flush(true) + + rpc_logging.set_err(log, err) + rpc_logging.incr_stat(log, 'downstream', 'sendbody', #(data or '')) if err then return nil, 'ClientAborted', err end From 42a7f8233d5cd64f53e1227774075c3765f57893 Mon Sep 17 00:00:00 2001 From: shuwen5 Date: Thu, 13 Sep 2018 13:37:21 +0800 Subject: [PATCH 13/17] fixup: error range parameter --- t/test_ngx_resp_writer.t | 4 ++-- 1 file changed, 2 insertions(+), 2 deletions(-) diff --git a/t/test_ngx_resp_writer.t b/t/test_ngx_resp_writer.t index 93ef2c0..c085f94 100644 --- a/t/test_ngx_resp_writer.t +++ b/t/test_ngx_resp_writer.t @@ -29,8 +29,8 @@ __DATA__ local status = 200 local headers = {["Content-Length"]=3} local opts = {range ={}} - opts.range["start"] = 2 - opts.range["end"] = 4 + opts.range["from"] = 2 + opts.range["to"] = 4 local writers = {pipe_pipe.writer.make_ngx_resp_writer(status, headers, opts)} From fe6108e932de63be6a02aeb6dec2f288ea5ce129 Mon Sep 17 00:00:00 2001 From: shuwen5 Date: Wed, 24 Oct 2018 19:33:09 +0800 Subject: [PATCH 14/17] write_data_to_ngx add body_sha1 optional arg --- lib/pipe/writer.lua | 18 ++++++++++++++++++ t/test_ngx_resp_writer.t | 1 + 2 files changed, 19 insertions(+) diff --git a/lib/pipe/writer.lua b/lib/pipe/writer.lua index c6bfbb7..5772a11 100644 --- a/lib/pipe/writer.lua +++ b/lib/pipe/writer.lua @@ -4,6 +4,8 @@ local tableutil = require("acid.tableutil") local rpc_logging = require("acid.rpc_logging") local acid_setutil = require("acid.setutil") local s3_client = require('resty.aws_s3.client') +local hashlib = require("acid.hashlib") +local resty_string = require("resty.string") local aws_chunk_writer = require("resty.aws_chunk.writer") local _M = { _VERSION = '1.0' } @@ -42,6 +44,12 @@ local function write_data_to_ngx(pobj, ident, opts) end end + local alg_sha1 = nil + + if opts.body_sha1 ~= nil then + alg_sha1 = hashlib:sha1() + end + while true do rpc_logging.reset_start(log) @@ -55,6 +63,12 @@ local function write_data_to_ngx(pobj, ident, opts) end if data == '' then + if alg_sha1 ~= nil then + local calc_sha1 = resty_string.to_hex(alg_sha1:final()) + if calc_sha1 ~= opts.body_sha1 then + return nil, "Sha1Notmatched", to_str("expect:", opts.sha1, ", actual:", calc_sha1) + end + end break end @@ -80,6 +94,10 @@ local function write_data_to_ngx(pobj, ident, opts) ret.size = ret.size + #data + if alg_sha1 ~= nil then + alg_sha1:update(data) + end + rpc_logging.reset_start(log) ngx.print(data) diff --git a/t/test_ngx_resp_writer.t b/t/test_ngx_resp_writer.t index c085f94..451d4d9 100644 --- a/t/test_ngx_resp_writer.t +++ b/t/test_ngx_resp_writer.t @@ -31,6 +31,7 @@ __DATA__ local opts = {range ={}} opts.range["from"] = 2 opts.range["to"] = 4 + opts.body_sha1 = "35139ef894b28b73bea022755166a23933c7d9cb" local writers = {pipe_pipe.writer.make_ngx_resp_writer(status, headers, opts)} From 1016c8b18eb00877cd4fa56614c689bbdfd3b440 Mon Sep 17 00:00:00 2001 From: shuwen5 Date: Wed, 24 Oct 2018 20:02:47 +0800 Subject: [PATCH 15/17] fixup typo --- lib/pipe/writer.lua | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/lib/pipe/writer.lua b/lib/pipe/writer.lua index 5772a11..966c816 100644 --- a/lib/pipe/writer.lua +++ b/lib/pipe/writer.lua @@ -66,7 +66,7 @@ local function write_data_to_ngx(pobj, ident, opts) if alg_sha1 ~= nil then local calc_sha1 = resty_string.to_hex(alg_sha1:final()) if calc_sha1 ~= opts.body_sha1 then - return nil, "Sha1Notmatched", to_str("expect:", opts.sha1, ", actual:", calc_sha1) + return nil, "Sha1Notmatched", to_str("expect:", opts.body_sha1, ", actual:", calc_sha1) end end break From 5a43e5d1e8156782a68acf5a75342348ab5515fc Mon Sep 17 00:00:00 2001 From: shuwen5 Date: Thu, 9 Apr 2020 20:20:37 +0800 Subject: [PATCH 16/17] add make_kill_low_write_speed_filter --- lib/pipe/filter.lua | 43 +++++++++++++++++++++++++++++++++++++++++++ lib/pipe/httplib.lua | 8 +++++--- lib/pipe/pipe.lua | 37 +++++++++++++++++++++++++++++++++++++ 3 files changed, 85 insertions(+), 3 deletions(-) diff --git a/lib/pipe/filter.lua b/lib/pipe/filter.lua index b7c4966..edcd6c8 100644 --- a/lib/pipe/filter.lua +++ b/lib/pipe/filter.lua @@ -61,4 +61,47 @@ function _M.make_read_max_size_filter(max_size, r_idx) end end +function _M.make_kill_low_write_speed_filter(pobj, assert_func, quorum) + local all_stat = pobj:get_stat() + + return function(rbufs, n_rd, wbufs, n_wrt, pipe_rst) + local ok_stat, n_ok = {}, 0 + for idx, wrt_rst in pairs(pipe_rst.write_result) do + if wrt_rst.err == nil then + local ident = pobj.wrt_cos[idx].ident + local id_stat = all_stat[ident] or {} + + if id_stat.write_time ~= nil and id_stat.write_size ~= nil then + ok_stat[ident] = { + write_size = id_stat.write_size, + write_time = id_stat.write_time, + } + n_ok = n_ok + 1 + end + end + end + + if n_ok <= quorum then + return nil, nil, nil + end + + for ident, st in pairs(ok_stat) do + local cur_speed = st.write_size/(math.max(st.write_time * 1000, 1)/1000) + + if assert_func(ok_stat, ident, st, cur_speed) then + local err = { + err_code = "WriteSlow", + err_msg = to_str(ident, " coroutine write slow, speed:", + strutil.placeholder(cur_speed/1024, '-', '%.3f'), "kb/s"), + } + + pobj.wrt_cos[ident].err = err + ngx.log(ngx.ERR, to_str("slow coroutine:", pobj.wrt_cos[ident], ", error:", err)) + + break + end + end + end +end + return _M diff --git a/lib/pipe/httplib.lua b/lib/pipe/httplib.lua index 465fab9..c4656e6 100644 --- a/lib/pipe/httplib.lua +++ b/lib/pipe/httplib.lua @@ -165,10 +165,12 @@ function _M.loop_http_write(pobj, ident, http) break end + local now = ngx.now() local _, err_code, err_msg = http:send_body(data) - if err_code ~= nil then - return nil, err_code, err_msg - end + + ngx.update_time() + pobj:incr_stat(ident, "write_size", #data) + pobj:incr_stat(ident, "write_time", ngx.now()-now) bytes = bytes + #data end diff --git a/lib/pipe/pipe.lua b/lib/pipe/pipe.lua index 609ed26..ad62acd 100644 --- a/lib/pipe/pipe.lua +++ b/lib/pipe/pipe.lua @@ -255,11 +255,32 @@ function _M.new(_, rds, wrts, filters, rd_timeout, wrt_timeout) rd_timeout = (rd_timeout or READ_TIMEOUT)/1000, wrt_timeout = (wrt_timeout or WRITE_TIMEOUT)/1000, + stat = {}, } return setmetatable(obj, mt) end +function _M.set_stat(self, ident, key, val) + self.stat[ident] = self.stat[ident] or {} + self.stat[ident][key] = val + + return val +end + +function _M.incr_stat(self, ident, key, val) + self.stat[ident] = self.stat[ident] or {} + + local prev = self.stat[ident][key] or 0 + self.stat[ident][key] = prev + val + + return self.stat[ident][key] +end + +function _M.get_stat(self) + return self.stat +end + function _M.write_pipe(pobj, ident, buf) local rd_co = pobj.rd_cos[ident] @@ -367,4 +388,20 @@ function _M.pipe(self, is_running, quorum_return) return get_pipe_result(self) end +function _M.add_read_filter(self, flt) + if flt == nil then + return + end + + table.insert(self.rd_filters, flt) +end + +function _M.add_write_filter(self, flt) + if flt == nil then + return + end + + table.insert(self.wrt_filters, flt) +end + return _M From 82d1c7e81344d7227e3fd1c0b38e589790d67bf1 Mon Sep 17 00:00:00 2001 From: shuwen Date: Thu, 18 Jun 2020 04:33:21 -0400 Subject: [PATCH 17/17] check sha1 before sending all data --- lib/pipe/writer.lua | 15 +++++++++------ 1 file changed, 9 insertions(+), 6 deletions(-) diff --git a/lib/pipe/writer.lua b/lib/pipe/writer.lua index 966c816..78fc518 100644 --- a/lib/pipe/writer.lua +++ b/lib/pipe/writer.lua @@ -63,12 +63,6 @@ local function write_data_to_ngx(pobj, ident, opts) end if data == '' then - if alg_sha1 ~= nil then - local calc_sha1 = resty_string.to_hex(alg_sha1:final()) - if calc_sha1 ~= opts.body_sha1 then - return nil, "Sha1Notmatched", to_str("expect:", opts.body_sha1, ", actual:", calc_sha1) - end - end break end @@ -98,6 +92,15 @@ local function write_data_to_ngx(pobj, ident, opts) alg_sha1:update(data) end + if ret.size == opts.total_size then + if alg_sha1 ~= nil then + local calc_sha1 = resty_string.to_hex(alg_sha1:final()) + if calc_sha1 ~= opts.body_sha1 then + return nil, "Sha1Notmatched", to_str("expect:", opts.body_sha1, ", actual:", calc_sha1) + end + end + end + rpc_logging.reset_start(log) ngx.print(data)