diff --git a/src/ngx_stream_lua_util.c b/src/ngx_stream_lua_util.c index 7a432b35..1020775a 100644 --- a/src/ngx_stream_lua_util.c +++ b/src/ngx_stream_lua_util.c @@ -1051,8 +1051,10 @@ ngx_stream_lua_run_thread(lua_State *L, ngx_stream_lua_request_t *r, return NGX_AGAIN; } - ngx_stream_lua_del_thread(r, L, ctx, ctx->cur_co_ctx); - ctx->uthreads--; + if (ctx->cur_co_ctx->co_ref != LUA_NOREF) { + ngx_stream_lua_del_thread(r, L, ctx, ctx->cur_co_ctx); + ctx->uthreads--; + } if (ctx->uthreads == 0) { if (ngx_stream_lua_entry_thread_alive(ctx)) { @@ -1090,7 +1092,9 @@ ngx_stream_lua_run_thread(lua_State *L, ngx_stream_lua_request_t *r, lua_xmove(ctx->cur_co_ctx->co, next_co, nrets); } - if (ctx->cur_co_ctx->is_uthread) { + if (ctx->cur_co_ctx->is_uthread + && ctx->cur_co_ctx->co_ref != LUA_NOREF) + { ngx_stream_lua_del_thread(r, L, ctx, ctx->cur_co_ctx); ctx->uthreads--; } @@ -1201,8 +1205,10 @@ ngx_stream_lua_run_thread(lua_State *L, ngx_stream_lua_request_t *r, return NGX_AGAIN; } - ngx_stream_lua_del_thread(r, L, ctx, ctx->cur_co_ctx); - ctx->uthreads--; + if (ctx->cur_co_ctx->co_ref != LUA_NOREF) { + ngx_stream_lua_del_thread(r, L, ctx, ctx->cur_co_ctx); + ctx->uthreads--; + } if (ctx->uthreads == 0) { if (ngx_stream_lua_entry_thread_alive(ctx)) { diff --git a/t/127-uthread-kill.t b/t/127-uthread-kill.t index fdcead11..1eb07a42 100644 --- a/t/127-uthread-kill.t +++ b/t/127-uthread-kill.t @@ -11,6 +11,7 @@ plan tests => repeat_each() * (blocks() * 5 + 1); $ENV{TEST_NGINX_RESOLVER} ||= '8.8.8.8'; $ENV{TEST_NGINX_MEMCACHED_PORT} ||= '11211'; +$ENV{TEST_NGINX_REDIS_PORT} ||= '6379'; #no_shuffle(); no_long_string(); @@ -313,3 +314,83 @@ thread created: zombie [alert] stream lua tcp socket abort resolver --- error_log + + + +=== TEST 7: no phantom uthreads decrement after killing parent uthread +--- stream_server_config + content_by_lua_block { + local redis_port = $TEST_NGINX_REDIS_PORT + local dns_threads = {} + + dns_threads[1] = ngx.thread.spawn(function() + local sock = ngx.socket.tcp() + sock:settimeout(2000) + local ok, err = sock:connect("127.0.0.1", redis_port) + if not ok then + return nil, err + end + + sock:send("PING\r\n") + local line = sock:receive() + sock:setkeepalive() + return line + end) + + dns_threads[2] = ngx.thread.spawn(function() + local child = coroutine.create(function() + local sock = ngx.socket.tcp() + sock:settimeout(2000) + local ok, err = sock:connect("127.0.0.1", redis_port) + if not ok then + return nil, err + end + + sock:send("PING\r\n") + local line = sock:receive() + sock:setkeepalive() + return line + end) + + local ok, res = coroutine.resume(child) + return res + end) + + ngx.thread.wait(dns_threads[1], dns_threads[2]) + + for _, t in ipairs(dns_threads) do + ngx.thread.kill(t) + end + + local probe_threads = {} + for i = 1, 10 do + probe_threads[i] = ngx.thread.spawn(function() + local sock = ngx.socket.tcp() + sock:settimeout(2000) + local ok, err = sock:connect("127.0.0.1", redis_port) + if not ok then + return nil, err + end + + sock:send("PING\r\n") + local line = sock:receive() + sock:setkeepalive() + return line + end) + end + + local ok_count = 0 + for i = 1, #probe_threads do + local ok, res = ngx.thread.wait(probe_threads[i]) + if ok and res then + ok_count = ok_count + 1 + end + end + + ngx.say("ok_count=", ok_count) + } +--- stream_response +ok_count=10 +--- no_error_log +[error] +[alert]