|
28 | 28 | import time |
29 | 29 | import typing |
30 | 30 | import logging |
| 31 | +from typing import Callable, Any, Dict, Union |
31 | 32 |
|
32 | 33 | import serial |
33 | 34 |
|
|
132 | 133 | # Bit 7: Lower limit switch: 0 = open, 1 = closed |
133 | 134 |
|
134 | 135 |
|
135 | | -AXIS_MAPPER = { |
136 | | - 1: "X", |
137 | | - 2: "Y", |
138 | | - 3: "Z", |
139 | | -} |
140 | | - |
141 | | - |
142 | | -def parse_info(info: bytes) -> typing.Mapping[str, str]: |
143 | | - info = info.decode().strip() |
| 136 | +def parse_info(info: list) -> dict[str, dict[str, Union[typing.Optional[str], Any]]]: |
144 | 137 |
|
145 | 138 | items = [] |
146 | | - for line in info.split('\r'): |
147 | | - items.append(line[:33].strip()) |
148 | | - items.append(line[33:].strip()) |
| 139 | + for line in info: |
| 140 | + if line in [b"", b"\n", b"\r", b"\r\n"]: |
| 141 | + continue |
| 142 | + items.append(line[:33].strip().decode()) |
| 143 | + items.append(line[33:].strip().decode()) |
149 | 144 |
|
150 | | - pattern = "(?P<name>.*):\s*((?P<value>\S+)(\s+)?(?P<command>[[].*[]])?(\s+)?(?P<units>.*)?)$" |
| 145 | + pattern = r"(?P<name>.*):\s*((?P<value>\S+)(\s+)?(?P<command>[\[].*[\]])?(\s+)?(?P<units>.*)?)$" |
151 | 146 |
|
152 | 147 | settings = {} |
153 | 148 | for item in items: |
@@ -188,35 +183,35 @@ def __init__(self, port: str, baudrate: int, timeout: float) -> None: |
188 | 183 | ) |
189 | 184 | self._lock = threading.RLock() |
190 | 185 |
|
191 | | - with self._lock: |
192 | | - # We do not use the general get_description() here because |
193 | | - # if this is not a ProScan device it would never reach the |
194 | | - # '\rEND\r' that signals the end of the description. |
195 | | - self.axis_info = {} |
196 | | - try: |
197 | | - for i, axis in AXIS_MAPPER.items(): |
198 | | - self.command(f"INFO {axis}".encode()) |
199 | | - answer = self._serial.read_all() |
200 | | - if len(answer) == 0: |
201 | | - _logger.info(f"Axis {axis} not present") |
202 | | - continue |
203 | | - _logger.info(f"Axis {axis} present") |
204 | | - self.axis_info[axis] = parse_info(answer) |
205 | | - |
206 | | - except: |
207 | | - print( |
208 | | - "Unable to read configuration. Is ASI controller connected?" |
209 | | - ) |
210 | | - return |
211 | | - # parse config responce which tells us what devices are present |
212 | | - # on this controller. |
213 | | - |
| 186 | + # We do not use the general get_description() here because |
| 187 | + # if this is not a ProScan device it would never reach the |
| 188 | + # '\rEND\r' that signals the end of the description. |
| 189 | + self._axis_info = {} |
| 190 | + self._axis_mapper = {} |
| 191 | + i = 1 |
| 192 | + try: |
| 193 | + for axis in ["X", "Y", "Z"]: |
| 194 | + self.command(bytes(f"INFO {axis}", "ascii")) |
| 195 | + answer = self.read_multiline() |
| 196 | + if answer == [b""]: # no axis present |
| 197 | + _logger.info(f"Axis {axis} not present") |
| 198 | + continue |
| 199 | + _logger.info(f"Axis {axis} present") |
| 200 | + self._axis_info[axis] = parse_info(answer) |
| 201 | + self._axis_mapper[i] = axis |
| 202 | + except: |
| 203 | + print( |
| 204 | + "Unable to read configuration. Is ASI controller connected?" |
| 205 | + ) |
| 206 | + return |
| 207 | + # parse config responce which tells us what devices are present |
| 208 | + # on this controller. |
214 | 209 |
|
215 | 210 | def is_busy(self): |
216 | 211 | pass |
217 | 212 |
|
218 | 213 | def get_number_axes(self): |
219 | | - return 2 |
| 214 | + return len(self._axis_info) |
220 | 215 |
|
221 | 216 | def command(self, command: bytes) -> None: |
222 | 217 | """Send command to device.""" |
@@ -289,45 +284,45 @@ def move_command(self, command: bytes) -> None: |
289 | 284 | # other process to check position |
290 | 285 | # self.get_command(command) |
291 | 286 |
|
292 | | - def move_by_relative_position(self, axis: bytes, delta: float) -> None: |
| 287 | + def move_by_relative_position(self, axis: int, delta: float) -> None: |
293 | 288 | """Send a relative movement command to stated axis""" |
294 | | - axisname = AXIS_MAPPER[axis] |
295 | | - self.move_command(bytes(f"MOVREL {axisname}={str(delta)}", "ascii")) |
| 289 | + axis_name = self._axis_mapper[axis] |
| 290 | + self.move_command(bytes(f"MOVREL {axis_name}={str(delta)}", "ascii")) |
296 | 291 | self.wait_for_motor_stop(axis) |
297 | 292 |
|
298 | | - def move_to_absolute_position(self, axis: bytes, pos: float) -> None: |
| 293 | + def move_to_absolute_position(self, axis: int, pos: float) -> None: |
299 | 294 | """Send a relative movement command to stated axis""" |
300 | | - axisname = AXIS_MAPPER[axis] |
301 | | - self.move_command(bytes(f"MOVE {axisname}={str(pos)}", "ascii")) |
| 295 | + axis_name = self._axis_mapper[axis] |
| 296 | + self.move_command(bytes(f"MOVE {axis_name}={str(pos)}", "ascii")) |
302 | 297 | self.wait_for_motor_stop(axis) |
303 | 298 |
|
304 | | - def move_to_limit(self, axis: bytes, speed: int): |
305 | | - axisname = AXIS_MAPPER[axis] |
306 | | - self.get_command(bytes(f"SPIN {axisname}={speed}", "ascii")) |
| 299 | + def move_to_limit(self, axis: int, speed: int): |
| 300 | + axis_name = self._axis_mapper[axis] |
| 301 | + self.get_command(bytes(f"SPIN {axis_name}={speed}", "ascii")) |
307 | 302 |
|
308 | | - def motor_moving(self, axis: bytes) -> int: |
309 | | - axisname = AXIS_MAPPER[axis] |
310 | | - reply = self.get_command(bytes(f"RDSTAT {axisname}", "ascii")) |
| 303 | + def motor_moving(self, axis: int) -> int: |
| 304 | + axis_name = self._axis_mapper[axis] |
| 305 | + reply = self.get_command(bytes(f"RDSTAT {axis_name}", "ascii")) |
311 | 306 | flags = int(reply.strip()[3:]) |
312 | 307 | return flags & 1 |
313 | 308 |
|
314 | | - def set_speed(self, axis: bytes, speed: int) -> None: |
315 | | - axisname = AXIS_MAPPER[axis] |
316 | | - self.get_command(bytes(f"SPEED {axisname}={speed}", "ascii")) |
| 309 | + def set_speed(self, axis: int, speed: int) -> None: |
| 310 | + axis_name = self._axis_mapper[axis] |
| 311 | + self.get_command(bytes(f"SPEED {axis_name}={speed}", "ascii")) |
317 | 312 |
|
318 | | - def wait_for_motor_stop(self, axis: bytes): |
319 | | - # give axis a chnace to start maybe? |
| 313 | + def wait_for_motor_stop(self, axis: int): |
| 314 | + # give axis a chance to start maybe? |
320 | 315 | time.sleep(0.2) |
321 | 316 | while self.motor_moving(axis): |
322 | 317 | time.sleep(0.1) |
323 | 318 |
|
324 | | - def reset_position(self, axis: bytes): |
325 | | - axisname = AXIS_MAPPER[axis] |
326 | | - self.get_command(bytes(f"HERE {axisname}=0", "ascii")) |
| 319 | + def reset_position(self, axis: int): |
| 320 | + axis_name = self._axis_mapper[axis] |
| 321 | + self.get_command(bytes(f"HERE {axis_name}=0", "ascii")) |
327 | 322 |
|
328 | | - def get_absolute_position(self, axis: bytes) -> float: |
329 | | - axisname = AXIS_MAPPER[axis] |
330 | | - position = self.get_command(bytes(f"WHERE {axisname}", "ascii")) |
| 323 | + def get_absolute_position(self, axis: int) -> float: |
| 324 | + axis_name = self._axis_mapper[axis] |
| 325 | + position = self.get_command(bytes(f"WHERE {axis_name}", "ascii")) |
331 | 326 | if position[3:4] == b"N": |
332 | 327 | print(f"Error: {position} : {LUDL_ERRORS[int(position[4:6])]}") |
333 | 328 | else: |
@@ -357,7 +352,7 @@ def changed_timeout(self, new_timeout: float): |
357 | 352 |
|
358 | 353 |
|
359 | 354 | class _ASIStageAxis(microscope.abc.StageAxis): |
360 | | - def __init__(self, dev_conn: _ASIMotionController, axis: str) -> None: |
| 355 | + def __init__(self, dev_conn: _ASIMotionController, axis: int) -> None: |
361 | 356 | super().__init__() |
362 | 357 | self._dev_conn = dev_conn |
363 | 358 | self._axis = axis |
@@ -432,10 +427,44 @@ def __init__(self, conn: _ASIMotionController, **kwargs) -> None: |
432 | 427 | self._dev_conn = conn |
433 | 428 | self._axes = { |
434 | 429 | str(i): _ASIStageAxis(self._dev_conn, i) |
435 | | - for i in range(1, 3) # self._dev_conn.get_number_axes() + 1) |
| 430 | + for i in range(1, self._dev_conn.get_number_axes() + 1) |
436 | 431 | } |
| 432 | + |
437 | 433 | self.homed = False |
438 | 434 |
|
| 435 | + def _add_settings(self, settings) -> None: |
| 436 | + """INFO command returns a list of settings that is parsed into a dict. This function takes that dict and |
| 437 | + adds settings consequently to the stage object. |
| 438 | + """ |
| 439 | + for axis_name, axis_settings in settings.items: |
| 440 | + for setting_name, setting_params in axis_settings.items: |
| 441 | + if setting_params['value'].isdigit(): |
| 442 | + value = int(setting_params['value']) |
| 443 | + dtype = "int" |
| 444 | + elif setting_params['value'].replace('.', '', 1).isdigit(): |
| 445 | + value = float(setting_params['value']) |
| 446 | + dtype = "float" |
| 447 | + else: |
| 448 | + value = setting_params['value'] |
| 449 | + dtype = "str" # It might be a enum but we dont know |
| 450 | + |
| 451 | + if setting_params['command'] is not None: |
| 452 | + set_function: Callable[[Any], bytes] = lambda x: self._dev_conn.get_command( |
| 453 | + bytes(f"{setting_params['command']} {axis_name}={x}", "ascii")) |
| 454 | + read_only = False |
| 455 | + else: |
| 456 | + set_function = None |
| 457 | + read_only = True |
| 458 | + |
| 459 | + self.add_setting( |
| 460 | + name=f"{setting_name}_{axis_name}", |
| 461 | + dtype=dtype, |
| 462 | + get_func=lambda: self._dev_conn.get_command( |
| 463 | + bytes(f"{setting_params['command']} {axis_name}", "ascii")), |
| 464 | + set_func=set_function, |
| 465 | + readonly=read_only, |
| 466 | + ) |
| 467 | + |
439 | 468 | def _do_shutdown(self) -> None: |
440 | 469 | pass |
441 | 470 |
|
@@ -537,7 +566,7 @@ class ASIMS2000(microscope.abc.Controller): |
537 | 566 | """ |
538 | 567 |
|
539 | 568 | def __init__( |
540 | | - self, port: str, baudrate: int = 9600, timeout: float = 0.5, **kwargs |
| 569 | + self, port: str, baudrate: int = 9600, timeout: float = 0.5, **kwargs |
541 | 570 | ) -> None: |
542 | 571 | super().__init__(**kwargs) |
543 | 572 | self._conn = _ASIMotionController(port, baudrate, timeout) |
|
0 commit comments