-
-
Notifications
You must be signed in to change notification settings - Fork 34.2k
Expand file tree
/
Copy path_sync_coordinator.py
More file actions
261 lines (202 loc) · 7.64 KB
/
_sync_coordinator.py
File metadata and controls
261 lines (202 loc) · 7.64 KB
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
259
260
261
"""
Internal synchronization coordinator for the sample profiler.
This module is used internally by the sample profiler to coordinate
the startup of target processes. It should not be called directly by users.
"""
import importlib.util
import os
import sys
import socket
import runpy
import time
import types
from typing import List, NoReturn
class CoordinatorError(Exception):
"""Base exception for coordinator errors."""
pass
class ArgumentError(CoordinatorError):
"""Raised when invalid arguments are provided."""
pass
class SyncError(CoordinatorError):
"""Raised when synchronization with profiler fails."""
pass
class TargetError(CoordinatorError):
"""Raised when target execution fails."""
pass
def _validate_arguments(args: List[str]) -> tuple[int, str, List[str]]:
"""
Validate and parse command line arguments.
Args:
args: Command line arguments including script name
Returns:
Tuple of (sync_port, working_directory, target_args)
Raises:
ArgumentError: If arguments are invalid
"""
if len(args) < 4:
raise ArgumentError(
"Insufficient arguments. Expected: <sync_port> <cwd> <target> [args...]"
)
try:
sync_port = int(args[1])
if not (1 <= sync_port <= 65535):
raise ValueError("Port out of range")
except ValueError as e:
raise ArgumentError(f"Invalid sync port '{args[1]}': {e}") from e
cwd = args[2]
if not os.path.isdir(cwd):
raise ArgumentError(f"Working directory does not exist: {cwd}")
target_args = args[3:]
if not target_args:
raise ArgumentError("No target specified")
return sync_port, cwd, target_args
# Constants for socket communication
_MAX_RETRIES = 3
_INITIAL_RETRY_DELAY_SEC = 0.1
_SOCKET_TIMEOUT_SEC = 2.0
_READY_MESSAGE = b"ready"
def _signal_readiness(sync_port: int) -> None:
"""
Signal readiness to the profiler via TCP socket.
Args:
sync_port: Port number where profiler is listening
Raises:
SyncError: If unable to signal readiness
"""
last_error = None
for attempt in range(_MAX_RETRIES):
try:
# Use context manager for automatic cleanup
with socket.create_connection(("127.0.0.1", sync_port), timeout=_SOCKET_TIMEOUT_SEC) as sock:
sock.send(_READY_MESSAGE)
return
except (socket.error, OSError) as e:
last_error = e
if attempt < _MAX_RETRIES - 1:
# Exponential backoff before retry
time.sleep(_INITIAL_RETRY_DELAY_SEC * (2 ** attempt))
# If we get here, all retries failed
raise SyncError(f"Failed to signal readiness after {_MAX_RETRIES} attempts: {last_error}") from last_error
def _setup_environment(cwd: str) -> None:
"""
Set up the execution environment.
Args:
cwd: Working directory to change to
Raises:
TargetError: If unable to set up environment
"""
try:
os.chdir(cwd)
except OSError as e:
raise TargetError(f"Failed to change to directory {cwd}: {e}") from e
# Add current directory to sys.path if not present (for module imports)
if cwd not in sys.path:
sys.path.insert(0, cwd)
def _execute_module(module_name: str, module_args: List[str]) -> None:
"""
Execute a Python module.
Args:
module_name: Name of the module to execute
module_args: Arguments to pass to the module
Raises:
TargetError: If module execution fails
"""
# Replace sys.argv to match how Python normally runs modules
# When running 'python -m module args', sys.argv is ["__main__.py", "args"]
sys.argv = ["__main__.py"] + module_args
try:
runpy.run_module(module_name, run_name="__main__", alter_sys=True)
except ImportError as e:
raise TargetError(f"Module '{module_name}' not found: {e}") from e
except SystemExit:
# SystemExit is normal for modules
pass
except Exception as e:
raise TargetError(f"Error executing module '{module_name}': {e}") from e
def _execute_script(script_path: str, script_args: List[str], cwd: str) -> None:
"""
Execute a Python script.
Args:
script_path: Path to the script to execute
script_args: Arguments to pass to the script
cwd: Current working directory for path resolution
Raises:
TargetError: If script execution fails
"""
# Make script path absolute if it isn't already
if not os.path.isabs(script_path):
script_path = os.path.join(cwd, script_path)
if not os.path.isfile(script_path):
raise TargetError(f"Script not found: {script_path}")
# Replace sys.argv to match original script call
sys.argv = [script_path] + script_args
try:
with open(script_path, 'rb') as f:
source_code = f.read()
except FileNotFoundError as e:
raise TargetError(f"Script file not found: {script_path}") from e
except PermissionError as e:
raise TargetError(f"Permission denied reading script: {script_path}") from e
try:
main_module = types.ModuleType("__main__")
main_module.__file__ = script_path
main_module.__builtins__ = __builtins__
# gh-140729: Create a __mp_main__ module to allow pickling
sys.modules['__main__'] = sys.modules['__mp_main__'] = main_module
code = compile(source_code, script_path, 'exec', module='__main__')
exec(code, main_module.__dict__)
except SyntaxError as e:
raise TargetError(f"Syntax error in script {script_path}: {e}") from e
except SystemExit:
# SystemExit is normal for scripts
pass
except Exception as e:
raise TargetError(f"Error executing script '{script_path}': {e}") from e
def main() -> NoReturn:
"""
Main coordinator function.
This function coordinates the startup of a target Python process
with the sample profiler by signaling when the process is ready
to be profiled.
"""
try:
# Parse and validate arguments
sync_port, cwd, target_args = _validate_arguments(sys.argv)
# Set up execution environment
_setup_environment(cwd)
# Determine execution type and validate target exists
is_module = target_args[0] == "-m"
if is_module:
if len(target_args) < 2:
raise ArgumentError("Module name required after -m")
module_name = target_args[1]
module_args = target_args[2:]
if importlib.util.find_spec(module_name) is None:
raise TargetError(f"Module not found: {module_name}")
else:
script_path = target_args[0]
script_args = target_args[1:]
# Match the path resolution logic in _execute_script
check_path = script_path if os.path.isabs(script_path) else os.path.join(cwd, script_path)
if not os.path.isfile(check_path):
raise TargetError(f"Script not found: {script_path}")
# Signal readiness to profiler
_signal_readiness(sync_port)
# Execute the target
if is_module:
_execute_module(module_name, module_args)
else:
_execute_script(script_path, script_args, cwd)
except CoordinatorError as e:
print(f"Profiler coordinator error: {e}", file=sys.stderr)
sys.exit(1)
except KeyboardInterrupt:
print("Interrupted", file=sys.stderr)
sys.exit(1)
except Exception as e:
print(f"Unexpected error in profiler coordinator: {e}", file=sys.stderr)
sys.exit(1)
# Normal exit
sys.exit(0)
if __name__ == "__main__":
main()