33import io
44import os
55import pickle
6+ import struct
67import subprocess
78import sys
89import unittest
@@ -83,16 +84,27 @@ def have_python_version(py_version):
8384 return py_executable_map .get (py_version , None )
8485
8586
86- @support .requires_resource ('cpu' )
87+ def read_exact (f , n ):
88+ buf = b''
89+ while len (buf ) < n :
90+ chunk = f .read (n - len (buf ))
91+ if not chunk :
92+ raise EOFError
93+ buf += chunk
94+ return buf
95+
96+
8797class AbstractCompatTests (pickletester .AbstractPickleTests ):
8898 py_version = None
99+ worker = None
89100
90101 @classmethod
91102 def setUpClass (cls ):
92103 assert cls .py_version is not None , 'Needs a python version tuple'
93104 if not have_python_version (cls .py_version ):
94105 py_version_str = "." .join (map (str , cls .py_version ))
95106 raise unittest .SkipTest (f'Python { py_version_str } not available' )
107+ cls .addClassCleanup (cls .close_worker )
96108 # Override the default pickle protocol to match what xpickle worker
97109 # will be running.
98110 highest_protocol = highest_proto_for_py_version (cls .py_version )
@@ -101,8 +113,31 @@ def setUpClass(cls):
101113 cls .enterClassContext (support .swap_attr (pickle , 'HIGHEST_PROTOCOL' ,
102114 highest_protocol ))
103115
104- @staticmethod
105- def send_to_worker (python , data ):
116+ @classmethod
117+ def start_worker (cls ):
118+ target = os .path .join (os .path .dirname (__file__ ), 'xpickle_worker.py' )
119+ worker = subprocess .Popen ([* python , target ],
120+ stdin = subprocess .PIPE ,
121+ stdout = subprocess .PIPE ,
122+ stderr = subprocess .PIPE ,
123+ # For windows bpo-17023.
124+ shell = is_windows )
125+ cls .worker = worker
126+
127+ @classmethod
128+ def close_worker (cls ):
129+ worker = cls .worker
130+ if worker is None :
131+ return
132+ cls .worker = None
133+ worker .stdin .close ()
134+ worker .stdout .close ()
135+ worker .stderr .close ()
136+ worker .terminate ()
137+ worker .wait ()
138+
139+ @classmethod
140+ def send_to_worker (cls , python , data ):
106141 """Bounce a pickled object through another version of Python.
107142 This will send data to a child process where it will
108143 be unpickled, then repickled and sent back to the parent process.
@@ -112,33 +147,40 @@ def send_to_worker(python, data):
112147 Returns:
113148 The pickled data received from the child process.
114149 """
115- target = os . path . join ( os . path . dirname ( __file__ ), 'xpickle_worker.py' )
116- worker = subprocess . Popen ([ * python , target ],
117- stdin = subprocess . PIPE ,
118- stdout = subprocess .PIPE ,
119- stderr = subprocess .PIPE ,
120- # For windows bpo-17023.
121- shell = is_windows )
122- stdout , stderr = worker . communicate ( data )
123- if worker . returncode == 0 :
124- return stdout
125- # if the worker fails, it will write the exception to stdout
150+ worker = cls . worker
151+ if worker is None :
152+ target = os . path . join ( os . path . dirname ( __file__ ), 'xpickle_worker.py' )
153+ worker = subprocess .Popen ([ * python , target ] ,
154+ stdin = subprocess .PIPE ,
155+ stdout = subprocess . PIPE ,
156+ stderr = subprocess . PIPE ,
157+ # For windows bpo-17023.
158+ shell = is_windows )
159+ cls . worker = worker
160+
126161 try :
127- exception = pickle .loads (stdout )
128- except (pickle .UnpicklingError , EOFError ):
162+ worker .stdin .write (struct .pack ('!i' , len (data )) + data )
163+ worker .stdin .flush ()
164+
165+ size , = struct .unpack ('!i' , read_exact (worker .stdout , 4 ))
166+ if size > 0 :
167+ return read_exact (worker .stdout , size )
168+ # if the worker fails, it will write the exception to stdout
169+ if size < 0 :
170+ stdout = read_exact (worker .stdout , - size )
171+ try :
172+ exception = pickle .loads (stdout )
173+ except (pickle .UnpicklingError , EOFError ):
174+ pass
175+ else :
176+ if isinstance (exception , Exception ):
177+ # To allow for tests which test for errors.
178+ raise exception
179+ _ , stderr = worker .communicate ()
129180 raise RuntimeError (stderr )
130- else :
131- if support .verbose > 1 :
132- print ()
133- print (f'{ data = } ' )
134- print (f'{ stdout = } ' )
135- print (f'{ stderr = } ' )
136- if isinstance (exception , Exception ):
137- # To allow for tests which test for errors.
138- raise exception
139- else :
140- raise RuntimeError (stderr )
141-
181+ except :
182+ cls .close_worker ()
183+ raise
142184
143185 def dumps (self , arg , proto = 0 , ** kwargs ):
144186 # Skip tests that require buffer_callback arguments since
@@ -148,9 +190,8 @@ def dumps(self, arg, proto=0, **kwargs):
148190 self .skipTest ('Test does not support "buffer_callback" argument.' )
149191 f = io .BytesIO ()
150192 p = self .pickler (f , proto , ** kwargs )
151- p .dump ((proto , arg ))
152- f .seek (0 )
153- data = bytes (f .read ())
193+ p .dump (arg )
194+ data = struct .pack ('!i' , proto ) + f .getvalue ()
154195 python = py_executable_map [self .py_version ]
155196 return self .send_to_worker (python , data )
156197
0 commit comments