diff --git a/pylabrobot/liquid_handling/backends/hamilton/STAR_backend.py b/pylabrobot/liquid_handling/backends/hamilton/STAR_backend.py index 1545b399cb2..3d0857461dd 100644 --- a/pylabrobot/liquid_handling/backends/hamilton/STAR_backend.py +++ b/pylabrobot/liquid_handling/backends/hamilton/STAR_backend.py @@ -3,6 +3,7 @@ import enum import functools import logging +import math import re import sys import warnings @@ -38,9 +39,7 @@ HamiltonLiquidHandler, ) from pylabrobot.liquid_handling.backends.hamilton.common import fill_in_defaults -from pylabrobot.liquid_handling.backends.hamilton.planning import group_by_x_batch_by_xy from pylabrobot.liquid_handling.channel_positioning import ( - MIN_SPACING_EDGE, get_tight_single_resource_liquid_op_offsets, get_wide_single_resource_liquid_op_offsets, ) @@ -49,6 +48,12 @@ HamiltonLiquidClass, get_star_liquid_class, ) +from pylabrobot.liquid_handling.pipette_batch_scheduling import ( + ChannelBatch, + log_batches, + plan_batches, + validate_channel_selections, +) from pylabrobot.liquid_handling.standard import ( Drop, DropTipRack, @@ -1369,15 +1374,19 @@ def __init__( self._pip_channel_information: Optional[List[PipChannelInformation]] = None self._default_1d_symbology: Barcode1DSymbology = "Code 128 (Subset B and C)" + self._x_grouping_tolerance_mm: float = 0.1 self._setup_done = False def _min_spacing_between(self, i: int, j: int) -> float: """Return the firmware-safe minimum Y spacing between channels *i* and *j*. - Uses max() of both channels' spacings for firmware safety (conservative). - For adjacent channels, ceiling-rounded to 0.1mm. - For non-adjacent channels, the sum of all intermediate adjacent-pair spacings. + For each adjacent pair, takes max() of both channels' spacings and ceiling-rounds + to 0.1mm. For non-adjacent channels, sums these per-pair spacings. + + TODO: migrate to radii model (spacing[i]/2 + spacing[j]/2) to match + compute_channel_offsets. Current max() model is conservative but inconsistent + with channel_positioning.py's diameter-based abstraction. """ lo, hi = min(i, j), max(i, j) if hi - lo == 1: @@ -1827,21 +1836,14 @@ async def channel_request_y_minimum_spacing(self, channel_idx: int) -> float: return self.y_drive_increment_to_mm(resp["yc"][1]) async def channels_request_y_minimum_spacing(self) -> List[float]: - """Query the minimum Y spacing for all channels in parallel. - - Each channel is addressed on its own module (P1, P2, ...), so the queries - can run concurrently. + """Query all channels for their minimum Y spacing in parallel. Returns: - A list of exact (unrounded) minimum Y spacings in mm, one per channel, - indexed by channel number. + A list of minimum Y spacings in mm, one per channel. """ return list( await asyncio.gather( - *( - self.channel_request_y_minimum_spacing(channel_idx=idx) - for idx in range(self.num_channels) - ) + *(self.channel_request_y_minimum_spacing(i) for i in range(self.num_channels)) ) ) @@ -2107,240 +2109,209 @@ class PressureLLDMode(enum.Enum): LIQUID = 0 FOAM = 1 - async def _move_to_traverse_height( - self, channels: Optional[List[int]] = None, traverse_height: Optional[float] = None - ): - """Move channels to a specified traverse height, if given, otherwise move to full Z safety. + async def execute_batched( + self, + func: Callable[[ChannelBatch], Awaitable[T]], + batches: List[ChannelBatch], + min_traverse_height_during_command: Optional[float] = None, + ) -> List[T]: + """Execute a Z-axis callback across pre-planned batches with X/Y positioning. + + Handles inter-batch safety: raises channels between batches, moves X when the + X group changes, and positions Y before calling *func*. On error or + KeyboardInterrupt, channels are moved to Z safety before re-raising. Args: - channels: Channels to move. If None, all channels are moved. - traverse_height: Absolute Z position in mm. If None, move to full Z safety. + func: Async callback that receives a ``ChannelBatch`` and performs Z-axis work + (e.g. liquid level detection, z-touch probing). Must not move X or Y. + batches: Pre-planned batches from ``plan_batches()``. + min_traverse_height_during_command: Absolute Z height (mm) for inter-batch + channel raises. ``None`` uses full Z safety. + + Returns: + List of results from each batch callback, in batch order. """ - if traverse_height is None: + log_batches(batches) + results: List[T] = [] + try: + prev_batch: Optional[ChannelBatch] = None + for batch in batches: + if prev_batch is not None: + if min_traverse_height_during_command is None: + await self.move_all_channels_in_z_safety() + else: + await self.position_channels_in_z_direction( + {ch: min_traverse_height_during_command for ch in prev_batch.channels} + ) + + if prev_batch is None or not math.isclose(batch.x_position, prev_batch.x_position): + await self.move_channel_x(0, batch.x_position) + + await self.position_channels_in_y_direction(batch.y_positions) + results.append(await func(batch)) + prev_batch = batch + + except Exception: # firmware errors, RuntimeError, etc. await self.move_all_channels_in_z_safety() - else: - if channels is None: - channels = list(range(self.num_channels)) - await self.position_channels_in_z_direction( - {channel: traverse_height for channel in channels} - ) + raise + except BaseException: # KeyboardInterrupt, SystemExit — still must raise channels + await self.move_all_channels_in_z_safety() + raise + + return results - async def _probe_liquid_heights_batch( + async def _prepare_batched( self, containers: List[Container], - use_channels: List[int], - lld_mode: LLDMode = LLDMode.GAMMA, - search_speed: float = 10.0, - n_replicates: int = 1, - ) -> List[float]: - """Helper for probe_liquid_heights that performs a single batch of liquid level detection using a set of channels. + use_channels: Optional[List[int]] = None, + resource_offsets: Optional[List[Coordinate]] = None, + x_grouping_tolerance: Optional[float] = None, + min_traverse_height_at_beginning_of_command: Optional[float] = None, + ) -> Tuple[List[int], List[float], List[ChannelBatch]]: + """Validate channels, verify tips, position Z, resolve targets, plan batches. + + Shared setup for any batched channel operation (probing, aspirate, + dispense). Returns everything the caller needs to define its callback + and call ``execute_batched``. - Assumes channels are moved to the appropriate traverse height before calling, and does not move channels after completion. + Returns: + (use_channels, tip_lengths, batches). """ + if x_grouping_tolerance is None: + x_grouping_tolerance = self._x_grouping_tolerance_mm + use_channels = validate_channel_selections( + containers=containers, + num_channels=self.num_channels, + use_channels=use_channels, + ) + + # Verify tips and query tip lengths + tip_presence = await self.request_tip_presence() + if not all(tip_presence[idx] for idx in use_channels): + raise RuntimeError("All specified channels must have tips attached.") tip_lengths = [await self.request_tip_len_on_channel(channel_idx=idx) for idx in use_channels] - detect_func: Callable[..., Any] - if lld_mode == self.LLDMode.GAMMA: - detect_func = self._move_z_drive_to_liquid_surface_using_clld + # Z pre-positioning + idle_channels = sorted(set(range(self.num_channels)) - set(use_channels)) + if min_traverse_height_at_beginning_of_command is not None: + await asyncio.gather( + *[ + self.move_channel_stop_disk_z(channel_idx=ch_idx, z=self.MAXIMUM_CHANNEL_Z_POSITION) + for ch_idx in idle_channels + ] + ) + await self.position_channels_in_z_direction( + {ch: min_traverse_height_at_beginning_of_command for ch in use_channels} + ) else: - detect_func = self._search_for_surface_using_plld + await self.move_all_channels_in_z_safety() + + # Plan batches directly from containers (per-batch spread, no-go-zone aware). + batches = plan_batches( + use_channels=use_channels, + containers=containers, + channel_spacings=self._channels_minimum_y_spacing, + wrt_resource=self.deck, + x_tolerance=x_grouping_tolerance, + resource_offsets=resource_offsets, + ) + + return use_channels, tip_lengths, batches + + async def _run_lld_on_channel_batch( + self, + batch: ChannelBatch, + containers: List[Container], + tip_lengths: List[float], + z_cavity_bottom: List[float], + z_top: List[float], + lld_mode: List[LLDMode], + search_speed: float, + n_replicates: int, + ) -> Dict[int, List[Optional[float]]]: + """Per-batch liquid level detection. Override to substitute simulated sensing. + + *lld_mode* is indexed by original container position (``batch.indices[i]``), + so channels within a single batch may use different detection modes concurrently. + + Returns absolute heights keyed by job index (``batch.indices[i]``), so duplicate + channels across batches don't collide. One list per job, with ``None`` entries for + replicates where no liquid was detected. The caller subtracts ``z_cavity_bottom`` + to get heights relative to container bottom. + """ + + def _detect_func(mode: "STARBackend.LLDMode") -> Callable[..., Any]: + return ( + self._move_z_drive_to_liquid_surface_using_clld + if mode == self.LLDMode.GAMMA + else self._search_for_surface_using_plld + ) - # Compute Z search bounds for this batch batch_lowest_immers = [ - container.get_absolute_location("c", "c", "cavity_bottom").z - + tip_len - - self.DEFAULT_TIP_FITTING_DEPTH - for container, tip_len in zip(containers, tip_lengths) + z_cavity_bottom[i] + tip_lengths[i] - self.DEFAULT_TIP_FITTING_DEPTH for i in batch.indices ] batch_start_pos = [ - container.get_absolute_location("c", "c", "t").z - + tip_len - - self.DEFAULT_TIP_FITTING_DEPTH - + 5 - for container, tip_len in zip(containers, tip_lengths) + z_top[i] + tip_lengths[i] - self.DEFAULT_TIP_FITTING_DEPTH + self.SEARCH_START_CLEARANCE_MM + for i in batch.indices ] - absolute_heights_measurements: Dict[int, List[Optional[float]]] = { - idx: [] for idx in range(len(use_channels)) - } + measurements: Dict[int, List[Optional[float]]] = {orig_idx: [] for orig_idx in batch.indices} - # Run n_replicates detection loop for this batch for _ in range(n_replicates): - errors = await asyncio.gather( + results = await asyncio.gather( *[ - detect_func( + _detect_func(lld_mode[orig_idx])( channel_idx=channel, lowest_immers_pos=lip, start_pos_search=sps, channel_speed=search_speed, ) - for channel, lip, sps in zip(use_channels, batch_lowest_immers, batch_start_pos) + for channel, lip, sps, orig_idx in zip( + batch.channels, batch_lowest_immers, batch_start_pos, batch.indices + ) ], return_exceptions=True, ) - # Get heights for ALL channels, handling failures for channels with no liquid current_absolute_liquid_heights = await self.request_pip_height_last_lld() - for idx, (channel_idx, error) in enumerate(zip(use_channels, errors)): - if isinstance(error, STARFirmwareError): - error_msg = str(error).lower() + for local_idx, (ch_idx, result) in enumerate(zip(batch.channels, results)): + orig_idx = batch.indices[local_idx] + if isinstance(result, STARFirmwareError): + error_msg = str(result).lower() if "no liquid level found" in error_msg or "no liquid was present" in error_msg: height = None msg = ( - f"Operation {idx} (channel {channel_idx}): No liquid detected. Could be because there is " - f"no liquid in container {containers[idx].name} or liquid level " + f"Channel {ch_idx}: No liquid detected. Could be because there is " + f"no liquid in container {containers[orig_idx].name} or liquid level " f"is too low." ) - if lld_mode == self.LLDMode.GAMMA: + if lld_mode[orig_idx] == self.LLDMode.GAMMA: msg += " Consider using pressure-based LLD if liquid is believed to exist." logger.warning(msg) else: - raise error - elif isinstance(error, Exception): - raise error + raise result + elif isinstance(result, Exception): + raise result else: - height = current_absolute_liquid_heights[channel_idx] - absolute_heights_measurements[idx].append(height) - - # Compute liquid heights relative to well bottom - relative_to_well: List[float] = [] - inconsistent_ops: List[str] = [] - - for idx, container in enumerate(containers): - measurements = absolute_heights_measurements[idx] - valid = [m for m in measurements if m is not None] - cavity_bottom = container.get_absolute_location("c", "c", "cavity_bottom").z - - if len(valid) == 0: - relative_to_well.append(0.0) - elif len(valid) == len(measurements): - relative_to_well.append(sum(valid) / len(valid) - cavity_bottom) - else: - inconsistent_ops.append( - f"Operation {idx}: {len(valid)}/{len(measurements)} replicates detected liquid" - ) - - if inconsistent_ops: - raise RuntimeError( - "Inconsistent liquid detection across replicates. " - "This may indicate liquid levels near the detection limit:\n" + "\n".join(inconsistent_ops) - ) - - return relative_to_well - - def _get_maximum_minimum_spacing_between_channels(self, use_channels: List[int]) -> float: - """Get the maximum of the set of minimum spacing requirements between the channels being used""" - sorted_channels = sorted(use_channels) - max_channel_spacing = max( - self._min_spacing_between(hi, lo) for hi, lo in zip(sorted_channels[1:], sorted_channels[:-1]) - ) - return max_channel_spacing - - def _compute_channels_in_resource_locations( - self, - resources: Sequence[Resource], - use_channels: List[int], - offsets: Optional[List[Coordinate]], - ) -> List[Coordinate]: - """Compute absolute locations of resources with given offsets.""" - - # If no offset is provided but we can fit all channels inside a single resource, - # compute the offsets to make that happen using wide spacing. - if offsets is None: - if len(set(resources)) == 1 and len(use_channels) == len(set(use_channels)): - container_size_y = resources[0].get_absolute_size_y() - # For non-consecutive channels (e.g. [0,1,2,5,6,7]), we must account for - # phantom intermediate channels (3,4) that physically exist between them. - # Compute offsets for the full channel range (min to max), then pick only - # the offsets corresponding to the actual channels being used. - max_channel_spacing = self._get_maximum_minimum_spacing_between_channels(use_channels) - num_channels_in_span = max(use_channels) - min(use_channels) + 1 - min_required = MIN_SPACING_EDGE * 2 + (num_channels_in_span - 1) * max_channel_spacing - if container_size_y >= min_required: - all_offsets = get_wide_single_resource_liquid_op_offsets( - resource=resources[0], - num_channels=num_channels_in_span, - min_spacing=max_channel_spacing, - ) - min_ch = min(use_channels) - offsets = [all_offsets[ch - min_ch] for ch in use_channels] - # else: container too small to fit all channels — fall back to center offsets. - # Y sub-batching will serialize channels that can't coexist. - - offsets = offsets or [Coordinate.zero()] * len(resources) - - # Compute positions for all resources - resource_locations = [ - resource.get_location_wrt(self.deck, x="c", y="c", z="b") + offset - for resource, offset in zip(resources, offsets) - ] + height = current_absolute_liquid_heights[ch_idx] + measurements[orig_idx].append(height) - return resource_locations - - async def execute_batched( # TODO: any hamilton liquid handler - self, - func: Callable[[List[int]], Awaitable[None]], - resources: List[Container], - use_channels: Optional[List[int]] = None, - resource_offsets: Optional[List[Coordinate]] = None, - min_traverse_height_during_command: Optional[float] = None, - ): - if use_channels is None: - use_channels = list(range(len(resources))) - - # precompute locations and batches - locations = self._compute_channels_in_resource_locations( - resources, use_channels, resource_offsets - ) - x_batches = group_by_x_batch_by_xy( - locations=locations, - use_channels=use_channels, - min_spacing_between_channels=self._min_spacing_between, - ) - - # loop over batches. keep track of channels used in previous batch to ensure they are raised to traverse height before next batch - prev_channels: Optional[List[int]] = None - - try: - for x_value, x_batch in x_batches.items(): - if prev_channels is not None: - await self._move_to_traverse_height( - channels=prev_channels, traverse_height=min_traverse_height_during_command - ) - await self.move_channel_x(0, x_value) - - for y_batch in x_batch: - if prev_channels is not None: - await self._move_to_traverse_height( - channels=prev_channels, traverse_height=min_traverse_height_during_command - ) - await self.position_channels_in_y_direction( - {use_channels[idx]: locations[idx].y for idx in y_batch}, - ) - - await func(y_batch) - - prev_channels = [use_channels[idx] for idx in y_batch] - except Exception: - await self.move_all_channels_in_z_safety() - raise - except BaseException: - await self.move_all_channels_in_z_safety() - raise + return measurements async def probe_liquid_heights( self, containers: List[Container], use_channels: Optional[List[int]] = None, resource_offsets: Optional[List[Coordinate]] = None, - lld_mode: LLDMode = LLDMode.GAMMA, + lld_mode: Union[LLDMode, List[LLDMode], None] = None, search_speed: float = 10.0, n_replicates: int = 1, # Traverse height parameters (None = full Z safety, float = absolute Z position in mm) min_traverse_height_at_beginning_of_command: Optional[float] = None, min_traverse_height_during_command: Optional[float] = None, z_position_at_end_of_command: Optional[float] = None, + x_grouping_tolerance: Optional[float] = None, # Deprecated move_to_z_safety_after: Optional[bool] = None, ) -> List[float]: @@ -2350,107 +2321,140 @@ async def probe_liquid_heights( container positions and sensing the liquid surface. Heights are measured from the bottom of each container's cavity. + Uses ``plan_batches`` for X/Y partitioning with per-batch container spread + (respecting no-go zones), then ``execute_batched`` to iterate batches with + Z safety. + Args: containers: List of Container objects to probe, one per channel. use_channels: Channel indices to use for probing (0-indexed). - resource_offsets: Optional XYZ offsets from container centers. Auto-calculated for single - containers with odd channel counts to avoid center dividers. Defaults to container centers. - lld_mode: Detection mode - LLDMode(1) for capacitive, LLDMode(2) for pressure-based. - Defaults to capacitive. + resource_offsets: Optional XYZ offsets from container centers. When not provided, + ``plan_batches`` auto-spreads channels targeting the same container. + lld_mode: Detection mode. Either a single ``LLDMode`` applied to all containers + (deprecated, removed in v1b1) or a list of ``LLDMode``s (one per container) + allowing mixed GAMMA/PRESSURE within one call. ``None`` (default) applies + GAMMA to all containers. search_speed: Z-axis search speed in mm/s. Default 10.0 mm/s. n_replicates: Number of measurements per channel. Default 1. min_traverse_height_at_beginning_of_command: Absolute Z height (mm) to move involved - channels to before the first batch. None (default) uses full Z safety. + channels to before the first batch. Must clear all deck obstacles since channels + travel laterally at this height. None (default) uses full Z safety. min_traverse_height_during_command: Absolute Z height (mm) to move involved channels to - between batches (X groups and Y sub-batches). None (default) uses full Z safety. + between batches. None (default) uses full Z safety. z_position_at_end_of_command: Absolute Z height (mm) to move involved channels to after probing. None (default) uses full Z safety. - + x_grouping_tolerance: Containers within this X distance (mm) are grouped and probed + together. Defaults to ``_x_grouping_tolerance_mm`` (0.1 mm). + move_to_z_safety_after: Deprecated. Use ``z_position_at_end_of_command`` instead. Returns: Mean of measured liquid heights for each container (mm from cavity bottom). Raises: - RuntimeError: If channels lack tips. - - Notes: - - All specified channels must have tips attached - - Containers at different X positions are probed in sequential groups (single X carriage) - - For single containers with no-go zones, Y-offsets are computed to avoid - obstructed regions (e.g. center dividers in troughs) + ValueError: If ``use_channels`` is empty, contains out-of-range indices, + or if input list lengths don't match. + RuntimeError: If any specified channel lacks a tip. """ if move_to_z_safety_after is not None: warnings.warn( - "The 'move_to_z_safety_after' parameter is deprecated and will be removed in a future release. " - "Use 'z_position_at_end_of_command' with an appropriate Z height instead. If not set, " - "the default behavior will be to move to full Z safety after the command.", + "The 'move_to_z_safety_after' parameter is deprecated and will be removed in a " + "future release. Use 'z_position_at_end_of_command' with an appropriate Z height " + "instead. If not set, the default behavior will be to move to full Z safety after " + "the command.", DeprecationWarning, + stacklevel=2, ) - # Validate parameters. - if use_channels is None: - use_channels = list(range(len(containers))) - if len(use_channels) == 0: - raise ValueError("use_channels must not be empty.") - if not all(0 <= ch < self.num_channels for ch in use_channels): - raise ValueError( - f"All use_channels must be integers in range [0, {self.num_channels - 1}], " - f"got {use_channels}." - ) - - if lld_mode not in {self.LLDMode.GAMMA, self.LLDMode.PRESSURE}: - raise ValueError(f"LLDMode must be 1 (capacitive) or 2 (pressure-based), is {lld_mode}") + if n_replicates < 1: + raise ValueError(f"n_replicates must be >= 1, got {n_replicates}.") - if not len(containers) == len(use_channels): - raise ValueError( - "Length of containers and use_channels must match, " - f"got lengths {len(containers)}, {len(use_channels)}." + if lld_mode is None: + lld_mode = [self.LLDMode.GAMMA] * len(containers) + elif isinstance(lld_mode, self.LLDMode): + warnings.warn( + "Passing a single LLDMode to probe_liquid_heights is deprecated and will be " + "removed in v1b1. Pass a list of LLDModes (one per container) instead.", + DeprecationWarning, + stacklevel=2, ) + lld_mode = [lld_mode] * len(containers) + elif not isinstance(lld_mode, list): + raise TypeError(f"lld_mode must be List[LLDMode], got {type(lld_mode).__name__}") - # Validate resource_offsets length (if provided) to avoid silent truncation in downstream zips. - if resource_offsets is not None and len(resource_offsets) != len(containers): + if len(lld_mode) != len(containers): raise ValueError( - "Length of resource_offsets must match the length of containers and use_channels, " - f"got lengths {len(resource_offsets)} (resource_offsets) and " - f"{len(containers)} (containers/use_channels)." + f"lld_mode list length must match containers: got {len(lld_mode)} LLD modes " + f"for {len(containers)} containers." ) - # Make sure we have tips on all channels and know their lengths - tip_presence = await self.request_tip_presence() - if not all(tip_presence[idx] for idx in use_channels): - raise RuntimeError("All specified channels must have tips attached.") + for m in lld_mode: + if m not in (self.LLDMode.GAMMA, self.LLDMode.PRESSURE): + raise ValueError(f"Unsupported lld_mode: {m!r}") - # Move channels to traverse height - await self._move_to_traverse_height( - channels=use_channels, traverse_height=min_traverse_height_at_beginning_of_command - ) - - result_by_operation: Dict[int, float] = {} + z_cavity_bottom = [ + r.get_location_wrt(self.deck, "c", "c", "cavity_bottom").z for r in containers + ] + z_top = [r.get_location_wrt(self.deck, "c", "c", "t").z for r in containers] - async def func(batch: List[int]): - liquid_heights = await self._probe_liquid_heights_batch( - containers=[containers[idx] for idx in batch], - use_channels=[use_channels[idx] for idx in batch], + use_channels, tip_lengths, batches = await self._prepare_batched( + containers=containers, + use_channels=use_channels, + resource_offsets=resource_offsets, + x_grouping_tolerance=x_grouping_tolerance, + min_traverse_height_at_beginning_of_command=min_traverse_height_at_beginning_of_command, + ) + batch_results = await self.execute_batched( + func=lambda b: self._run_lld_on_channel_batch( + batch=b, + containers=containers, + tip_lengths=tip_lengths, + z_cavity_bottom=z_cavity_bottom, + z_top=z_top, lld_mode=lld_mode, search_speed=search_speed, n_replicates=n_replicates, - ) - for idx, height in zip(batch, liquid_heights): - result_by_operation[idx] = height - - await self.execute_batched( - func=func, - resources=containers, - use_channels=use_channels, - resource_offsets=resource_offsets, + ), + batches=batches, min_traverse_height_during_command=min_traverse_height_during_command, ) - await self._move_to_traverse_height( - channels=use_channels, - traverse_height=z_position_at_end_of_command, - ) + absolute_heights_measurements: Dict[int, List[Optional[float]]] = {} + for batch_measurements in batch_results: + for orig_idx, heights in batch_measurements.items(): + absolute_heights_measurements.setdefault(orig_idx, []).extend(heights) - return [result_by_operation[idx] for idx in range(len(containers))] + # Compute liquid heights relative to well bottom + relative_to_well: List[float] = [] + inconsistent_channels: List[str] = [] + + for idx, (ch, container) in enumerate(zip(use_channels, containers)): + measurements = absolute_heights_measurements[idx] + valid = [m for m in measurements if m is not None] + cavity_bottom = z_cavity_bottom[idx] + + if len(valid) == 0: + relative_to_well.append(0.0) + elif len(valid) == len(measurements): + relative_to_well.append(sum(valid) / len(valid) - cavity_bottom) + else: + inconsistent_channels.append( + f"Channel {ch}: {len(valid)}/{len(measurements)} replicates detected liquid" + ) + + if inconsistent_channels: + raise RuntimeError( + "Inconsistent liquid detection across replicates. " + "This may indicate liquid levels near the detection limit:\n" + + "\n".join(inconsistent_channels) + ) + + if z_position_at_end_of_command is not None: + await self.position_channels_in_z_direction( + {ch: z_position_at_end_of_command for ch in use_channels} + ) + else: + await self.move_all_channels_in_z_safety() + + return relative_to_well async def probe_liquid_volumes( self, @@ -2460,7 +2464,9 @@ async def probe_liquid_volumes( lld_mode: LLDMode = LLDMode.GAMMA, search_speed: float = 10.0, n_replicates: int = 3, - move_to_z_safety_after: bool = True, + z_position_at_end_of_command: Optional[float] = None, + # Deprecated + move_to_z_safety_after: Optional[bool] = None, ) -> List[float]: """Probe liquid volumes in containers by measuring heights and converting to volumes. @@ -2475,7 +2481,9 @@ async def probe_liquid_volumes( lld_mode: Detection mode - LLDMode(1) for capacitive, LLDMode(2) for pressure-based. Defaults to capacitive. search_speed: Z-axis search speed in mm/s. Default 10.0 mm/s. n_replicates: Number of measurements per channel. Default 3. - move_to_z_safety_after: Whether to move channels to safe Z height after probing. Default True. + z_position_at_end_of_command: Absolute Z height (mm) to move involved channels to after + probing. None (default) uses full Z safety. + move_to_z_safety_after: Deprecated. Use ``z_position_at_end_of_command`` instead. Returns: Volumes in each container (uL). @@ -2488,6 +2496,16 @@ async def probe_liquid_volumes( - All containers must support height-volume functions. Volume calculation uses Container.compute_volume_from_height() """ + if move_to_z_safety_after is not None: + warnings.warn( + "The 'move_to_z_safety_after' parameter is deprecated and will be removed in a " + "future release. Use 'z_position_at_end_of_command' with an appropriate Z height " + "instead. If not set, the default behavior will be to move to full Z safety after " + "the command.", + DeprecationWarning, + stacklevel=2, + ) + if any(not resource.supports_compute_height_volume_functions() for resource in containers): raise ValueError( "probe_liquid_volumes can only be used with containers that support height<->volume functions." @@ -2500,7 +2518,7 @@ async def probe_liquid_volumes( lld_mode=lld_mode, search_speed=search_speed, n_replicates=n_replicates, - move_to_z_safety_after=move_to_z_safety_after, + z_position_at_end_of_command=z_position_at_end_of_command, ) return [ @@ -2939,7 +2957,7 @@ async def aspirate( containers=[op.resource for op in ops], use_channels=use_channels, resource_offsets=[op.offset for op in ops], - move_to_z_safety_after=False, + z_position_at_end_of_command=100, ) # override minimum traversal height because we don't want to move channels up. we are already above the liquid. @@ -3301,7 +3319,7 @@ async def dispense( containers=[op.resource for op in ops], use_channels=use_channels, resource_offsets=[op.offset for op in ops], - move_to_z_safety_after=False, + z_position_at_end_of_command=100, ) # override minimum traversal height because we don't want to move channels up. we are already above the liquid. @@ -10754,8 +10772,9 @@ async def clld_probe_y_position_using_channel( # Machine-compatibility check of calculated parameters assert 0 <= max_y_search_pos_increments <= 13_714, ( - "Maximum y search position must be between \n0 and" - + f"{STARBackend.y_drive_increment_to_mm(13_714) + 9} mm, is {max_y_search_pos_increments} mm" + "Maximum y search position must be between 0 and " + + f"{STARBackend.y_drive_increment_to_mm(13_714):.1f} mm, " + + f"is {max_y_search_pos:.1f} mm" ) assert 20 <= channel_speed_increments <= 8_000, ( f"LLD search speed must be between \n{STARBackend.y_drive_increment_to_mm(20)}" @@ -11487,6 +11506,7 @@ async def request_tip_len_on_channel(self, channel_idx: int) -> float: MAXIMUM_CHANNEL_Z_POSITION = 334.7 # mm (= z-drive increment 31_200) MINIMUM_CHANNEL_Z_POSITION = 99.98 # mm (= z-drive increment 9_320) DEFAULT_TIP_FITTING_DEPTH = 8 # mm, for 10, 50, 300, 1000 ul Hamilton tips + SEARCH_START_CLEARANCE_MM = 5 # mm above container top for LLD search start position async def ztouch_probe_z_height_using_channel( self, @@ -11678,10 +11698,10 @@ async def get_channels_y_positions(self) -> Dict[int, float]: y_positions = [round(y / 10, 2) for y in resp["ry"]] # sometimes there is (likely) a floating point error and channels are reported to be - # less than their minimum spacing apart (typically 9 mm). (When you set channels using - # position_channels_in_y_direction, it will raise an error.) The minimum y is 6mm, - # so we fix that first (in case that value is misreported). Then, we traverse the - # list in reverse and enforce pairwise minimum spacing. + # closer together than the minimum required spacing. (When you set channels using + # position_channels_in_y_direction, it will raise an error.) We first ensure the last + # channel is not reported in front of the known minimum Y position, then traverse the + # list in reverse and enforce the per-channel minimum spacing. min_y = self.extended_conf.left_arm_min_y_position if y_positions[-1] < min_y - 0.2: raise RuntimeError( @@ -11694,9 +11714,9 @@ async def get_channels_y_positions(self) -> Dict[int, float]: y_positions[-1] = min_y for i in range(len(y_positions) - 2, -1, -1): - spacing = self._min_spacing_between(i, i + 1) - if y_positions[i] - y_positions[i + 1] < spacing: - y_positions[i] = y_positions[i + 1] + spacing + min_diff = self._min_spacing_between(i, i + 1) + if y_positions[i] - y_positions[i + 1] < min_diff: + y_positions[i] = y_positions[i + 1] + min_diff return {channel_idx: y for channel_idx, y in enumerate(y_positions)} @@ -11722,37 +11742,30 @@ async def position_channels_in_y_direction(self, ys: Dict[int, float], make_spac channel_locations[channel_idx] = y if make_space: + # For the channels to the back of `back_channel`, make sure the space between them + # meets the per-pair minimum. We start with the channel closest to `back_channel`, and + # make sure the channel behind it is spaced correctly, updating if needed. use_channels = list(ys.keys()) back_channel = min(use_channels) - front_channel = max(use_channels) + for channel_idx in range(back_channel, 0, -1): + pair_spacing = self._min_spacing_between(channel_idx - 1, channel_idx) + if (channel_locations[channel_idx - 1] - channel_locations[channel_idx]) < pair_spacing: + channel_locations[channel_idx - 1] = channel_locations[channel_idx] + pair_spacing - # Position channels in between used channels + # Position intermediate channels between back_channel and front_channel. + front_channel = max(use_channels) for intermediate_ch in range(back_channel + 1, front_channel): if intermediate_ch not in ys: - channel_locations[intermediate_ch] = channel_locations[ - intermediate_ch - 1 - ] - self._min_spacing_between(intermediate_ch - 1, intermediate_ch) - - # For the channels to the back of `back_channel`, make sure the space between them is - # >=9mm. We start with the channel closest to `back_channel`, and make sure the - # channel behind it is at least 9mm, updating if needed. Iterating from the front (closest - # to `back_channel`) to the back (channel 0), all channels are put at the correct location. - # This order matters because the channel in front of any channel may have been moved in the - # previous iteration. - # Note that if a channel is already spaced at >=9mm, it is not moved. - for channel_idx in range(back_channel, 0, -1): - spacing = self._min_spacing_between(channel_idx - 1, channel_idx) - if (channel_locations[channel_idx - 1] - channel_locations[channel_idx]) < spacing: - channel_locations[channel_idx - 1] = channel_locations[channel_idx] + spacing + pair_spacing = self._min_spacing_between(intermediate_ch - 1, intermediate_ch) + channel_locations[intermediate_ch] = channel_locations[intermediate_ch - 1] - pair_spacing # Similarly for the channels to the front of `front_channel`, make sure they are all - # spaced >= channel_minimum_y_spacing (usually 9mm) apart. This time, we iterate from - # back (closest to `front_channel`) to the front (lh.backend.num_channels - 1), and - # put each channel >= channel_minimum_y_spacing before the one behind it. + # spaced by the per-pair minimum. This time, we iterate from back (closest to + # `front_channel`) to the front (lh.backend.num_channels - 1). for channel_idx in range(front_channel, self.num_channels - 1): - spacing = self._min_spacing_between(channel_idx, channel_idx + 1) - if (channel_locations[channel_idx] - channel_locations[channel_idx + 1]) < spacing: - channel_locations[channel_idx + 1] = channel_locations[channel_idx] - spacing + pair_spacing = self._min_spacing_between(channel_idx, channel_idx + 1) + if (channel_locations[channel_idx] - channel_locations[channel_idx + 1]) < pair_spacing: + channel_locations[channel_idx + 1] = channel_locations[channel_idx] - pair_spacing # Quick checks before movement. if channel_locations[0] > 650: @@ -11838,7 +11851,10 @@ async def pierce_foil( offsets = get_wide_single_resource_liquid_op_offsets( resource=well, num_channels=len(piercing_channels), - min_spacing=self._get_maximum_minimum_spacing_between_channels(piercing_channels), + min_spacing=max( + self._min_spacing_between(lo, hi) + for lo, hi in zip(sorted(piercing_channels)[:-1], sorted(piercing_channels)[1:]) + ), ) else: offsets = get_tight_single_resource_liquid_op_offsets( diff --git a/pylabrobot/liquid_handling/backends/hamilton/STAR_chatterbox.py b/pylabrobot/liquid_handling/backends/hamilton/STAR_chatterbox.py index 8bd0f16da19..28624096ba8 100644 --- a/pylabrobot/liquid_handling/backends/hamilton/STAR_chatterbox.py +++ b/pylabrobot/liquid_handling/backends/hamilton/STAR_chatterbox.py @@ -12,6 +12,7 @@ MachineConfiguration, STARBackend, ) +from pylabrobot.resources.container import Container from pylabrobot.resources.well import Well _DEFAULT_MACHINE_CONFIGURATION = MachineConfiguration( @@ -213,6 +214,10 @@ async def channel_request_y_minimum_spacing(self, channel_idx: int) -> float: ) return self._channels_minimum_y_spacing[channel_idx] + async def channels_request_y_minimum_spacing(self) -> List[float]: + """Return mock per-channel minimum Y spacings for all channels.""" + return list(self._channels_minimum_y_spacing) + async def move_channel_y(self, channel: int, y: float): print(f"moving channel {channel} to y: {y}") @@ -314,22 +319,33 @@ async def request_tip_len_on_channel(self, channel_idx: int) -> float: async def position_channels_in_y_direction(self, ys, make_space=True): print("positioning channels in y:", ys, "make_space:", make_space) - async def probe_liquid_heights( - self, - containers, - use_channels=None, - resource_offsets=None, - lld_mode=None, - search_speed=10.0, - n_replicates=1, - min_traverse_height_at_beginning_of_command=None, - min_traverse_height_during_command=None, - z_position_at_end_of_command=None, - move_to_z_safety_after=None, - ) -> List[float]: - """Return liquid heights from the volume tracker using each container's - height-from-volume function. No physical probing in simulation.""" - return [c.compute_height_from_volume(c.tracker.get_used_volume()) for c in containers] - async def request_pip_height_last_lld(self): return list(range(12)) + + async def _run_lld_on_channel_batch( + self, + batch, + containers: List[Container], + tip_lengths: List[float], + z_cavity_bottom: List[float], + z_top: List[float], + lld_mode: List["STARBackend.LLDMode"], + search_speed: float, + n_replicates: int, + ) -> Dict[int, List[Optional[float]]]: + """Simulate LLD by computing absolute heights from each container's volume tracker. + + Empty containers report the cavity-bottom Z (relative height 0). Non-empty + containers report ``cavity_bottom + compute_height_from_volume(volume)`` so the + parent ``probe_liquid_heights`` can subtract ``z_cavity_bottom`` consistently. + """ + measurements: Dict[int, List[Optional[float]]] = {} + for orig_idx in batch.indices: + container = containers[orig_idx] + volume = container.tracker.get_used_volume() + if volume == 0: + absolute_height = z_cavity_bottom[orig_idx] + else: + absolute_height = z_cavity_bottom[orig_idx] + container.compute_height_from_volume(volume) + measurements[orig_idx] = [absolute_height] * n_replicates + return measurements diff --git a/pylabrobot/liquid_handling/backends/hamilton/STAR_tests.py b/pylabrobot/liquid_handling/backends/hamilton/STAR_tests.py index b478ff7b637..2bb9a8b0152 100644 --- a/pylabrobot/liquid_handling/backends/hamilton/STAR_tests.py +++ b/pylabrobot/liquid_handling/backends/hamilton/STAR_tests.py @@ -1,5 +1,6 @@ # mypy: disable-error-code="attr-defined,method-assign" +import contextlib import unittest import unittest.mock from typing import Literal, cast @@ -39,7 +40,10 @@ UnknownHamiltonError, parse_star_fw_string, ) -from .STAR_chatterbox import _DEFAULT_EXTENDED_CONFIGURATION, _DEFAULT_MACHINE_CONFIGURATION +from .STAR_chatterbox import ( + _DEFAULT_EXTENDED_CONFIGURATION, + _DEFAULT_MACHINE_CONFIGURATION, +) class TestSTARResponseParsing(unittest.TestCase): @@ -1632,8 +1636,8 @@ async def test_position_channels_make_space_spreads_wider_at_18mm(self): self.assertNotEqual(cmd_9mm, cmd_18mm) -class STARTestBase(unittest.IsolatedAsyncioTestCase): - """Shared setup for probe/batch/helper tests.""" +class TestProbeLiquidHeights(unittest.IsolatedAsyncioTestCase): + """Tests for probe_liquid_heights: detection dispatch, replicates, error handling.""" async def asyncSetUp(self): self.STAR = STARBackend(read_timeout=1) @@ -1671,179 +1675,73 @@ def _put_tips_on_channels(self, channels): tip = self.tip_rack.get_tip("A1") self.lh.update_head_state({ch: tip for ch in channels}) + def _standard_mocks(self, detect_side_effect=None): + """Return a context manager stack with standard mocks for probe_liquid_heights.""" + mocks = {} + + if detect_side_effect is None: + detect_side_effect = unittest.mock.AsyncMock(return_value=None) + mocks["detect"] = unittest.mock.patch.object( + self.STAR, "_move_z_drive_to_liquid_surface_using_clld", detect_side_effect + ) + mocks["plld"] = unittest.mock.patch.object( + self.STAR, + "_search_for_surface_using_plld", + new_callable=unittest.mock.AsyncMock, + return_value=None, + ) + mocks["pip_height"] = unittest.mock.patch.object( + self.STAR, + "request_pip_height_last_lld", + new_callable=unittest.mock.AsyncMock, + return_value=list(range(12)), + ) + mocks["tip_len"] = unittest.mock.patch.object( + self.STAR, + "request_tip_len_on_channel", + new_callable=unittest.mock.AsyncMock, + return_value=59.9, + ) + mocks["tip_presence"] = unittest.mock.patch.object( + self.STAR, + "request_tip_presence", + new_callable=unittest.mock.AsyncMock, + return_value={i: True for i in range(8)}, + ) + mocks["z_safety"] = unittest.mock.patch.object( + self.STAR, + "move_all_channels_in_z_safety", + new_callable=unittest.mock.AsyncMock, + ) + mocks["move_x"] = unittest.mock.patch.object( + self.STAR, + "move_channel_x", + new_callable=unittest.mock.AsyncMock, + ) + mocks["pos_y"] = unittest.mock.patch.object( + self.STAR, + "position_channels_in_y_direction", + new_callable=unittest.mock.AsyncMock, + ) + mocks["backmost_y"] = unittest.mock.patch.object( + self.STAR.extended_conf, + "pip_maximal_y_position", + 606.5, + ) + return mocks -class TestMoveToTraverseHeight(STARTestBase): - async def test_none_calls_z_safety(self): - with unittest.mock.patch.object( - self.STAR, "move_all_channels_in_z_safety", new_callable=unittest.mock.AsyncMock - ) as mock_z_safety: - await self.STAR._move_to_traverse_height(channels=[0, 1], traverse_height=None) - mock_z_safety.assert_awaited_once() - - async def test_float_calls_position_z(self): - with unittest.mock.patch.object( - self.STAR, "position_channels_in_z_direction", new_callable=unittest.mock.AsyncMock - ) as mock_pos_z: - await self.STAR._move_to_traverse_height(channels=[0, 2], traverse_height=245.0) - mock_pos_z.assert_awaited_once_with({0: 245.0, 2: 245.0}) - - -class TestComputeChannelsInResourceLocations(STARTestBase): - async def test_explicit_offsets(self): - wells = [self.plate.get_item("A1"), self.plate.get_item("B1")] - offsets = [Coordinate(0, 0, 0), Coordinate(0, 0, 0)] - locs = self.STAR._compute_channels_in_resource_locations( - resources=wells, use_channels=[0, 1], offsets=offsets - ) - self.assertEqual(len(locs), 2) - self.assertAlmostEqual(locs[0].x, 298.3, places=1) - self.assertAlmostEqual(locs[0].y, 145.7, places=1) - self.assertAlmostEqual(locs[1].x, 298.3, places=1) - self.assertAlmostEqual(locs[1].y, 136.7, places=1) - - async def test_none_offsets_single_resource(self): - """Same resource twice: both channels get the well center (offset auto-calc falls back to zero for small wells).""" - well = self.plate.get_item("A1") - locs = self.STAR._compute_channels_in_resource_locations( - resources=[well, well], use_channels=[0, 1], offsets=None - ) - self.assertEqual(len(locs), 2) - self.assertAlmostEqual(locs[0].x, 298.3, places=1) - self.assertAlmostEqual(locs[0].y, 145.7, places=1) - self.assertAlmostEqual(locs[1].x, 298.3, places=1) - self.assertAlmostEqual(locs[1].y, 145.7, places=1) - - async def test_none_offsets_different_resources(self): - """Different resources with no offsets: each gets its own center.""" - well_a1 = self.plate.get_item("A1") - well_b1 = self.plate.get_item("B1") - locs = self.STAR._compute_channels_in_resource_locations( - resources=[well_a1, well_b1], use_channels=[0, 1], offsets=None - ) - self.assertEqual(len(locs), 2) - self.assertAlmostEqual(locs[0].x, 298.3, places=1) - self.assertAlmostEqual(locs[0].y, 145.7, places=1) - self.assertAlmostEqual(locs[1].x, 298.3, places=1) - self.assertAlmostEqual(locs[1].y, 136.7, places=1) - - -class TestExecuteBatched(STARTestBase): - async def test_single_batch(self): - well = self.plate.get_item("A1") - calls = [] - - async def func(batch): - calls.append(batch) - - self._put_tips_on_channels([0, 1]) - - with ( - unittest.mock.patch.object(self.STAR, "move_channel_x", new_callable=unittest.mock.AsyncMock), - unittest.mock.patch.object( - self.STAR, "position_channels_in_y_direction", new_callable=unittest.mock.AsyncMock - ), - unittest.mock.patch.object( - self.STAR, "move_all_channels_in_z_safety", new_callable=unittest.mock.AsyncMock - ), - ): - await self.STAR.execute_batched( - func=func, - resources=[well, well], - use_channels=[0, 1], - ) - - all_indices = [i for call in calls for i in call] - self.assertEqual(sorted(all_indices), [0, 1]) - - async def test_different_x_groups(self): - well_a1 = self.plate.get_item("A1") - well_a2 = self.plate.get_item("A2") - calls = [] - - async def func(batch): - calls.append(list(batch)) - - self._put_tips_on_channels([0, 1]) - - with ( - unittest.mock.patch.object(self.STAR, "move_channel_x", new_callable=unittest.mock.AsyncMock), - unittest.mock.patch.object( - self.STAR, "position_channels_in_y_direction", new_callable=unittest.mock.AsyncMock - ), - unittest.mock.patch.object( - self.STAR, "move_all_channels_in_z_safety", new_callable=unittest.mock.AsyncMock - ), - ): - await self.STAR.execute_batched( - func=func, - resources=[well_a1, well_a2], - use_channels=[0, 1], - resource_offsets=[Coordinate.zero(), Coordinate.zero()], - ) - - all_indices = [i for call in calls for i in call] - self.assertEqual(sorted(all_indices), [0, 1]) - - async def test_traverse_height(self): - well_a1 = self.plate.get_item("A1") - well_a2 = self.plate.get_item("A2") - - async def func(batch): - pass - - self._put_tips_on_channels([0, 1]) - - with ( - unittest.mock.patch.object(self.STAR, "move_channel_x", new_callable=unittest.mock.AsyncMock), - unittest.mock.patch.object( - self.STAR, "position_channels_in_y_direction", new_callable=unittest.mock.AsyncMock - ), - unittest.mock.patch.object( - self.STAR, "_move_to_traverse_height", new_callable=unittest.mock.AsyncMock - ) as mock_traverse, - ): - await self.STAR.execute_batched( - func=func, - resources=[well_a1, well_a2], - use_channels=[0, 1], - resource_offsets=[Coordinate.zero(), Coordinate.zero()], - min_traverse_height_during_command=200.0, - ) - - if mock_traverse.await_count > 0: - for call in mock_traverse.call_args_list: - self.assertEqual(call.kwargs.get("traverse_height"), 200.0) - - -class TestProbeLiquidHeightsBatch(STARTestBase): async def test_single_well_returns_height(self): well = self.plate.get_item("A1") self._put_tips_on_channels([0]) - with ( - unittest.mock.patch.object( - self.STAR, - "_move_z_drive_to_liquid_surface_using_clld", - new_callable=unittest.mock.AsyncMock, - return_value=None, - ), - unittest.mock.patch.object( - self.STAR, - "request_pip_height_last_lld", - new_callable=unittest.mock.AsyncMock, - return_value=list(range(12)), - ), - unittest.mock.patch.object( - self.STAR, - "request_tip_len_on_channel", - new_callable=unittest.mock.AsyncMock, - return_value=59.9, - ), - ): - result = await self.STAR._probe_liquid_heights_batch(containers=[well], use_channels=[0]) + mocks = self._standard_mocks() + with contextlib.ExitStack() as stack: + for v in mocks.values(): + stack.enter_context(v) + result = await self.STAR.probe_liquid_heights(containers=[well], use_channels=[0]) # request_pip_height_last_lld returns list(range(12)), so channel 0 gets height 0. - # relative = 0 - cavity_bottom_z = 0 - 186.65 = -186.65 + # relative = 0 - cavity_bottom_z self.assertEqual(len(result), 1) self.assertAlmostEqual(result[0], 0 - well.get_absolute_location("c", "c", "cavity_bottom").z) @@ -1852,26 +1750,11 @@ async def test_n_replicates(self): self._put_tips_on_channels([0]) mock_detect = unittest.mock.AsyncMock(return_value=None) - with ( - unittest.mock.patch.object( - self.STAR, "_move_z_drive_to_liquid_surface_using_clld", mock_detect - ), - unittest.mock.patch.object( - self.STAR, - "request_pip_height_last_lld", - new_callable=unittest.mock.AsyncMock, - return_value=list(range(12)), - ), - unittest.mock.patch.object( - self.STAR, - "request_tip_len_on_channel", - new_callable=unittest.mock.AsyncMock, - return_value=59.9, - ), - ): - await self.STAR._probe_liquid_heights_batch( - containers=[well], use_channels=[0], n_replicates=3 - ) + mocks = self._standard_mocks(detect_side_effect=mock_detect) + with contextlib.ExitStack() as stack: + for v in mocks.values(): + stack.enter_context(v) + await self.STAR.probe_liquid_heights(containers=[well], use_channels=[0], n_replicates=3) self.assertEqual(mock_detect.await_count, 3) @@ -1894,26 +1777,13 @@ async def test_no_liquid_detected_returns_zero(self): async def raise_error(**kwargs): raise error - with ( - unittest.mock.patch.object( - self.STAR, - "_move_z_drive_to_liquid_surface_using_clld", - unittest.mock.AsyncMock(side_effect=raise_error), - ), - unittest.mock.patch.object( - self.STAR, - "request_pip_height_last_lld", - new_callable=unittest.mock.AsyncMock, - return_value=list(range(12)), - ), - unittest.mock.patch.object( - self.STAR, - "request_tip_len_on_channel", - new_callable=unittest.mock.AsyncMock, - return_value=59.9, - ), - ): - result = await self.STAR._probe_liquid_heights_batch(containers=[well], use_channels=[0]) + mocks = self._standard_mocks( + detect_side_effect=unittest.mock.AsyncMock(side_effect=raise_error) + ) + with contextlib.ExitStack() as stack: + for v in mocks.values(): + stack.enter_context(v) + result = await self.STAR.probe_liquid_heights(containers=[well], use_channels=[0]) self.assertEqual(result[0], 0.0) @@ -1942,58 +1812,46 @@ async def side_effect(**kwargs): return None raise error - with ( - unittest.mock.patch.object( - self.STAR, - "_move_z_drive_to_liquid_surface_using_clld", - unittest.mock.AsyncMock(side_effect=side_effect), - ), - unittest.mock.patch.object( - self.STAR, - "request_pip_height_last_lld", - new_callable=unittest.mock.AsyncMock, - return_value=list(range(12)), - ), - unittest.mock.patch.object( - self.STAR, - "request_tip_len_on_channel", - new_callable=unittest.mock.AsyncMock, - return_value=59.9, - ), - ): + mocks = self._standard_mocks( + detect_side_effect=unittest.mock.AsyncMock(side_effect=side_effect) + ) + with contextlib.ExitStack() as stack: + for v in mocks.values(): + stack.enter_context(v) with self.assertRaises(RuntimeError): - await self.STAR._probe_liquid_heights_batch( - containers=[well], use_channels=[0], n_replicates=2 - ) + await self.STAR.probe_liquid_heights(containers=[well], use_channels=[0], n_replicates=2) async def test_pressure_lld_mode(self): well = self.plate.get_item("A1") self._put_tips_on_channels([0]) - with ( - unittest.mock.patch.object( - self.STAR, - "_search_for_surface_using_plld", - new_callable=unittest.mock.AsyncMock, - return_value=None, - ) as mock_plld, - unittest.mock.patch.object( - self.STAR, - "request_pip_height_last_lld", - new_callable=unittest.mock.AsyncMock, - return_value=list(range(12)), - ), - unittest.mock.patch.object( - self.STAR, - "request_tip_len_on_channel", - new_callable=unittest.mock.AsyncMock, - return_value=59.9, - ), - ): - await self.STAR._probe_liquid_heights_batch( + mocks = self._standard_mocks() + with contextlib.ExitStack() as stack: + entered = {k: stack.enter_context(v) for k, v in mocks.items()} + await self.STAR.probe_liquid_heights( containers=[well], use_channels=[0], lld_mode=self.STAR.LLDMode.PRESSURE, ) - mock_plld.assert_awaited_once() + entered["plld"].assert_awaited_once() + + async def test_duplicate_channels_serialize_measurements(self): + """Same physical channel probing two wells in one call: results don't collide.""" + well_a = self.plate.get_item("A1") + well_b = self.plate.get_item("B1") + self._put_tips_on_channels([0]) + + mocks = self._standard_mocks() + with contextlib.ExitStack() as stack: + for v in mocks.values(): + stack.enter_context(v) + result = await self.STAR.probe_liquid_heights( + containers=[well_a, well_b], + use_channels=[0, 0], + ) + + # Two jobs, one result each, keyed by job index not channel. + self.assertEqual(len(result), 2) + self.assertAlmostEqual(result[0], 0 - well_a.get_absolute_location("c", "c", "cavity_bottom").z) + self.assertAlmostEqual(result[1], 0 - well_b.get_absolute_location("c", "c", "cavity_bottom").z) diff --git a/pylabrobot/liquid_handling/backends/hamilton/planning.py b/pylabrobot/liquid_handling/backends/hamilton/planning.py deleted file mode 100644 index dfba2fa0a5f..00000000000 --- a/pylabrobot/liquid_handling/backends/hamilton/planning.py +++ /dev/null @@ -1,71 +0,0 @@ -from typing import Callable, Dict, List - -from pylabrobot.liquid_handling.channel_positioning import MIN_SPACING_BETWEEN_CHANNELS -from pylabrobot.resources import Coordinate - - -def _default_min_spacing_between(channel1: int, channel2: int) -> float: - """Default minimum spacing between two channels, based on their indices.""" - return MIN_SPACING_BETWEEN_CHANNELS * abs(channel1 - channel2) - - -def group_by_x_batch_by_xy( - locations: List[Coordinate], - use_channels: List[int], - min_spacing_between_channels: Callable[[int, int], float] = _default_min_spacing_between, -) -> Dict[float, List[List[int]]]: - if len(use_channels) == 0: - raise ValueError("use_channels must not be empty.") - if len(locations) == 0: - raise ValueError("locations must not be empty.") - if len(locations) != len(use_channels): - raise ValueError("locations and use_channels must have the same length.") - - # Move channels to traverse height - x_pos, y_pos = zip(*[(loc.x, loc.y) for loc in locations]) - - # Start with a list of indices for each operation. The order is the order of operations as given in the input parameters. - # We will then turn this list of indices into batches of indices that can be executed together, based on their X and Y positions and channel numbers. - indices = list(range(len(locations))) - - # Sort indices by x position. - indices = sorted(indices, key=lambda i: x_pos[i]) - - # Group indices by x position (rounding to 0.1mm to avoid floating point splitting of same-position containers) - # Note that since the indices were already sorted by x position, the groups will also be sorted by x position. - x_groups: Dict[float, List[int]] = {} - for i in indices: - x_rounded = round(x_pos[i], 1) - x_groups.setdefault(x_rounded, []).append(i) - - # Within each x group, sort channels from back (lowest channel index) to front (highest channel index) - for x_group_indices in x_groups.values(): - x_group_indices.sort(key=lambda i: use_channels[i]) - - # Within each x group, batch by y position while respecting minimum y spacing constraint - y_batches: dict[float, List[List[int]]] = {} # x position (group) -> list of batches of indices - for x_group, x_group_indices in x_groups.items(): - y_batches_for_this_x: List[List[int]] = [] # batches of indices for this x group - for i in x_group_indices: - y = y_pos[i] - - # find the first batch that this index can be added to without violating the minimum y spacing constraint - # if no batch is found, create a new batch with this index - for batch in y_batches_for_this_x: - index_min_y = min(batch, key=lambda i: y_pos[i]) - # check min spacing - if y_pos[index_min_y] - y < min_spacing_between_channels( - use_channels[i], use_channels[index_min_y] - ): - continue - # check if channel is already used in this batch - if use_channels[i] in [use_channels[j] for j in batch]: - continue - batch.append(i) - break - else: - y_batches_for_this_x.append([i]) - - y_batches[x_group] = y_batches_for_this_x - - return y_batches diff --git a/pylabrobot/liquid_handling/backends/hamilton/planning_tests.py b/pylabrobot/liquid_handling/backends/hamilton/planning_tests.py deleted file mode 100644 index d13304655c0..00000000000 --- a/pylabrobot/liquid_handling/backends/hamilton/planning_tests.py +++ /dev/null @@ -1,129 +0,0 @@ -import unittest - -from pylabrobot.resources import Coordinate - -from .planning import group_by_x_batch_by_xy - - -class TestGroupByXBatchByXY(unittest.TestCase): - """Tests for group_by_x_batch_by_xy.""" - - def test_single_location(self): - locations = [Coordinate(100.0, 200.0, 0)] - result = group_by_x_batch_by_xy(locations=locations, use_channels=[0]) - self.assertEqual(result, {100.0: [[0]]}) - - def test_same_x_different_y_fits_in_one_batch(self): - locations = [ - Coordinate(100.0, 200.0, 0), - Coordinate(100.0, 180.0, 0), - ] - result = group_by_x_batch_by_xy(locations=locations, use_channels=[0, 1]) - # y diff = 20 >= 9*1, fits in one batch - self.assertEqual(result, {100.0: [[0, 1]]}) - - def test_same_x_too_close_y_splits_batches(self): - locations = [ - Coordinate(100.0, 200.0, 0), - Coordinate(100.0, 195.0, 0), - ] - result = group_by_x_batch_by_xy(locations=locations, use_channels=[0, 1]) - # y diff = 5 < 9*1, separate batches - self.assertEqual(result, {100.0: [[0], [1]]}) - - def test_different_x_groups(self): - locations = [ - Coordinate(100.0, 200.0, 0), - Coordinate(200.0, 200.0, 0), - ] - result = group_by_x_batch_by_xy(locations=locations, use_channels=[0, 1]) - self.assertEqual(result, {100.0: [[0]], 200.0: [[1]]}) - - def test_x_rounding(self): - locations = [ - Coordinate(100.04, 200.0, 0), - Coordinate(100.02, 180.0, 0), - ] - result = group_by_x_batch_by_xy(locations=locations, use_channels=[0, 1]) - # Both round to 100.0 - self.assertEqual(result, {100.0: [[0, 1]]}) - - def test_two_channels_same_x(self): - locations = [Coordinate(100.0, 200.0, 0), Coordinate(100.0, 180.0, 0)] - result = group_by_x_batch_by_xy(locations=locations, use_channels=[0, 1]) - self.assertEqual(result, {100.0: [[0, 1]]}) - - def test_empty_use_channels_raises(self): - with self.assertRaises(ValueError): - group_by_x_batch_by_xy( - locations=[Coordinate(100.0, 200.0, 0)], - use_channels=[], - ) - - def test_non_adjacent_channels(self): - locations = [ - Coordinate(100.0, 300.0, 0), - Coordinate(100.0, 200.0, 0), - ] - result = group_by_x_batch_by_xy(locations=locations, use_channels=[0, 5]) - # y diff = 100 >= 9*(5-0) = 45, fits in one batch - self.assertEqual(result, {100.0: [[0, 1]]}) - - def test_non_adjacent_channels_too_close(self): - locations = [ - Coordinate(100.0, 240.0, 0), - Coordinate(100.0, 200.0, 0), - ] - result = group_by_x_batch_by_xy(locations=locations, use_channels=[0, 5]) - # y diff = 40 < 9*(5-0) = 45, separate batches - self.assertEqual(result, {100.0: [[0], [1]]}) - - def test_sorted_by_x(self): - locations = [ - Coordinate(300.0, 200.0, 0), - Coordinate(100.0, 200.0, 0), - Coordinate(200.0, 200.0, 0), - ] - result = group_by_x_batch_by_xy(locations=locations, use_channels=[0, 1, 2]) - self.assertEqual(result, {100.0: [[1]], 200.0: [[2]], 300.0: [[0]]}) - - def test_multiple_batches_in_one_x_group(self): - locations = [ - Coordinate(100.0, 200.0, 0), - Coordinate(100.0, 197.0, 0), - Coordinate(100.0, 194.0, 0), - Coordinate(100.0, 191.0, 0), - ] - result = group_by_x_batch_by_xy(locations=locations, use_channels=[0, 1, 2, 3]) - # Each consecutive pair has y diff = 3 < 9, so each in its own batch - self.assertEqual(result, {100.0: [[0], [1], [2], [3]]}) - - def test_duplicate_channels_split_into_separate_batches(self): - locations = [ - Coordinate(100.0, 200.0, 0), - Coordinate(100.0, 180.0, 0), - ] - result = group_by_x_batch_by_xy(locations=locations, use_channels=[0, 0]) - self.assertEqual(result, {100.0: [[0], [1]]}) - - def test_duplicate_channels_three_ops(self): - locations = [ - Coordinate(100.0, 200.0, 0), - Coordinate(100.0, 180.0, 0), - Coordinate(100.0, 160.0, 0), - ] - result = group_by_x_batch_by_xy(locations=locations, use_channels=[0, 0, 0]) - self.assertEqual(result, {100.0: [[0], [1], [2]]}) - - def test_channels_sorted_by_channel_index_within_x_group(self): - locations = [ - Coordinate(100.0, 180.0, 0), # channel 2 - Coordinate(100.0, 200.0, 0), # channel 0 - ] - result = group_by_x_batch_by_xy(locations=locations, use_channels=[2, 0]) - # Channel 0 (index 1) sorted before channel 2 (index 0) - self.assertEqual(result, {100.0: [[1, 0]]}) - - -if __name__ == "__main__": - unittest.main() diff --git a/pylabrobot/liquid_handling/channel_positioning.py b/pylabrobot/liquid_handling/channel_positioning.py index 535dc8a8681..047ea7c48f0 100644 --- a/pylabrobot/liquid_handling/channel_positioning.py +++ b/pylabrobot/liquid_handling/channel_positioning.py @@ -1,7 +1,7 @@ import logging import math import warnings -from typing import List, Optional, Tuple +from typing import Dict, List, Optional, Tuple from pylabrobot.liquid_handling.errors import ChannelsDoNotFitError from pylabrobot.resources.container import Container @@ -298,10 +298,11 @@ def compute_channel_offsets( spread: str = "wide", channel_spacings: Optional[List[float]] = None, ) -> List[Coordinate]: - """Compute Y offsets for positioning pipette channels in a resource. + """Compute Y offsets for positioning consecutive channels (0..num_channels-1) in a resource. - Single entry point for all channel positioning logic. Handles containers with no-go zones - (distributing channels across compartments) and plain resources (wide/tight spread). + Handles container geometry: compartments with no-go zones, wide/tight spread. + Assumes channels are consecutively indexed — for non-consecutive channel selections + (e.g. [0, 2, 5]), use ``compute_nonconsecutive_channel_offsets`` instead. Args: resource: The target resource (Container, Trough, Well, etc.). @@ -448,6 +449,96 @@ def compute_channel_offsets( return _centers_to_offsets(centers, resource) +# --------------------------------------------------------------------------- +# Non-consecutive channel offsets +# --------------------------------------------------------------------------- + + +def compute_nonconsecutive_channel_offsets( + container: Container, + use_channels: List[int], + channel_spacings: List[float], +) -> Optional[List[Coordinate]]: + """Compute Y offsets for non-consecutive channel selections targeting one container. + + Wraps ``compute_channel_offsets``: fills in phantom channels for gaps in the + channel sequence (e.g. [0, 2, 5] → physical span 0..5), attempts to fit the + full span, and falls back to splitting into consecutive sub-groups when it + doesn't fit. + + Args: + container: The target container. + use_channels: Channel indices being used (e.g. [0, 2, 5]). Need not be + consecutive — gaps are filled with phantom channels. + channel_spacings: Per-channel occupancy diameters in mm. Must have at least + ``max(use_channels) + 1`` entries. + + Returns: + List of Y offsets (one per entry in *use_channels*), or None if even a + single pair of adjacent channels can't fit in the container. + """ + + if len(use_channels) == 0: + return [] + + ch_hi = max(use_channels) + if len(channel_spacings) < ch_hi + 1: + raise ValueError( + f"channel_spacings list must have at least {ch_hi + 1} entries " + f"(max channel index is {ch_hi}), got {len(channel_spacings)}." + ) + + def _try_group(channels: List[int]) -> Optional[List[Coordinate]]: + """Try to fit channels into the container, returning None if too narrow.""" + g_lo, g_hi = min(channels), max(channels) + spacing = max(channel_spacings[g_lo : g_hi + 1]) + num_physical = g_hi - g_lo + 1 + min_required = MIN_SPACING_EDGE * 2 + (num_physical - 1) * spacing + if container.get_absolute_size_y() < min_required: + return None + try: + all_offsets = compute_channel_offsets( + resource=container, + num_channels=num_physical, + spread="wide", + channel_spacings=[spacing] * num_physical, + ) + except (ChannelsDoNotFitError, ValueError): + return None + return [all_offsets[ch - g_lo] for ch in channels] + + # Try the full span first (all channels including phantoms fit) + full = _try_group(use_channels) + if full is not None: + return full + + # Full span doesn't fit. Split at gaps in the sorted channel sequence + # into consecutive sub-groups and compute offsets for each independently. + sorted_chs = sorted(use_channels) + groups: List[List[int]] = [[sorted_chs[0]]] + for i in range(1, len(sorted_chs)): + if sorted_chs[i] == sorted_chs[i - 1] + 1: + groups[-1].append(sorted_chs[i]) + else: + groups.append([sorted_chs[i]]) + + # If there's only one consecutive group and it didn't fit above, container is too small + if len(groups) == 1: + return None + + # Compute offsets per sub-group + ch_to_offset: Dict[int, Coordinate] = {} + for group in groups: + group_offsets = _try_group(group) + if group_offsets is None: + return None # even a sub-group doesn't fit + for ch, offset in zip(group, group_offsets): + ch_to_offset[ch] = offset + + # Return in the original use_channels order + return [ch_to_offset[ch] for ch in use_channels] + + # --------------------------------------------------------------------------- # Deprecated wrappers (remove in v1) # --------------------------------------------------------------------------- diff --git a/pylabrobot/liquid_handling/channel_positioning_tests.py b/pylabrobot/liquid_handling/channel_positioning_tests.py index 78f9fa6612e..13e6ca7ba2a 100644 --- a/pylabrobot/liquid_handling/channel_positioning_tests.py +++ b/pylabrobot/liquid_handling/channel_positioning_tests.py @@ -1,4 +1,5 @@ import unittest +from unittest.mock import MagicMock, patch from pylabrobot.liquid_handling.channel_positioning import ( _centers_to_offsets, @@ -9,6 +10,7 @@ _resolve_channel_spacings, _space_needed, compute_channel_offsets, + compute_nonconsecutive_channel_offsets, required_spacing_between, ) from pylabrobot.liquid_handling.errors import ChannelsDoNotFitError @@ -402,5 +404,68 @@ def test_wide_vs_tight_gap_difference(self): self.assertGreaterEqual(wide_gap, tight_gap - 0.01) +class TestComputeSingleContainerOffsets(unittest.TestCase): + S = [9.0] * 8 + + def _mock_container(self, size_y: float): + c = MagicMock(spec=["get_absolute_size_y"]) + c.get_absolute_size_y.return_value = size_y + return c + + @patch("pylabrobot.liquid_handling.channel_positioning.compute_channel_offsets") + def test_even_span_no_center_offset(self, mock_offsets): + mock_offsets.return_value = [Coordinate(0, 4.5, 0), Coordinate(0, -4.5, 0)] + result = compute_nonconsecutive_channel_offsets(self._mock_container(50.0), [0, 1], self.S) + assert result is not None + self.assertAlmostEqual(result[0].y, 4.5) + self.assertAlmostEqual(result[1].y, -4.5) + + @patch("pylabrobot.liquid_handling.channel_positioning.compute_channel_offsets") + def test_odd_span_passes_through_offsets(self, mock_offsets): + mock_offsets.return_value = [ + Coordinate(0, 9.0, 0), + Coordinate(0, 0.0, 0), + Coordinate(0, -9.0, 0), + ] + # No additional shift; compute_channel_offsets handles no-go zones directly + result = compute_nonconsecutive_channel_offsets(self._mock_container(50.0), [0, 1, 2], self.S) + assert result is not None + self.assertAlmostEqual(result[0].y, 9.0) + + def test_container_too_small_returns_none(self): + self.assertIsNone( + compute_nonconsecutive_channel_offsets(self._mock_container(10.0), [0, 1], self.S) + ) + + @patch("pylabrobot.liquid_handling.channel_positioning.compute_channel_offsets") + def test_non_consecutive_uses_full_physical_span(self, mock_offsets): + mock_offsets.return_value = [ + Coordinate(0, 10.0, 0), + Coordinate(0, 0.0, 0), + Coordinate(0, -10.0, 0), + ] + result = compute_nonconsecutive_channel_offsets(self._mock_container(50.0), [0, 2], self.S) + assert result is not None + self.assertEqual(len(result), 2) + mock_offsets.assert_called_once_with( + resource=unittest.mock.ANY, num_channels=3, spread="wide", channel_spacings=[9.0] * 3 + ) + + @patch("pylabrobot.liquid_handling.channel_positioning.compute_channel_offsets") + def test_mixed_spacing_uses_effective(self, mock_offsets): + mock_offsets.return_value = [ + Coordinate(0, 18.0, 0), + Coordinate(0, 0.0, 0), + Coordinate(0, -18.0, 0), + ] + result = compute_nonconsecutive_channel_offsets( + self._mock_container(100.0), [0, 2], [9.0, 9.0, 18.0] + ) + self.assertIsNotNone(result) + mock_offsets.assert_called_once_with( + resource=unittest.mock.ANY, num_channels=3, spread="wide", channel_spacings=[18.0] * 3 + ) + + if __name__ == "__main__": unittest.main() diff --git a/pylabrobot/liquid_handling/pipette_batch_scheduling.py b/pylabrobot/liquid_handling/pipette_batch_scheduling.py new file mode 100644 index 00000000000..a6f35e576b6 --- /dev/null +++ b/pylabrobot/liquid_handling/pipette_batch_scheduling.py @@ -0,0 +1,446 @@ +"""Plan the fewest X/Y moves that position each channel at its target. + +Multi-channel heads share one X carriage, enforce minimum pairwise Y spacing, strictly +descending Y by channel index, and per-container geometry (including no-go zones). +Given a list of (channel, target container) assignments, this module groups them into batches +where each batch is a set of channels whose targets can all be reached in one X/Y move. +A Z-axis operation (e.g. LLD probe, aspirate, dispense, ...) is then supplied as a callback +by the caller. + +This is formally a Minimum Exact Cover problem (equivalently, Set Partitioning in OR terminology, +or minimum hypergraph coloring in graph theory): pairwise constraints alone reduce to +graph coloring; container fit with no-go zones is k-ary, making it hypergraph coloring. +Hence the enumerate-then-partition pipeline rather than 2-ary graph coloring. Intended +for n <= ~16 channels; planning is O(2^n * n^2) in the worst case, with the branch-and- +bound partition solver typically fast on the structured instances this module sees. + + batches = plan_batches( + use_channels=[0, 1, 2, 5, 6, 7], + containers=[w0, w1, w2, w5, w6, w7], + channel_spacings=backend._channels_minimum_y_spacing, + wrt_resource=backend.deck, + x_tolerance=0.1, + ) + await backend.execute_batched(func=my_z_callback, batches=batches) +""" + +import itertools +import logging +import math +from collections import defaultdict +from dataclasses import dataclass, field +from typing import Collection, Dict, FrozenSet, List, Optional, Tuple + +from pylabrobot.liquid_handling.channel_positioning import ( + compute_nonconsecutive_channel_offsets, +) +from pylabrobot.resources.container import Container +from pylabrobot.resources.coordinate import Coordinate +from pylabrobot.resources.resource import Resource + +logger = logging.getLogger(__name__) + +# --- Data types --- + + +@dataclass +class ChannelBatch: + """A group of channels that can operate simultaneously. + + ``y_positions`` contains entries for active channels and any phantom channels + between non-consecutive active members. + """ + + x_position: float + indices: List[int] + channels: List[int] + y_positions: Dict[int, float] = field(default_factory=dict) # includes phantoms + + +def log_batches( + batches: List[ChannelBatch], + use_channels: Optional[List[int]] = None, + containers: Optional[List["Container"]] = None, +) -> None: + """Log a tree view of the batch execution plan. + + Groups batches by X position and shows Y batches nested within each X group. + Active channels are marked with ``*``, phantoms with a space. + + Args: + batches: Output from ``plan_batches()``. + use_channels: Channel indices (parallel with *containers*). If omitted, + container names are not shown next to active channels. + containers: Container objects (parallel with *use_channels*). If omitted, + container names are not shown next to active channels. + """ + + ch_to_container = ( + dict(zip(use_channels, containers)) + if use_channels is not None and containers is not None + else {} + ) + + x_groups = [ + (x_key, list(group)) for x_key, group in itertools.groupby(batches, key=lambda b: b.x_position) + ] + + lines = ["plan:"] + for xg_i, (x_key, xg_batches) in enumerate(x_groups): + is_last_xg = xg_i == len(x_groups) - 1 + xg_branch = "└" if is_last_xg else "├" + xg_cont = " " if is_last_xg else "│" + lines.append(f" {xg_branch}── x-group {xg_i + 1} (x={x_key:.1f} mm)") + for yb_i, b in enumerate(xg_batches): + is_last_yb = yb_i == len(xg_batches) - 1 + yb_branch = "└" if is_last_yb else "├" + yb_cont = " " if is_last_yb else "│" + lines.append(f" {xg_cont} {yb_branch}── y-batch {yb_i + 1}") + for ch in sorted(b.y_positions.keys()): + is_last_ch = ch == max(b.y_positions.keys()) + ch_branch = "└" if is_last_ch else "├" + active = "*" if ch in b.channels else " " + container_name = f" ({ch_to_container[ch].name})" if ch in ch_to_container else "" + lines.append( + f" {xg_cont} {yb_cont} {ch_branch}── {active}ch{ch}: y={b.y_positions[ch]:.1f} mm{container_name}" + ) + logger.info("\n".join(lines)) + + +# --- Spacing helpers --- + + +def _span_required(spacings: List[float], ch_lo: int, ch_hi: int) -> float: + """Minimum total Y distance required between channels ch_lo and ch_hi. + + Sums the rounded pairwise spacing for each adjacent pair in the range, + matching what the firmware enforces. + """ + return sum(math.ceil(max(spacings[ch], spacings[ch + 1]) * 10) / 10 for ch in range(ch_lo, ch_hi)) + + +# --- Batch partitioning --- + + +def _interpolate_phantoms( + channels: List[int], y_positions: Dict[int, float], spacings: List[float] +) -> Dict[int, float]: + """Return Y positions with phantom channels filled in between non-consecutive batch members. + + Each phantom is placed using the conservative max-spacing model (via ``_span_required``), + so non-uniform spacings are respected (e.g. a wide channel only widens its own gaps). + """ + result = dict(y_positions) + sorted_chs = sorted(channels) + for k in range(len(sorted_chs) - 1): + ch_lo, ch_hi = sorted_chs[k], sorted_chs[k + 1] + for phantom in range(ch_lo + 1, ch_hi): + if phantom not in result: + result[phantom] = result[ch_lo] - _span_required(spacings, ch_lo, phantom) + return result + + +# --- Input validation and position computation --- + + +def validate_channel_selections( + containers: List[Container], + num_channels: int, + use_channels: Optional[List[int]] = None, +) -> List[int]: + """Validate and normalize channel selection. + + If *use_channels* is ``None``, defaults to ``[0, 1, ..., len(containers)-1]``. + + Returns: + Validated list of channel indices. + + Raises: + ValueError: If channels are empty, out of range, or if *containers* + and *use_channels* have different lengths. + """ + if use_channels is None: + use_channels = list(range(len(containers))) + if len(use_channels) == 0: + raise ValueError("use_channels must not be empty.") + if not all(0 <= ch < num_channels for ch in use_channels): + raise ValueError( + f"All use_channels must be integers in range [0, {num_channels - 1}], got {use_channels}." + ) + if len(containers) != len(use_channels): + raise ValueError( + f"Length of containers and use_channels must match, " + f"got {len(containers)} and {len(use_channels)}." + ) + return use_channels + + +# --- Validity predicate & enumeration --- + + +def is_valid_batch( + job_indices: Collection[int], + use_channels: List[int], + containers: List[Container], + channel_spacings: List[float], + wrt_resource: Resource, + x_tolerance: float, + resource_offsets: Optional[List[Coordinate]] = None, +) -> Optional[Dict[int, Coordinate]]: + """Validate a candidate batch against all physical constraints, cheapest first. + + Checks, in order: + 1. Unique channels — O(k), prunes most candidates instantly. + 2. Same X — O(k), uses container centers (X isn't shifted by auto-spread). + 3. Container fit — per container, calls + :func:`compute_nonconsecutive_channel_offsets` to see if the channels + targeting it can coexist (respecting no-go zones). Skipped when + *resource_offsets* is provided. This is the expensive check. + 4. Y spacing between adjacent channels — O(k) on resolved positions. + + Returns: + Dict mapping each job index to its resolved absolute Coordinate if the + batch is valid; ``None`` otherwise. + """ + jobs = list(job_indices) + if not jobs: + return {} + + # 1. Unique channels. + if len({use_channels[j] for j in jobs}) != len(jobs): + return None + + centers = [containers[j].get_location_wrt(wrt_resource, x="c", y="c", z="b") for j in jobs] + + # 2. Same X (container center + any user X offset; auto-spread only moves Y). + if len(jobs) > 1: + xs = [ + centers[k].x + (resource_offsets[jobs[k]].x if resource_offsets is not None else 0.0) + for k in range(len(jobs)) + ] + if max(xs) - min(xs) > x_tolerance: + return None + + # 3. Container fit → per-channel Y offset (skipped when user supplies offsets). + y_offsets: Dict[int, float] = {j: 0.0 for j in jobs} + if resource_offsets is None: + by_container: Dict[int, List[int]] = defaultdict(list) + for j in jobs: + by_container[id(containers[j])].append(j) + for cjobs in by_container.values(): + c = containers[cjobs[0]] + if len(cjobs) == 1 and not getattr(c, "no_go_zones", ()): + continue # single channel, no no-go zones — center is fine. + offsets = compute_nonconsecutive_channel_offsets( + c, + [use_channels[j] for j in cjobs], + channel_spacings, + ) + if offsets is None: + return None + for j, off in zip(cjobs, offsets): + y_offsets[j] = off.y + + # Resolve absolute targets. + targets: Dict[int, Coordinate] = {} + for k, j in enumerate(jobs): + ox = resource_offsets[j].x if resource_offsets is not None else 0.0 + oy = resource_offsets[j].y if resource_offsets is not None else y_offsets[j] + targets[j] = Coordinate(centers[k].x + ox, centers[k].y + oy, 0.0) + + if len(jobs) == 1: + return targets + + # 4. Y spacing between adjacent channels. + jobs_sorted = sorted(jobs, key=lambda j: use_channels[j]) + for lo, hi in zip(jobs_sorted, jobs_sorted[1:]): + required = _span_required(channel_spacings, use_channels[lo], use_channels[hi]) + if targets[lo].y - targets[hi].y < required - 1e-9: + return None + + return targets + + +def _build_channel_batch( + job_indices: Collection[int], + resolved: Dict[int, Coordinate], + use_channels: List[int], + channel_spacings: List[float], +) -> ChannelBatch: + """Assemble a :class:`ChannelBatch` from validated job indices and resolved positions.""" + indices = sorted(job_indices, key=lambda i: use_channels[i]) + channels = [use_channels[i] for i in indices] + y_positions: Dict[int, float] = {use_channels[i]: resolved[i].y for i in indices} + y_positions = _interpolate_phantoms(channels, y_positions, channel_spacings) + x_position = sum(resolved[i].x for i in indices) / len(indices) + return ChannelBatch( + x_position=x_position, + indices=indices, + channels=channels, + y_positions=y_positions, + ) + + +def enumerate_valid_batches( + use_channels: List[int], + containers: List[Container], + channel_spacings: List[float], + wrt_resource: Resource, + x_tolerance: float, + resource_offsets: Optional[List[Coordinate]] = None, +) -> List[ChannelBatch]: + """Enumerate every valid batch by backtracking, returning ChannelBatch objects. + + Jobs are visited in channel-ascending order; an invalid candidate prunes its subtree. + """ + n = len(use_channels) + order = sorted(range(n), key=lambda i: use_channels[i]) + result: List[ChannelBatch] = [] + + def backtrack(start: int, current: List[int]): + if current: + resolved = is_valid_batch( + current, + use_channels, + containers, + channel_spacings, + wrt_resource, + x_tolerance, + resource_offsets, + ) + if resolved is None: + return + result.append(_build_channel_batch(current, resolved, use_channels, channel_spacings)) + for pos in range(start, n): + backtrack(pos + 1, current + [order[pos]]) + + backtrack(0, []) + return result + + +def minimum_exact_cover( + n_jobs: int, + batches: List[ChannelBatch], +) -> List[ChannelBatch]: + """Pick the fewest batches that partition ``{0..n_jobs-1}`` (branch-and-bound). + + Returned list is a subset of *batches*. Assumes every job appears in at + least one batch (caller's responsibility) so a partition always exists. + """ + if n_jobs == 0: + return [] + + by_min: Dict[int, List[Tuple[FrozenSet[int], ChannelBatch]]] = defaultdict(list) + for b in batches: + js = frozenset(b.indices) + by_min[min(js)].append((js, b)) + # Try larger batches first at each pivot — finds a tight bound sooner. + for bucket in by_min.values(): + bucket.sort(key=lambda js_b: len(js_b[0]), reverse=True) + + all_jobs = frozenset(range(n_jobs)) + best: List[List[ChannelBatch]] = [batches[: n_jobs + 1]] # sentinel: unreachable length + + def recurse(remaining: FrozenSet[int], current: List[ChannelBatch]): + if not remaining: + if len(current) < len(best[0]): + best[0] = list(current) + return + if len(current) + 1 >= len(best[0]): + return + pivot = min(remaining) + for js, b in by_min[pivot]: + if js.issubset(remaining): + current.append(b) + recurse(remaining - js, current) + current.pop() + + recurse(all_jobs, []) + return best[0] + + +def plan_batches( + use_channels: List[int], + containers: List[Container], + channel_spacings: List[float], + wrt_resource: Resource, + x_tolerance: float, + resource_offsets: Optional[List[Coordinate]] = None, +) -> List[ChannelBatch]: + """Container-aware, optimal batch planning (respects no-go zones). + + Decides the per-container spread *per candidate batch* by asking + :func:`compute_nonconsecutive_channel_offsets` whether the channels + targeting each container physically coexist in it given its geometry and + no-go zones. Pairs that behavior with minimum exact-cover partition + (:func:`minimum_exact_cover`) so the returned plan uses the fewest batches. + + Args: + use_channels: Channel indices being used (parallel to *containers*). + containers: Target Container objects, one per channel. + channel_spacings: Per-channel minimum Y spacing (mm). + wrt_resource: Reference resource for computing absolute positions. + x_tolerance: Positions within this tolerance share a batch (pairwise). + resource_offsets: Optional explicit XYZ offsets from container centers. + When provided, auto-spread and no-go-zone checks are skipped (user is + authoritative). + + Returns: + Flat list of ChannelBatch sorted by ascending X position. + """ + if x_tolerance <= 0: + raise ValueError(f"x_tolerance must be > 0, got {x_tolerance}.") + if len(use_channels) != len(containers): + raise ValueError( + f"use_channels and containers must have the same length, " + f"got {len(use_channels)} and {len(containers)}." + ) + if len(use_channels) == 0: + raise ValueError("use_channels must not be empty.") + if resource_offsets is not None and len(resource_offsets) != len(use_channels): + raise ValueError( + f"resource_offsets length must match use_channels, " + f"got {len(resource_offsets)} and {len(use_channels)}." + ) + max_ch = max(use_channels) + if len(channel_spacings) < max_ch + 1: + raise ValueError( + f"channel_spacings list must have at least {max_ch + 1} entries " + f"(max channel index is {max_ch}), got {len(channel_spacings)}." + ) + + valid = enumerate_valid_batches( + use_channels, + containers, + channel_spacings, + wrt_resource, + x_tolerance, + resource_offsets, + ) + + # Every job must appear in at least one valid batch, else no partition exists. + covered = set().union(*(b.indices for b in valid)) if valid else set() + missing = set(range(len(use_channels))) - covered + if missing: + raise ValueError( + f"No valid batch contains job(s) {sorted(missing)} " + f"(channel(s) {[use_channels[j] for j in sorted(missing)]}); " + f"container(s) cannot accommodate them." + ) + + partition = minimum_exact_cover(len(use_channels), valid) + + # Snap batches sharing an X bucket to one representative x_position so downstream + # consumers can compare by equality instead of tolerance. Each batch's x_position + # starts as the mean of its own jobs' x-coords, so averages across batches in the + # same bucket drift within x_tolerance; collapse them to a shared value here. + buckets: Dict[int, List[ChannelBatch]] = defaultdict(list) + for b in partition: + buckets[math.floor(b.x_position / x_tolerance)].append(b) + for group in buckets.values(): + shared_x = sum(b.x_position for b in group) / len(group) + for b in group: + b.x_position = shared_x + + partition.sort(key=lambda b: b.x_position) + return partition diff --git a/pylabrobot/liquid_handling/pipette_batch_scheduling_tests.py b/pylabrobot/liquid_handling/pipette_batch_scheduling_tests.py new file mode 100644 index 00000000000..0ef97cc23d9 --- /dev/null +++ b/pylabrobot/liquid_handling/pipette_batch_scheduling_tests.py @@ -0,0 +1,394 @@ +"""Tests for pipette_batch_scheduling module.""" + +import unittest +from typing import Iterable, List +from unittest.mock import MagicMock, patch + +from pylabrobot.liquid_handling.pipette_batch_scheduling import ( + ChannelBatch, + _span_required, + enumerate_valid_batches, + is_valid_batch, + minimum_exact_cover, + plan_batches, +) +from pylabrobot.resources.container import Container +from pylabrobot.resources.coordinate import Coordinate +from pylabrobot.resources.resource import Resource + + +def _mock_container( + cx: float, cy: float, size_y: float = 3.0, name: str = "well", no_go_zones: Iterable = () +) -> Container: + """Build a mock Container at (cx, cy). + + Default ``size_y=3.0`` mm is below ``2 * MIN_SPACING_EDGE = 4mm`` so + ``compute_nonconsecutive_channel_offsets`` returns ``None`` without reaching + ``compute_channel_offsets`` (which reads real Container internals). Tests + that need multi-channel spread pass a larger ``size_y`` and ``@patch`` the + offsets function. + """ + c = MagicMock(spec=Container) + c.name = name + c.get_absolute_size_y.return_value = size_y + c.get_location_wrt = MagicMock(return_value=Coordinate(cx, cy, 0)) + c.no_go_zones = list(no_go_zones) + return c + + +def _mock_deck() -> Resource: + return MagicMock(spec=Resource) + + +def _containers(xs: List[float], ys: List[float]) -> List[Container]: + return [_mock_container(x, y) for x, y in zip(xs, ys)] + + +def _stub_batch(indices: Iterable[int]) -> ChannelBatch: + """Build a ChannelBatch with only indices populated (cover-algorithm tests).""" + return ChannelBatch( + x_position=0.0, + indices=list(indices), + channels=list(indices), + y_positions={}, + ) + + +def _index_sets(batches: List[ChannelBatch]) -> List[frozenset]: + return [frozenset(b.indices) for b in batches] + + +DECK = _mock_deck() + + +class TestSpanRequired(unittest.TestCase): + """Rounded pairwise spacing sums.""" + + SPACINGS = [8.98, 8.98, 17.96, 17.96] + + def test_uniform(self): + # max(8.98, 8.98) = 8.98 -> ceil(89.8)/10 = 9.0 + self.assertAlmostEqual(_span_required(self.SPACINGS, 0, 1), 9.0) + + def test_mixed(self): + # max(8.98, 17.96) = 17.96 -> ceil(179.6)/10 = 18.0 + self.assertAlmostEqual(_span_required(self.SPACINGS, 1, 2), 18.0) + + +class TestIsValidBatch(unittest.TestCase): + """Container-aware batch-level validity predicate.""" + + S = [9.0] * 8 + + def test_empty_and_singleton_valid(self): + c = _mock_container(100.0, 200.0) + self.assertEqual(is_valid_batch([], [0], [c], self.S, DECK, x_tolerance=0.1), {}) + result = is_valid_batch([0], [0], [c], self.S, DECK, x_tolerance=0.1) + assert result is not None + self.assertAlmostEqual(result[0].x, 100.0) + self.assertAlmostEqual(result[0].y, 200.0) + + def test_duplicate_channels_invalid(self): + cs = _containers([100.0, 100.0], [200.0, 180.0]) + self.assertIsNone(is_valid_batch([0, 1], [3, 3], cs, self.S, DECK, x_tolerance=0.1)) + + def test_x_tolerance_respected(self): + cs = _containers([100.0, 100.08], [209.0, 200.0]) + self.assertIsNotNone(is_valid_batch([0, 1], [0, 1], cs, self.S, DECK, x_tolerance=0.1)) + cs = _containers([100.0, 100.2], [209.0, 200.0]) + self.assertIsNone(is_valid_batch([0, 1], [0, 1], cs, self.S, DECK, x_tolerance=0.1)) + + def test_spacing_boundary(self): + cs = _containers([100.0, 100.0], [209.0, 200.0]) + self.assertIsNotNone(is_valid_batch([0, 1], [0, 1], cs, self.S, DECK, x_tolerance=0.1)) + cs = _containers([100.0, 100.0], [208.9, 200.0]) + self.assertIsNone(is_valid_batch([0, 1], [0, 1], cs, self.S, DECK, x_tolerance=0.1)) + + def test_monotone_y_enforced(self): + # Higher channel must be at lower Y. + cs = _containers([100.0, 100.0], [200.0, 209.0]) + self.assertIsNone(is_valid_batch([0, 1], [0, 1], cs, self.S, DECK, x_tolerance=0.1)) + + def test_pairwise_sum_not_uniform_product(self): + # ch0→ch3 with mixed [8.98, 8.98, 17.96, 17.96]: 9+18+18 = 45mm pairwise. + sp = [8.98, 8.98, 17.96, 17.96] + cs = _containers([100.0, 100.0], [245.0, 200.0]) # 45mm apart + self.assertIsNotNone(is_valid_batch([0, 1], [0, 3], cs, sp, DECK, x_tolerance=0.1)) + cs = _containers([100.0, 100.0], [244.9, 200.0]) # 44.9mm — short + self.assertIsNone(is_valid_batch([0, 1], [0, 3], cs, sp, DECK, x_tolerance=0.1)) + + @patch( + "pylabrobot.liquid_handling.pipette_batch_scheduling.compute_nonconsecutive_channel_offsets" + ) + def test_container_fit_failure_rejects_batch(self, mock_offsets): + mock_offsets.return_value = None + c = _mock_container(100.0, 200.0, size_y=3.0) + self.assertIsNone(is_valid_batch([0, 1], [0, 1], [c, c], self.S, DECK, x_tolerance=0.1)) + + @patch( + "pylabrobot.liquid_handling.pipette_batch_scheduling.compute_nonconsecutive_channel_offsets" + ) + def test_container_fit_sets_spread_positions(self, mock_offsets): + mock_offsets.return_value = [Coordinate(0, 4.5, 0), Coordinate(0, -4.5, 0)] + c = _mock_container(100.0, 200.0, size_y=50.0) + resolved = is_valid_batch([0, 1], [0, 1], [c, c], self.S, DECK, x_tolerance=0.1) + assert resolved is not None + self.assertAlmostEqual(resolved[0].y, 204.5) + self.assertAlmostEqual(resolved[1].y, 195.5) + + def test_resource_offsets_override_auto_spread(self): + # When explicit offsets are provided, compute_nonconsecutive_channel_offsets is not called. + c = _mock_container(100.0, 200.0) + offsets = [Coordinate(0, 10.0, 0), Coordinate(0, -10.0, 0)] + resolved = is_valid_batch( + [0, 1], + [0, 1], + [c, c], + self.S, + DECK, + x_tolerance=0.1, + resource_offsets=offsets, + ) + assert resolved is not None + self.assertAlmostEqual(resolved[0].y, 210.0) + self.assertAlmostEqual(resolved[1].y, 190.0) + + +class TestEnumerateValidBatches(unittest.TestCase): + """Backtracking enumeration.""" + + S = [9.0] * 8 + + def test_singletons_always_present(self): + cs = _containers([100.0, 200.0, 300.0], [200.0, 200.0, 200.0]) + batches = enumerate_valid_batches([0, 1, 2], cs, self.S, DECK, x_tolerance=0.1) + keys = _index_sets(batches) + for i in range(3): + self.assertIn(frozenset([i]), keys) + + def test_enumerates_all_compatible_subsets(self): + # 3 channels, all pairwise compatible at 9mm spacing. + cs = _containers([100.0] * 3, [218.0, 209.0, 200.0]) + batches = enumerate_valid_batches([0, 1, 2], cs, self.S, DECK, x_tolerance=0.1) + expected = { + frozenset([0]), + frozenset([1]), + frozenset([2]), + frozenset([0, 1]), + frozenset([1, 2]), + frozenset([0, 2]), + frozenset([0, 1, 2]), + } + self.assertEqual(set(_index_sets(batches)), expected) + + def test_excludes_invalid_pair(self): + cs = _containers([100.0] * 3, [210.0, 205.0, 200.0]) # 0↔1 gap only 5mm + batches = enumerate_valid_batches([0, 1, 2], cs, self.S, DECK, x_tolerance=0.1) + keys = set(_index_sets(batches)) + self.assertNotIn(frozenset([0, 1]), keys) + self.assertNotIn(frozenset([0, 1, 2]), keys) + + def test_different_x_never_share(self): + cs = _containers([100.0, 200.0], [209.0, 200.0]) + batches = enumerate_valid_batches([0, 1], cs, self.S, DECK, x_tolerance=0.1) + self.assertNotIn(frozenset([0, 1]), _index_sets(batches)) + + +class TestMinimumExactCover(unittest.TestCase): + """Branch-and-bound minimum partition.""" + + def test_single_job(self): + cover = minimum_exact_cover(1, [_stub_batch([0])]) + self.assertEqual(_index_sets(cover), [frozenset([0])]) + + def test_prefers_larger_batch(self): + batches = [_stub_batch(ix) for ix in ([0], [1], [2], [0, 1], [0, 1, 2])] + cover = minimum_exact_cover(3, batches) + self.assertEqual(_index_sets(cover), [frozenset([0, 1, 2])]) + + def test_forces_two_when_no_triple_valid(self): + batches = [_stub_batch(ix) for ix in ([0], [1], [2], [0, 1], [1, 2])] + cover = minimum_exact_cover(3, batches) + self.assertEqual(len(cover), 2) + self.assertEqual(frozenset().union(*_index_sets(cover)), frozenset([0, 1, 2])) + + def test_falls_back_to_singletons(self): + batches = [_stub_batch([0]), _stub_batch([1])] + self.assertEqual(len(minimum_exact_cover(2, batches)), 2) + + def test_greedy_largest_first_beaten_by_branch_and_bound(self): + # Greedy takes {0,1,2,3} first, then {4},{5} → 3 batches. + # Optimum is {0,1,2} + {3,4,5} = 2 batches. + batches = [ + _stub_batch(ix) for ix in ([0, 1, 2, 3], [0, 1, 2], [3, 4, 5], [0], [1], [2], [3], [4], [5]) + ] + cover = minimum_exact_cover(6, batches) + self.assertEqual(len(cover), 2) + self.assertEqual(frozenset().union(*_index_sets(cover)), frozenset(range(6))) + + +class TestPlanBatches(unittest.TestCase): + """End-to-end planning: X grouping, Y batching, phantoms, errors.""" + + S = [9.0] * 8 + + def test_spacing_boundary(self): + batches = plan_batches( + [0, 1], _containers([100.0] * 2, [209.0, 200.0]), self.S, DECK, x_tolerance=0.1 + ) + self.assertEqual(len(batches), 1) + batches = plan_batches( + [0, 1], _containers([100.0] * 2, [208.9, 200.0]), self.S, DECK, x_tolerance=0.1 + ) + self.assertEqual(len(batches), 2) + + def test_same_y_serializes(self): + batches = plan_batches( + [0, 1, 2], _containers([100.0] * 3, [200.0] * 3), self.S, DECK, x_tolerance=0.1 + ) + self.assertEqual(len(batches), 3) + + def test_x_tolerance_boundary(self): + batches = plan_batches( + [0, 1], _containers([100.0, 100.05], [270.0, 261.0]), self.S, DECK, x_tolerance=0.1 + ) + self.assertEqual(len(batches), 1) + batches = plan_batches( + [0, 1], _containers([100.0, 100.2], [270.0, 270.0]), self.S, DECK, x_tolerance=0.1 + ) + self.assertEqual(len(batches), 2) + + def test_x_groups_sorted_ascending(self): + batches = plan_batches( + [0, 1, 2], _containers([300.0, 100.0, 200.0], [270.0] * 3), self.S, DECK, x_tolerance=0.1 + ) + xs = [b.x_position for b in batches] + self.assertEqual(xs, sorted(xs)) + + def test_empty_raises(self): + with self.assertRaises(ValueError): + plan_batches([], [], self.S, DECK, x_tolerance=0.1) + + def test_mismatched_lengths_raises(self): + with self.assertRaises(ValueError): + plan_batches([0, 1], _containers([100.0], [200.0]), self.S, DECK, x_tolerance=0.1) + + def test_duplicate_channels_serialized(self): + batches = plan_batches( + [0, 0], _containers([100.0] * 2, [200.0] * 2), self.S, DECK, x_tolerance=0.1 + ) + self.assertEqual(len(batches), 2) + + def test_duplicate_channels_three_ops(self): + batches = plan_batches( + [0, 0, 0], _containers([100.0] * 3, [200.0] * 3), self.S, DECK, x_tolerance=0.1 + ) + self.assertEqual(len(batches), 3) + + def test_phantoms_interpolated_at_spacing(self): + # 6 channels in one batch with non-consecutive channel indices. + batches = plan_batches( + [0, 1, 2, 5, 6, 7], + _containers([100.0] * 6, [300.0, 291.0, 282.0, 255.0, 246.0, 237.0]), + [9.0] * 8, + DECK, + x_tolerance=0.1, + ) + self.assertEqual(len(batches), 1) + y = batches[0].y_positions + self.assertIn(3, y) + self.assertIn(4, y) + self.assertAlmostEqual(y[3], 282.0 - 9.0) + self.assertAlmostEqual(y[4], 282.0 - 18.0) + + def test_phantoms_only_within_batch(self): + # Two separate batches — no phantoms bridging them. + batches = plan_batches( + [0, 3], _containers([100.0] * 2, [200.0, 250.0]), [9.0] * 4, DECK, x_tolerance=0.1 + ) + self.assertEqual(len(batches), 2) + for batch in batches: + self.assertEqual(len(batch.y_positions), 1) + + def test_indices_map_back_correctly(self): + use_channels = [3, 7, 0] + batches = plan_batches( + use_channels, + _containers([100.0] * 3, [261.0, 237.0, 270.0]), + [9.0] * 8, + DECK, + x_tolerance=0.1, + ) + all_indices = [idx for b in batches for idx in b.indices] + self.assertEqual(sorted(all_indices), [0, 1, 2]) + for batch in batches: + for idx, ch in zip(batch.indices, batch.channels): + self.assertEqual(use_channels[idx], ch) + + def test_mixed_spacing_boundary(self): + # 1mL (ch0,1) + 5mL (ch2,3): 18mm required between ch1 and ch2. + sp = [8.98, 8.98, 17.96, 17.96] + batches = plan_batches( + [1, 2], _containers([100.0] * 2, [217.9, 200.0]), sp, DECK, x_tolerance=0.1 + ) + self.assertEqual(len(batches), 2) + batches = plan_batches( + [1, 2], _containers([100.0] * 2, [218.0, 200.0]), sp, DECK, x_tolerance=0.1 + ) + self.assertEqual(len(batches), 1) + + def test_mixed_phantoms_use_pairwise_spacing(self): + sp = [8.98, 8.98, 17.96, 17.96] + batches = plan_batches( + [0, 3], _containers([100.0] * 2, [245.0, 200.0]), sp, DECK, x_tolerance=0.1 + ) + self.assertEqual(len(batches), 1) + y = batches[0].y_positions + # ch0→ch1: 9.0, ch1→ch2: 18.0 + self.assertAlmostEqual(y[1], 245.0 - 9.0) + self.assertAlmostEqual(y[2], 245.0 - 9.0 - 18.0) + + @patch( + "pylabrobot.liquid_handling.pipette_batch_scheduling.compute_nonconsecutive_channel_offsets" + ) + def test_auto_spread_same_container(self, mock_offsets): + mock_offsets.return_value = [Coordinate(0, 4.5, 0), Coordinate(0, -4.5, 0)] + trough = _mock_container(100.0, 200.0, size_y=50.0, name="trough") + batches = plan_batches([0, 1], [trough, trough], self.S, DECK, x_tolerance=0.1) + self.assertEqual(len(batches), 1) + y = batches[0].y_positions + self.assertAlmostEqual(y[0], 204.5) + self.assertAlmostEqual(y[1], 195.5) + + def test_narrow_container_serializes(self): + # size_y=3 means compute_nonconsecutive_channel_offsets returns None → singletons only. + well = _mock_container(100.0, 200.0, size_y=3.0, name="narrow") + batches = plan_batches([0, 1], [well, well], self.S, DECK, x_tolerance=0.1) + self.assertEqual(len(batches), 2) + + +class TestPlanBatchesNoGoZones(unittest.TestCase): + """Per-batch container-fit decisions let plan_batches batch subsets that a + fixed up-front spread cannot (the motivating case for no-go zones).""" + + S = [9.0] * 8 + + @patch( + "pylabrobot.liquid_handling.pipette_batch_scheduling.compute_nonconsecutive_channel_offsets" + ) + def test_fits_pair_but_not_triple_in_container(self, mock_offsets): + # Container fits any adjacent pair but not three channels at once. + def fake_offsets(container, channels, spacings): + if len(channels) <= 2 and channels == sorted(channels) and channels[-1] - channels[0] <= 1: + return [Coordinate(0, 4.5, 0), Coordinate(0, -4.5, 0)][: len(channels)] + return None + + mock_offsets.side_effect = fake_offsets + c = _mock_container(100.0, 200.0, size_y=12.0, name="small") + batches = plan_batches([0, 1, 2], [c, c, c], self.S, DECK, x_tolerance=0.1) + self.assertEqual(len(batches), 2) + self.assertEqual(sorted(len(b.channels) for b in batches), [1, 2]) + + +if __name__ == "__main__": + unittest.main()