Skip to content

Commit e3a2fbe

Browse files
gpsheadclaude
andcommitted
Add subprocess.run_pipeline() for command pipe chaining
Add a new run_pipeline() function to the subprocess module that enables running multiple commands connected via pipes, similar to shell pipelines. New API: - run_pipeline(*commands, ...) - Run a pipeline of commands - PipelineResult - Return type with commands, returncodes, stdout, stderr - PipelineError - Raised when check=True and any command fails Features: - Supports arbitrary number of commands (minimum 2) - capture_output, input, timeout, and check parameters like run() - stdin= connects to first process, stdout= connects to last process - Text mode support via text=True, encoding, errors - All processes share a single stderr pipe for simplicity - "pipefail" semantics: check=True fails if any command fails Unlike run(), this function does not accept universal_newlines. Use text=True instead. Example: result = subprocess.run_pipeline( ['cat', 'file.txt'], ['grep', 'pattern'], ['wc', '-l'], capture_output=True, text=True ) Co-authored-by: Claude Opus 4.5 <noreply@anthropic.com>
1 parent cfcd524 commit e3a2fbe

File tree

2 files changed

+514
-1
lines changed

2 files changed

+514
-1
lines changed

Lib/subprocess.py

Lines changed: 303 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -62,7 +62,8 @@
6262

6363
__all__ = ["Popen", "PIPE", "STDOUT", "call", "check_call", "getstatusoutput",
6464
"getoutput", "check_output", "run", "CalledProcessError", "DEVNULL",
65-
"SubprocessError", "TimeoutExpired", "CompletedProcess"]
65+
"SubprocessError", "TimeoutExpired", "CompletedProcess",
66+
"run_pipeline", "PipelineResult", "PipelineError"]
6667
# NOTE: We intentionally exclude list2cmdline as it is
6768
# considered an internal implementation detail. issue10838.
6869

@@ -194,6 +195,36 @@ def stdout(self, value):
194195
self.output = value
195196

196197

198+
class PipelineError(SubprocessError):
199+
"""Raised when run_pipeline() is called with check=True and one or more
200+
commands in the pipeline return a non-zero exit status.
201+
202+
Attributes:
203+
commands: List of commands in the pipeline (each a list of strings).
204+
returncodes: List of return codes corresponding to each command.
205+
stdout: Standard output from the final command (if captured).
206+
stderr: Standard error output (if captured).
207+
failed: List of (index, command, returncode) tuples for failed commands.
208+
"""
209+
def __init__(self, commands, returncodes, stdout=None, stderr=None):
210+
self.commands = commands
211+
self.returncodes = returncodes
212+
self.stdout = stdout
213+
self.stderr = stderr
214+
self.failed = [
215+
(i, cmd, rc)
216+
for i, (cmd, rc) in enumerate(zip(commands, returncodes))
217+
if rc != 0
218+
]
219+
220+
def __str__(self):
221+
failed_info = ", ".join(
222+
f"command {i} {cmd!r} returned {rc}"
223+
for i, cmd, rc in self.failed
224+
)
225+
return f"Pipeline failed: {failed_info}"
226+
227+
197228
if _mswindows:
198229
class STARTUPINFO:
199230
def __init__(self, *, dwFlags=0, hStdInput=None, hStdOutput=None,
@@ -508,6 +539,47 @@ def check_returncode(self):
508539
self.stderr)
509540

510541

542+
class PipelineResult:
543+
"""A pipeline of processes that have finished running.
544+
545+
This is returned by run_pipeline().
546+
547+
Attributes:
548+
commands: List of commands in the pipeline (each command is a list).
549+
returncodes: List of return codes for each command in the pipeline.
550+
returncode: The return code of the final command (for convenience).
551+
stdout: The standard output of the final command (None if not captured).
552+
stderr: The standard error output (None if not captured).
553+
"""
554+
def __init__(self, commands, returncodes, stdout=None, stderr=None):
555+
self.commands = list(commands)
556+
self.returncodes = list(returncodes)
557+
self.stdout = stdout
558+
self.stderr = stderr
559+
560+
@property
561+
def returncode(self):
562+
"""Return the exit code of the final command in the pipeline."""
563+
return self.returncodes[-1] if self.returncodes else None
564+
565+
def __repr__(self):
566+
args = [f'commands={self.commands!r}',
567+
f'returncodes={self.returncodes!r}']
568+
if self.stdout is not None:
569+
args.append(f'stdout={self.stdout!r}')
570+
if self.stderr is not None:
571+
args.append(f'stderr={self.stderr!r}')
572+
return f"{type(self).__name__}({', '.join(args)})"
573+
574+
__class_getitem__ = classmethod(types.GenericAlias)
575+
576+
def check_returncodes(self):
577+
"""Raise PipelineError if any command's exit code is non-zero."""
578+
if any(rc != 0 for rc in self.returncodes):
579+
raise PipelineError(self.commands, self.returncodes,
580+
self.stdout, self.stderr)
581+
582+
511583
def run(*popenargs,
512584
input=None, capture_output=False, timeout=None, check=False, **kwargs):
513585
"""Run command with arguments and return a CompletedProcess instance.
@@ -578,6 +650,236 @@ def run(*popenargs,
578650
return CompletedProcess(process.args, retcode, stdout, stderr)
579651

580652

653+
def run_pipeline(*commands, input=None, capture_output=False, timeout=None,
654+
check=False, **kwargs):
655+
"""Run a pipeline of commands connected via pipes.
656+
657+
Each positional argument should be a command (list of strings or a string
658+
if shell=True) to execute. The stdout of each command is connected to the
659+
stdin of the next command in the pipeline, similar to shell pipelines.
660+
661+
Returns a PipelineResult instance with attributes commands, returncodes,
662+
stdout, and stderr. By default, stdout and stderr are not captured, and
663+
those attributes will be None. Pass capture_output=True to capture both
664+
the final command's stdout and stderr from all commands.
665+
666+
If check is True and any command's exit code is non-zero, it raises a
667+
PipelineError. This is similar to shell "pipefail" behavior.
668+
669+
If timeout (seconds) is given and the pipeline takes too long, a
670+
TimeoutExpired exception will be raised and all processes will be killed.
671+
672+
The optional "input" argument allows passing bytes or a string to the
673+
first command's stdin. If you use this argument, you may not also specify
674+
stdin in kwargs.
675+
676+
By default, all communication is in bytes. Use text=True, encoding, or
677+
errors to enable text mode, which affects the input argument and stdout/
678+
stderr outputs.
679+
680+
.. note::
681+
When using text=True with capture_output=True or stderr=PIPE, be aware
682+
that stderr output from multiple processes may be interleaved in ways
683+
that produce invalid character sequences when decoded. For reliable
684+
text decoding, avoid text=True when capturing stderr from pipelines,
685+
or handle decoding errors appropriately.
686+
687+
Other keyword arguments are passed to each Popen call, except for stdin,
688+
stdout which are managed by the pipeline.
689+
690+
Example:
691+
# Equivalent to: cat file.txt | grep pattern | wc -l
692+
result = run_pipeline(
693+
['cat', 'file.txt'],
694+
['grep', 'pattern'],
695+
['wc', '-l'],
696+
capture_output=True, text=True
697+
)
698+
print(result.stdout) # "42\\n"
699+
print(result.returncodes) # [0, 0, 0]
700+
"""
701+
if len(commands) < 2:
702+
raise ValueError('run_pipeline requires at least 2 commands')
703+
704+
# Reject universal_newlines - use text= instead
705+
if kwargs.get('universal_newlines') is not None:
706+
raise TypeError(
707+
"run_pipeline() does not support 'universal_newlines'. "
708+
"Use 'text=True' instead."
709+
)
710+
711+
# Validate no conflicting arguments
712+
if input is not None:
713+
if kwargs.get('stdin') is not None:
714+
raise ValueError('stdin and input arguments may not both be used.')
715+
716+
if capture_output:
717+
if kwargs.get('stdout') is not None or kwargs.get('stderr') is not None:
718+
raise ValueError('stdout and stderr arguments may not be used '
719+
'with capture_output.')
720+
721+
# Determine stderr handling - all processes share the same stderr pipe
722+
# When capturing, we create one pipe and all processes write to it
723+
stderr_arg = kwargs.pop('stderr', None)
724+
capture_stderr = capture_output or stderr_arg == PIPE
725+
726+
# stdin is for the first process, stdout is for the last process
727+
stdin_arg = kwargs.pop('stdin', None)
728+
stdout_arg = kwargs.pop('stdout', None)
729+
730+
processes = []
731+
stderr_read_fd = None # Read end of shared stderr pipe (for parent)
732+
stderr_write_fd = None # Write end of shared stderr pipe (for children)
733+
734+
try:
735+
# Create a single stderr pipe that all processes will share
736+
if capture_stderr:
737+
stderr_read_fd, stderr_write_fd = os.pipe()
738+
739+
for i, cmd in enumerate(commands):
740+
is_first = (i == 0)
741+
is_last = (i == len(commands) - 1)
742+
743+
# Determine stdin for this process
744+
if is_first:
745+
if input is not None:
746+
proc_stdin = PIPE
747+
else:
748+
proc_stdin = stdin_arg # Could be None, PIPE, fd, or file
749+
else:
750+
proc_stdin = processes[-1].stdout
751+
752+
# Determine stdout for this process
753+
if is_last:
754+
if capture_output:
755+
proc_stdout = PIPE
756+
else:
757+
proc_stdout = stdout_arg # Could be None, PIPE, fd, or file
758+
else:
759+
proc_stdout = PIPE
760+
761+
# All processes share the same stderr pipe (write end)
762+
if capture_stderr:
763+
proc_stderr = stderr_write_fd
764+
else:
765+
proc_stderr = stderr_arg
766+
767+
proc = Popen(cmd, stdin=proc_stdin, stdout=proc_stdout,
768+
stderr=proc_stderr, **kwargs)
769+
processes.append(proc)
770+
771+
# Close the parent's copy of the previous process's stdout
772+
# to allow the pipe to signal EOF when the previous process exits
773+
if not is_first and processes[-2].stdout is not None:
774+
processes[-2].stdout.close()
775+
776+
# Close the write end of stderr pipe in parent - children have it
777+
if stderr_write_fd is not None:
778+
os.close(stderr_write_fd)
779+
stderr_write_fd = None
780+
781+
first_proc = processes[0]
782+
last_proc = processes[-1]
783+
784+
# Handle communication with timeout
785+
start_time = _time() if timeout is not None else None
786+
787+
# Write input to first process if provided
788+
if input is not None and first_proc.stdin is not None:
789+
try:
790+
first_proc.stdin.write(input)
791+
except BrokenPipeError:
792+
pass # First process may have exited early
793+
finally:
794+
first_proc.stdin.close()
795+
796+
# Determine if we're in text mode
797+
text_mode = kwargs.get('text') or kwargs.get('encoding') or kwargs.get('errors')
798+
799+
# Read output from the last process
800+
stdout = None
801+
stderr = None
802+
803+
# Read stdout if we created a pipe for it (capture_output or stdout=PIPE)
804+
if last_proc.stdout is not None:
805+
stdout = last_proc.stdout.read()
806+
807+
# Read stderr from the shared pipe
808+
if stderr_read_fd is not None:
809+
stderr = os.read(stderr_read_fd, 1024 * 1024 * 10) # Up to 10MB
810+
# Keep reading until EOF
811+
while True:
812+
chunk = os.read(stderr_read_fd, 65536)
813+
if not chunk:
814+
break
815+
stderr += chunk
816+
817+
# Calculate remaining timeout
818+
def remaining_timeout():
819+
if timeout is None:
820+
return None
821+
elapsed = _time() - start_time
822+
remaining = timeout - elapsed
823+
if remaining <= 0:
824+
raise TimeoutExpired(commands, timeout, stdout, stderr)
825+
return remaining
826+
827+
# Wait for all processes to complete
828+
returncodes = []
829+
for proc in processes:
830+
try:
831+
proc.wait(timeout=remaining_timeout())
832+
except TimeoutExpired:
833+
# Kill all processes on timeout
834+
for p in processes:
835+
if p.poll() is None:
836+
p.kill()
837+
for p in processes:
838+
p.wait()
839+
raise TimeoutExpired(commands, timeout, stdout, stderr)
840+
returncodes.append(proc.returncode)
841+
842+
# Handle text mode conversion for stderr (stdout is already handled
843+
# by Popen when text=True). stderr is always read as bytes since
844+
# we use os.pipe() directly.
845+
if text_mode and stderr is not None:
846+
encoding = kwargs.get('encoding')
847+
errors = kwargs.get('errors', 'strict')
848+
if encoding is None:
849+
encoding = locale.getencoding()
850+
stderr = stderr.decode(encoding, errors)
851+
852+
result = PipelineResult(commands, returncodes, stdout, stderr)
853+
854+
if check and any(rc != 0 for rc in returncodes):
855+
raise PipelineError(commands, returncodes, stdout, stderr)
856+
857+
return result
858+
859+
finally:
860+
# Ensure all processes are cleaned up
861+
for proc in processes:
862+
if proc.poll() is None:
863+
proc.kill()
864+
proc.wait()
865+
# Close any open file handles
866+
if proc.stdin and not proc.stdin.closed:
867+
proc.stdin.close()
868+
if proc.stdout and not proc.stdout.closed:
869+
proc.stdout.close()
870+
# Close stderr pipe file descriptors
871+
if stderr_read_fd is not None:
872+
try:
873+
os.close(stderr_read_fd)
874+
except OSError:
875+
pass
876+
if stderr_write_fd is not None:
877+
try:
878+
os.close(stderr_write_fd)
879+
except OSError:
880+
pass
881+
882+
581883
def list2cmdline(seq):
582884
"""
583885
Translate a sequence of arguments into a command line

0 commit comments

Comments
 (0)