@@ -2194,6 +2194,100 @@ def test_pipeline_stdout_to_devnull(self):
21942194 self .assertIsNone (result .stdout )
21952195 self .assertEqual (result .returncodes , [0 , 0 ])
21962196
2197+ def test_pipeline_large_data_no_deadlock (self ):
2198+ """Test that large data doesn't cause pipe buffer deadlock.
2199+
2200+ This test verifies that the multiplexed I/O implementation properly
2201+ handles cases where pipe buffers would fill up. Without proper
2202+ multiplexing, this would deadlock because:
2203+ 1. First process outputs large data filling stdout pipe buffer
2204+ 2. Middle process reads some, processes, writes to its stdout
2205+ 3. If stdout pipe buffer fills, middle process blocks on write
2206+ 4. But first process is blocked waiting for middle to read more
2207+ 5. Classic deadlock
2208+
2209+ The test uses data larger than typical pipe buffer size (64KB on Linux)
2210+ to ensure the multiplexed I/O is working correctly.
2211+ """
2212+ # Generate data larger than typical pipe buffer (64KB)
2213+ # Use 256KB to ensure we exceed buffer on most systems
2214+ large_data = 'x' * (256 * 1024 )
2215+
2216+ # Pipeline: input -> double the data -> count chars
2217+ # The middle process outputs twice as much, increasing buffer pressure
2218+ result = subprocess .run_pipeline (
2219+ [sys .executable , '-c' ,
2220+ 'import sys; data = sys.stdin.read(); print(data + data)' ],
2221+ [sys .executable , '-c' ,
2222+ 'import sys; print(len(sys.stdin.read().strip()))' ],
2223+ input = large_data , capture_output = True , text = True , timeout = 30
2224+ )
2225+
2226+ # Original data doubled = 512KB = 524288 chars
2227+ # Second process strips whitespace (removes trailing newline) then counts
2228+ expected_len = 256 * 1024 * 2 # doubled data, newline stripped
2229+ self .assertEqual (result .stdout .strip (), str (expected_len ))
2230+ self .assertEqual (result .returncodes , [0 , 0 ])
2231+
2232+ def test_pipeline_large_data_three_stages (self ):
2233+ """Test large data through a three-stage pipeline.
2234+
2235+ This is a more complex deadlock scenario with three processes,
2236+ where buffer pressure can occur at multiple points.
2237+ """
2238+ # Use 128KB of data
2239+ large_data = 'y' * (128 * 1024 )
2240+
2241+ # Pipeline: input -> uppercase -> add prefix to each line -> count
2242+ # We use line-based processing to create more buffer churn
2243+ result = subprocess .run_pipeline (
2244+ [sys .executable , '-c' ,
2245+ 'import sys; print(sys.stdin.read().upper())' ],
2246+ [sys .executable , '-c' ,
2247+ 'import sys; print("".join("PREFIX:" + line for line in sys.stdin))' ],
2248+ [sys .executable , '-c' ,
2249+ 'import sys; print(len(sys.stdin.read()))' ],
2250+ input = large_data , capture_output = True , text = True , timeout = 30
2251+ )
2252+
2253+ self .assertEqual (result .returncodes , [0 , 0 , 0 ])
2254+ # Just verify we got a reasonable numeric output without deadlock
2255+ output_len = int (result .stdout .strip ())
2256+ self .assertGreater (output_len , len (large_data ))
2257+
2258+ def test_pipeline_large_data_with_stderr (self ):
2259+ """Test large data with stderr output from multiple processes.
2260+
2261+ Ensures stderr collection doesn't interfere with the main data flow
2262+ and doesn't cause deadlocks when multiple processes write stderr.
2263+ """
2264+ # 64KB of data
2265+ data_size = 64 * 1024
2266+ large_data = 'z' * data_size
2267+
2268+ result = subprocess .run_pipeline (
2269+ [sys .executable , '-c' , '''
2270+ import sys
2271+ sys.stderr.write("stage1 processing\\ n")
2272+ data = sys.stdin.read()
2273+ sys.stderr.write(f"stage1 read {len(data)} bytes\\ n")
2274+ print(data)
2275+ ''' ],
2276+ [sys .executable , '-c' , '''
2277+ import sys
2278+ sys.stderr.write("stage2 processing\\ n")
2279+ data = sys.stdin.read()
2280+ sys.stderr.write(f"stage2 read {len(data)} bytes\\ n")
2281+ print(len(data.strip()))
2282+ ''' ],
2283+ input = large_data , capture_output = True , text = True , timeout = 30
2284+ )
2285+
2286+ self .assertEqual (result .stdout .strip (), str (data_size ))
2287+ self .assertIn ('stage1 processing' , result .stderr )
2288+ self .assertIn ('stage2 processing' , result .stderr )
2289+ self .assertEqual (result .returncodes , [0 , 0 ])
2290+
21972291
21982292def _get_test_grp_name ():
21992293 for name_group in ('staff' , 'nogroup' , 'grp' , 'nobody' , 'nfsnobody' ):
0 commit comments