Skip to content

Commit b33c811

Browse files
author
Giulio Leone
authored
perf: use deque for InMemoryTaskMessageQueue FIFO operations (#2165)
1 parent 7c02248 commit b33c811

File tree

2 files changed

+7
-5
lines changed

2 files changed

+7
-5
lines changed

src/mcp/shared/experimental/tasks/message_queue.py

Lines changed: 5 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -12,6 +12,7 @@
1212
"""
1313

1414
from abc import ABC, abstractmethod
15+
from collections import deque
1516
from dataclasses import dataclass, field
1617
from datetime import datetime, timezone
1718
from typing import Any, Literal
@@ -151,13 +152,13 @@ class InMemoryTaskMessageQueue(TaskMessageQueue):
151152
"""
152153

153154
def __init__(self) -> None:
154-
self._queues: dict[str, list[QueuedMessage]] = {}
155+
self._queues: dict[str, deque[QueuedMessage]] = {}
155156
self._events: dict[str, anyio.Event] = {}
156157

157-
def _get_queue(self, task_id: str) -> list[QueuedMessage]:
158+
def _get_queue(self, task_id: str) -> deque[QueuedMessage]:
158159
"""Get or create the queue for a task."""
159160
if task_id not in self._queues:
160-
self._queues[task_id] = []
161+
self._queues[task_id] = deque()
161162
return self._queues[task_id]
162163

163164
async def enqueue(self, task_id: str, message: QueuedMessage) -> None:
@@ -172,7 +173,7 @@ async def dequeue(self, task_id: str) -> QueuedMessage | None:
172173
queue = self._get_queue(task_id)
173174
if not queue:
174175
return None
175-
return queue.pop(0)
176+
return queue.popleft()
176177

177178
async def peek(self, task_id: str) -> QueuedMessage | None:
178179
"""Return the next message without removing it."""

tests/experimental/tasks/test_message_queue.py

Lines changed: 2 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -1,5 +1,6 @@
11
"""Tests for TaskMessageQueue and InMemoryTaskMessageQueue."""
22

3+
from collections import deque
34
from datetime import datetime, timezone
45

56
import anyio
@@ -270,7 +271,7 @@ async def is_empty_with_injection(tid: str) -> bool:
270271
if call_count == 2 and tid == task_id:
271272
# Before second check, inject a message - this simulates a message
272273
# arriving between event creation and the double-check
273-
queue._queues[task_id] = [QueuedMessage(type="request", message=make_request())]
274+
queue._queues[task_id] = deque([QueuedMessage(type="request", message=make_request())])
274275
return await original_is_empty(tid)
275276

276277
queue.is_empty = is_empty_with_injection # type: ignore[method-assign]

0 commit comments

Comments
 (0)