Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
21 changes: 18 additions & 3 deletions Doc/library/subprocess.rst
Original file line number Diff line number Diff line change
Expand Up @@ -803,14 +803,29 @@ Instances of the :class:`Popen` class have the following methods:

.. note::

When the ``timeout`` parameter is not ``None``, then (on POSIX) the
function is implemented using a busy loop (non-blocking call and short
sleeps). Use the :mod:`asyncio` module for an asynchronous wait: see
When ``timeout`` is not ``None`` and the platform supports it, an
efficient event-driven mechanism is used to wait for process termination:

- Linux >= 5.3 uses :func:`os.pidfd_open` + :func:`select.poll`
- macOS and other BSD variants use :func:`select.kqueue` +
``KQ_FILTER_PROC`` + ``KQ_NOTE_EXIT``
- Windows uses ``WaitForSingleObject``

If none of these mechanisms are available, the function falls back to a
busy loop (non-blocking call and short sleeps).

.. note::

Use the :mod:`asyncio` module for an asynchronous wait: see
:class:`asyncio.create_subprocess_exec`.

.. versionchanged:: 3.3
*timeout* was added.

.. versionchanged:: 3.15
if *timeout* is not ``None``, use efficient event-driven implementation
on Linux >= 5.3 and macOS / BSD.

.. method:: Popen.communicate(input=None, timeout=None)

Interact with process: Send data to stdin. Read data from stdout and stderr,
Expand Down
14 changes: 14 additions & 0 deletions Doc/whatsnew/3.15.rst
Original file line number Diff line number Diff line change
Expand Up @@ -743,6 +743,20 @@ ssl

(Contributed by Ron Frederick in :gh:`138252`.)

subprocess
----------

* :meth:`subprocess.Popen.wait`: when ``timeout`` is not ``None`` and the
platform supports it, an efficient event-driven mechanism is used to wait for
process termination:

- Linux >= 5.3 uses :func:`os.pidfd_open` + :func:`select.poll`.
- macOS and other BSD variants use :func:`select.kqueue` + ``KQ_FILTER_PROC`` + ``KQ_NOTE_EXIT``.
- Windows keeps using ``WaitForSingleObject`` (unchanged).

If none of these mechanisms are available, the function falls back to the
traditional busy loop (non-blocking call and short sleeps).
(Contributed by Giampaolo Rodola in :gh:`83069`).

symtable
--------
Expand Down
145 changes: 143 additions & 2 deletions Lib/subprocess.py
Original file line number Diff line number Diff line change
Expand Up @@ -748,6 +748,60 @@ def _use_posix_spawn():
return False


def _can_use_pidfd_open():
# Availability: Linux >= 5.3
if not hasattr(os, "pidfd_open"):
return False
try:
pidfd = os.pidfd_open(os.getpid(), 0)
except OSError as err:
if err.errno in {errno.EMFILE, errno.ENFILE}:
# transitory 'too many open files'
return True
# likely blocked by security policy like SECCOMP (EPERM,
# EACCES, ENOSYS)
return False
else:
os.close(pidfd)
return True


def _can_use_kqueue():
# Availability: macOS, BSD
names = (
"kqueue",
"KQ_EV_ADD",
"KQ_EV_ONESHOT",
"KQ_FILTER_PROC",
"KQ_NOTE_EXIT",
)
if not all(hasattr(select, x) for x in names):
return False
kq = None
try:
kq = select.kqueue()
kev = select.kevent(
os.getpid(),
filter=select.KQ_FILTER_PROC,
flags=select.KQ_EV_ADD | select.KQ_EV_ONESHOT,
fflags=select.KQ_NOTE_EXIT,
)
kq.control([kev], 1, 0)
return True
except OSError as err:
if err.errno in {errno.EMFILE, errno.ENFILE}:
# transitory 'too many open files'
return True
return False
finally:
if kq is not None:
kq.close()


_CAN_USE_PIDFD_OPEN = not _mswindows and _can_use_pidfd_open()
_CAN_USE_KQUEUE = not _mswindows and _can_use_kqueue()


# These are primarily fail-safe knobs for negatives. A True value does not
# guarantee the given libc/syscall API will be used.
_USE_POSIX_SPAWN = _use_posix_spawn()
Expand Down Expand Up @@ -2046,14 +2100,100 @@ def _try_wait(self, wait_flags):
sts = 0
return (pid, sts)

def _wait_pidfd(self, timeout):
"""Wait for PID to terminate using pidfd_open() + poll().
Linux >= 5.3 only.
"""
if not _CAN_USE_PIDFD_OPEN:
return False
try:
pidfd = os.pidfd_open(self.pid, 0)
except OSError:
# May be:
# - ESRCH: no such process
# - EMFILE, ENFILE: too many open files (usually 1024)
# - ENODEV: anonymous inode filesystem not supported
# - EPERM, EACCES, ENOSYS: undocumented; may happen if
# blocked by security policy like SECCOMP
return False

try:
poller = select.poll()
poller.register(pidfd, select.POLLIN)
events = poller.poll(timeout * 1000)
if not events:
raise TimeoutExpired(self.args, timeout)
return True
finally:
os.close(pidfd)

def _wait_kqueue(self, timeout):
"""Wait for PID to terminate using kqueue(). macOS and BSD only."""
if not _CAN_USE_KQUEUE:
return False
try:
kq = select.kqueue()
except OSError:
# likely EMFILE / ENFILE (too many open files)
return False

try:
kev = select.kevent(
self.pid,
filter=select.KQ_FILTER_PROC,
flags=select.KQ_EV_ADD | select.KQ_EV_ONESHOT,
fflags=select.KQ_NOTE_EXIT,
)
try:
events = kq.control([kev], 1, timeout) # wait
except OSError:
return False
else:
if not events:
raise TimeoutExpired(self.args, timeout)
return True
finally:
kq.close()

def _wait(self, timeout):
"""Internal implementation of wait() on POSIX."""
"""Internal implementation of wait() on POSIX.

Uses efficient pidfd_open() + poll() on Linux or kqueue()
on macOS/BSD when available. Falls back to polling
waitpid(WNOHANG) otherwise.
"""
if self.returncode is not None:
return self.returncode

if timeout is not None:
endtime = _time() + timeout
if timeout < 0:
raise TimeoutExpired(self.args, timeout)
started = _time()
endtime = started + timeout

# Try efficient wait first.
if self._wait_pidfd(timeout) or self._wait_kqueue(timeout):
# Process is gone. At this point os.waitpid(pid, 0)
# will return immediately, but in very rare races
# the PID may have been reused.
# os.waitpid(pid, WNOHANG) ensures we attempt a
# non-blocking reap without blocking indefinitely.
with self._waitpid_lock:
if self.returncode is not None:
return self.returncode # Another thread waited.
(pid, sts) = self._try_wait(os.WNOHANG)
assert pid == self.pid or pid == 0
if pid == self.pid:
self._handle_exitstatus(sts)
return self.returncode
# os.waitpid(pid, WNOHANG) returned 0 instead
# of our PID, meaning PID has not yet exited,
# even though poll() / kqueue() said so. Very
# rare and mostly theoretical. Fallback to busy
# polling.
elapsed = _time() - started
endtime -= elapsed

# Enter a busy loop if we have a timeout. This busy loop was
# cribbed from Lib/threading.py in Thread.wait() at r71065.
delay = 0.0005 # 500 us -> initial delay of 1 ms
Expand Down Expand Up @@ -2085,6 +2225,7 @@ def _wait(self, timeout):
# http://bugs.python.org/issue14396.
if pid == self.pid:
self._handle_exitstatus(sts)

return self.returncode


Expand Down
119 changes: 119 additions & 0 deletions Lib/test/test_subprocess.py
Original file line number Diff line number Diff line change
Expand Up @@ -1423,6 +1423,8 @@ def test_wait(self):
def test_wait_timeout(self):
p = subprocess.Popen([sys.executable,
"-c", "import time; time.sleep(0.3)"])
with self.assertRaises(subprocess.TimeoutExpired) as c:
p.wait(timeout=0)
with self.assertRaises(subprocess.TimeoutExpired) as c:
p.wait(timeout=0.0001)
self.assertIn("0.0001", str(c.exception)) # For coverage of __str__.
Expand Down Expand Up @@ -4094,5 +4096,122 @@ def test_broken_pipe_cleanup(self):
self.assertTrue(proc.stdin.closed)



class FastWaitTestCase(BaseTestCase):
"""Tests for efficient (pidfd_open() + poll() / kqueue()) process
waiting in subprocess.Popen.wait().
"""
CAN_USE_PIDFD_OPEN = subprocess._CAN_USE_PIDFD_OPEN
CAN_USE_KQUEUE = subprocess._CAN_USE_KQUEUE
COMMAND = [sys.executable, "-c", "import time; time.sleep(0.3)"]
WAIT_TIMEOUT = 0.0001 # 0.1 ms

def assert_fast_waitpid_error(self, patch_point):
# Emulate a case where pidfd_open() or kqueue() fails.
# Busy-poll wait should be used as fallback.
exc = OSError(errno.EMFILE, os.strerror(errno.EMFILE))
with mock.patch(patch_point, side_effect=exc) as m:
p = subprocess.Popen(self.COMMAND)
with self.assertRaises(subprocess.TimeoutExpired):
p.wait(self.WAIT_TIMEOUT)
self.assertEqual(p.wait(timeout=support.SHORT_TIMEOUT), 0)
self.assertTrue(m.called)

@unittest.skipIf(not CAN_USE_PIDFD_OPEN, reason="needs pidfd_open()")
def test_wait_pidfd_open_error(self):
self.assert_fast_waitpid_error("os.pidfd_open")

@unittest.skipIf(not CAN_USE_KQUEUE, reason="needs kqueue() for proc")
def test_wait_kqueue_error(self):
self.assert_fast_waitpid_error("select.kqueue")

@unittest.skipIf(not CAN_USE_KQUEUE, reason="needs kqueue() for proc")
def test_kqueue_control_error(self):
# Emulate a case where kqueue.control() fails. Busy-poll wait
# should be used as fallback.
p = subprocess.Popen(self.COMMAND)
kq_mock = mock.Mock()
kq_mock.control.side_effect = OSError(
errno.EPERM, os.strerror(errno.EPERM)
)
kq_mock.close = mock.Mock()

with mock.patch("select.kqueue", return_value=kq_mock) as m:
with self.assertRaises(subprocess.TimeoutExpired):
p.wait(self.WAIT_TIMEOUT)
self.assertEqual(p.wait(timeout=support.SHORT_TIMEOUT), 0)
self.assertTrue(m.called)

def assert_wait_race_condition(self, patch_target, real_func):
# Call pidfd_open() / kqueue(), then terminate the process.
# Make sure that the wait call (poll() / kqueue.control())
# still works for a terminated PID.
p = subprocess.Popen(self.COMMAND)

def wrapper(*args, **kwargs):
ret = real_func(*args, **kwargs)
try:
os.kill(p.pid, signal.SIGTERM)
os.waitpid(p.pid, 0)
except OSError:
pass
return ret

with mock.patch(patch_target, side_effect=wrapper) as m:
status = p.wait(timeout=support.SHORT_TIMEOUT)
self.assertTrue(m.called)
self.assertEqual(status, 0)

@unittest.skipIf(not CAN_USE_PIDFD_OPEN, reason="needs pidfd_open()")
def test_pidfd_open_race(self):
self.assert_wait_race_condition("os.pidfd_open", os.pidfd_open)

@unittest.skipIf(not CAN_USE_KQUEUE, reason="needs kqueue() for proc")
def test_kqueue_race(self):
self.assert_wait_race_condition("select.kqueue", select.kqueue)

def assert_notification_without_immediate_reap(self, patch_target):
# Verify fallback to busy polling when poll() / kqueue()
# succeeds, but waitpid(pid, WNOHANG) returns (0, 0).
def waitpid_wrapper(pid, flags):
nonlocal ncalls
ncalls += 1
if ncalls == 1:
return (0, 0)
return real_waitpid(pid, flags)

ncalls = 0
real_waitpid = os.waitpid
with mock.patch.object(subprocess.Popen, patch_target, return_value=True) as m1:
with mock.patch("os.waitpid", side_effect=waitpid_wrapper) as m2:
p = subprocess.Popen(self.COMMAND)
with self.assertRaises(subprocess.TimeoutExpired):
p.wait(self.WAIT_TIMEOUT)
self.assertEqual(p.wait(timeout=support.SHORT_TIMEOUT), 0)
self.assertTrue(m1.called)
self.assertTrue(m2.called)

@unittest.skipIf(not CAN_USE_PIDFD_OPEN, reason="needs pidfd_open()")
def test_pidfd_open_notification_without_immediate_reap(self):
self.assert_notification_without_immediate_reap("_wait_pidfd")

@unittest.skipIf(not CAN_USE_KQUEUE, reason="needs kqueue() for proc")
def test_kqueue_notification_without_immediate_reap(self):
self.assert_notification_without_immediate_reap("_wait_kqueue")

@unittest.skipUnless(
CAN_USE_PIDFD_OPEN or CAN_USE_KQUEUE,
"fast wait mechanism not available"
)
def test_fast_path_avoid_busy_loop(self):
# assert that the busy loop is not called as long as the fast
# wait is available
with mock.patch('time.sleep') as m:
p = subprocess.Popen(self.COMMAND)
with self.assertRaises(subprocess.TimeoutExpired):
p.wait(self.WAIT_TIMEOUT)
self.assertEqual(p.wait(timeout=support.LONG_TIMEOUT), 0)
self.assertFalse(m.called)

if __name__ == "__main__":
unittest.main()
Original file line number Diff line number Diff line change
@@ -0,0 +1 @@
Fix error handling in perf jitdump initialization on memory allocation failure.
Original file line number Diff line number Diff line change
@@ -0,0 +1,7 @@
:meth:`subprocess.Popen.wait`: when ``timeout`` is not ``None``, an efficient
event-driven mechanism now waits for process termination, if available. Linux
>= 5.3 uses :func:`os.pidfd_open` + :func:`select.poll`. macOS and other BSD
variants use :func:`select.kqueue` + ``KQ_FILTER_PROC`` + ``KQ_NOTE_EXIT``.
Windows keeps using ``WaitForSingleObject`` (unchanged). If none of these
mechanisms are available, the function falls back to the traditional busy loop
(non-blocking call and short sleeps). Patch by Giampaolo Rodola.
3 changes: 2 additions & 1 deletion Python/perf_jit_trampoline.c
Original file line number Diff line number Diff line change
Expand Up @@ -1083,7 +1083,8 @@ static void* perf_map_jit_init(void) {
0 // Offset 0 (first page)
);

if (perf_jit_map_state.mapped_buffer == NULL) {
if (perf_jit_map_state.mapped_buffer == MAP_FAILED) {
perf_jit_map_state.mapped_buffer = NULL;
close(fd);
return NULL; // Memory mapping failed
}
Expand Down
Loading