|
104 | 104 | import socket |
105 | 105 | import socketserver |
106 | 106 | import sys |
| 107 | +import threading |
107 | 108 | import time |
108 | 109 | import urllib.parse |
109 | 110 |
|
|
136 | 137 |
|
137 | 138 | # Data larger than this will be read in chunks, to prevent extreme |
138 | 139 | # overallocation. |
139 | | -_MIN_READ_BUF_SIZE = 1 << 20 |
| 140 | +_READ_BUF_SIZE = 1 << 20 |
140 | 141 |
|
141 | 142 | class HTTPServer(socketserver.TCPServer): |
142 | 143 |
|
@@ -1287,30 +1288,62 @@ def run_cgi(self): |
1287 | 1288 | stderr=subprocess.PIPE, |
1288 | 1289 | env = env |
1289 | 1290 | ) |
| 1291 | + def finish_request(): |
| 1292 | + # throw away additional data [see bug #427345, gh-34546] |
| 1293 | + while select.select([self.rfile._sock], [], [], 0)[0]: |
| 1294 | + if not self.rfile._sock.recv(1): |
| 1295 | + break |
1290 | 1296 | if self.command.lower() == "post" and nbytes > 0: |
1291 | | - cursize = 0 |
1292 | | - data = self.rfile.read(min(nbytes, _MIN_READ_BUF_SIZE)) |
1293 | | - while (len(data) < nbytes and len(data) != cursize and |
1294 | | - select.select([self.rfile._sock], [], [], 0)[0]): |
1295 | | - cursize = len(data) |
1296 | | - # This is a geometric increase in read size (never more |
1297 | | - # than doubling our the current length of data per loop |
1298 | | - # iteration). |
1299 | | - delta = min(cursize, nbytes - cursize) |
1300 | | - data += self.rfile.read(delta) |
| 1297 | + def _in_task(): |
| 1298 | + """Pipe the input into the process stdin""" |
| 1299 | + bytes_left = nbytes |
| 1300 | + # We need to wait until either there's new data in rfile, |
| 1301 | + # or the process has exited. |
| 1302 | + # This spins (with short sleeps) polling for process exit. |
| 1303 | + TIMEOUT = 0.1 |
| 1304 | + while ( |
| 1305 | + bytes_left |
| 1306 | + and not p.returncode |
| 1307 | + and select.select([self.rfile._sock], [], [], TIMEOUT)[0] |
| 1308 | + ): |
| 1309 | + data = self.rfile.read(min(bytes_left, _READ_BUF_SIZE)) |
| 1310 | + if not data: |
| 1311 | + break |
| 1312 | + bytes_left -= len(data) |
| 1313 | + p.stdin.write(data) |
| 1314 | + finish_request() |
| 1315 | + try: |
| 1316 | + p.stdin.close() |
| 1317 | + except OSError: |
| 1318 | + # already closed |
| 1319 | + pass |
| 1320 | + request_relay_thread = threading.Thread(target=_in_task) |
| 1321 | + request_relay_thread.start() |
1301 | 1322 | else: |
1302 | 1323 | data = None |
1303 | | - # throw away additional data [see bug #427345] |
1304 | | - while select.select([self.rfile._sock], [], [], 0)[0]: |
1305 | | - if not self.rfile._sock.recv(1): |
1306 | | - break |
1307 | | - stdout, stderr = p.communicate(data) |
1308 | | - self.wfile.write(stdout) |
1309 | | - if stderr: |
1310 | | - self.log_error('%s', stderr) |
1311 | | - p.stderr.close() |
| 1324 | + finish_request() |
| 1325 | + request_relay_thread = None |
| 1326 | + def _out_task(): |
| 1327 | + """Pipe the process's stdout into the socket""" |
| 1328 | + while data := p.stdout.read(_READ_BUF_SIZE): |
| 1329 | + self.wfile.write(data) |
| 1330 | + response_relay_thread = threading.Thread(target=_out_task) |
| 1331 | + response_relay_thread.start() |
| 1332 | + stderr_chunks = [] |
| 1333 | + def _err_task(): |
| 1334 | + """Collect all of stderr, to log as single message""" |
| 1335 | + while data := p.stderr.read(_READ_BUF_SIZE): |
| 1336 | + stderr_chunks.append(data) |
| 1337 | + error_log_thread = threading.Thread(target=_err_task) |
| 1338 | + error_log_thread.start() |
| 1339 | + status = p.wait() |
| 1340 | + response_relay_thread.join() |
1312 | 1341 | p.stdout.close() |
1313 | | - status = p.returncode |
| 1342 | + error_log_thread.join() |
| 1343 | + self.log_error('%s', b''.join(stderr_chunks)) |
| 1344 | + p.stderr.close() |
| 1345 | + if request_relay_thread: |
| 1346 | + request_relay_thread.join() |
1314 | 1347 | if status: |
1315 | 1348 | self.log_error("CGI script exit status %#x", status) |
1316 | 1349 | else: |
|
0 commit comments