Skip to content

Commit 7a26f15

Browse files
authored
feat: support shutdown_callback argument in XXXAsyncServer (#311)
Signed-off-by: Kazuki Yamamoto <yamamoto.kazuki.24@gmail.com>
1 parent e3917f4 commit 7a26f15

9 files changed

Lines changed: 62 additions & 10 deletions

File tree

packages/pynumaflow/pynumaflow/accumulator/async_server.py

Lines changed: 6 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -73,6 +73,9 @@ class AccumulatorAsyncServer(NumaflowServer):
7373
max_threads: The max number of threads to be spawned;
7474
defaults to 4 and max capped at 16
7575
server_info_file: The path to the server info file
76+
shutdown_callback: Callable, executed after loop is stopped, before
77+
cancelling any tasks.
78+
Useful for graceful shutdown.
7679
7780
Example invocation:
7881
```py
@@ -139,13 +142,15 @@ def __init__(
139142
max_message_size=MAX_MESSAGE_SIZE,
140143
max_threads=NUM_THREADS_DEFAULT,
141144
server_info_file=ACCUMULATOR_SERVER_INFO_FILE_PATH,
145+
shutdown_callback=None,
142146
):
143147
init_kwargs = init_kwargs or {}
144148
self.accumulator_handler = get_handler(accumulator_instance, init_args, init_kwargs)
145149
self.sock_path = f"unix://{sock_path}"
146150
self.max_message_size = max_message_size
147151
self.max_threads = min(max_threads, MAX_NUM_THREADS)
148152
self.server_info_file = server_info_file
153+
self.shutdown_callback = shutdown_callback
149154

150155
self._server_options = [
151156
("grpc.max_send_message_length", self.max_message_size),
@@ -162,7 +167,7 @@ def start(self):
162167
_LOGGER.info(
163168
"Starting Async Accumulator Server",
164169
)
165-
aiorun.run(self.aexec(), use_uvloop=True)
170+
aiorun.run(self.aexec(), use_uvloop=True, shutdown_callback=self.shutdown_callback)
166171

167172
async def aexec(self):
168173
"""

packages/pynumaflow/pynumaflow/batchmapper/async_server.py

Lines changed: 7 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -34,6 +34,7 @@ def __init__(
3434
max_message_size=MAX_MESSAGE_SIZE,
3535
max_threads=NUM_THREADS_DEFAULT,
3636
server_info_file=MAP_SERVER_INFO_FILE_PATH,
37+
shutdown_callback=None,
3738
):
3839
"""
3940
Create a new grpc Async Batch Map Server instance.
@@ -46,6 +47,10 @@ def __init__(
4647
max_message_size: The max message size in bytes the server can receive and send
4748
max_threads: The max number of threads to be spawned;
4849
defaults to 4 and max capped at 16
50+
server_info_file: The path to the server info file
51+
shutdown_callback: Callable, executed after loop is stopped, before
52+
cancelling any tasks.
53+
Useful for graceful shutdown.
4954
5055
Example invocation:
5156
```py
@@ -79,6 +84,7 @@ async def handler(
7984
self.max_threads = min(max_threads, MAX_NUM_THREADS)
8085
self.max_message_size = max_message_size
8186
self.server_info_file = server_info_file
87+
self.shutdown_callback = shutdown_callback
8288

8389
self._server_options = [
8490
("grpc.max_send_message_length", self.max_message_size),
@@ -92,7 +98,7 @@ def start(self):
9298
Starter function for the Async Batch Map server, we need a separate caller
9399
to the aexec so that all the async coroutines can be started from a single context
94100
"""
95-
aiorun.run(self.aexec(), use_uvloop=True)
101+
aiorun.run(self.aexec(), use_uvloop=True, shutdown_callback=self.shutdown_callback)
96102

97103
async def aexec(self):
98104
"""

packages/pynumaflow/pynumaflow/mapper/async_server.py

Lines changed: 7 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -60,6 +60,7 @@ def __init__(
6060
max_message_size=MAX_MESSAGE_SIZE,
6161
max_threads=NUM_THREADS_DEFAULT,
6262
server_info_file=MAP_SERVER_INFO_FILE_PATH,
63+
shutdown_callback=None,
6364
):
6465
"""
6566
Create a new grpc Asynchronous Map Server instance.
@@ -72,11 +73,16 @@ def __init__(
7273
max_message_size: The max message size in bytes the server can receive and send
7374
max_threads: The max number of threads to be spawned;
7475
defaults to 4 and max capped at 16
76+
server_info_file: The path to the server info file
77+
shutdown_callback: Callable, executed after loop is stopped, before
78+
cancelling any tasks.
79+
Useful for graceful shutdown.
7580
"""
7681
self.sock_path = f"unix://{sock_path}"
7782
self.max_threads = min(max_threads, MAX_NUM_THREADS)
7883
self.max_message_size = max_message_size
7984
self.server_info_file = server_info_file
85+
self.shutdown_callback = shutdown_callback
8086

8187
self.mapper_instance = mapper_instance
8288

@@ -92,7 +98,7 @@ def start(self) -> None:
9298
Starter function for the Async server class, need a separate caller
9399
so that all the async coroutines can be started from a single context
94100
"""
95-
aiorun.run(self.aexec(), use_uvloop=True)
101+
aiorun.run(self.aexec(), use_uvloop=True, shutdown_callback=self.shutdown_callback)
96102

97103
async def aexec(self) -> None:
98104
"""

packages/pynumaflow/pynumaflow/mapstreamer/async_server.py

Lines changed: 7 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -37,6 +37,7 @@ def __init__(
3737
max_message_size=MAX_MESSAGE_SIZE,
3838
max_threads=NUM_THREADS_DEFAULT,
3939
server_info_file=MAP_SERVER_INFO_FILE_PATH,
40+
shutdown_callback=None,
4041
):
4142
"""
4243
Create a new grpc Async Map Stream Server instance.
@@ -50,6 +51,10 @@ def __init__(
5051
max_threads: The max number of threads to be spawned;
5152
defaults to 4 and max capped at 16
5253
server_type: The type of server to be used
54+
server_info_file: The path to the server info file
55+
shutdown_callback: Callable, executed after loop is stopped, before
56+
cancelling any tasks.
57+
Useful for graceful shutdown.
5358
5459
Example invocation:
5560
```py
@@ -98,6 +103,7 @@ async def map_stream_handler(_: list[str], datum: Datum) -> AsyncIterable[Messag
98103
self.max_threads = min(max_threads, MAX_NUM_THREADS)
99104
self.max_message_size = max_message_size
100105
self.server_info_file = server_info_file
106+
self.shutdown_callback = shutdown_callback
101107

102108
self._server_options = [
103109
("grpc.max_send_message_length", self.max_message_size),
@@ -111,7 +117,7 @@ def start(self):
111117
Starter function for the Async Map Stream server, we need a separate caller
112118
to the aexec so that all the async coroutines can be started from a single context
113119
"""
114-
aiorun.run(self.aexec(), use_uvloop=True)
120+
aiorun.run(self.aexec(), use_uvloop=True, shutdown_callback=self.shutdown_callback)
115121

116122
async def aexec(self):
117123
"""

packages/pynumaflow/pynumaflow/reducer/async_server.py

Lines changed: 7 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -64,6 +64,10 @@ class ReduceAsyncServer(NumaflowServer):
6464
max_message_size: The max message size in bytes the server can receive and send
6565
max_threads: The max number of threads to be spawned;
6666
defaults to 4 and max capped at 16
67+
server_info_file: The path to the server info file
68+
shutdown_callback: Callable, executed after loop is stopped, before
69+
cancelling any tasks.
70+
Useful for graceful shutdown.
6771
Example invocation:
6872
```py
6973
import os
@@ -124,13 +128,15 @@ def __init__(
124128
max_message_size=MAX_MESSAGE_SIZE,
125129
max_threads=NUM_THREADS_DEFAULT,
126130
server_info_file=REDUCE_SERVER_INFO_FILE_PATH,
131+
shutdown_callback=None,
127132
):
128133
init_kwargs = init_kwargs or {}
129134
self.reducer_handler = get_handler(reducer_instance, init_args, init_kwargs)
130135
self.sock_path = f"unix://{sock_path}"
131136
self.max_message_size = max_message_size
132137
self.max_threads = min(max_threads, MAX_NUM_THREADS)
133138
self.server_info_file = server_info_file
139+
self.shutdown_callback = shutdown_callback
134140

135141
self._server_options = [
136142
("grpc.max_send_message_length", self.max_message_size),
@@ -147,7 +153,7 @@ def start(self):
147153
_LOGGER.info(
148154
"Starting Async Reduce Server",
149155
)
150-
aiorun.run(self.aexec(), use_uvloop=True)
156+
aiorun.run(self.aexec(), use_uvloop=True, shutdown_callback=self.shutdown_callback)
151157

152158
async def aexec(self):
153159
"""

packages/pynumaflow/pynumaflow/reducestreamer/async_server.py

Lines changed: 7 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -69,8 +69,11 @@ class ReduceStreamAsyncServer(NumaflowServer):
6969
sock_path: The UNIX socket path to be used for the server
7070
max_message_size: The max message size in bytes the server can receive and send
7171
max_threads: The max number of threads to be spawned;
72-
defaults to 4 and max capped at 16
72+
defaults to 4 and max capped at 16
7373
server_info_file: The path to the server info file
74+
shutdown_callback: Callable, executed after loop is stopped, before
75+
cancelling any tasks.
76+
Useful for graceful shutdown.
7477
7578
Example invocation:
7679
```py
@@ -138,13 +141,15 @@ def __init__(
138141
max_message_size=MAX_MESSAGE_SIZE,
139142
max_threads=NUM_THREADS_DEFAULT,
140143
server_info_file=REDUCE_STREAM_SERVER_INFO_FILE_PATH,
144+
shutdown_callback=None,
141145
):
142146
init_kwargs = init_kwargs or {}
143147
self.reduce_stream_handler = get_handler(reduce_stream_instance, init_args, init_kwargs)
144148
self.sock_path = f"unix://{sock_path}"
145149
self.max_message_size = max_message_size
146150
self.max_threads = min(max_threads, MAX_NUM_THREADS)
147151
self.server_info_file = server_info_file
152+
self.shutdown_callback = shutdown_callback
148153

149154
self._server_options = [
150155
("grpc.max_send_message_length", self.max_message_size),
@@ -161,7 +166,7 @@ def start(self):
161166
_LOGGER.info(
162167
"Starting Async Reduce Stream Server",
163168
)
164-
aiorun.run(self.aexec(), use_uvloop=True)
169+
aiorun.run(self.aexec(), use_uvloop=True, shutdown_callback=self.shutdown_callback)
165170

166171
async def aexec(self):
167172
"""

packages/pynumaflow/pynumaflow/sinker/async_server.py

Lines changed: 7 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -41,6 +41,10 @@ class SinkAsyncServer(NumaflowServer):
4141
max_message_size: The max message size in bytes the server can receive and send
4242
max_threads: The max number of threads to be spawned;
4343
defaults to 4 and max capped at 16
44+
server_info_file: The path to the server info file
45+
shutdown_callback: Callable, executed after loop is stopped, before
46+
cancelling any tasks.
47+
Useful for graceful shutdown.
4448
4549
Example invocation:
4650
```py
@@ -88,6 +92,7 @@ def __init__(
8892
max_message_size=MAX_MESSAGE_SIZE,
8993
max_threads=NUM_THREADS_DEFAULT,
9094
server_info_file=SINK_SERVER_INFO_FILE_PATH,
95+
shutdown_callback=None,
9196
):
9297
# If the container type is fallback sink, then use the fallback sink address and path.
9398
if os.getenv(ENV_UD_CONTAINER_TYPE, "") == UD_CONTAINER_FALLBACK_SINK:
@@ -103,6 +108,7 @@ def __init__(
103108
self.max_threads = min(max_threads, MAX_NUM_THREADS)
104109
self.max_message_size = max_message_size
105110
self.server_info_file = server_info_file
111+
self.shutdown_callback = shutdown_callback
106112

107113
self.sinker_instance = sinker_instance
108114

@@ -118,7 +124,7 @@ def start(self):
118124
Starter function for the Async server class, need a separate caller
119125
so that all the async coroutines can be started from a single context
120126
"""
121-
aiorun.run(self.aexec(), use_uvloop=True)
127+
aiorun.run(self.aexec(), use_uvloop=True, shutdown_callback=self.shutdown_callback)
122128

123129
async def aexec(self):
124130
"""

packages/pynumaflow/pynumaflow/sourcer/async_server.py

Lines changed: 7 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -29,6 +29,7 @@ def __init__(
2929
max_message_size=MAX_MESSAGE_SIZE,
3030
max_threads=NUM_THREADS_DEFAULT,
3131
server_info_file=SOURCE_SERVER_INFO_FILE_PATH,
32+
shutdown_callback=None,
3233
):
3334
"""
3435
Create a new grpc Async Source Server instance.
@@ -41,6 +42,10 @@ def __init__(
4142
max_message_size: The max message size in bytes the server can receive and send
4243
max_threads: The max number of threads to be spawned;
4344
defaults to 4 and max capped at 16
45+
server_info_file: The path to the server info file
46+
shutdown_callback: Callable, executed after loop is stopped, before
47+
cancelling any tasks.
48+
Useful for graceful shutdown.
4449
4550
Example invocation:
4651
```py
@@ -138,6 +143,7 @@ async def partitions_handler(self) -> PartitionsResponse:
138143
self.max_threads = min(max_threads, MAX_NUM_THREADS)
139144
self.max_message_size = max_message_size
140145
self.server_info_file = server_info_file
146+
self.shutdown_callback = shutdown_callback
141147

142148
self.sourcer_instance = sourcer_instance
143149

@@ -153,7 +159,7 @@ def start(self):
153159
Starter function for the Async server class, need a separate caller
154160
so that all the async coroutines can be started from a single context
155161
"""
156-
aiorun.run(self.aexec(), use_uvloop=True)
162+
aiorun.run(self.aexec(), use_uvloop=True, shutdown_callback=self.shutdown_callback)
157163

158164
async def aexec(self):
159165
"""

packages/pynumaflow/pynumaflow/sourcetransformer/async_server.py

Lines changed: 7 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -35,6 +35,10 @@ class SourceTransformAsyncServer(NumaflowServer):
3535
max_message_size: The max message size in bytes the server can receive and send
3636
max_threads: The max number of threads to be spawned;
3737
defaults to 4 and max capped at 16
38+
server_info_file: The path to the server info file
39+
shutdown_callback: Callable, executed after loop is stopped, before
40+
cancelling any tasks.
41+
Useful for graceful shutdown.
3842
3943
4044
Below is a simple User Defined Function example which receives a message, applies the
@@ -96,11 +100,13 @@ def __init__(
96100
max_message_size=MAX_MESSAGE_SIZE,
97101
max_threads=NUM_THREADS_DEFAULT,
98102
server_info_file=SOURCE_TRANSFORMER_SERVER_INFO_FILE_PATH,
103+
shutdown_callback=None,
99104
):
100105
self.sock_path = f"unix://{sock_path}"
101106
self.max_threads = min(max_threads, MAX_NUM_THREADS)
102107
self.max_message_size = max_message_size
103108
self.server_info_file = server_info_file
109+
self.shutdown_callback = shutdown_callback
104110

105111
self.source_transform_instance = source_transform_instance
106112

@@ -115,7 +121,7 @@ def start(self) -> None:
115121
Starter function for the Async server class, need a separate caller
116122
so that all the async coroutines can be started from a single context
117123
"""
118-
aiorun.run(self.aexec(), use_uvloop=True)
124+
aiorun.run(self.aexec(), use_uvloop=True, shutdown_callback=self.shutdown_callback)
119125

120126
async def aexec(self) -> None:
121127
"""

0 commit comments

Comments
 (0)