Skip to content

Commit a268dd0

Browse files
committed
fix: can't pickle mp.Queue -> mp.Manager().Queue()
1 parent 50c3a40 commit a268dd0

File tree

3 files changed

+15
-6
lines changed

3 files changed

+15
-6
lines changed

requirements_dev.txt

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -1,2 +1,2 @@
1-
moto[all]
1+
moto[all,server]
22
pytest

taskqueue/taskqueue.py

Lines changed: 1 addition & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -513,8 +513,6 @@ def soloprocess_upload(QueueClass, queue_name, tasks):
513513
tq = QueueClass(queue_name, progress=False)
514514
return tq.insert(tasks, skip_insert_counter=True)
515515

516-
error_queue = mp.Queue()
517-
518516
def multiprocess_upload(QueueClass, queue_name, tasks, parallel=True, total=None):
519517
if parallel is True:
520518
parallel = mp.cpu_count()
@@ -570,6 +568,7 @@ def capturing_soloprocess_upload(*args, **kwargs):
570568
# Don't fork, spawn entirely new processes. This
571569
# avoids accidental deadlocks.
572570
mp.set_start_method("spawn", force=True)
571+
error_queue = mp.Manager().Queue()
573572

574573
ct = 0
575574
with tqdm(desc="Upload", total=total) as pbar:

test/test_taskqueue.py

Lines changed: 13 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -4,7 +4,6 @@
44
import time
55

66
from moto import mock_aws
7-
87
from six.moves import range
98
import pytest
109

@@ -18,7 +17,7 @@
1817
from taskqueue.queueables import totask
1918
from taskqueue.queueablefns import tofunc, UnregisteredFunctionError, func2task
2019

21-
@pytest.fixture(scope='function')
20+
@pytest.fixture(scope='session')
2221
def aws_credentials():
2322
"""Mocked AWS Credentials for moto."""
2423
os.environ['AWS_ACCESS_KEY_ID'] = 'testing'
@@ -35,6 +34,17 @@ def sqs(aws_credentials):
3534
client.create_queue(QueueName='test-pull-queue')
3635
yield client
3736

37+
@pytest.fixture(scope='session')
38+
def sqs_server(aws_credentials):
39+
from moto.server import ThreadedMotoServer
40+
server = ThreadedMotoServer(port=0)
41+
server.start()
42+
host, port = server.get_host_and_port()
43+
os.environ['AWS_ENDPOINT_URL'] = f"http://{host}:{port}"
44+
yield
45+
del os.environ['AWS_ENDPOINT_URL']
46+
server.stop()
47+
3848
QURLS = {
3949
'sqs': 'test-pull-queue',
4050
'fq': '/tmp/removeme/taskqueue/fq',
@@ -269,7 +279,7 @@ def test_local_taskqueue():
269279
assert tq.insert(epts) == 200
270280

271281
@pytest.mark.parametrize('protocol', PROTOCOL)
272-
def test_parallel_insert_all(sqs, protocol):
282+
def test_parallel_insert_all(sqs, sqs_server, protocol):
273283
import pathos_issue
274284

275285
path = getpath(protocol)

0 commit comments

Comments
 (0)