From 773ad8d937f922f38f3d83a40d3148ff9db00443 Mon Sep 17 00:00:00 2001 From: John Griffiths Date: Mon, 17 Nov 2025 17:02:56 -0500 Subject: [PATCH 1/3] added functionality for serial port triggers backend --- eegnb/devices/eeg.py | 42 +++++++++++++++++++++++++++++++++++++++++- 1 file changed, 41 insertions(+), 1 deletion(-) diff --git a/eegnb/devices/eeg.py b/eegnb/devices/eeg.py index b1518fbec..34f0e2cc1 100644 --- a/eegnb/devices/eeg.py +++ b/eegnb/devices/eeg.py @@ -108,6 +108,9 @@ def initialize_backend(self): self._muse_get_recent() elif self.backend == "kernelflow": self._init_kf() + elif self.backend == "serialport": + #self._init_serial() + pass def _get_backend(self, device_name): if device_name in brainflow_devices: @@ -116,6 +119,8 @@ def _get_backend(self, device_name): return "muselsl" elif device_name in ["kernelflow"]: return "kernelflow" + elif device_name in ["biosemi"]: + return "serialport" ##################### @@ -512,7 +517,36 @@ def _kf_sendeventinfo(self, event_info): event_info_pack = json.dumps(event_info).encode("utf-8") msg = struct.pack("!I", len(event_info_pack)) + event_info_pack self.kf_socket.sendall(msg) - + + + + ########################### + # Serial Port Function # + ########################### + + def _serial_push_sample(self, timestamp, marker): + + # Send trigger + serial_obj = self.serial + pulse_ms = 5 + clearandflush=True + + code = marker + + if not (0 <= code <= 255): raise ValueError("code must be 045255") + + serial_obj.write(bytes([code])) + if clearandflush: + serial_obj.flush() # push immediately + sleep(pulse_ms / 1000.0) + serial_obj.write(b"\x00") # clear back to zero + serial_obj.flush() + + def _start_serial(self): + pass + + + ################################# # Highlevel device functions # @@ -534,6 +568,9 @@ def start(self, fn, duration=None): self._start_muse(duration) elif self.backend == "kernelflow": self._start_kf() + elif self.backend == "serialport": + self._start_serial() + def push_sample(self, marker, timestamp, marker_name=None): """ @@ -549,6 +586,9 @@ def push_sample(self, marker, timestamp, marker_name=None): self._muse_push_sample(marker=marker, timestamp=timestamp) elif self.backend == "kernelflow": self._kf_push_sample(marker=marker,timestamp=timestamp, marker_name=marker_name) + elif self.backend == "serialport": + self._serial_push_sample(marker=marker, timestamp=timestamp) + def stop(self): if self.backend == "brainflow": From db1d658ab2e7d5b1f713cce33a45ea11255c7e96 Mon Sep 17 00:00:00 2001 From: John Griffiths Date: Mon, 17 Nov 2025 18:14:52 -0500 Subject: [PATCH 2/3] added function to create serial port object --- eegnb/devices/eeg.py | 50 +++++++++++++++++++++++++++--------------- eegnb/devices/utils.py | 5 ++++- 2 files changed, 36 insertions(+), 19 deletions(-) diff --git a/eegnb/devices/eeg.py b/eegnb/devices/eeg.py index 34f0e2cc1..679ae40e0 100644 --- a/eegnb/devices/eeg.py +++ b/eegnb/devices/eeg.py @@ -19,6 +19,8 @@ from muselsl import stream, list_muses, record, constants as mlsl_cnsts from pylsl import StreamInfo, StreamOutlet, StreamInlet, resolve_byprop +from serial import Serial, EIGHTBITS, PARITY_NONE, STOPBITS_ONE + from eegnb.devices.utils import ( get_openbci_usb, create_stim_array, @@ -109,8 +111,8 @@ def initialize_backend(self): elif self.backend == "kernelflow": self._init_kf() elif self.backend == "serialport": - #self._init_serial() - pass + self._init_serial(self.serial_port) + def _get_backend(self, device_name): if device_name in brainflow_devices: @@ -524,26 +526,38 @@ def _kf_sendeventinfo(self, event_info): # Serial Port Function # ########################### - def _serial_push_sample(self, timestamp, marker): - - # Send trigger - serial_obj = self.serial - pulse_ms = 5 - clearandflush=True - code = marker - - if not (0 <= code <= 255): raise ValueError("code must be 045255") + def _init_serial(self, serial_port): + self.serial = self._serial_open_port(serial_port) + - serial_obj.write(bytes([code])) + def _serial_push_sample(self, marker, clearandflush=True, pulse_ms=5): + + if not (0 <= marker <= 255): raise ValueError("marker code must be 045255") + self.serial.write(bytes([marker])) if clearandflush: - serial_obj.flush() # push immediately + self.serial.flush() # push immediately sleep(pulse_ms / 1000.0) - serial_obj.write(b"\x00") # clear back to zero - serial_obj.flush() + self.serial.write(b"\x00") # clear back to zero + self.serial.flush() + + + + def _serial_open_port(self,PORT_ID="COM4", BAUD=115200): + """ + PORT_ID = "COM4" # Example of a stimulus delivery computer USB out port name + # on linux it should be sth like # PORT_ID = "/dev/ttyUSB0" # Linux + # on macOS it should be sth like # PORT_ID = "/dev/tty.usbserial-XXXX" # macOS + BAUD = 115200 # This matches ActiView's serial settings + """ + my_serial = Serial(PORT_ID, BAUD, bytesize=EIGHTBITS, parity=PARITY_NONE, + stopbits=STOPBITS_ONE, timeout=0, write_timeout=0) + + print("\nOpened Serial Port %s\n" %PORT_ID) + + return my_serial + - def _start_serial(self): - pass @@ -587,7 +601,7 @@ def push_sample(self, marker, timestamp, marker_name=None): elif self.backend == "kernelflow": self._kf_push_sample(marker=marker,timestamp=timestamp, marker_name=marker_name) elif self.backend == "serialport": - self._serial_push_sample(marker=marker, timestamp=timestamp) + self._serial_push_sample(marker=marker) def stop(self): diff --git a/eegnb/devices/utils.py b/eegnb/devices/utils.py index de5f3f9b0..38579196b 100644 --- a/eegnb/devices/utils.py +++ b/eegnb/devices/utils.py @@ -27,7 +27,8 @@ "notion2": BoardShim.get_eeg_names(BoardIds.NOTION_2_BOARD.value), "crown": BoardShim.get_eeg_names(BoardIds.CROWN_BOARD.value), "freeeeg32": [f"eeg_{i}" for i in range(0, 32)], - "kernelflow": [] + "kernelflow": [], + "biosemi": [], } BRAINFLOW_CHANNELS = { @@ -58,6 +59,7 @@ "crown": BoardShim.get_eeg_channels(BoardIds.CROWN_BOARD.value), "freeeeg32": BoardShim.get_eeg_channels(BoardIds.FREEEEG32_BOARD.value), "kernelflow": [], + "biosemi": [], } SAMPLE_FREQS = { @@ -81,6 +83,7 @@ "crown": BoardShim.get_sampling_rate(BoardIds.CROWN_BOARD.value), "freeeeg32": BoardShim.get_sampling_rate(BoardIds.FREEEEG32_BOARD.value), "kernelflow": [], + "biosemi": [], } From b78a2923208e349000de14a00ca613ae613b165e Mon Sep 17 00:00:00 2001 From: John Griffiths Date: Mon, 17 Nov 2025 19:05:15 -0500 Subject: [PATCH 3/3] fine tuned object construction and cmdline outputs --- eegnb/devices/eeg.py | 19 +++++++++++-------- eegnb/experiments/Experiment.py | 13 ++++++------- 2 files changed, 17 insertions(+), 15 deletions(-) diff --git a/eegnb/devices/eeg.py b/eegnb/devices/eeg.py index 679ae40e0..4a71b2451 100644 --- a/eegnb/devices/eeg.py +++ b/eegnb/devices/eeg.py @@ -111,7 +111,7 @@ def initialize_backend(self): elif self.backend == "kernelflow": self._init_kf() elif self.backend == "serialport": - self._init_serial(self.serial_port) + self._init_serial() def _get_backend(self, device_name): @@ -527,8 +527,11 @@ def _kf_sendeventinfo(self, event_info): ########################### - def _init_serial(self, serial_port): - self.serial = self._serial_open_port(serial_port) + def _init_serial(self): + if self.serial_port: # if a port name is supplied, open a serial port. + self.serial = self._serial_open_port(PORT_ID=self.serial_port) + # (otherwise, don't open; assuming serial obj will be + # manually added) def _serial_push_sample(self, marker, clearandflush=True, pulse_ms=5): @@ -542,13 +545,13 @@ def _serial_push_sample(self, marker, clearandflush=True, pulse_ms=5): self.serial.flush() - def _serial_open_port(self,PORT_ID="COM4", BAUD=115200): """ PORT_ID = "COM4" # Example of a stimulus delivery computer USB out port name - # on linux it should be sth like # PORT_ID = "/dev/ttyUSB0" # Linux - # on macOS it should be sth like # PORT_ID = "/dev/tty.usbserial-XXXX" # macOS - BAUD = 115200 # This matches ActiView's serial settings + # on windows it should be sth like # PORT_ID = "COM4" + # on linux it should be sth like # PORT_ID = "/dev/ttyUSB0" + # on macOS it should be sth like # PORT_ID = "/dev/tty.usbserial-XXXX" + BAUD = 115200 # -> This matches BioSemi ActiView's serial settings """ my_serial = Serial(PORT_ID, BAUD, bytesize=EIGHTBITS, parity=PARITY_NONE, stopbits=STOPBITS_ONE, timeout=0, write_timeout=0) @@ -583,7 +586,7 @@ def start(self, fn, duration=None): elif self.backend == "kernelflow": self._start_kf() elif self.backend == "serialport": - self._start_serial() + pass def push_sample(self, marker, timestamp, marker_name=None): diff --git a/eegnb/experiments/Experiment.py b/eegnb/experiments/Experiment.py index 217aef5dd..e1bf34d46 100644 --- a/eegnb/experiments/Experiment.py +++ b/eegnb/experiments/Experiment.py @@ -101,8 +101,8 @@ def setup(self, instructions=True): # Checking for EEG to setup the EEG stream if self.eeg: - # If no save_fn passed, generate a new unnamed save file - if self.save_fn is None: + # If no save_fn passed, and data is being streamed, generate a new unnamed save file + if self.save_fn is None and self.eeg.backend not in ['serialport', 'kernelflow']: # Generating a random int for the filename random_id = random.randint(1000,10000) # Generating save function @@ -226,13 +226,12 @@ def iti_with_jitter(): # Setup the experiment, alternatively could get rid of this line, something to think about self.setup(instructions) - print("Wait for the EEG-stream to start...") - # Start EEG Stream, wait for signal to settle, and then pull timestamp for start point if self.eeg: - self.eeg.start(self.save_fn, duration=self.record_duration + 5) - - print("EEG Stream started") + if self.eeg.backend not in ['serialport']: + print("Wait for the EEG-stream to start...") + self.eeg.start(self.save_fn, duration=self.record_duration + 5) + print("EEG Stream started") # Run trial until a key is pressed or experiment duration has expired. start = time()