Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
Show all changes
25 commits
Select commit Hold shift + click to select a range
42e09fc
first bunch of structured concurrency conversions
burnpanck Apr 10, 2026
940d270
large second bunch
burnpanck Apr 10, 2026
ff323fa
many more backends@
burnpanck Apr 12, 2026
65ba7ac
last batch of first pass of refactoring
burnpanck Apr 12, 2026
977b256
test suite passes, no more unexpected warnings
burnpanck Apr 12, 2026
6f9676c
fixed a number of manual retry/timeout loops
burnpanck Apr 12, 2026
8c7bbff
a few smaller fixes
burnpanck Apr 13, 2026
e2bae32
migrated almost all remaining uses of asyncio primitives
burnpanck Apr 13, 2026
a97a0fc
removed quite a few manual timeout loops
burnpanck Apr 13, 2026
42b1edb
tests now cannot run under unittest, we need pytest
burnpanck Apr 13, 2026
d424921
ensure all cleanup actions are shielded from cancellation
burnpanck Apr 13, 2026
ee3a0b5
removed a leftover from early experimentation
burnpanck Apr 13, 2026
eb786e7
Merge branch 'main' into feature/structured-concurrency
burnpanck Apr 13, 2026
d9a4a22
fix py3.9 compatibility, and improve double-entry detection in contex…
burnpanck Apr 14, 2026
a6ba49c
fix typos
burnpanck Apr 14, 2026
038d06b
fix missing trio dependency
burnpanck Apr 14, 2026
59c4788
Merge branch 'main' into feature/structured-concurrency
burnpanck Apr 20, 2026
ee0a973
fix sphinx build
burnpanck Apr 20, 2026
a8faed5
formatting
burnpanck Apr 20, 2026
6a65482
fix lint errors
burnpanck Apr 20, 2026
b3aab73
fix linter issues (uncovered some real bugs)
burnpanck Apr 21, 2026
c650996
Merge branch 'main' into feature/structured-concurrency
burnpanck Apr 21, 2026
20842ed
fix py3.9 compatibility
burnpanck Apr 21, 2026
cb06508
Fix OT2 simulator
burnpanck May 6, 2026
d090294
Merge branch 'main' into feature/structured-concurrency
burnpanck May 6, 2026
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
1 change: 1 addition & 0 deletions docs/contributor_guide/index.md
Original file line number Diff line number Diff line change
Expand Up @@ -7,6 +7,7 @@
contributing
how-to-open-source
contributing-to-docs
structured-concurrency
```
<hr>

Expand Down
84 changes: 84 additions & 0 deletions docs/contributor_guide/structured-concurrency.md
Original file line number Diff line number Diff line change
@@ -0,0 +1,84 @@
# Structured Concurrency in PyLabRobot

## API

In PyLabRobot, all asynchronous resources expose the `pylabrobot.concurrency.AsyncResource` API: Resources are usable exactly within the body of `async with resource:`.
What exactly *usable* means may depend on the resource though,
as some functionality *may* be available outside the `async with` block too.
Unless that is specified by the API for a specific resource, you should not rely on it.

### Implementing `AsyncResource`

When implementing `AsyncResource` for a new class, you should not write `__aenter__` and `__aexit__` directly, as this is difficult to get right.
Instead, you should implement the `_lifespan` async context manager.
It is often most convenient to do so in terms of a `contextlib.AsyncExitStack`,
so the default implementation of `_lifespan` does that and delegates to a `_enter_lifespan(stack)` coroutine.
There is no `_exit_lifespan` (because separate enter and exit calls are the antithesis of structured concurrency),
instead, all cleanup is registered with the `stack`.

### Legacy `setup`/`stop` calls

For historical reasons and to support certain interactive use-cases,
we still expose a `setup`/`stop` API in subclasses of `Machine`.
Note however that, with this API, you give away control over the scope of the async work: For example, there is no way to reliably catch all errors in background tasks, or to handle cancellation of tasks consistently. Do not use that in production scripts.

## Testing

Previous testing within PLR relied on `unittest.IsolatedAsyncioTestCase`.
Unfortunately, the `unittest` paradigm is fundamentally incompatible with structured concurrency.
There is no structured scope enclosing the tests, and all attempts to work around this failed.

Instead, we provide `pylabrobot.testing.concurrency.AnyioTestBase`.
This is *not* a `unittest.TestCase` on purpose, in order not to trigger `pytest`'s
`unittest` compatibility mode. It *does* however reimplement the asserts from `unittest`,
as to streamline test conversion.
Test cases can be left as-is, but the `setUp`/`asyncSetUp` / `tearDown`/`asyncTearDown` logic needs to be replaced by a `_lifespan` or `_enter_lifespan` implementation (it is a `AsyncResource` itself).

### Gotchas:
- `unittest.AsyncMock` creates `async` methods that do never yield.
This is a problem if they are used in a tight loop, with no other yield point;
leading to a deadlock. This appears in the wild in reader loops of I/O plumbing,
so we provide `pylabrobot.testing.mock_io.MockIO` as a more focussed alternative.

## Notes from the refactor:
- Timeout semantics may have changed slightly. Usually, that's the case because previous
timeout semantics are often confusing or ill specified (because without structured concurrency,
it's very hard to implement good timeout semantics). We tried to stay as close as possible to the previous semantics. That said, going forward, one `timeout` arguments should always be a trigger to take a step back and think about semantics: Is it supposed to be a timeout on the full operation? Then, *don't* put a `timeout` argument at all! Users are better served by wrapping
*the whole operation* with `with anyio.fail_after`. If the timeout somehow applies to sub-parts,
then be very careful in specifying to what they apply (and what is being done if timeouts fail).

## Limitations:
- The Opentrons thermocycler USB backend is `asyncio`-only.

## Issues found during the refactor

### Unstructured start/stop behaviours that might be better off as context manager
- `shake` and `stop_shaking` on Agilent Biotek.

### Inconsistent "turn-off" behaviout of various machines.
Most machines seem to turn off any ongoing actions and go back to some form of "parking position", but other machines don't:
- Tecan EVO has a number of arms that one could park; currently, we don't.


## TODOs in the refactor

### References to `setup`
- Developer docs
- Many error messages
- `.setup_done()` calls

### References to `unittest`
- Async tests now *require* pytest - let's remove all calls to `unittest.main()`

### Check for other signs that are frowned upon with structured concurrency:
- Anything involving `time.time()` or `time.monotonic()` - should at least be `anyio.current_time()`, but often is a sign for a busy-loop or manual timeout handling.
- Check for use of `threading`.
- Check for use of `asyncio` - avoid raw `asyncio` APIs, should all be converted to `anyio` or something else that is loop-agnostic.

### Verification checks for changes already made
- `_enter_lifespan` extra arguments other than `stack` should be *keyword-only*!
- Have a look at all `stack.push_async_callback`, especially for `cleanup()` functions - these could often in fact be sync.
- Verify that all cleanup logic has cancellation-shielding in place where necessary.

### Things to watch out for
- We never ever catch a cancellation without re-raising. In basic `asyncio`, that might be ok, but in structured concurrency, it never is.
25 changes: 14 additions & 11 deletions pylabrobot/arms/precise_flex/precise_flex_backend.py
Original file line number Diff line number Diff line change
@@ -1,8 +1,9 @@
import asyncio
import warnings
from abc import ABC
from typing import Dict, List, Literal, Optional, Union

import anyio

from pylabrobot.arms.backend import (
AccessPattern,
HorizontalAccess,
Expand All @@ -12,6 +13,7 @@
from pylabrobot.arms.precise_flex.coords import ElbowOrientation, PreciseFlexCartesianCoords
from pylabrobot.arms.precise_flex.error_codes import ERROR_CODES
from pylabrobot.arms.precise_flex.joints import PFAxis
from pylabrobot.concurrency import AsyncExitStackWithShielding
from pylabrobot.io.socket import Socket
from pylabrobot.resources import Coordinate, Rotation

Expand Down Expand Up @@ -54,6 +56,7 @@ def __init__(
self.timeout = timeout
self._has_rail = has_rail
self._is_dual_gripper = is_dual_gripper

if is_dual_gripper:
warnings.warn(
"Dual gripper support is experimental and may not work as expected.", UserWarning
Expand Down Expand Up @@ -90,21 +93,21 @@ def _convert_to_cartesian_array(
)
return arr

async def setup(self, skip_home: bool = False):
"""Initialize the PreciseFlex backend."""
await self.io.setup()
async def _enter_lifespan(self, stack: AsyncExitStackWithShielding, *, skip_home: bool = False):
await super()._enter_lifespan(stack)

await stack.enter_async_context(self.io)
stack.push_shielded_async_callback(self.exit)

await self.set_response_mode("pc")
await self.power_on_robot()
await self.attach(1)
if not skip_home:
await self.home()

async def stop(self):
"""Stop the PreciseFlex backend."""
await self.detach()
await self.power_off_robot()
await self.exit()
await self.io.stop()
# push_async_callback executes in reverse order!
stack.push_shielded_async_callback(self.power_off_robot)
stack.push_shielded_async_callback(self.detach)

async def set_speed(self, speed_percent: float):
"""Set the speed percentage of the arm's movement (0-100)."""
Expand Down Expand Up @@ -1591,7 +1594,7 @@ async def wait_for_eom(self) -> None:
some other means. Does not reply until the robot has stopped.
"""
await self.send_command("waitForEom")
await asyncio.sleep(0.2) # Small delay to ensure command is fully processed
await anyio.sleep(0.2) # Small delay to ensure command is fully processed

async def zero_torque(self, enable: bool, axis_mask: int = 1) -> None:
"""Sets or clears zero torque mode for the selected robot.
Expand Down
34 changes: 24 additions & 10 deletions pylabrobot/arms/precise_flex/precise_flex_backend_tests.py
Original file line number Diff line number Diff line change
@@ -1,28 +1,31 @@
import unittest
from typing import Dict
from unittest.mock import AsyncMock, patch

import anyio

from pylabrobot.arms.backend import HorizontalAccess, VerticalAccess
from pylabrobot.arms.precise_flex.coords import ElbowOrientation, PreciseFlexCartesianCoords
from pylabrobot.arms.precise_flex.joints import PFAxis
from pylabrobot.arms.precise_flex.precise_flex_backend import PreciseFlexBackend, PreciseFlexError
from pylabrobot.io.socket import Socket # Import Socket for mocking
from pylabrobot.resources import Coordinate, Rotation
from pylabrobot.testing.concurrency import AnyioTestBase


class PreciseFlexBackendHardwareTests(unittest.IsolatedAsyncioTestCase):
class TestPreciseFlexBackendHardware(AnyioTestBase):
"""Integration tests for PreciseFlex robot - RUNS ON ACTUAL HARDWARE"""


class PreciseFlexBackendTests(unittest.IsolatedAsyncioTestCase):
class TestPreciseFlexBackend(AnyioTestBase):
"""Unit tests for PreciseFlexBackend"""

def setUp(self):
async def _enter_lifespan(self, stack):
await super()._enter_lifespan(stack)

self.mock_socket_instance = AsyncMock(spec=Socket)
self.mock_socket_instance.read.return_value = b""
self.mock_socket_instance.readline.return_value = b""
self.mock_socket_instance.write.return_value = None
self.mock_socket_instance.setup.return_value = None # Configure setup to return None
self.mock_socket_instance._writer = AsyncMock() # Mock the _writer attribute

# Patch the Socket class where it's used in PreciseFlexBackend
Expand All @@ -31,9 +34,10 @@ def setUp(self):
return_value=self.mock_socket_instance,
)
self.MockSocketClass = patcher.start() # Store the mock of the class
self.addCleanup(patcher.stop)
stack.push_async_callback(lambda: anyio.to_thread.run_sync(patcher.stop))

self.backend = PreciseFlexBackend(has_rail=False, host="localhost", port=10100)

# self.backend.io is already self.mock_socket_instance because of the patch

async def test_init(self):
Expand Down Expand Up @@ -92,25 +96,35 @@ async def test_setup(self):
b"0 OK\r\n", # power_on_robot
b"0 OK\r\n", # attach
b"0 OK\r\n", # home
b"0 attach\r\n", # detach
b"0 hp\r\n", # power_off_robot
b"0 exit\r\n", # exit
]
await self.backend.setup()
self.mock_socket_instance.setup.assert_called_once()
async with self.backend:
pass
self.mock_socket_instance.__aenter__.assert_called_once()

self.mock_socket_instance.write.assert_any_call(b"mode 0\n")
self.mock_socket_instance.write.assert_any_call(b"hp 1 20\n")
self.mock_socket_instance.write.assert_any_call(b"attach 1\n")
self.mock_socket_instance.write.assert_any_call(b"home\n")

async def test_stop(self):
self.mock_socket_instance.readline.side_effect = [
b"0 OK\r\n", # set_mode
b"0 OK\r\n", # power_on_robot
b"0 OK\r\n", # attach
b"0 OK\r\n", # home
b"0 attach\r\n", # detach
b"0 hp\r\n", # power_off_robot
b"0 exit\r\n", # exit
]
await self.backend.stop()
async with self.backend:
pass
self.mock_socket_instance.write.assert_any_call(b"attach 0\n")
self.mock_socket_instance.write.assert_any_call(b"hp 0\n")
self.mock_socket_instance.write.assert_any_call(b"exit\n")
self.mock_socket_instance.stop.assert_called_once()
self.mock_socket_instance.__aexit__.assert_called_once()

async def test_set_speed(self):
self.mock_socket_instance.readline.return_value = b"0 Speed 1 50.0\r\n"
Expand Down
6 changes: 3 additions & 3 deletions pylabrobot/arms/scara_tests.py
Original file line number Diff line number Diff line change
@@ -1,14 +1,14 @@
import unittest
from unittest.mock import AsyncMock, MagicMock

from pylabrobot.arms.backend import SCARABackend
from pylabrobot.arms.precise_flex.coords import PreciseFlexCartesianCoords
from pylabrobot.arms.scara import ExperimentalSCARA
from pylabrobot.resources import Coordinate, Rotation
from pylabrobot.testing.concurrency import AnyioTestBase


class TestExperimentalSCARA(unittest.IsolatedAsyncioTestCase):
async def asyncSetUp(self):
class TestExperimentalSCARA(AnyioTestBase):
async def _enter_lifespan(self, stack):
self.mock_backend = MagicMock(spec=SCARABackend)
for method_name in [
"move_to",
Expand Down
37 changes: 18 additions & 19 deletions pylabrobot/barcode_scanners/keyence/keyence_backend.py
Original file line number Diff line number Diff line change
@@ -1,6 +1,6 @@
import asyncio
import logging
import time

import anyio

try:
import serial
Expand All @@ -14,6 +14,7 @@
BarcodeScannerBackend,
BarcodeScannerError,
)
from pylabrobot.concurrency import AsyncExitStackWithShielding
from pylabrobot.io.serial import Serial
from pylabrobot.resources.barcode import Barcode

Expand Down Expand Up @@ -51,26 +52,27 @@ def __init__(
rtscts=False,
)

async def setup(self):
await self.io.setup()
async def _enter_lifespan(self, stack: AsyncExitStackWithShielding):
await super()._enter_lifespan(stack)
await stack.enter_async_context(self.io)
await self.initialize()

async def initialize(self):
"""Initialize the Keyence barcode scanner."""

deadline = time.time() + self.init_timeout
while time.time() < deadline:
response = await self.send_command("RMOTOR")
if response.strip() == "MOTORON":
logger.info("Barcode scanner motor is ON.")
break
elif response.strip() == "MOTOROFF":
raise BarcodeScannerError("Failed to initialize Keyence barcode scanner: Motor is off.")
await asyncio.sleep(self.poll_interval)
else:
try:
with anyio.fail_after(self.init_timeout):
while True:
response = await self.send_command("RMOTOR")
if response.strip() == "MOTORON":
logger.info("Barcode scanner motor is ON.")
break
elif response.strip() == "MOTOROFF":
raise BarcodeScannerError("Failed to initialize Keyence barcode scanner: Motor is off.")
await anyio.sleep(self.poll_interval)
except TimeoutError as e:
raise BarcodeScannerError(
"Failed to initialize Keyence barcode scanner: Timeout waiting for motor to turn on."
)
) from e

async def send_command(self, command: str) -> str:
"""Send a command to the barcode scanner and return the response.
Expand All @@ -80,9 +82,6 @@ async def send_command(self, command: str) -> str:
response = await self.io.read()
return response.decode(self.serial_messaging_encoding).strip()

async def stop(self):
await self.io.stop()

async def scan_barcode(self) -> Barcode:
data = await self.send_command("LON")
if data.startswith("NG"):
Expand Down
6 changes: 3 additions & 3 deletions pylabrobot/centrifuge/centrifuge_tests.py
Original file line number Diff line number Diff line change
Expand Up @@ -12,6 +12,7 @@
from pylabrobot.centrifuge.backend import CentrifugeBackend, LoaderBackend
from pylabrobot.centrifuge.chatterbox import CentrifugeChatterboxBackend, LoaderChatterboxBackend
from pylabrobot.resources import Coordinate, Cor_96_wellplate_360ul_Fb
from pylabrobot.testing.concurrency import AnyioTestBase


class CentrifugeTests(unittest.IsolatedAsyncioTestCase):
Expand All @@ -28,8 +29,8 @@ def test_serialization(self):
self.assertEqual(deserialized, centrifuge)


class CentrifugeLoaderResourceModelTests(unittest.IsolatedAsyncioTestCase):
async def asyncSetUp(self):
class CentrifugeLoaderResourceModelTests(AnyioTestBase):
async def _enter_lifespan(self, stack):
self.mock_centrifuge_backend = unittest.mock.MagicMock(spec=CentrifugeBackend)
self.mock_loader_backend = unittest.mock.MagicMock(spec=LoaderBackend)
self.centrifuge = Centrifuge(
Expand All @@ -45,7 +46,6 @@ async def asyncSetUp(self):
child_location=Coordinate.zero(),
)
self.plate = Cor_96_wellplate_360ul_Fb(name="plate")
return await super().asyncSetUp()

async def test_go_to_bucket(self):
self.assertIsNone(self.centrifuge.at_bucket)
Expand Down
15 changes: 7 additions & 8 deletions pylabrobot/centrifuge/chatterbox.py
Original file line number Diff line number Diff line change
@@ -1,12 +1,12 @@
from pylabrobot.centrifuge.backend import CentrifugeBackend, LoaderBackend
from pylabrobot.concurrency import AsyncExitStackWithShielding


class CentrifugeChatterboxBackend(CentrifugeBackend):
async def setup(self):
async def _enter_lifespan(self, stack: AsyncExitStackWithShielding):
await super()._enter_lifespan(stack)
print("Setting up")

async def stop(self):
print("Stopping")
stack.callback(lambda: print("Stopping"))

async def open_door(self):
print("Opening door")
Expand Down Expand Up @@ -40,11 +40,10 @@ async def spin(self, g: float, duration: float, acceleration: float):


class LoaderChatterboxBackend(LoaderBackend):
async def setup(self):
async def _enter_lifespan(self, stack: AsyncExitStackWithShielding):
await super()._enter_lifespan(stack)
print("Setting up")

async def stop(self):
print("Stopping")
stack.callback(lambda: print("Stopping"))

async def load(self):
print("Loading")
Expand Down
Loading
Loading