Skip to content

Commit ecab230

Browse files
Added block keywords queue.get and queue.put
1 parent 2cea941 commit ecab230

File tree

3 files changed

+43
-22
lines changed

3 files changed

+43
-22
lines changed

Lib/concurrent/interpreters/_queues.py

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -233,7 +233,7 @@ def put_nowait(self, obj, *, unbounditems=None):
233233
unboundop = -1
234234
else:
235235
unboundop, = _serialize_unbound(unbounditems)
236-
_queues.put(self._id, obj, unboundop)
236+
_queues.put(self._id, obj, unboundop, False)
237237

238238
def get(self, block=True, timeout=None, *):
239239
"""Return the next object from the queue.
@@ -271,7 +271,7 @@ def get_nowait(self):
271271
is the same as get().
272272
"""
273273
try:
274-
obj, unboundop = _queues.get(self._id)
274+
obj, unboundop = _queues.get(self._id, False)
275275
except QueueEmpty:
276276
raise # re-raise
277277
if unboundop is not None:

Modules/_interpqueuesmodule.c

Lines changed: 14 additions & 8 deletions
Original file line numberDiff line numberDiff line change
@@ -1132,7 +1132,7 @@ queue_destroy(_queues *queues, int64_t qid)
11321132
// Push an object onto the queue.
11331133
static int
11341134
queue_put(_queues *queues, int64_t qid, PyObject *obj, unboundop_t unboundop,
1135-
xidata_fallback_t fallback)
1135+
xidata_fallback_t fallback, int block)
11361136
{
11371137
PyThreadState *tstate = PyThreadState_Get();
11381138

@@ -1145,7 +1145,9 @@ queue_put(_queues *queues, int64_t qid, PyObject *obj, unboundop_t unboundop,
11451145
assert(queue != NULL);
11461146

11471147
// Wait for the queue to have space
1148-
PyEvent_Wait(&queue->space_available);
1148+
if (block == 1) {
1149+
PyEvent_Wait(&queue->space_available);
1150+
}
11491151

11501152
// Convert the object to cross-interpreter data.
11511153
_PyXIData_t *xidata = _PyXIData_New();
@@ -1179,7 +1181,7 @@ queue_put(_queues *queues, int64_t qid, PyObject *obj, unboundop_t unboundop,
11791181
// XXX Support a "wait" mutex?
11801182
static int
11811183
queue_get(_queues *queues, int64_t qid,
1182-
PyObject **res, int *p_unboundop)
1184+
PyObject **res, int *p_unboundop, int block)
11831185
{
11841186
int err;
11851187
*res = NULL;
@@ -1194,7 +1196,9 @@ queue_get(_queues *queues, int64_t qid,
11941196
assert(queue != NULL);
11951197

11961198
// Wait for the queue to have some value
1197-
PyEvent_Wait(&queue->has_item);
1199+
if (block == 1) {
1200+
PyEvent_Wait(&queue->has_item);
1201+
}
11981202

11991203
// Pop off the next item from the queue.
12001204
_PyXIData_t *data = NULL;
@@ -1627,13 +1631,14 @@ _interpqueues.put
16271631
obj: object
16281632
unboundop as unboundarg: int = -1
16291633
fallback as fallbackarg: int = -1
1634+
block: bool = True
16301635
16311636
Add the object's data to the queue.
16321637
[clinic start generated code]*/
16331638

16341639
static PyObject *
16351640
_interpqueues_put_impl(PyObject *module, int64_t qid, PyObject *obj,
1636-
int unboundarg, int fallbackarg)
1641+
int unboundarg, int fallbackarg, int block)
16371642
/*[clinic end generated code: output=2e0b31c6eaec29c9 input=4906550ab5c73be3]*/
16381643
{
16391644
struct _queuedefaults defaults = {-1, -1};
@@ -1653,7 +1658,7 @@ _interpqueues_put_impl(PyObject *module, int64_t qid, PyObject *obj,
16531658
}
16541659

16551660
/* Queue up the object. */
1656-
int err = queue_put(&_globals.queues, qid, obj, unboundop, fallback);
1661+
int err = queue_put(&_globals.queues, qid, obj, unboundop, fallback, block);
16571662
// This is the only place that raises QueueFull.
16581663
if (handle_queue_error(err, module, qid)) {
16591664
return NULL;
@@ -1665,19 +1670,20 @@ _interpqueues_put_impl(PyObject *module, int64_t qid, PyObject *obj,
16651670
/*[clinic input]
16661671
_interpqueues.get
16671672
qid: qidarg
1673+
block: bool = True
16681674
16691675
Return the (object, unbound op) from the front of the queue.
16701676
16711677
If there is nothing to receive then raise QueueEmpty.
16721678
[clinic start generated code]*/
16731679

16741680
static PyObject *
1675-
_interpqueues_get_impl(PyObject *module, int64_t qid)
1681+
_interpqueues_get_impl(PyObject *module, int64_t qid, int block)
16761682
/*[clinic end generated code: output=b0988a0e29194f05 input=c5bccbc409ad0190]*/
16771683
{
16781684
PyObject *obj = NULL;
16791685
int unboundop = 0;
1680-
int err = queue_get(&_globals.queues, qid, &obj, &unboundop);
1686+
int err = queue_get(&_globals.queues, qid, &obj, &unboundop, block);
16811687
// This is the only place that raises QueueEmpty.
16821688
if (handle_queue_error(err, module, qid)) {
16831689
return NULL;

Modules/clinic/_interpqueuesmodule.c.h

Lines changed: 27 additions & 12 deletions
Some generated files are not rendered by default. Learn more about customizing how changed files appear on GitHub.

0 commit comments

Comments
 (0)