Skip to content

Commit e1ae625

Browse files
Used PyEvents in subinterpreter queues to wait for queue full and queue empty conditions.
1 parent c3bfe5d commit e1ae625

File tree

2 files changed

+16
-2
lines changed

2 files changed

+16
-2
lines changed

Lib/concurrent/interpreters/_queues.py

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -226,7 +226,7 @@ def put(self, obj, block=True, timeout=None, *,
226226
except QueueFull:
227227
if timeout is not None and time.time() >= end:
228228
raise # re-raise
229-
time.sleep(_delay)
229+
# time.sleep(_delay)
230230
else:
231231
break
232232

@@ -261,7 +261,7 @@ def get(self, block=True, timeout=None, *,
261261
except QueueEmpty:
262262
if timeout is not None and time.time() >= end:
263263
raise # re-raise
264-
time.sleep(_delay)
264+
# time.sleep(_delay)
265265
else:
266266
break
267267
if unboundop is not None:

Modules/_interpqueuesmodule.c

Lines changed: 14 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -7,6 +7,7 @@
77

88
#include "Python.h"
99
#include "pycore_crossinterp.h" // _PyXIData_t
10+
#include "pycore_lock.h" // PyEvent
1011

1112
#define REGISTERS_HEAP_TYPES
1213
#define HAS_FALLBACK
@@ -532,6 +533,8 @@ typedef struct _queue {
532533
xidata_fallback_t fallback;
533534
int unboundop;
534535
} defaults;
536+
PyEvent queue_space_available;
537+
PyEvent queue_has_item;
535538
} _queue;
536539

537540
static int
@@ -549,6 +552,8 @@ _queue_init(_queue *queue, Py_ssize_t maxsize, struct _queuedefaults defaults)
549552
.maxsize = maxsize,
550553
},
551554
.defaults = defaults,
555+
.queue_space_available = (PyEvent){1},
556+
.queue_has_item = (PyEvent){0}
552557
};
553558
return 0;
554559
}
@@ -642,6 +647,7 @@ _queue_add(_queue *queue, int64_t interpid, _PyXIData_t *data, int unboundop)
642647
}
643648
if (queue->items.count >= maxsize) {
644649
_queue_unlock(queue);
650+
queue->queue_space_available = (PyEvent){0};
645651
return ERR_QUEUE_FULL;
646652
}
647653

@@ -661,6 +667,7 @@ _queue_add(_queue *queue, int64_t interpid, _PyXIData_t *data, int unboundop)
661667
queue->items.last = item;
662668

663669
_queue_unlock(queue);
670+
queue->queue_has_item = (PyEvent){1};
664671
return 0;
665672
}
666673

@@ -676,6 +683,7 @@ _queue_next(_queue *queue, _PyXIData_t **p_data, int *p_unboundop)
676683
_queueitem *item = queue->items.first;
677684
if (item == NULL) {
678685
_queue_unlock(queue);
686+
queue->queue_has_item = (PyEvent){0};
679687
return ERR_QUEUE_EMPTY;
680688
}
681689
queue->items.first = item->next;
@@ -1136,6 +1144,9 @@ queue_put(_queues *queues, int64_t qid, PyObject *obj, unboundop_t unboundop,
11361144
}
11371145
assert(queue != NULL);
11381146

1147+
// Wait for the queue to have space
1148+
PyEvent_Wait(&queue->queue_space_available);
1149+
11391150
// Convert the object to cross-interpreter data.
11401151
_PyXIData_t *xidata = _PyXIData_New();
11411152
if (xidata == NULL) {
@@ -1182,6 +1193,9 @@ queue_get(_queues *queues, int64_t qid,
11821193
// Past this point we are responsible for releasing the mutex.
11831194
assert(queue != NULL);
11841195

1196+
// Wait for the queue to have some value
1197+
PyEvent_Wait(&queue->queue_has_item);
1198+
11851199
// Pop off the next item from the queue.
11861200
_PyXIData_t *data = NULL;
11871201
err = _queue_next(queue, &data, p_unboundop);

0 commit comments

Comments
 (0)