diff --git a/bin/ibw_to_mat.py b/bin/ibw_to_mat.py
new file mode 100755
index 0000000..bcacaf1
--- /dev/null
+++ b/bin/ibw_to_mat.py
@@ -0,0 +1,73 @@
+#!/usr/bin/env python
+
+import argparse
+import os
+import sys
+
+import numpy as np
+import scipy.io as sio
+
+script_dir = os.path.dirname(os.path.abspath(__file__))
+vendor_dir = os.path.join(script_dir, "..", "vendor")
+sys.path.insert(0, vendor_dir)
+
+from ibwpy import ibwpy
+
+
+def create_parser():
+ parser = argparse.ArgumentParser(
+ description="Combine Igor Binary Wave files into a single matlab file that "
+ "can be read by ASCAM."
+ )
+ parser.add_argument("current", help="Current .ibw file.")
+ parser.add_argument("--command", help="Command Voltage .ibw file.")
+ parser.add_argument("--piezo", help="Piezo Voltage .ibw file.")
+ parser.add_argument(
+ "--sampling-rate",
+ dest="sampling_rate",
+ default=25000,
+ help="Piezo Voltage .ibw file.",
+ )
+ parser.add_argument("--output", help="Output filename.")
+
+ return parser
+
+
+def main():
+ parser = create_parser()
+ args = parser.parse_args()
+
+ # Load the three IBW files
+ current_data = ibwpy.load(args.current).array
+ if args.command:
+ command_data = ibwpy.load(args.command).array
+ if args.piezo:
+ piezo_data = ibwpy.load(args.piezo).array
+
+ # Sampling parameters
+ num_points = current_data.shape[0] # Number of data points per sweep
+ num_sweeps = current_data.shape[1] # Number of sweeps
+
+ # Create the time vector
+ time_vector = np.arange(num_points) / args.sampling_rate
+
+ # Create a dictionary to hold the data for ASCAM
+ ascam_data = {
+ "c001_Time": time_vector # Time variable
+ }
+
+ # Populate the dictionary with data for each sweep
+ for i in range(num_sweeps):
+ # For each sweep, assign the corresponding data for Ipatch, Piezo, and Command
+ ascam_data[f"c002_Ipatch_{i + 1}"] = current_data[:, i]
+ if args.piezo:
+ ascam_data[f"c003_Piezo_{i + 1}"] = piezo_data[:, i]
+ if args.command:
+ ascam_data[f"c004_Command_{i + 1}"] = command_data[:, i]
+
+ # Save the data to a .mat file
+ sio.savemat(args.output, ascam_data)
+
+
+if __name__ == "__main__":
+ main()
diff --git a/vendor/ibwpy/LICENSE b/vendor/ibwpy/LICENSE
new file mode 100644
index 0000000..34eeee2
--- /dev/null
+++ b/vendor/ibwpy/LICENSE
@@ -0,0 +1,21 @@
+MIT License
+
+Copyright (c) 2022 Hiroaki Takahashi
+
+Permission is hereby granted, free of charge, to any person obtaining a copy
+of this software and associated documentation files (the "Software"), to deal
+in the Software without restriction, including without limitation the rights
+to use, copy, modify, merge, publish, distribute, sublicense, and/or sell
+copies of the Software, and to permit persons to whom the Software is
+furnished to do so, subject to the following conditions:
+
+The above copyright notice and this permission notice shall be included in all
+copies or substantial portions of the Software.
+
+THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR
+IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY,
+FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE
+AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER
+LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM,
+OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN THE
+SOFTWARE.
diff --git a/vendor/ibwpy/README.md b/vendor/ibwpy/README.md
new file mode 100644
index 0000000..74d9a4c
--- /dev/null
+++ b/vendor/ibwpy/README.md
@@ -0,0 +1,80 @@
+# ibwpy
+Read and write [Igor Pro](https://www.wavemetrics.com/) files (Igor binary wave) with Python
+
+## Installation
+### Install with pip (using Git, recommended)
+1. Install ibwpy with pip
+```bash
+$ python -m pip install git+https://github.com/MiLL4U/ibwpy.git
+```
+
+### Install with pip (without Git)
+1. download a wheel package (*.whl) from [Releases](https://github.com/MiLL4U/ibwpy/releases)
+
+2. Install ibwpy with pip
+```bash
+$ python -m pip install ibwpy-x.y.z-py3-none-any.whl
+```
+(replace x.y.z with the version of ibwpy which you downloaded)
+
+### Install with git clone
+1. Clone this repository
+
+```bash
+$ git clone https://github.com/MiLL4U/ibwpy.git
+```
+
+2. Go into the repository
+
+```bash
+$ cd ibwpy
+```
+
+3. Install ibwpy with setup.py
+
+```bash
+$ python setup.py install
+```
+
+## Examples
+### Read
+Read wave from ibw file:
+```python
+import ibwpy as ip
+test_wave = ip.load("test_wave.ibw")
+print(test_wave)
+```
+
+### Make
+Make new wave from Numpy array
+```python
+import numpy as np
+
+arr_1 = np.array([[1., 2., 3.],
+ [1.5, 2.5, 3.5]])
+wave_1 = ip.from_nparray(arr_1, 'wave1')
+# wave1 (IgorBinaryWave)
+# [[1. 2. 3. ]
+# [1.5 2.5 3.5]]
+```
+
+### Calculation
+Treat wave as NumPy array:
+```python
+arr_2 = np.ones((2, 3))
+print(arr_2)
+# [[1. 1. 1.]
+# [1. 1. 1.]]
+
+wave_1 = wave_1 + arr_2
+print(wave_1)
+# wave1 (IgorBinaryWave)
+# [[2. 3. 4. ]
+# [2.5 3.5 4.5]]
+```
+
+### Save
+Save wave as ibw file
+```python
+wave_1.save("wave1.ibw")
+```
\ No newline at end of file
diff --git a/vendor/ibwpy/ibwpy/__init__.py b/vendor/ibwpy/ibwpy/__init__.py
new file mode 100644
index 0000000..f953de1
--- /dev/null
+++ b/vendor/ibwpy/ibwpy/__init__.py
@@ -0,0 +1,38 @@
+"""
+IBWPy
+=====
+
+Provides
+ 1. Functions for reading/writing Igor binary wave (*.ibw) directly
+ 2. Interfaces to edit ibw files as NumPy array
+"""
+
+from typing import List, Tuple, Union
+
+import numpy as np
+
+from .constants import DEFAULT_DTYPE, IBWDType
+from .igorbinarywave import BinaryWave5, BinaryWave5Loader
+from .waveheader import BinaryWaveHeader5
+
+
+def make(shape: Union[List[int], Tuple[int, ...]],
+ name: str, dtype: IBWDType = DEFAULT_DTYPE) -> BinaryWave5:
+ shape_tuple = tuple(shape)
+ header = BinaryWaveHeader5(shape=shape_tuple, name=name, dtype=dtype)
+ zeros = np.zeros(shape, dtype=dtype)
+ res = BinaryWave5(ibw_header=header, wave_values=zeros)
+ return res
+
+
+def from_nparray(array: np.ndarray, name: str) -> BinaryWave5:
+ header = BinaryWaveHeader5(shape=array.shape, name=name,
+ dtype=str(array.dtype))
+ res = BinaryWave5(ibw_header=header, wave_values=array)
+ return res
+
+
+def load(path: str) -> BinaryWave5:
+ loader = BinaryWave5Loader(path)
+ res = loader.load()
+ return res
diff --git a/vendor/ibwpy/ibwpy/commonfunc.py b/vendor/ibwpy/ibwpy/commonfunc.py
new file mode 100644
index 0000000..4eff190
--- /dev/null
+++ b/vendor/ibwpy/ibwpy/commonfunc.py
@@ -0,0 +1,9 @@
+from .constants import TEXT_ENCODE, TEXT_ENCODE_2ND
+
+
+def decode_unicode(text_buf: bytes):
+ try:
+ res = text_buf.decode(TEXT_ENCODE)
+ except UnicodeDecodeError:
+ res = text_buf.decode(TEXT_ENCODE_2ND)
+ return res
diff --git a/vendor/ibwpy/ibwpy/constants.py b/vendor/ibwpy/ibwpy/constants.py
new file mode 100644
index 0000000..77844a3
--- /dev/null
+++ b/vendor/ibwpy/ibwpy/constants.py
@@ -0,0 +1,17 @@
+import datetime
+
+from typing_extensions import Literal
+
+IBWPY_VERSION = "1.0.4"
+
+IBWDType = Literal['float32', 'float64', 'int8', 'int16', 'int32']
+DEFAULT_DTYPE: IBWDType = 'float32'
+
+WAVE_HEADER_SIZE = 320
+
+DATETIME_OFFSET = datetime.datetime(1904, 1, 1, 0, 0, 0)
+MAX_WAVE_NAME_LENGTH = 31
+
+TEXT_ENCODE = 'utf-8'
+TEXT_ENCODE_2ND = 'shift_jis'
+DEFAULT_EOL = '\r'
diff --git a/vendor/ibwpy/ibwpy/igorbinarywave.py b/vendor/ibwpy/ibwpy/igorbinarywave.py
new file mode 100644
index 0000000..f6bf6c8
--- /dev/null
+++ b/vendor/ibwpy/ibwpy/igorbinarywave.py
@@ -0,0 +1,419 @@
+from __future__ import annotations
+
+import datetime
+import re
+import struct
+from copy import deepcopy
+from functools import reduce
+from typing import List, Optional, Tuple, Union
+
+import numpy as np
+
+from .commonfunc import decode_unicode
+from .constants import DEFAULT_EOL, TEXT_ENCODE, WAVE_HEADER_SIZE
+from .waveheader import BinaryWaveHeader5, BinaryWaveHeader5Loader
+
+
+class BinaryWave5:
+ def __init__(self,
+ ibw_header: BinaryWaveHeader5,
+ wave_values: np.ndarray,
+ data_unit: str = '',
+ axes_unit: Optional[List[str]] = None,
+ dependency_formula: str = '',
+ note: str = '',
+ # axes_label: List[List[str]] = None,
+ ) -> None:
+ self.__header = ibw_header
+ if self.__header.dtype != wave_values.dtype:
+ raise TypeError(
+ 'Data type of wave_values ({})'
+ 'does not match with ibw_header ({})'
+ .format(wave_values.dtype, self.__header.dtype))
+ self.__values = wave_values
+ self.__data_unit = data_unit
+ self.__axes_unit = axes_unit if axes_unit \
+ else ['' for _ in range(wave_values.ndim)]
+
+ dependency_formula = self.__convert_eol(dependency_formula)
+ self.__dependency_formula = dependency_formula
+ self.__header.formula_size = self.formula_size
+
+ note = self.__convert_eol(note)
+ self.__note = note
+ self.__header.note_size = self.note_size
+
+ def __update_modify_time(self):
+ self.__header.update_modify_time()
+
+ def __str__(self) -> str:
+ name = '{} (IgorBinaryWave)\n'.format(self.name)
+ return name + str(self.__values)
+
+ def __add__(self, other: Union[BinaryWave5, np.ndarray, int, float,
+ List[int], List[float]]) -> BinaryWave5:
+ if isinstance(other, BinaryWave5):
+ other = other.__values
+ res_array = self.array + other
+ res = deepcopy(self)
+ res.__values = res_array
+
+ res.__update_dtype()
+ res.__update_modify_time()
+ return res
+
+ def __sub__(self, other: Union[BinaryWave5, np.ndarray, int, float,
+ List[int], List[float]]) -> BinaryWave5:
+ if isinstance(other, BinaryWave5):
+ other = other.__values
+ res_array = self.array - other
+ res = deepcopy(self)
+ res.__values = res_array
+
+ res.__update_dtype()
+ res.__update_modify_time()
+ return res
+
+ def __mul__(self, other: Union[BinaryWave5, np.ndarray, int, float,
+ List[int], List[float]]) -> BinaryWave5:
+ if isinstance(other, BinaryWave5):
+ other = other.__values
+ res_array = self.array * other
+ res = deepcopy(self)
+ res.__values = res_array
+
+ res.__update_dtype()
+ res.__update_modify_time()
+ return res
+
+ def __truediv__(self, other: Union[BinaryWave5, np.ndarray, int, float,
+ List[int], List[float]]) -> BinaryWave5:
+ if isinstance(other, BinaryWave5):
+ other = other.__values
+ res_array = self.array / other
+ res = deepcopy(self)
+ res.__values = res_array
+
+ res.__update_dtype()
+ res.__update_modify_time()
+ return res
+
+ def __len__(self) -> int:
+ return len(self.__values)
+
+ def __getitem__(self, key) -> np.ndarray:
+ return self.__values[key]
+
+ def __setitem__(self, key, value) -> BinaryWave5:
+ res = self.__values.copy()
+ res[key] = value
+ self.__values = res
+
+ self.__update_dtype()
+ self.__update_modify_time()
+ return self
+
+ @property
+ def name(self) -> str:
+ return self.__header.name
+
+ def rename(self, name: str) -> BinaryWave5:
+ self.__header.rename(name)
+
+ self.__update_modify_time()
+ return self
+
+ @property
+ def shape(self) -> Tuple[int, ...]:
+ return self.__header.shape
+
+ def reshape(self, shape: Union[List[int], Tuple[int, ...]]) -> BinaryWave5:
+ shape_tuple = tuple(shape)
+ self.__header.reshape(shape_tuple)
+ self.__values = self.__values.reshape(shape_tuple)
+
+ # following informations will initialized
+ self.__axes_unit = ['' for i in range(self.__values.ndim)]
+ # TODO: support dimension label
+ """
+ self.__axes_label = ...
+ """
+
+ self.__update_modify_time()
+ return self
+
+ @property
+ def dtype(self) -> str:
+ return self.__header.dtype
+
+ def __update_dtype(self) -> BinaryWave5:
+ self.__header.set_dtype(str(self.__values.dtype))
+ return self
+
+ def change_dtype(self, dtype: str) -> BinaryWave5:
+ if not self.__header.is_valid_dtype(dtype):
+ raise TypeError("invalid data type")
+ self.__values = self.__values.astype(dtype)
+
+ self.__update_dtype()
+ self.__update_modify_time()
+ return self
+
+ @property
+ def array(self) -> np.ndarray:
+ return self.__values
+
+ def set_values(self,
+ values: Union[np.ndarray, BinaryWave5, int, float]
+ ) -> BinaryWave5:
+ if isinstance(values, BinaryWave5):
+ set_values = values.array
+ elif isinstance(values, np.ndarray):
+ set_values = values
+ elif isinstance(values, (int, float)):
+ set_values = np.full(self.shape, values)
+ else:
+ raise TypeError('IgorBinaryWave or NumPy array is required')
+ if set_values.shape != tuple(self.shape):
+ raise ValueError('shape of array does not match to original shape')
+ self.__values = set_values
+
+ self.__update_dtype()
+ self.__update_modify_time()
+ return self
+
+ @property
+ def ndim(self) -> int:
+ return self.__header.ndim
+
+ @property
+ def dependency_formula(self) -> str:
+ return self.__dependency_formula
+
+ @property
+ def formula_buf(self) -> bytes:
+ return bytes(self.__dependency_formula, encoding=TEXT_ENCODE)
+
+ @property
+ def formula_size(self) -> int:
+ return len(self.formula_buf)
+
+ def __convert_eol(self, string, to=DEFAULT_EOL):
+ return re.sub(r'\r\n|\r|\n', to, string)
+
+ def set_dependency_formula(self, formula: str) -> BinaryWave5:
+ if not isinstance(formula, str):
+ raise TypeError('a string is required')
+ formula = self.__convert_eol(formula)
+ self.__dependency_formula = formula
+ self.__header.formula_size = self.formula_size
+
+ self.__update_modify_time()
+ return self
+
+ @property
+ def data_unit(self) -> str:
+ return self.__data_unit
+
+ def set_data_unit(self, unit: str) -> BinaryWave5:
+ if not isinstance(unit, str):
+ raise TypeError('a string is required as unit')
+
+ self.__header.set_data_unit(unit)
+ self.__data_unit = unit
+
+ self.__update_modify_time()
+ return self
+
+ @property
+ def axes_unit(self) -> Tuple[str, ...]:
+ return tuple(self.__axes_unit)
+
+ def set_axis_unit(self, axis_index: int, unit: str) -> BinaryWave5:
+ if not isinstance(unit, str):
+ raise TypeError('a string is required as unit')
+
+ self.__header.set_axis_unit(axis_index, unit)
+ self.__axes_unit[axis_index] = unit
+
+ self.__update_modify_time()
+ return self
+
+ def set_axis_scale(self, axis_index: int,
+ start: Union[float, int],
+ delta: Union[float, int]) -> BinaryWave5:
+ start = float(start)
+ delta = float(delta)
+ self.__header.set_axis_scale(axis_index, start, delta)
+
+ self.__update_modify_time()
+ return self
+
+ def axis_scale(self, axis_index: int) -> Tuple[float, float]:
+ return self.__header.axis_scale(axis_index)
+
+ def calculated_axis_wave(self, axis_index: int) -> np.ndarray:
+ return self.__header.calculated_axis_wave(axis_index)
+
+ @property
+ def data_scale(self) -> Union[Tuple[float, float], None]:
+ return self.__header.data_scale
+
+ def set_data_scale(self,
+ max_: Union[float, int],
+ min_: Union[float, int]) -> BinaryWave5:
+ max_ = float(max_)
+ min_ = float(min_)
+ self.__header.set_data_scale(max_, min_)
+
+ self.__update_modify_time()
+ return self
+
+ @property
+ def note(self) -> str:
+ return self.__note
+
+ @property
+ def note_buf(self) -> bytes:
+ return bytes(self.__note, encoding=TEXT_ENCODE)
+
+ @property
+ def note_size(self) -> int:
+ return len(self.note_buf)
+
+ def set_note(self, note: str) -> BinaryWave5:
+ if not isinstance(note, str):
+ raise TypeError('a string is required')
+ note = self.__convert_eol(note)
+ self.__note = note
+ self.__header.note_size = self.note_size
+
+ self.__update_modify_time()
+ return self
+
+ @property
+ def creation_time(self) -> datetime.datetime:
+ return self.__header.creation_time
+
+ def set_creation_time(self, time: datetime.datetime) -> BinaryWave5:
+ if not isinstance(time, datetime.datetime):
+ raise TypeError("datetime.datetime object is required")
+ self.__header.set_creation_time(time)
+ return self
+
+ def __update_creation_time(self) -> BinaryWave5:
+ self.__header.update_creation_time()
+ return self
+
+ @property
+ def modify_time(self) -> datetime.datetime:
+ return self.__header.modify_time
+
+ def __initialize_modify_time(self) -> BinaryWave5:
+ self.__header.initialize_modify_time()
+ return self
+
+ def duplicate(self, name: str) -> BinaryWave5:
+ res = deepcopy(self)
+ res.rename(name)
+ res.__update_creation_time()
+ res.__initialize_modify_time()
+ return res
+
+ def save(self, path: Optional[str] = None) -> None:
+ header_buf = self.__header.buffer
+ values_buf = self.__values.tobytes(order='F')
+
+ dependency_formula_buf = self.formula_buf
+ note_buf = self.note_buf
+
+ if not self.__header.data_unit:
+ ex_data_unit_buf = bytes(self.data_unit, encoding=TEXT_ENCODE)
+ else:
+ ex_data_unit_buf = b''
+
+ short_dim_units = self.__header.axes_unit
+ dim_units = self.axes_unit
+ ex_dim_units_bufs = [bytes(dim_unit, encoding=TEXT_ENCODE)
+ if not short_dim_unit else b''
+ for dim_unit, short_dim_unit
+ in zip(dim_units, short_dim_units)]
+ ex_dim_units_buf = reduce(lambda x, y: x + y, ex_dim_units_bufs)
+ # TODO: support dimension label
+ # dimension_label_bufs = ...
+
+ buffer = header_buf + values_buf \
+ + dependency_formula_buf + note_buf \
+ + ex_data_unit_buf + ex_dim_units_buf
+
+ if path is None:
+ path = self.name + ".ibw"
+ with open(path, mode='wb') as f:
+ f.write(buffer)
+
+
+class BinaryWave5Loader:
+ BIN_HEADER_SIZE = 64
+
+ def __init__(self, path: str) -> None:
+ self.path = path
+
+ def __has_valid_checksum(self) -> bool:
+ with open(self.path, mode='rb') as f:
+ header_buf = f.read(self.BIN_HEADER_SIZE + WAVE_HEADER_SIZE)
+ values = np.array(struct.unpack("192h", header_buf))
+ checksum = np.sum(values, dtype=np.int16)
+
+ return checksum == 0
+
+ def load(self) -> BinaryWave5:
+ if not self.__has_valid_checksum():
+ raise ValueError('bad checksum')
+
+ header_loader = BinaryWaveHeader5Loader()
+ with open(self.path, mode='rb') as f:
+ bin_header_buf = f.read(self.BIN_HEADER_SIZE)
+ wave_header_buf = f.read(WAVE_HEADER_SIZE)
+ header = header_loader.load_from_buffer(
+ bin_header_buf, wave_header_buf)
+ section_sizes = header.section_sizes
+
+ values_buf = f.read(section_sizes['value_size'])
+ values = np.frombuffer(values_buf, dtype=header.dtype)
+ values_array = np.reshape(values, list(reversed(header.shape))).T
+
+ dependency_formula_buf = f.read(section_sizes['formula_size'])
+ dependency_formula = decode_unicode(dependency_formula_buf)
+
+ note_buf = f.read(section_sizes['note_size'])
+ note = decode_unicode(note_buf)
+
+ ex_data_unit = decode_unicode(
+ f.read(section_sizes['ex_data_unit_size']))
+ data_unit = header.data_unit if header.data_unit else ex_data_unit
+
+ ex_axes_unit = [decode_unicode(f.read(size))
+ for size in section_sizes['ex_axes_unit_size']]
+ axes_unit = [short_unit if short_unit else ex_unit
+ for short_unit, ex_unit
+ in zip(header.axes_unit, ex_axes_unit)]
+
+ # TODO: support dimension label
+ if section_sizes['axes_label_size'] != (0, 0, 0, 0):
+ print('Warning: axis labels are not supported')
+
+ # TODO: delete when dimension labels are supported
+ header.initialize_axis_label_size()
+
+ """
+ axes_label_buf = [f.read(size)
+ for size in section_sizes['axes_label_size']]
+ """
+
+ res = BinaryWave5(ibw_header=header,
+ wave_values=values_array,
+ data_unit=data_unit,
+ axes_unit=axes_unit,
+ dependency_formula=dependency_formula,
+ note=note)
+ return res
diff --git a/vendor/ibwpy/ibwpy/typeddicts.py b/vendor/ibwpy/ibwpy/typeddicts.py
new file mode 100644
index 0000000..1707ded
--- /dev/null
+++ b/vendor/ibwpy/ibwpy/typeddicts.py
@@ -0,0 +1,36 @@
+import datetime
+from typing import Tuple, Union
+
+from typing_extensions import TypedDict
+
+from .constants import IBWDType
+
+
+class BinaryHeaderValues(TypedDict):
+ formula_size: int
+ note_size: int
+ data_unit_size: int
+ axes_unit_size: Tuple[int, int, int, int]
+ axes_label_size: Tuple[int, int, int, int]
+
+
+class WaveHeaderValues(TypedDict):
+ creation_datetime: datetime.datetime
+ mod_datetime: datetime.datetime
+ dtype: IBWDType
+ name: str
+ shape: Tuple[int, ...]
+ axes_delta: Tuple[float, float, float, float]
+ axes_start: Tuple[float, float, float, float]
+ data_unit: str
+ axes_unit: Tuple[str, str, str, str]
+ data_scale: Union[Tuple[float, float], None]
+
+
+class SectionSizes(TypedDict):
+ value_size: int
+ formula_size: int
+ note_size: int
+ ex_data_unit_size: int
+ ex_axes_unit_size: Tuple[int, int, int, int]
+ axes_label_size: Tuple[int, int, int, int]
diff --git a/vendor/ibwpy/ibwpy/waveheader.py b/vendor/ibwpy/ibwpy/waveheader.py
new file mode 100644
index 0000000..78cf448
--- /dev/null
+++ b/vendor/ibwpy/ibwpy/waveheader.py
@@ -0,0 +1,561 @@
+from __future__ import annotations
+
+import datetime
+import struct
+from functools import reduce
+from typing import List, Optional, Tuple, Union, cast
+
+import numpy as np
+
+from .commonfunc import decode_unicode
+from .constants import (DATETIME_OFFSET, DEFAULT_DTYPE, MAX_WAVE_NAME_LENGTH,
+ TEXT_ENCODE, WAVE_HEADER_SIZE, IBWDType)
+from .typeddicts import BinaryHeaderValues, SectionSizes, WaveHeaderValues
+
+
+class BinaryWaveHeader5:
+ DEFAULT_AXES_UNIT = ('', '', '', '')
+ DEFAULT_AXES_START = (0., 0., 0., 0.)
+ DEFAULT_AXES_DELTA = (1., 1., 1., 1.)
+ DEFAULT_AXES_LABEL_SIZE = (0, 0, 0, 0)
+ DTYPE_IDS = {'float32': 2, 'float64': 4,
+ 'int8': 8, 'int16': 0x10, 'int32': 0x20}
+ DTYPE_BYTES = {'float32': 4, 'float64': 8,
+ 'int8': 1, 'int16': 2, 'int32': 4}
+ VALID_DTYPES = ['float32', 'float64', 'int8', 'int16', 'int32']
+
+ MAX_WFM_SIZE = 2 ** 32 // 2 - 1
+
+ MAX_NDIM = 4
+
+ def __init__(
+ self, shape: Tuple[int, ...], name: str, dtype: str = DEFAULT_DTYPE,
+ formula_size: int = 0, note_size: int = 0,
+ data_unit: Union[str, int] = '', # unit or size (if extended unit)
+ axes_unit: Optional[
+ Tuple[Union[str, int], Union[str, int], # default: '' x4
+ Union[str, int], Union[str, int]]] = None,
+ axes_start: Optional[
+ Tuple[float, float, float, float]] = None, # default: 0. x 4
+ axes_delta: Optional[
+ Tuple[float, float, float, float]] = None, # default: 1. x 4
+ axes_label_size: Optional[
+ Tuple[int, int, int, int]] = None, # default: 0 x 4
+ creation_time: datetime.datetime = datetime.datetime.now(),
+ modify_time: datetime.datetime = DATETIME_OFFSET,
+ data_scale: Optional[Tuple[float, float]] = None) -> None:
+ if not self.is_valid_name(name):
+ raise ValueError('invalid name')
+ self.__name = name
+ if not self.is_valid_dtype(dtype):
+ raise ValueError('invalid data type')
+ self.__dtype = dtype
+ if not self.__is_valid_shape(shape):
+ raise ValueError('invalid shape')
+ self.__shape = shape
+
+ self.formula_size = formula_size # must updated in loading/writing wave
+ self.note_size = note_size # must updated in loading/writing wave
+ self.__data_unit = data_unit
+ self.__axes_unit = axes_unit if axes_unit else self.DEFAULT_AXES_UNIT
+ """
+
+ data_unit stores a data unit string when (data unit length) <= 3 or
+ the size of a data unit string when (data unit length) > 3.
+ It is the same in axes_unit (list of dimension units).
+ """
+ self.__axes_start = axes_start if axes_start \
+ else self.DEFAULT_AXES_START
+ self.__axes_delta = axes_delta if axes_delta \
+ else self.DEFAULT_AXES_DELTA
+ self.__axes_label_size = axes_label_size if axes_label_size \
+ else self.DEFAULT_AXES_LABEL_SIZE
+ self.__creation_time = creation_time
+ self.__modify_time = modify_time
+ self.__data_scale = data_scale
+
+ @property
+ def name(self) -> str:
+ return self.__name
+
+ @classmethod
+ def is_valid_name(cls, name: str) -> bool:
+ if not isinstance(name, str):
+ raise TypeError('name must be a string')
+ ub_removed = name.replace('_', '')
+ if not ub_removed.encode(TEXT_ENCODE).isalnum():
+ raise ValueError('all characters in name must be '
+ 'alphabet, digit, or underscore')
+ if not name[0].isalpha():
+ raise ValueError('name must start with an alphabet')
+ if len(name) > MAX_WAVE_NAME_LENGTH:
+ raise ValueError(
+ 'max length of name is {}'.format(MAX_WAVE_NAME_LENGTH))
+ return True
+
+ def rename(self, name: str) -> BinaryWaveHeader5:
+ if not self.is_valid_name(name):
+ raise ValueError('invalid name')
+ self.__name = name
+ return self
+
+ @property
+ def shape(self) -> Tuple[int, ...]:
+ return self.__shape
+
+ def __is_valid_shape(self, shape: Tuple[int, ...]) -> bool:
+ if not isinstance(shape, tuple):
+ raise TypeError(
+ 'shape must be passed as a tuple of positive integer(s)')
+ if not all([isinstance(size, int) and size > 0 for size in shape]):
+ raise ValueError(
+ 'shape must be passed as a tuple of positive integer(s)')
+ if len(shape) > self.MAX_NDIM:
+ raise ValueError(
+ 'max number of dimension is {}'.format(self.MAX_NDIM))
+ return True
+
+ def reshape(self, shape: Tuple[int, ...]) -> BinaryWaveHeader5:
+ if not self.__is_valid_shape(shape):
+ raise ValueError('invalid shape')
+ if reduce(lambda x, y: x * y, shape) != self.__npnts:
+ raise ValueError('new shape does not match to original number '
+ 'of data points ({} points)'.format(self.__npnts))
+ self.__shape = shape
+
+ # following informations will initialized
+ self.__axes_unit = self.DEFAULT_AXES_UNIT
+ self.__axes_start = self.DEFAULT_AXES_START
+ self.__axes_delta = self.DEFAULT_AXES_DELTA
+ self.__axes_label_size = self.DEFAULT_AXES_LABEL_SIZE
+
+ return self
+
+ @property
+ def ndim(self) -> int:
+ return len(self.__shape)
+
+ @property
+ def dtype(self) -> str:
+ return self.__dtype
+
+ def is_valid_dtype(self, dtype: str) -> bool:
+ if not isinstance(dtype, str):
+ raise TypeError('data type must be passed as string')
+ if dtype not in self.VALID_DTYPES:
+ raise TypeError('invalid data type')
+ return True
+
+ def set_dtype(self, dtype: str) -> BinaryWaveHeader5:
+ if not self.is_valid_dtype(dtype):
+ raise TypeError('invalid data type')
+ self.__dtype = dtype
+ return self
+
+ @property
+ def data_unit(self) -> Union[str, None]:
+ res = self.__data_unit if isinstance(self.__data_unit, str) else None
+ return res
+
+ def set_data_unit(self, unit: str) -> BinaryWaveHeader5:
+ self.__data_unit = len(unit) if len(unit) > 3 else unit
+ return self
+
+ @property
+ def axes_unit(self) -> List[Union[str, None]]:
+ res = [unit if isinstance(unit, str) else None
+ for unit in self.__axes_unit]
+ return res
+
+ def __is_valid_axis_index(self, axis_index: int) -> bool:
+ if not isinstance(axis_index, int):
+ raise TypeError('axis_index must be passed as an integer')
+ if not 0 <= axis_index < self.ndim:
+ raise KeyError(
+ 'this wave has only {} dimensions'.format(self.ndim))
+ return True
+
+ def set_axis_unit(self, axis_index: int, unit: str) -> BinaryWaveHeader5:
+ if not self.__is_valid_axis_index(axis_index):
+ raise ValueError('invalid axis_index')
+ units = list(self.__axes_unit)
+ units[axis_index] = len(unit) if len(unit) > 3 else unit
+ res = cast(Tuple[Union[str, int], Union[str, int],
+ Union[str, int], Union[str, int]], tuple(units))
+ self.__axes_unit = res
+ return self
+
+ def calculated_axis_wave(self, axis_index: int) -> np.ndarray:
+ if not self.__is_valid_axis_index(axis_index):
+ raise ValueError('invalid axis_index')
+ size = self.shape[axis_index]
+ start = self.__axes_start[axis_index]
+ step = self.__axes_delta[axis_index]
+
+ res = np.full(size, start)
+ increments = np.arange(size) * step
+
+ res += increments
+ return res
+
+ def set_axis_scale(self, axis_index: int,
+ start: float, delta: float) -> BinaryWaveHeader5:
+ if not self.__is_valid_axis_index(axis_index):
+ raise ValueError('invalid axis_index')
+ if not (isinstance(start, float) and isinstance(delta, float)):
+ raise TypeError('invalid argument type')
+ starts = list(self.__axes_start)
+ deltas = list(self.__axes_delta)
+
+ starts[axis_index] = start
+ deltas[axis_index] = delta
+
+ res_starts = cast(Tuple[float, float, float, float], tuple(starts))
+ res_deltas = cast(Tuple[float, float, float, float], tuple(deltas))
+
+ self.__axes_start = res_starts
+ self.__axes_delta = res_deltas
+ return self
+
+ def axis_scale(self, axis_index: int) -> Tuple[float, float]:
+ if not self.__is_valid_axis_index(axis_index):
+ raise ValueError('invalid axis_index')
+ return (self.__axes_start[axis_index], self.__axes_delta[axis_index])
+
+ def initialize_axis_label_size(self) -> BinaryWaveHeader5:
+ self.__axes_label_size = self.DEFAULT_AXES_LABEL_SIZE
+ return self
+
+ @property
+ def data_scale(self) -> Union[Tuple[float, float], None]:
+ return self.__data_scale
+
+ def set_data_scale(self, max_: float, min_: float) -> BinaryWaveHeader5:
+ if not (isinstance(max_, float) and isinstance(min_, float)):
+ raise TypeError('invalid argument type')
+ self.__data_scale = (max_, min_)
+ return self
+
+ def __datetime_to_num(self, time: datetime.datetime) -> int:
+ res = time - DATETIME_OFFSET
+ return int(res.total_seconds())
+
+ @property
+ def creation_time(self) -> datetime.datetime:
+ return self.__creation_time
+
+ def set_creation_time(self, time: datetime.datetime) -> BinaryWaveHeader5:
+ self.__creation_time = time
+ return self
+
+ def update_creation_time(self) -> BinaryWaveHeader5:
+ self.__creation_time = datetime.datetime.now()
+ return self
+
+ @property
+ def modify_time(self) -> datetime.datetime:
+ return self.__modify_time
+
+ def initialize_modify_time(self) -> BinaryWaveHeader5:
+ self.__modify_time = DATETIME_OFFSET
+ return self
+
+ def update_modify_time(self) -> BinaryWaveHeader5:
+ self.__modify_time = datetime.datetime.now()
+ return self
+
+ @property
+ def __npnts(self) -> int:
+ return reduce(lambda x, y: x * y, self.__shape)
+
+ @property
+ def __type_size(self) -> int:
+ return self.DTYPE_BYTES[self.__dtype]
+
+ @property
+ def __ex_data_unit_size(self) -> int:
+ res = self.__data_unit if isinstance(self.__data_unit, int) else 0
+ return res
+
+ @property
+ def __ex_axes_unit_size(self) -> Tuple[int, int, int, int]:
+ res = tuple(unit if isinstance(unit, int) else 0
+ for unit in self.__axes_unit)
+ res = cast(Tuple[int, int, int, int], res)
+ return res
+
+ @property
+ def section_sizes(self) -> SectionSizes:
+ value_size = self.__npnts * self.__type_size
+
+ res: SectionSizes = {
+ 'value_size': value_size,
+ 'formula_size': self.formula_size,
+ 'note_size': self.note_size,
+ 'ex_data_unit_size': self.__ex_data_unit_size,
+ 'ex_axes_unit_size': self.__ex_axes_unit_size,
+ 'axes_label_size': self.__axes_label_size}
+
+ return res
+
+ @property
+ def buffer(self) -> bytes:
+ binary_header_buf = self.__binary_header_buffer()
+ wave_header_buf = self.__wave_header_buffer()
+
+ header_buf = bytearray(binary_header_buf + wave_header_buf)
+ checksum = int(self.__checksum(header_buf) * (-1))
+ checksum_buf = checksum.to_bytes(2, byteorder='little', signed=True)
+ header_buf[2:4] = checksum_buf
+
+ return bytes(header_buf)
+
+ def __binary_header_buffer(self) -> bytes:
+ version = 5
+ checksum = 0 # temporal value
+ wfm_size = WAVE_HEADER_SIZE + self.__type_size * self.__npnts
+ if wfm_size > self.MAX_WFM_SIZE:
+ raise ValueError(
+ "array size exceeds the ibw file limit "
+ f"({self.MAX_WFM_SIZE - WAVE_HEADER_SIZE:,} bytes)")
+ formula_size = self.formula_size
+ note_size = self.note_size
+ data_eunits_size = self.__ex_data_unit_size
+ dim_eunits_size = self.__ex_axes_unit_size
+ dim_elabels_size = self.__axes_label_size
+ s_indices_size = 0 # used in text wave
+ options_size_1 = 0 # reserved
+ options_size_2 = 0 # reserved
+
+ values = (
+ version, checksum, wfm_size,
+ formula_size, note_size,
+ data_eunits_size, *dim_eunits_size, *dim_elabels_size,
+ s_indices_size, options_size_1, options_size_2)
+ res = struct.pack('2h15i', *values)
+
+ return res
+
+ def __wave_header_buffer(self) -> bytes:
+ next_ = 0 # pointer (no meaning in python)
+ creation_time = self.__datetime_to_num(self.__creation_time)
+ mod_time = self.__datetime_to_num(self.__modify_time)
+ npnts = self.__npnts
+ type_ = self.DTYPE_IDS[self.__dtype]
+ d_lock = 0 # reserved
+
+ values_1 = (
+ next_, creation_time, mod_time,
+ npnts, type_, d_lock)
+ buffer_1 = struct.pack("iIIihh", *values_1)
+
+ whpad1 = bytes('', encoding=TEXT_ENCODE) # reserved
+ wh_version = 1
+ name = bytes(self.__name, encoding=TEXT_ENCODE)
+ whpad2 = 0 # reserved
+ data_folder = 0 # pointer (no meaning in python)
+
+ values_2 = (
+ whpad1, wh_version, name, whpad2, data_folder)
+ buffer_2 = struct.pack("6sh32sii", *values_2)
+
+ n_dim = [0, 0, 0, 0]
+ for index, size in enumerate(self.__shape):
+ n_dim[index] = size
+ sf_a = self.__axes_delta
+ sf_b = self.__axes_start
+
+ values_3 = (
+ *n_dim, *sf_a, *sf_b)
+ buffer_3 = struct.pack("4i4d4d", *values_3)
+
+ data_unit = bytes(self.data_unit if self.data_unit else '',
+ encoding=TEXT_ENCODE)
+ dim_units = [bytes(unit if unit else '', encoding=TEXT_ENCODE)
+ for unit in self.axes_unit]
+ if self.data_scale:
+ fs_valid = 1
+ top_full_scale = self.data_scale[0]
+ bot_full_scale = self.data_scale[1]
+ else:
+ fs_valid = 0
+ top_full_scale = 0
+ bot_full_scale = 0
+ whpad3 = 0 # reserved
+
+ values_4 = (
+ data_unit, *dim_units, fs_valid, whpad3,
+ top_full_scale, bot_full_scale)
+ buffer_4 = struct.pack("4s4s4s4s4shhdd", *values_4)
+
+ values_5 = tuple(0 for i in range(26)) # pointers and reserved
+ buffer_5 = struct.pack("i4i4ii16i", *values_5)
+
+ values_6 = tuple(0 for i in range(11)) # private to igor
+ buffer_6 = struct.pack("hhh??iihhii", *values_6) # format: hhhcciihhii
+
+ res = buffer_1 + buffer_2 + buffer_3 + buffer_4 + buffer_5 + buffer_6
+ return res
+
+ def __checksum(self, buffer: bytes) -> int:
+ values = np.array(struct.unpack("192h", buffer))
+ checksum = np.sum(values, dtype=np.int16)
+ return checksum
+
+
+class BinaryWaveHeader5Loader:
+ WAVETYPES = {0: 'text', 1: 'complex', 2: 'float32',
+ 4: 'float64', 8: 'int8', 0x10: 'int16', 0x20: 'int32',
+ 0x40: 'unsigned'}
+ DEFAULT_WAVE_NAME = 'wave'
+
+ def __init__(self) -> None:
+ pass
+
+ def load_from_buffer(self, bin_header: bytes,
+ wave_header: bytes) -> BinaryWaveHeader5:
+ version = struct.unpack("h", bin_header[0:2])[0]
+ if version != 5:
+ if version == 7:
+ print('Warning: Got version 7 Igor binary wave file. '
+ 'Long wave name and/or long dimension labels '
+ 'will ignored.')
+ else:
+ raise TypeError(
+ 'only version 5 Igor binary wave files '
+ 'are supported (got version {})'.format(version))
+
+ binh_values = self.__unpack_binary_header(bin_header)
+ waveh_values = self.__unpack_wave_header(wave_header)
+
+ name = waveh_values['name']
+ if name == ':wave name too long:':
+ name = self.DEFAULT_WAVE_NAME
+ print('Warning: Long wave name is not supported. '
+ 'Wave name is set to default ({}).'.format(name))
+
+ data_unit_size = binh_values['data_unit_size']
+ data_unit_str = waveh_values['data_unit']
+ data_unit = data_unit_size if data_unit_size != 0 else data_unit_str
+ data_unit = cast(Union[str, int], data_unit)
+
+ axes_unit_size = binh_values['axes_unit_size']
+ axes_unit_str = waveh_values['axes_unit']
+ axes_unit = tuple(axis_unit_size if axis_unit_size != 0 else axis_unit
+ for axis_unit_size, axis_unit
+ in zip(axes_unit_size, axes_unit_str))
+ axes_unit = cast(Tuple[Union[str, int], Union[str, int],
+ Union[str, int], Union[str, int]],
+ axes_unit)
+
+ header = BinaryWaveHeader5(
+ shape=waveh_values['shape'], name=name,
+ dtype=waveh_values['dtype'],
+ formula_size=binh_values['formula_size'],
+ note_size=binh_values['note_size'],
+ data_unit=data_unit, axes_unit=axes_unit,
+ axes_start=waveh_values['axes_start'],
+ axes_delta=waveh_values['axes_delta'],
+ axes_label_size=binh_values['axes_label_size'],
+ creation_time=waveh_values['creation_datetime'],
+ modify_time=waveh_values['mod_datetime'],
+ data_scale=waveh_values['data_scale']
+ )
+
+ return header
+
+ def __unpack_binary_header(self, bin_header: bytes
+ ) -> BinaryHeaderValues:
+ # 64 bytes
+ values = struct.unpack("2h15i", bin_header)
+
+ # size of the dependency formula, if any.
+ formula_size = cast(int, values[3])
+ note_size = cast(int, values[4]) # size of the note text
+ # size of optional extended data unit
+ data_unit_size = cast(int, values[5])
+ # sizes of optional extended dimension units
+ axes_unit_size = cast(Tuple[int, int, int, int], values[6:10])
+ # sizes of optional dimension labels
+ axes_label_size = cast(Tuple[int, int, int, int], values[10:14])
+
+ # size of string indicies if text wave
+ string_indice_size = cast(int, values[14])
+ is_text_wave = string_indice_size != 0
+ if is_text_wave:
+ raise TypeError('text wave is not supported')
+
+ res: BinaryHeaderValues = {
+ 'formula_size': formula_size,
+ 'note_size': note_size,
+ 'data_unit_size': data_unit_size,
+ 'axes_unit_size': axes_unit_size,
+ 'axes_label_size': axes_label_size}
+
+ return res
+
+ def __unpack_wave_header(self, wave_header: bytes) -> WaveHeaderValues:
+
+ # 1st section (20 bytes)
+ header_1 = wave_header[0:20]
+ values_1 = struct.unpack("iIIihh", header_1)
+ creation_datetime = self.__num_to_datetime(
+ values_1[1]) # datetime of creation
+ # datetime of last modification
+ mod_datetime = self.__num_to_datetime(values_1[2])
+ npnts = values_1[3] # total number of points
+ # data type of wave
+ dtype = cast(IBWDType, self.WAVETYPES[values_1[4]])
+ if dtype in ('text', 'complex', 'unsigned'):
+ raise TypeError('{} wave is not supported'.format(dtype))
+
+ # 2nd section (48 bytes)
+ header_2 = wave_header[20:68]
+ values_2 = struct.unpack("6sh32sii", header_2)
+ name = cast(str, decode_unicode(values_2[2].split(b'\x00', 1)[0]))
+
+ # 3rd section (80 bytes)
+ header_3 = wave_header[68:148]
+ values_3 = struct.unpack("4i4d4d", header_3)
+ shape = cast(
+ Tuple[int, ...], tuple(size for size in values_3[0:4] if size != 0))
+ points_num = reduce(lambda x, y: x * y, shape)
+ if not points_num == npnts:
+ raise ValueError(
+ 'number of points (npnts) does not match with shape')
+ axes_delta = cast(Tuple[float, float, float, float], values_3[4:8])
+ axes_start = cast(Tuple[float, float, float, float], values_3[8:12])
+
+ # 4th section (40 bytes)
+ header_4 = wave_header[148:188]
+ values_4 = struct.unpack("4s4s4s4s4shhdd", header_4)
+ data_unit = cast(str, decode_unicode(values_4[0].split(b'\x00', 1)[0]))
+ axes_unit = cast(Tuple[str, str, str, str],
+ tuple([decode_unicode(unit.split(b'\x00', 1)[0])
+ for unit in values_4[1:5]]))
+ has_data_scale = bool(values_4[5])
+
+ if has_data_scale:
+ data_scale = cast(Union[Tuple[float, float], None],
+ (values_4[7], values_4[8]))
+ else:
+ data_scale = None
+
+ # skip following headers (104 + 28 bytes)
+ # format: i4i4ii16i, hhhcciihhii
+
+ res: WaveHeaderValues = {
+ 'creation_datetime': creation_datetime,
+ 'mod_datetime': mod_datetime,
+ 'dtype': dtype,
+ 'name': name,
+ 'shape': shape,
+ 'axes_delta': axes_delta,
+ 'axes_start': axes_start,
+ 'data_unit': data_unit,
+ 'axes_unit': axes_unit,
+ 'data_scale': data_scale}
+ return res
+
+ def __num_to_datetime(self, num: int) -> datetime.datetime:
+ return DATETIME_OFFSET + datetime.timedelta(seconds=num)
diff --git a/vendor/ibwpy/requirements.txt b/vendor/ibwpy/requirements.txt
new file mode 100644
index 0000000..1e557c6
--- /dev/null
+++ b/vendor/ibwpy/requirements.txt
@@ -0,0 +1,2 @@
+numpy>=1.20.3
+typing-extensions>=4.0.0
\ No newline at end of file