Skip to content

Commit a73c932

Browse files
committed
fix: can't pickle mp.Queue -> mp.Manager().Queue()
1 parent 50c3a40 commit a73c932

File tree

2 files changed

+18
-5
lines changed

2 files changed

+18
-5
lines changed

taskqueue/taskqueue.py

Lines changed: 1 addition & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -513,8 +513,6 @@ def soloprocess_upload(QueueClass, queue_name, tasks):
513513
tq = QueueClass(queue_name, progress=False)
514514
return tq.insert(tasks, skip_insert_counter=True)
515515

516-
error_queue = mp.Queue()
517-
518516
def multiprocess_upload(QueueClass, queue_name, tasks, parallel=True, total=None):
519517
if parallel is True:
520518
parallel = mp.cpu_count()
@@ -570,6 +568,7 @@ def capturing_soloprocess_upload(*args, **kwargs):
570568
# Don't fork, spawn entirely new processes. This
571569
# avoids accidental deadlocks.
572570
mp.set_start_method("spawn", force=True)
571+
error_queue = mp.Manager().Queue()
573572

574573
ct = 0
575574
with tqdm(desc="Upload", total=total) as pbar:

test/test_taskqueue.py

Lines changed: 17 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -4,7 +4,6 @@
44
import time
55

66
from moto import mock_aws
7-
87
from six.moves import range
98
import pytest
109

@@ -18,7 +17,7 @@
1817
from taskqueue.queueables import totask
1918
from taskqueue.queueablefns import tofunc, UnregisteredFunctionError, func2task
2019

21-
@pytest.fixture(scope='function')
20+
@pytest.fixture(scope='session')
2221
def aws_credentials():
2322
"""Mocked AWS Credentials for moto."""
2423
os.environ['AWS_ACCESS_KEY_ID'] = 'testing'
@@ -35,6 +34,21 @@ def sqs(aws_credentials):
3534
client.create_queue(QueueName='test-pull-queue')
3635
yield client
3736

37+
@pytest.fixture(scope='session')
38+
def sqs_server(aws_credentials):
39+
from moto.server import ThreadedMotoServer
40+
import boto3
41+
server = ThreadedMotoServer(port=0)
42+
server.start()
43+
host, port = server.get_host_and_port()
44+
endpoint = f"http://{host}:{port}"
45+
os.environ['AWS_ENDPOINT_URL'] = endpoint
46+
client = boto3.client('sqs', endpoint_url=endpoint)
47+
client.create_queue(QueueName='test-pull-queue')
48+
yield client
49+
del os.environ['AWS_ENDPOINT_URL']
50+
server.stop()
51+
3852
QURLS = {
3953
'sqs': 'test-pull-queue',
4054
'fq': '/tmp/removeme/taskqueue/fq',
@@ -269,7 +283,7 @@ def test_local_taskqueue():
269283
assert tq.insert(epts) == 200
270284

271285
@pytest.mark.parametrize('protocol', PROTOCOL)
272-
def test_parallel_insert_all(sqs, protocol):
286+
def test_parallel_insert_all(sqs_server, protocol):
273287
import pathos_issue
274288

275289
path = getpath(protocol)

0 commit comments

Comments
 (0)