66import time
77import unittest
88import weakref
9- from test import support
9+ from test . support import gc_collect
1010from test .support import import_helper
1111from test .support import threading_helper
1212
@@ -420,11 +420,12 @@ class BaseSimpleQueueTest:
420420 def setUp (self ):
421421 self .q = self .type2test ()
422422
423- def feed (self , q , seq , rnd ):
423+ def feed (self , q , seq , rnd , sentinel ):
424424 while True :
425425 try :
426426 val = seq .pop ()
427427 except IndexError :
428+ q .put (sentinel )
428429 return
429430 q .put (val )
430431 if rnd .random () > 0.5 :
@@ -463,11 +464,10 @@ def consume_timeout(self, q, results, sentinel):
463464 return
464465 results .append (val )
465466
466- def run_threads (self , n_feeders , n_consumers , q , inputs ,
467- feed_func , consume_func ):
467+ def run_threads (self , n_threads , q , inputs , feed_func , consume_func ):
468468 results = []
469469 sentinel = None
470- seq = inputs + [ sentinel ] * n_consumers
470+ seq = inputs . copy ()
471471 seq .reverse ()
472472 rnd = random .Random (42 )
473473
@@ -481,11 +481,11 @@ def wrapper(*args, **kwargs):
481481 return wrapper
482482
483483 feeders = [threading .Thread (target = log_exceptions (feed_func ),
484- args = (q , seq , rnd ))
485- for i in range (n_feeders )]
484+ args = (q , seq , rnd , sentinel ))
485+ for i in range (n_threads )]
486486 consumers = [threading .Thread (target = log_exceptions (consume_func ),
487487 args = (q , results , sentinel ))
488- for i in range (n_consumers )]
488+ for i in range (n_threads )]
489489
490490 with threading_helper .start_threads (feeders + consumers ):
491491 pass
@@ -543,7 +543,7 @@ def test_order(self):
543543 # Test a pair of concurrent put() and get()
544544 q = self .q
545545 inputs = list (range (100 ))
546- results = self .run_threads (1 , 1 , q , inputs , self .feed , self .consume )
546+ results = self .run_threads (1 , q , inputs , self .feed , self .consume )
547547
548548 # One producer, one consumer => results appended in well-defined order
549549 self .assertEqual (results , inputs )
@@ -553,7 +553,7 @@ def test_many_threads(self):
553553 N = 50
554554 q = self .q
555555 inputs = list (range (10000 ))
556- results = self .run_threads (N , N , q , inputs , self .feed , self .consume )
556+ results = self .run_threads (N , q , inputs , self .feed , self .consume )
557557
558558 # Multiple consumers without synchronization append the
559559 # results in random order
@@ -564,7 +564,7 @@ def test_many_threads_nonblock(self):
564564 N = 50
565565 q = self .q
566566 inputs = list (range (10000 ))
567- results = self .run_threads (N , N , q , inputs ,
567+ results = self .run_threads (N , q , inputs ,
568568 self .feed , self .consume_nonblock )
569569
570570 self .assertEqual (sorted (results ), inputs )
@@ -574,7 +574,7 @@ def test_many_threads_timeout(self):
574574 N = 50
575575 q = self .q
576576 inputs = list (range (1000 ))
577- results = self .run_threads (N , N , q , inputs ,
577+ results = self .run_threads (N , q , inputs ,
578578 self .feed , self .consume_timeout )
579579
580580 self .assertEqual (sorted (results ), inputs )
@@ -591,6 +591,7 @@ class C:
591591 q .put (C ())
592592 for i in range (N ):
593593 wr = weakref .ref (q .get ())
594+ gc_collect () # For PyPy or other GCs.
594595 self .assertIsNone (wr ())
595596
596597
0 commit comments