diff --git a/README.md b/README.md index c24a1de..f9bc68e 100644 --- a/README.md +++ b/README.md @@ -2,7 +2,15 @@ Fundamental algorithms from the field of fire science and fire safety engineering, for computations with Python. -Documentation is available here: [![FireSciPy Documentation](https://img.shields.io/badge/docs-online-brightgreen)](https://FireDynamics.github.io/FireSciPy/) +[![FireSciPy Documentation](https://img.shields.io/badge/docs-online-brightgreen)](https://FireDynamics.github.io/FireSciPy/) +[![PyPI version](https://badge.fury.io/py/firescipy.svg)](https://pypi.org/project/firescipy/) + + +## Installation + +``` +pip install firescipy +``` Examples are available in Jupyter notebooks in the [FireSciPy repo on GitHub](https://github.com/FireDynamics/FireSciPy). diff --git a/docs/index.rst b/docs/index.rst index 06e1d62..ba8725b 100644 --- a/docs/index.rst +++ b/docs/index.rst @@ -31,3 +31,4 @@ functions, and utility modules used in FireSciPy. reference/pyrolysis reference/handcalculation reference/utils + reference/instruments diff --git a/docs/reference/instruments.rst b/docs/reference/instruments.rst new file mode 100644 index 0000000..c29a144 --- /dev/null +++ b/docs/reference/instruments.rst @@ -0,0 +1,46 @@ +Instruments Module +================== + +This subpackage provides parsers for reading experimental instrument export +files commonly used in fire science and fire safety engineering research. +Supported formats include thermal analysis (STA, DSC, TGA) and calorimetry +(MCC, Cone Calorimeter) instruments. + +A generic reader with automatic file type detection is available via +:func:`firescipy.instruments.read_instrument_file`. + + +Reader +------ + +.. automodule:: firescipy.instruments.reader + :members: + :undoc-members: + :show-inheritance: + + +Netzsch STA / DSC / TGA +------------------------ + +.. automodule:: firescipy.instruments.netzsch_sta + :members: + :undoc-members: + :show-inheritance: + + +DEATAK MCC +---------- + +.. automodule:: firescipy.instruments.deatak_mcc + :members: + :undoc-members: + :show-inheritance: + + +Netzsch Cone Calorimeter +------------------------ + +.. automodule:: firescipy.instruments.netzsch_cone + :members: + :undoc-members: + :show-inheritance: diff --git a/docs/tutorials/pyrolysis/mock_kinetics_computation.rst b/docs/tutorials/pyrolysis/mock_kinetics_computation.rst index 2a4ab6a..edabdbf 100644 --- a/docs/tutorials/pyrolysis/mock_kinetics_computation.rst +++ b/docs/tutorials/pyrolysis/mock_kinetics_computation.rst @@ -183,12 +183,12 @@ Looping over the above dictionary, a temperature program is created for each nom t_array = fsp.utils.series_to_numpy(hr_model["Time"]) T_array = fsp.utils.series_to_numpy(hr_model["Temperature"]) # Get overview over temperature resolution - ΔT = temp_model[1] - temp_model[0] + ΔT = T_array[1] - T_array[0] print(f"Temperature resolution ({hr_label}): ΔT = {ΔT} K") # Compute conversion for decelerating reaction (n-th order) t_sol, alpha_sol = fsp.pyrolysis.modeling.solve_kinetics( - t_array=time_model, - T_array=temp_model, + t_array=t_array, + T_array=T_array, A=A, E=E, alpha0=alpha0, diff --git a/pyproject.toml b/pyproject.toml index cf7d156..9416127 100644 --- a/pyproject.toml +++ b/pyproject.toml @@ -4,7 +4,7 @@ build-backend = "hatchling.build" [project] name = "firescipy" -version = "0.0.6" +version = "0.0.9" description = "FireSciPy: Fundamental algorithms from the field of fire science, for computations with Python." readme = "README.md" keywords = ["Fire Safety Engineering", "fire", "pyrolysis", "kinetics", "FDS"] @@ -32,6 +32,12 @@ dependencies = [ [tool.hatch.build.targets.wheel] packages = ["src/firescipy"] +[project.optional-dependencies] +test = ["pytest>=7.0"] + +[tool.pytest.ini_options] +testpaths = ["tests"] + [project.urls] Repository = "https://github.com/FireDynamics/FireSciPy" Documentation = "https://firedynamics.github.io/FireSciPy/" diff --git a/src/firescipy/__init__.py b/src/firescipy/__init__.py index b1baa2a..3de4f06 100644 --- a/src/firescipy/__init__.py +++ b/src/firescipy/__init__.py @@ -7,6 +7,7 @@ from . import pyrolysis from . import constants from . import handcalculation +from . import instruments from importlib.metadata import PackageNotFoundError, version diff --git a/src/firescipy/instruments/__init__.py b/src/firescipy/instruments/__init__.py new file mode 100644 index 0000000..18b4de1 --- /dev/null +++ b/src/firescipy/instruments/__init__.py @@ -0,0 +1,8 @@ +# This Source Code Form is subject to the terms of the Mozilla Public +# License, v. 2.0. If a copy of the MPL was not distributed with this +# file, You can obtain one at https://mozilla.org/MPL/2.0/. + +from .reader import read_instrument_file, detect_file_type, SUPPORTED_TYPES +from .netzsch_sta import read_netzsch_sta_file +from .deatak_mcc import read_deatak_mcc_file +from .netzsch_cone import read_netzsch_cone_file diff --git a/src/firescipy/instruments/base.py b/src/firescipy/instruments/base.py new file mode 100644 index 0000000..8780fe0 --- /dev/null +++ b/src/firescipy/instruments/base.py @@ -0,0 +1,123 @@ +# This Source Code Form is subject to the terms of the Mozilla Public +# License, v. 2.0. If a copy of the MPL was not distributed with this +# file, You can obtain one at https://mozilla.org/MPL/2.0/. + +from pathlib import Path + + +class InstrumentFile: + """ + Generic file loader for laboratory text exports. + + Responsibilities + ---------------- + - read raw bytes + - decode using fallback encodings + - repair known character issues + - expose text and lines + - provide small helpers for line-based inspection + """ + + def __init__( + self, + file_path, + encodings=None, + replacements=None, + strip_bom=True, + ): + # Store the file path as a Path object for reliable cross-platform handling. + self.file_path = Path(file_path) + + # List of encodings to try in order. Many lab instruments export files + # with Windows-specific encodings (cp1252, latin1) rather than UTF-8. + # The first encoding that decodes the file without errors is used. + self.encodings = encodings or [ + "utf-8", + "cp1252", + "latin1", + "utf-16", + "utf-16-le", + "utf-16-be", + ] + + # Character repairs applied after decoding. Some instruments export + # special characters (e.g. degree sign °) using byte sequences that + # do not survive encoding conversion cleanly. These replacements fix + # the most common cases. The order matters: more specific patterns + # (e.g. "°C") must come before broader ones (e.g. "°") to avoid + # partial replacements. + self.replacements = replacements or { + "›": "°", + "›C": "°C", # cp1252: byte › decodes to › before C → °C + "›": "°", # cp1252: byte › decodes to › standalone → ° + "�C": "°C", # UTF-8 mojibake for degree-Celsius + "�": "°", # Unicode replacement character U+FFFD → degree sign + } + + # If True, strip the Byte Order Mark (BOM) that some editors and + # instruments prepend to UTF-8 or UTF-16 files. + self.strip_bom = strip_bom + + # These attributes are populated by read(). + self.raw_bytes = None # original file content as bytes + self.text = None # decoded and repaired full text + self.lines = None # text split into individual lines + self.used_encoding = None # whichever encoding succeeded + + def read(self): + # Read the entire file as raw bytes first, before any decoding. + self.raw_bytes = self.file_path.read_bytes() + + # Try each encoding in order until one succeeds. + self.text, self.used_encoding = self._decode_bytes(self.raw_bytes) + + # Fix known character encoding artefacts in the decoded text. + self.text = self._repair_text(self.text) + + # Remove a leading BOM character if present (common in UTF-8/16 files). + if self.strip_bom: + self.text = self.text.lstrip("") + + # Split into lines for line-by-line processing by the parsers. + self.lines = self.text.splitlines() + return self + + def _decode_bytes(self, raw_bytes): + last_error = None + + # Try each candidate encoding and return on the first success. + for enc in self.encodings: + try: + return raw_bytes.decode(enc), enc + except UnicodeDecodeError as exc: + last_error = exc + + # None of the encodings worked — raise a descriptive error. + raise ValueError( + f"Could not decode file '{self.file_path}' with tried encodings: {self.encodings}" + ) from last_error + + def _repair_text(self, text): + # Apply each search-and-replace pair from the replacements dict. + for old, new in self.replacements.items(): + text = text.replace(old, new) + return text + + def preview(self, start=0, stop=10): + # Quick inspection helper: return a slice of lines without printing all. + if self.lines is None: + raise RuntimeError("Call read() first.") + return self.lines[start:stop] + + def find_first_line(self, startswith=None, contains=None): + # Search for the first line that matches a prefix or a substring. + # Returns the line index, or None if no match is found. + if self.lines is None: + raise RuntimeError("Call read() first.") + + for idx, line in enumerate(self.lines): + if startswith is not None and line.startswith(startswith): + return idx + if contains is not None and contains in line: + return idx + return None diff --git a/src/firescipy/instruments/deatak_mcc.py b/src/firescipy/instruments/deatak_mcc.py new file mode 100644 index 0000000..844905e --- /dev/null +++ b/src/firescipy/instruments/deatak_mcc.py @@ -0,0 +1,257 @@ +# This Source Code Form is subject to the terms of the Mozilla Public +# License, v. 2.0. If a copy of the MPL was not distributed with this +# file, You can obtain one at https://mozilla.org/MPL/2.0/. + +from io import StringIO + +import pandas as pd + +from .base import InstrumentFile +from .helpers import split_line, strip_empty_edges, row_has_numeric_content, try_convert_to_float + + +def read_deatak_mcc_file(file_path): + """ + Convenience wrapper around DeatakMCCParser. + """ + parser = DeatakMCCParser(file_path=file_path) + return parser.parse() + + +class DeatakMCCParser: + """ + Parser for DEATAK MCC export files. + + Expected structure + ------------------ + - metadata lines up top + - table header line starting after '@' + - data rows below the table header + + Example + ------- + File Name: Wood_4mg_45Kmin_R1.txt + Version: 8.3.7.3 + ... + @ + Time (s) Temperature (C) HRR (W/g) + + 0.000 74.821 -1.830 + ... + """ + + DECIMAL_MAP = { + "COMMA": ",", + "POINT": ".", + "DOT": ".", + } + + SEPARATOR_MAP = { + "TAB": "\t", + } + + # Metadata fields whose values should be converted to numbers. + # All other fields are kept as strings. + NUMERIC_METADATA_KEYS = { + "Sample Mass (mg)", + "Sample Cup Mass (mg)", + "End Total Mass (mg)", + "Heating Rate (C/s)", + "Combuster Temperature (C)", + "N2 Flow Rate (cc/min)", + "O2 Flow Rate (cc/min)", + "T Correction Coefficients", + "Time Shift (s)", + "Baseline Flow", + "Baseline O2", + } + + def __init__(self, file_path, instrument_file=None): + """ + Parameters + ---------- + file_path : str or Path + Path to the DEATAK export file. + instrument_file : InstrumentFile, optional + Pre-loaded InstrumentFile instance. If None, one is created. + """ + self.file_path = file_path + # Use an existing InstrumentFile if provided, otherwise create one. + self.file = instrument_file or InstrumentFile(file_path).read() + + self.meta = dict() # will hold all metadata key-value pairs + self.data_df = None # will hold the measurement table as a DataFrame + + # These are set while scanning the file for the '@' separator line. + self.table_header_idx = None # line index of the column name row + self.table_header_line = None # the column name row as a string + + def parse(self): + """ + Parse metadata and data table. + + Returns + ------- + meta : dict + Parsed metadata. + data_df : pandas.DataFrame + Parsed measurement table. + """ + self._parse_metadata_and_find_table_header() + self._parse_data_table() + + # Record which encoding was used so the caller can inspect it. + self.meta["USED_ENCODING"] = self.file.used_encoding + + return self.meta, self.data_df + + def _parse_metadata_and_find_table_header(self): + """ + Parse metadata block and locate the table header line. + """ + for idx, raw_line in enumerate(self.file.lines): + line = raw_line.strip() + + # Skip blank lines. + if not line: + continue + + # Try to parse the current line as a metadata key-value pair. + parsed = self._parse_metadata_line(line) + if parsed is not None: + key, value = parsed + self.meta[key] = value + + # The '@' line signals the end of the metadata block. + # The column header is on the very next line. + header_indicator = "@" + if line.startswith(header_indicator): + self.table_header_idx = idx + 1 + self.table_header_line = self.file.lines[idx + 1] + break + + # Convert numeric metadata fields from strings to numbers. + self._postprocess_metadata() + + if self.table_header_idx is None: + raise ValueError(f"Could not find table header line starting with '{header_indicator}'.") + + def _parse_metadata_line(self, line): + """ + Parse one metadata line of the form: + KEY:\\tVALUE + + Returns + ------- + tuple[str, str | list[str] | None] or None + """ + # Split on the column separator (tab) and remove trailing empty cells. + column_sep = self._get_column_separator() + cells = split_line(line, sep=column_sep) + cells = strip_empty_edges(cells) + + if not cells: + return None + + first_cell = cells[0] + + # Only lines containing ":" are treated as metadata. + if ":" not in first_cell: + return None + + # Split "KEY: value" into key and its first value fragment. + key, first_value = first_cell.split(":", maxsplit=1) + key = key.strip() + first_value = first_value.strip() + + # Collect all non-empty value fragments (some fields span multiple cells). + values = [] + if first_value: + values.append(first_value) + + if len(cells) > 1: + values.extend(cell.strip() for cell in cells[1:] if cell.strip()) + + # Store as None, a single string, or a list depending on how many + # value fragments were found. + if len(values) == 0: + value = None + elif len(values) == 1: + value = values[0] + else: + value = values + + return key, value + + def _postprocess_metadata(self): + """ + Convert selected metadata values to numeric types where appropriate. + """ + decimal_sep = self._get_decimal_separator() + + # Only process the keys listed in NUMERIC_METADATA_KEYS. + for key in self.NUMERIC_METADATA_KEYS: + if key not in self.meta: + continue + + value = self.meta[key] + # try_convert_to_float handles both single strings and lists. + self.meta[key] = try_convert_to_float(value, decimal_sep) + + def _parse_data_table(self): + """ + Parse the measurement table below the table header. + """ + if self.table_header_idx is None or self.table_header_line is None: + raise RuntimeError("Table header information is missing.") + + decimal_sep = self._get_decimal_separator() + column_sep = self._get_column_separator() + column_names = self._get_column_names(column_sep) + + # Take all lines after the column header row and join them back into + # a single string so pandas can read it like a file. + table_lines = self.file.lines[self.table_header_idx + 1:] + table_text = "\n".join(table_lines) + + data_df = pd.read_csv( + StringIO(table_text), + sep=column_sep, + decimal=decimal_sep, + header=None, # column names are provided manually via 'names' + names=column_names, + engine="python", + skip_blank_lines=True, + ) + + # Clean up any stray '#' prefixes that some exports add to column names. + data_df.columns = [col.lstrip("#").strip() for col in data_df.columns] + # Drop columns that are entirely empty (padding artefact in some exports). + data_df = data_df.dropna(axis=1, how="all") + + self.data_df = data_df + + def _get_decimal_separator(self): + """ + Determine decimal separator from metadata. + """ + # Look for a "DECIMAL" entry in metadata; fall back to "POINT" (i.e. "."). + decimal_token = str(self.meta.get("DECIMAL", "POINT")).upper() + return self.DECIMAL_MAP.get(decimal_token, ".") + + def _get_column_separator(self): + """ + Determine column separator from metadata. + """ + # Look for a "SEPARATOR" entry in metadata; fall back to tab. + separator_token = str(self.meta.get("SEPARATOR", "SEMICOLON")).upper() + return self.SEPARATOR_MAP.get(separator_token, "\t") + + def _get_column_names(self, column_sep): + """ + Parse and clean table column names. + """ + column_names = split_line(self.table_header_line, sep=column_sep) + # Remove any leading '#' characters from column names. + column_names = [name.lstrip("#").strip() for name in column_names] + return column_names diff --git a/src/firescipy/instruments/helpers.py b/src/firescipy/instruments/helpers.py new file mode 100644 index 0000000..0414dfe --- /dev/null +++ b/src/firescipy/instruments/helpers.py @@ -0,0 +1,67 @@ +# This Source Code Form is subject to the terms of the Mozilla Public +# License, v. 2.0. If a copy of the MPL was not distributed with this +# file, You can obtain one at https://mozilla.org/MPL/2.0/. + + +def split_line(line, sep=";"): + """ + Split a line into stripped cells. + """ + # Split on the separator and remove leading/trailing whitespace from each cell. + return [cell.strip() for cell in line.split(sep)] + + +def strip_empty_edges(cells): + """ + Remove empty cells from the end of a row. + """ + # Many instrument exports pad rows with trailing empty fields (e.g. ";;;"). + # Remove them so column counts are not inflated. + while cells and cells[-1] == "": + cells.pop() + return cells + + +def row_has_numeric_content(cells, decimal="."): + """ + Heuristic: check whether at least one cell looks numeric. + """ + for cell in cells: + # Normalise the decimal separator to "." before attempting conversion, + # since European locales often use "," as the decimal character. + candidate = cell.replace(decimal, ".").replace(",", ".") + try: + float(candidate) + return True # at least one numeric cell found + except ValueError: + continue # not numeric, try the next cell + return False + + +def is_mostly_empty(cells): + # Returns True if all cells are empty strings (blank row). + non_empty = [cell for cell in cells if cell != ""] + return len(non_empty) == 0 + + +def try_convert_to_float(value, decimal=","): + # If the value is already a list, apply the conversion to each element. + if isinstance(value, list): + return [try_convert_to_float(v, decimal) for v in value] + + # Non-string values (e.g. int, float, None) are returned unchanged. + if not isinstance(value, str): + return value + + candidate = value.strip() + + # Replace the locale-specific decimal separator with "." so Python's + # float() can parse it (e.g. "3,14" → "3.14"). + if decimal != ".": + candidate = candidate.replace(decimal, ".") + + try: + return float(candidate) + except ValueError: + # The string is not a number — return it as-is. + return value diff --git a/src/firescipy/instruments/netzsch_cone.py b/src/firescipy/instruments/netzsch_cone.py new file mode 100644 index 0000000..8910ba2 --- /dev/null +++ b/src/firescipy/instruments/netzsch_cone.py @@ -0,0 +1,193 @@ +# This Source Code Form is subject to the terms of the Mozilla Public +# License, v. 2.0. If a copy of the MPL was not distributed with this +# file, You can obtain one at https://mozilla.org/MPL/2.0/. + +from io import StringIO + +import pandas as pd + +from .base import InstrumentFile +from .helpers import split_line, strip_empty_edges, try_convert_to_float + + +def read_netzsch_cone_file(file_path): + """ + Convenience wrapper around NetzschConeParser. + """ + parser = NetzschConeParser(file_path=file_path) + return parser.parse() + + +class NetzschConeParser: + """ + Parser for NETZSCH Cone Calorimeter CSV export files. + + Expected structure + ------------------ + - Row 0: "General information" label in column 0, measurement column names + starting from column 2 + - Row 1: measurement units starting from column 2 (columns 0-1 are empty) + - Rows 2+: metadata key in column 0, metadata value in column 1, + measurement values starting from column 2 + + Unlike the NetzschSTA and Deatak parsers, metadata and measurement data + are stored side by side in each row rather than in sequential blocks. + + Example + ------- + General information;;time (s);O2 (%);HRR/a;... + ;;;;;;kW/m²;... + Test;;0;20,952;0,0;... + Standard used;ISO 5660-1;1;20,954;0,0;... + Date of test;27.03.2025;2;20,953;0,0;... + ... + """ + + SEPARATOR = ";" + DECIMAL = "," + + # latin-1 is tried first because the Cone export encodes special characters + # (² and °) using byte sequences that are best handled at the latin-1 level. + _ENCODINGS = ["latin-1", "utf-8", "cp1252", "utf-16", "utf-16-le", "utf-16-be"] + + # The Cone export stores U+FFFD as a placeholder for ² and °. When decoded + # as latin-1, the three UTF-8 bytes \xef\xbf\xbd of U+FFFD appear as the + # three-character mojibake sequence �. The replacements below undo this: + # \xef\xbf\xbdC (�C) must be handled before \xef\xbf\xbd (�) alone so + # that °C and ² are recovered correctly. + _REPLACEMENTS = { + "\x9b": "°", + "\xef\xbf\xbdC": "°C", # �C → °C (degree-Celsius, e.g. in column headers) + "\xef\xbf\xbd": "²", # � → ² (superscript-2, e.g. kW/m²) + } + + # Metadata fields whose values should be converted to numbers. + # All other fields are kept as strings. + NUMERIC_METADATA_KEYS = { + "Heat flux (kW/m²)", + "Nominal duct flow rate (l/s)", + "Sampling interval (s)", + "Separation (mm)", + "E (MJ/kg)", + "Initial mass (g)", + "Thickness (mm)", + "Surface area (cm²)", + "C-factor (SI units)", + "OD correction factor", + "Duct diameter (m)", + "O2 delay time (s)", + "CO2 delay time (s)", + "CO delay time (s)", + "Test start time (s)", + "Time to ignition (s)", + "Time to flameout (s)", + "User EOT time (s)", + "Ambient temperature (°C)", + "Barometric pressure (Pa)", + "Relative humidity (%)", + } + + def __init__(self, file_path, instrument_file=None): + """ + Parameters + ---------- + file_path : str or Path + Path to the NETZSCH Cone export file. + instrument_file : InstrumentFile, optional + Pre-loaded InstrumentFile instance. If None, one is created. + """ + self.file_path = file_path + # Use an existing InstrumentFile if provided, otherwise create one with + # the Cone-specific encoding list and character replacements. + self.file = instrument_file or InstrumentFile( + file_path, + encodings=self._ENCODINGS, + replacements=self._REPLACEMENTS, + ).read() + + self.meta = dict() # will hold all metadata key-value pairs + self.units = dict() # will hold the unit string for each measurement column + self.data_df = None # will hold the measurement table as a DataFrame + + self._column_names = None # set during header parsing + + def parse(self): + """ + Parse metadata, units, and measurement table. + + Returns + ------- + meta : dict + Parsed metadata. + data_df : pandas.DataFrame + Parsed measurement table. + """ + self._parse_header_and_units() + self._parse_data_rows() + self._postprocess_metadata() + + # Record which encoding was used so the caller can inspect it. + self.meta["USED_ENCODING"] = self.file.used_encoding + return self.meta, self.data_df + + def _parse_header_and_units(self): + """ + Read column names from row 0 and units from row 1. + """ + header_cells = split_line(self.file.lines[0], sep=self.SEPARATOR) + unit_cells = split_line(self.file.lines[1], sep=self.SEPARATOR) + + # The first two columns of row 0 are "General information" and an empty + # label for the metadata value column — skip them to get measurement names. + col_names = strip_empty_edges(header_cells[2:]) + self._column_names = col_names + + # Pair each column name with its unit string (may be empty for some columns). + for name, unit in zip(col_names, unit_cells[2:]): + self.units[name] = unit + + def _parse_data_rows(self): + """ + Parse metadata from columns 0-1 and measurement values from columns 2+ + for each row starting at row 2. + """ + data_rows = [] + + for raw_line in self.file.lines[2:]: + cells = split_line(raw_line, sep=self.SEPARATOR) + + if not cells: + continue + + # Columns 0 and 1 contain metadata (key and value). + # Later rows may have empty metadata columns — those are skipped. + key = cells[0] if len(cells) > 0 else "" + value = cells[1] if len(cells) > 1 else "" + if key: + # Store None for fields that have a key but no value. + self.meta[key] = value or None + + # Columns 2 onwards contain the measurement data for this time step. + if len(cells) > 2: + n = len(self._column_names) + # Convert each cell to float using the locale decimal separator. + row = [ + try_convert_to_float(c, self.DECIMAL) + for c in cells[2:2 + n] + ] + data_rows.append(row) + + self.data_df = pd.DataFrame(data_rows, columns=self._column_names) + # Drop columns that are entirely empty (unused channels in the export). + self.data_df = self.data_df.dropna(axis=1, how="all") + + def _postprocess_metadata(self): + """ + Convert selected metadata values to numeric types where appropriate. + """ + # Only process the keys listed in NUMERIC_METADATA_KEYS. + for key in self.NUMERIC_METADATA_KEYS: + if key not in self.meta: + continue + # try_convert_to_float handles both single strings and lists. + self.meta[key] = try_convert_to_float(self.meta[key], self.DECIMAL) diff --git a/src/firescipy/instruments/netzsch_sta.py b/src/firescipy/instruments/netzsch_sta.py new file mode 100644 index 0000000..6d40ca8 --- /dev/null +++ b/src/firescipy/instruments/netzsch_sta.py @@ -0,0 +1,256 @@ +# This Source Code Form is subject to the terms of the Mozilla Public +# License, v. 2.0. If a copy of the MPL was not distributed with this +# file, You can obtain one at https://mozilla.org/MPL/2.0/. + +from io import StringIO + +import pandas as pd + +from .base import InstrumentFile +from .helpers import split_line, strip_empty_edges, row_has_numeric_content, try_convert_to_float + + +def read_netzsch_sta_file(file_path): + """ + Convenience wrapper around NetzschSTAParser. + """ + parser = NetzschSTAParser(file_path=file_path) + return parser.parse() + + +class NetzschSTAParser: + """ + Parser for NETZSCH STA/DSC/TGA ASCII export files. + + Expected structure + ------------------ + - metadata lines starting with '#' + - table header line starting with '##' + - data rows below the table header + + Example + ------- + #EXPORTTYPE: ;DATA ALL ;;; + #DECIMAL: ;COMMA ;;; + #SEPARATOR: ;SEMICOLON;;; + ... + ##Temp./°C;Time/min;DSC/(mW/mg);Mass/% + + 29,958;0;7,67E-02;99,99772 + ... + """ + + # Maps the keyword found in the DECIMAL metadata field to the actual character. + DECIMAL_MAP = { + "COMMA": ",", + "POINT": ".", + "DOT": ".", + } + + # Maps the keyword found in the SEPARATOR metadata field to the actual character. + SEPARATOR_MAP = { + "SEMICOLON": ";", + "COMMA": ",", + "TAB": "\t", + } + + # Metadata fields whose values should be converted to numbers. + # All other fields are kept as strings. + NUMERIC_METADATA_KEYS = { + "SAMPLE MASS /mg", + "REFERENCE MASS /mg", + "SAMPLE CRUCIBLE MASS /mg", + "REFERENCE CRUCIBLE MASS /mg", + } + + def __init__(self, file_path, instrument_file=None): + """ + Parameters + ---------- + file_path : str or Path + Path to the NETZSCH export file. + instrument_file : InstrumentFile, optional + Pre-loaded InstrumentFile instance. If None, one is created. + """ + self.file_path = file_path + # Use an existing InstrumentFile if provided, otherwise create one. + self.file = instrument_file or InstrumentFile(file_path).read() + + self.meta = dict() # will hold all metadata key-value pairs + self.data_df = None # will hold the measurement table as a DataFrame + + # These are set while scanning for the '##' column header line. + self.table_header_idx = None # line index of the '##' row + self.table_header_line = None # the '##' row content (without the '##' prefix) + + def parse(self): + """ + Parse metadata and data table. + + Returns + ------- + meta : dict + Parsed metadata. + data_df : pandas.DataFrame + Parsed measurement table. + """ + self._parse_metadata_and_find_table_header() + self._parse_data_table() + + # Record which encoding was used so the caller can inspect it. + self.meta["USED_ENCODING"] = self.file.used_encoding + + return self.meta, self.data_df + + def _parse_metadata_and_find_table_header(self): + """ + Parse metadata block and locate the table header line. + """ + for idx, raw_line in enumerate(self.file.lines): + line = raw_line.strip() + + # Skip blank lines. + if not line: + continue + + # A line starting with '##' is the column header — stop scanning metadata. + if line.startswith("##"): + self.table_header_idx = idx + # Strip the leading '##' to get the raw column name string. + self.table_header_line = line[2:].strip() + break + + # A line starting with a single '#' is a metadata line. + if line.startswith("#"): + parsed = self._parse_metadata_line(line) + if parsed is not None: + key, value = parsed + self.meta[key] = value + + # Convert numeric metadata fields from strings to numbers. + self._postprocess_metadata() + + if self.table_header_idx is None: + raise ValueError("Could not find table header line starting with '##'.") + + def _parse_metadata_line(self, line): + """ + Parse one metadata line of the form: + #KEY: ;VALUE ;;; + + Returns + ------- + tuple[str, str | list[str] | None] or None + """ + # Remove the leading '#' characters before processing. + cleaned = line.lstrip("#").strip() + + # Split on the column separator and remove trailing empty filler cells. + column_sep = self._get_column_separator() + cells = split_line(cleaned, sep=column_sep) + cells = strip_empty_edges(cells) + + if not cells: + return None + + first_cell = cells[0] + + # Only lines containing ":" are treated as metadata. + if ":" not in first_cell: + return None + + # Split "KEY: value" into key and its first value fragment. + key, first_value = first_cell.split(":", maxsplit=1) + key = key.strip() + first_value = first_value.strip() + + # Collect all non-empty value fragments (some fields span multiple cells). + values = [] + if first_value: + values.append(first_value) + + if len(cells) > 1: + values.extend(cell.strip() for cell in cells[1:] if cell.strip()) + + # Store as None, a single string, or a list depending on how many + # value fragments were found. + if len(values) == 0: + value = None + elif len(values) == 1: + value = values[0] + else: + value = values + + return key, value + + def _postprocess_metadata(self): + """ + Convert selected metadata values to numeric types where appropriate. + """ + decimal_sep = self._get_decimal_separator() + + # Only process the keys listed in NUMERIC_METADATA_KEYS. + for key in self.NUMERIC_METADATA_KEYS: + if key not in self.meta: + continue + + value = self.meta[key] + self.meta[key] = try_convert_to_float(value, decimal_sep) + + def _parse_data_table(self): + """ + Parse the measurement table below the table header. + """ + if self.table_header_idx is None or self.table_header_line is None: + raise RuntimeError("Table header information is missing.") + + decimal_sep = self._get_decimal_separator() + column_sep = self._get_column_separator() + column_names = self._get_column_names(column_sep) + + # Take all lines after the column header row and join them back into + # a single string so pandas can read it like a file. + table_lines = self.file.lines[self.table_header_idx + 1:] + table_text = "\n".join(table_lines) + + data_df = pd.read_csv( + StringIO(table_text), + sep=column_sep, + decimal=decimal_sep, + header=None, # column names are provided manually via 'names' + names=column_names, + engine="python", + skip_blank_lines=True, + ) + + # Clean up any stray '#' prefixes that some exports add to column names. + data_df.columns = [col.lstrip("#").strip() for col in data_df.columns] + # Drop columns that are entirely empty (padding artefact in some exports). + data_df = data_df.dropna(axis=1, how="all") + + self.data_df = data_df + + def _get_decimal_separator(self): + """ + Determine decimal separator from metadata. + """ + # Look for a "DECIMAL" entry in metadata; fall back to "POINT" (i.e. "."). + decimal_token = str(self.meta.get("DECIMAL", "POINT")).upper() + return self.DECIMAL_MAP.get(decimal_token, ".") + + def _get_column_separator(self): + """ + Determine column separator from metadata. + """ + # Look for a "SEPARATOR" entry in metadata; fall back to semicolon. + separator_token = str(self.meta.get("SEPARATOR", "SEMICOLON")).upper() + return self.SEPARATOR_MAP.get(separator_token, ";") + + def _get_column_names(self, column_sep): + """ + Parse and clean table column names. + """ + column_names = split_line(self.table_header_line, sep=column_sep) + # Remove any leading '#' characters from column names. + column_names = [name.lstrip("#").strip() for name in column_names] + return column_names diff --git a/src/firescipy/instruments/reader.py b/src/firescipy/instruments/reader.py new file mode 100644 index 0000000..42d8b62 --- /dev/null +++ b/src/firescipy/instruments/reader.py @@ -0,0 +1,111 @@ +# This Source Code Form is subject to the terms of the Mozilla Public +# License, v. 2.0. If a copy of the MPL was not distributed with this +# file, You can obtain one at https://mozilla.org/MPL/2.0/. + +# Import the individual instrument parsers from the same package. +# Each parser knows how to read one specific file format. +from .base import InstrumentFile +from .netzsch_sta import read_netzsch_sta_file +from .deatak_mcc import read_deatak_mcc_file +from .netzsch_cone import read_netzsch_cone_file + +# Central registry: maps a human-readable instrument name to its parser function. +# To add support for a new instrument, add a new entry here and provide +# a corresponding parser module in this package. +FILE_TYPE_PARSERS = { + "Netzsch STA": read_netzsch_sta_file, + "Deatak MCC": read_deatak_mcc_file, + "Netzsch Cone": read_netzsch_cone_file, +} + +# Convenience list of all supported instrument names, derived from the registry. +SUPPORTED_TYPES = list(FILE_TYPE_PARSERS.keys()) + + +def detect_file_type(file_path): + """ + Detect instrument file type by inspecting file content. + + Returns a key from FILE_TYPE_PARSERS, or None if the type is unknown. + """ + # Read the file using the generic loader (handles encoding automatically). + f = InstrumentFile(file_path).read() + + # An empty file cannot be identified. + if not f.lines: + return None + + # Look at the very first line — each format has a unique marker there. + first_line = f.lines[0].strip() + + # Netzsch STA / DSC / TGA exports always start with "#EXPORTTYPE". + if first_line.startswith("#EXPORTTYPE"): + return "Netzsch STA" + + # Netzsch Cone Calorimeter exports always start with "General information". + if first_line.startswith("General information"): + return "Netzsch Cone" + + # DEATAK MCC exports do not have a fixed first line, but always contain + # a line with only "@" that separates metadata from measurement data. + for line in f.lines: + if line.strip() == "@": + return "Deatak MCC" + + # None of the known markers were found — file type is unknown. + return None + + +def read_instrument_file(file_path, file_type=None): + """ + Load an instrument export file, optionally with a manually specified type. + + Auto-detection is attempted when file_type is None. If detection fails and + no file_type is provided, a ValueError is raised with the list of supported + types. + + Parameters + ---------- + file_path : str or Path + Path to the instrument export file. + file_type : str, optional + Force a specific parser. Must be one of SUPPORTED_TYPES. + If None, the file type is detected automatically. + + Returns + ------- + file_type : str + The detected or provided file type. + meta : dict + Parsed metadata. + df : pandas.DataFrame + Parsed measurement table. + + Raises + ------ + ValueError + If the file type cannot be detected and no file_type is given, + or if the provided file_type is not in SUPPORTED_TYPES. + """ + # If no file type was provided by the caller, try to detect it automatically. + if file_type is None: + file_type = detect_file_type(file_path) + + # If detection also failed, stop and tell the user what went wrong. + if file_type is None: + raise ValueError( + f"Could not detect instrument type for '{file_path}'. " + f"Specify manually via file_type. Supported: {SUPPORTED_TYPES}" + ) + + # Guard against a typo or unsupported type passed in manually. + if file_type not in FILE_TYPE_PARSERS: + raise ValueError( + f"Unknown file_type '{file_type}'. Supported: {SUPPORTED_TYPES}" + ) + + # Look up the correct parser from the registry and run it. + meta, df = FILE_TYPE_PARSERS[file_type](file_path) + + # Return the file type alongside the data so the caller knows what was loaded. + return file_type, meta, df diff --git a/src/firescipy/pyrolysis/__init__.py b/src/firescipy/pyrolysis/__init__.py index adf28b8..fb1144c 100644 --- a/src/firescipy/pyrolysis/__init__.py +++ b/src/firescipy/pyrolysis/__init__.py @@ -3,6 +3,6 @@ # file, You can obtain one at https://mozilla.org/MPL/2.0/. -from .kinetics import initialize_investigation_skeleton, add_isothermal_tga, add_constant_heating_rate_tga, combine_repetitions, differential_conversion, integral_conversion, compute_conversion, compute_conversion_levels, KAS_Ea, compute_Ea_KAS +from .kinetics import initialize_investigation_skeleton, add_isothermal_tga, add_constant_heating_rate_tga, combine_repetitions, differential_conversion, integral_conversion, compute_conversion, compute_conversion_levels, Ea_KAS, compute_Ea_KAS, Ea_Friedman, compute_Ea_Friedman from .modeling import create_linear_temp_program, reaction_rate, solve_kinetics, get_reaction_model diff --git a/src/firescipy/pyrolysis/kinetics.py b/src/firescipy/pyrolysis/kinetics.py index e765aa9..34030a2 100644 --- a/src/firescipy/pyrolysis/kinetics.py +++ b/src/firescipy/pyrolysis/kinetics.py @@ -445,8 +445,11 @@ def compute_conversion(database, condition="all", setup="constant_heating_rate") Returns ------- None - Updates the database in place by adding conversion data under - each condition. + Updates the database in place by adding a ``conversion`` DataFrame + under each condition. The DataFrame contains ``Time``, + ``Temperature_Avg``, the signal column, and ``Alpha``. + For differential data, ``dAlpha_dt`` is also included, computed + directly from the raw signal without numerical differentiation. """ # Validate setup @@ -488,21 +491,36 @@ def process_condition(cond): combined_data = database["experiments"]["TGA"][setup][cond]["combined"] time = combined_data["Time"] temp_avg = combined_data["Temperature_Avg"] - mass_avg = combined_data["Mass_Avg"] + # Get signal column name dynamically from general_info + signal_name = database["general_info"].get("signal", {}).get("name", "Signal") + signal_col = f"{signal_name}_Avg" + signal_avg = combined_data[signal_col] # Compute conversion based on data type if data_type == "integral": - alpha = integral_conversion(mass_avg) # Optionally pass m_0 and m_f here + alpha = integral_conversion(signal_avg) # Optionally pass m_0 and m_f here + + # Store the conversion data back in the database + conversion_data = pd.DataFrame({ + "Time": time, + "Temperature_Avg": temp_avg, + signal_col: signal_avg, + "Alpha": alpha}) + elif data_type == "differential": - alpha = differential_conversion(mass_avg) # Placeholder for differential logic - - # Store the conversion data back in the database - conversion_data = pd.DataFrame({ - "Time": time, - "Temperature_Avg": temp_avg, - "Mass_Avg": mass_avg, - "Alpha": alpha}) + alpha = differential_conversion(time, signal_avg) + Delta_H_total = trapezoid(signal_avg, time) # needed for exact dα/dt + dAlpha_dt = signal_avg / Delta_H_total + + # Store the conversion data back in the database + conversion_data = pd.DataFrame({ + "Time": time, + "Temperature_Avg": temp_avg, + signal_col: signal_avg, + "Alpha": alpha, + "dAlpha_dt": dAlpha_dt}) + database["experiments"]["TGA"][setup][cond]["conversion"] = conversion_data # Process each condition @@ -541,8 +559,10 @@ def compute_conversion_levels(database, desired_levels=None, setup="constant_hea Returns ------- None - Updates the database in place by adding the interpolated - conversion levels under each condition and setup. + Updates the database in place by adding a ``conversion_fractions`` + DataFrame under each condition. The DataFrame contains ``Time``, + ``Temperature_Avg``, and ``Alpha`` at each desired conversion level. + For differential data, ``dAlpha_dt`` is also interpolated and included. """ # Validate setup @@ -579,20 +599,10 @@ def compute_conversion_levels(database, desired_levels=None, setup="constant_hea # Helper function to process a single condition def process_condition(cond): - # # Check if combined data exists - # if "combined" not in database["experiments"]["TGA"][setup][cond]: - # raise KeyError(f" * No 'combined' data found for condition '{cond}' under '{setup}' setup.") - -# # Check for data type -# data_type = database["experiments"]["TGA"][setup][cond].get("data_type") -# if data_type not in {"integral", "differential"}: -# raise ValueError(f" * Invalid or missing data type for condition '{cond}' under '{setup}' setup.") - # Check if conversion data exists if "conversion" not in database["experiments"]["TGA"][setup][cond]: raise KeyError(f" * Conversion data is missing for condition '{cond}'. Run `compute_conversion` first.") - # Fetch conversion data conversion_data = database["experiments"]["TGA"][setup][cond]["conversion"] time = conversion_data["Time"] @@ -609,19 +619,218 @@ def process_condition(cond): new_time = np.interp(desired_levels, alpha_avg, time) new_temp = np.interp(desired_levels, alpha_avg, temp_avg) - # Store the conversion levels back in the database - conversion_fractions = pd.DataFrame({ + # dAlpha_dt is only present for differential data and needed for the Friedman method + data_type = database["experiments"]["TGA"][setup][cond].get("data_type") + conversion_fractions_data = { "Time": new_time, "Temperature_Avg": new_temp, - "Alpha": desired_levels}) - database["experiments"]["TGA"][setup][cond]["conversion_fractions"] = conversion_fractions + "Alpha": desired_levels} + if data_type == "differential": + conversion_fractions_data["dAlpha_dt"] = np.interp(desired_levels, alpha_avg, conversion_data["dAlpha_dt"]) + database["experiments"]["TGA"][setup][cond]["conversion_fractions"] = pd.DataFrame(conversion_fractions_data) # Process each condition for cond in conditions_to_process: process_condition(cond) -def KAS_Ea(temperature, heating_rate, B=1.92, C=1.0008): +def Ea_Friedman(temperature, dalpha_dt): + """ + Friedman method for estimating the activation energy at a given + conversion level :math:`E_{\\alpha}`. + + This is a differential isoconversional method based on a linear + regression of :math:`\\ln(d\\alpha/dt)` against :math:`1/T` across + experiments conducted under different temperature programs. + + The Friedman equation is presented below. + + .. math:: + + \ln\left(\frac{d\alpha}{dt}\right)_{\alpha,i} + = \ln\left[f(\alpha)A_\alpha\right] - \frac{E_\alpha}{R T_{\alpha,i}} + + Formula 3.3 in: Vyazovkin et al. (2011). ICTAC Kinetics Committee + recommendations for performing kinetic computations on thermal analysis data + *Thermochimica Acta*, 520(1–2), 1–19. + https://doi.org/10.1016/j.tca.2011.03.034 + + Parameters + ---------- + temperature : array-like + Sample temperatures in Kelvin. + dalpha_dt : array-like + Change in conversion (dalpha/dt) in 1 per second. + + Returns + ------- + tuple + A tuple containing: + + - slope (float): Slope of the linear fit. + - intercept (float): Intercept of the linear fit. + - Ea_i (float): Apparent activation energy in J/mol. + - fit_points (tuple): Data points (x, y) used for the linear fit. + """ + + # Ensure numpy arrays + temperature = series_to_numpy(temperature) + dalpha_dt = series_to_numpy(dalpha_dt) + + # Input validation + if len(temperature) != len(dalpha_dt): + raise ValueError("temperature and dalpha_dt must have the same length.") + + # Prepare x and y data for the linear fit + data_x = 1/temperature + data_y = np.log(dalpha_dt) + fit_points = (data_x, data_y) + + # Perform the linear fit + popt, pcov = curve_fit(linear_model, + data_x, data_y, + maxfev=10000) + + # Extract the fitted parameters + m_fit, b_fit = popt + + # Calculate estimate of (Ea_i), in J/mol. + Ea_i = -(m_fit * GAS_CONSTANT) + + # TODO: consider to use namedtuple or dataclass + return popt, Ea_i, fit_points + + +def compute_Ea_Friedman(database, data_keys=["experiments", "TGA", "constant_heating_rate"]): + """ + Wrapper function to easily compute activation energies using the Friedman differential isoconversional method. + + This function applies the Friedman method to interpolated conversion data + (as prepared by :func:`compute_conversion_levels`) and estimates the + activation energy (:math:`E_a`) at each conversion level. It computes + linear regression statistics, including RMSE and R², and stores the results + in a DataFrame. + + The final DataFrame is stored in the database at the specified location + defined by `data_keys`. + + This function requires that :func:`compute_conversion_levels` has been + called beforehand to prepare the input data. + + Parameters + ---------- + database : dict + The main data structure storing all experimental data. + Must follow the format initialized by + :func:`initialize_investigation_skeleton`. + data_keys: list + Keys that define the path to the relevant dataset inside the database. + For example: ["experiments", "TGA", "constant_heating_rate"]. + + Returns + ------- + None + Adds a DataFrame named ``Ea_results_Friedman`` to the corresponding location + in the database. The DataFrame contains: + + - ``Conversion``: Conversion level α + - ``Ea``: Activation energy in J/mol + - ``m_fit``: Slope of the linear fit + - ``b_fit``: Intercept of the linear fit + - ``R_squared``: Coefficient of determination + - ``RMSE``: Root mean square error + - ``x{i}``, ``y{i}``: x and y fit points per temperature program i + """ + + # Safely access the dataset + dataset = get_nested_value(database, data_keys) + if dataset is None: + raise ValueError(f"Dataset not found at the specified keys: {data_keys}") + + # Check that all conditions use differential data, required for the Friedman method + for key in dataset.keys(): + if dataset[key].get("data_type") == "integral": + raise ValueError( + f"Condition '{key}' has integral raw data. The Friedman method requires " + "differential raw data. Differentiate and smooth your data manually, then " + "add it as a new entry with data_type='differential'.") + + # Safely access the parent dictionary to store results + store_Ea = get_nested_value(database, data_keys[:-1]) + if store_Ea is None: + raise ValueError(f"Unable to store results; parent keys not found: {data_keys[:-1]}") + + # Sort the keys of the dataset based on the set values + set_value_keys = sorted(dataset.keys(), key=lambda x: dataset[x]["set_value"]["value"]) + + # Get conversion levels from the first condition (same for all) + conversion_levels = dataset[set_value_keys[0]]["conversion_fractions"]["Alpha"] + + # Prepare data collection. + Ea = list() + m = list() + b = list() + r_squared = list() + rmse = list() + + # Prepare placeholders for x and y values + xy_data = {f"x{i+1}": [] for i in range(len(set_value_keys))} + xy_data.update({f"y{i+1}": [] for i in range(len(set_value_keys))}) + + # Iterate through conversion levels and compute results + for conv_id, conversion_level in enumerate(conversion_levels): + conversion_temperatures = list() + conversion_dAlpha_dts = list() + for set_value_key in set_value_keys: + conv_temp = dataset[set_value_key]["conversion_fractions"]["Temperature_Avg"].iloc[conv_id] + conv_dAlpha_dt = dataset[set_value_key]["conversion_fractions"]["dAlpha_dt"].iloc[conv_id] + conversion_temperatures.append(conv_temp) + conversion_dAlpha_dts.append(conv_dAlpha_dt) + + # Compute activation energy + popt, Ea_i, fit_points = Ea_Friedman(conversion_temperatures, conversion_dAlpha_dts) + Ea.append(Ea_i) + + # Extract and store the fitted parameters. + m_fit, b_fit = popt + m.append(m_fit) + b.append(b_fit) + + # Generate y-values from the fitted model. + data_x, data_y = fit_points + y_fit = linear_model(data_x, m_fit, b_fit) + + # Calculate residuals. + residuals_i = calculate_residuals(data_y, y_fit) + + # Calculate R-squared. + r_squared_i = calculate_R_squared(residuals_i, data_y) + r_squared.append(r_squared_i) + + # Calculate RMSE. + rmse_i = calculate_RMSE(residuals_i) + rmse.append(rmse_i) + + # Store x and y values dynamically + for i, (x_val, y_val) in enumerate(zip(data_x, data_y)): + xy_data[f"x{i+1}"].append(x_val) + xy_data[f"y{i+1}"].append(y_val) + + # Combine results + Ea_results = pd.DataFrame( + {"Conversion": conversion_levels, + "Ea": np.array(Ea), + "m_fit": np.array(m), + "b_fit": np.array(b), + "R_squared": np.array(r_squared), + "RMSE": np.array(rmse), + **xy_data}) + + # Store results. + store_Ea["Ea_results_Friedman"] = Ea_results + + +def Ea_KAS(temperature, heating_rate, B=1.92, C=1.0008): """ Kissinger–Akahira–Sunose method (KAS), with Starink improvement by default. Estimates the activation energy for a given level of conversion @@ -725,7 +934,7 @@ def compute_Ea_KAS(database, data_keys=["experiments", "TGA", "constant_heating_ Keys that define the path to the relevant dataset inside the database. For example: ["experiments", "TGA", "constant_heating_rate"]. **kwargs - Additional arguments passed to :func:`KAS_Ea`. + Additional arguments passed to :func:`Ea_KAS`. For example, to override the default Starink constants (B, C). Returns @@ -781,7 +990,7 @@ def compute_Ea_KAS(database, data_keys=["experiments", "TGA", "constant_heating_ conversion_temperatures.append(conv_temp) # Compute activation energy - popt, Ea_i, fit_points = KAS_Ea(conversion_temperatures, set_values, **kwargs) + popt, Ea_i, fit_points = Ea_KAS(conversion_temperatures, set_values, **kwargs) Ea.append(Ea_i) # Extract and store the fitted parameters. diff --git a/src/firescipy/utils.py b/src/firescipy/utils.py index c21d555..ce7dda5 100644 --- a/src/firescipy/utils.py +++ b/src/firescipy/utils.py @@ -2,6 +2,10 @@ # License, v. 2.0. If a copy of the MPL was not distributed with this # file, You can obtain one at https://mozilla.org/MPL/2.0/. +import os +import subprocess +import warnings + import numpy as np import pandas as pd @@ -325,3 +329,329 @@ def dynamic_local_change_simplification(time, temperature, basis_tolerance=0.2): retained_indices.append(len(temperature) - 1) return np.array(retained_indices) + + +def get_git_commit_hash(path_to_git_repo): + """ + Retrieve the short and long commit hashes of the current HEAD in a local Git repository. + + This function runs ``git rev-parse`` in the specified repository directory and + returns both the short and full (long) forms of the current HEAD commit hash. + Note that uncommitted changes in the working tree are not reflected in these hashes. + + Parameters + ---------- + path_to_git_repo : str or os.PathLike + Path to the local clone of a Git repository. + + Returns + ------- + rev_short : str + Short form of the current HEAD commit hash (e.g., ``a1b2c3d``). + rev_long : str + Full (long) form of the current HEAD commit hash (e.g., ``a1b2c3d4e5f6...``). + + Raises + ------ + RuntimeError + If the path is not a Git repository, Git is not installed, or HEAD + cannot be resolved. + """ + + try: + rev_short = subprocess.check_output( + ["git", "rev-parse", "--short", "HEAD"], + cwd=path_to_git_repo + ).strip().decode() + + rev_long = subprocess.check_output( + ["git", "rev-parse", "HEAD"], + cwd=path_to_git_repo + ).strip().decode() + + except FileNotFoundError as e: + raise RuntimeError( + "Git does not seem to be installed or is not available on the PATH." + ) from e + + except subprocess.CalledProcessError as e: + raise RuntimeError( + f"Path '{path_to_git_repo}' is not a valid Git repository or HEAD is not defined." + ) from e + + return rev_short, rev_long + + +def get_window_points(phi, delta_phi): + """ + Compute the number of data points required to cover a given window width. + + Useful for determining the window size for smoothing operations where the + desired window is specified in physical units (e.g. temperature in K or + time in s) rather than in number of points. + + The result is always an odd number (required by most smoothing filters) + and at least 5. + + Parameters + ---------- + phi : array-like + The independent variable series (e.g. temperature or time). + delta_phi : float + Desired window width in the same units as phi. + + Returns + ------- + int + Odd number of data points covering the requested window width. + """ + + # Estimate the typical spacing between consecutive data points. + d_phi = np.median(np.diff(phi)) + + # Convert the desired window width to a number of points. + win_pts = int(round(delta_phi / d_phi)) + win_pts = max(win_pts, 5) + + # Smoothing filters (e.g. Savitzky-Golay) require an odd window size. + if win_pts % 2 == 0: + win_pts += 1 + + return win_pts + + +def get_peak_value(x_values, y_values): + """ + Find the peak (maximum) of a data series and return its coordinates and index. + + Parameters + ---------- + x_values : array-like or pd.Series + The independent variable (e.g. time or temperature). + y_values : array-like or pd.Series + The dependent variable whose maximum is sought. + + Returns + ------- + x_peak : float + The x-value at the peak. + y_peak : float + The y-value at the peak. + peak_idx : int + The index of the peak in the input arrays. + """ + + x_values = series_to_numpy(x_values) + y_values = series_to_numpy(y_values) + + peak_idx = np.argmax(y_values) + + x_peak = x_values[peak_idx] + y_peak = y_values[peak_idx] + + return x_peak, y_peak, peak_idx + + +def cubic_max_exact(coeffs, x1, x2): + """ + Find the exact maximum of a cubic polynomial on the interval [x1, x2]. + + Evaluates the polynomial at its critical points (where the derivative is + zero) and at both endpoints, then returns the location and value of the + global maximum on that interval. + + Parameters + ---------- + coeffs : array-like + Polynomial coefficients in descending order, as returned by + ``numpy.polyfit`` with degree 3. + x1 : float + Left boundary of the search interval. + x2 : float + Right boundary of the search interval. + + Returns + ------- + x_max : float + The x-value at which the polynomial reaches its maximum. + y_max : float + The maximum value of the polynomial on [x1, x2]. + """ + + p = np.poly1d(coeffs) + dp = np.polyder(coeffs) # first derivative (quadratic) + + # Critical points are where the first derivative is zero. + crit_x = np.roots(dp) + crit_x = crit_x[np.isreal(crit_x)].real + + # Keep only critical points strictly inside the interval. + crit_x = crit_x[(crit_x > x1) & (crit_x < x2)] + + # Evaluate at critical points and both endpoints, return the maximum. + xs = np.array([x1, x2, *crit_x]) + ys = p(xs) + + i = np.argmax(ys) + return xs[i], ys[i] + + +def interpolate_experiment_data(time, temp, signal_1, signal_2=None, + T_step=0.5, mode="temperature_grid", + nominal_heating_rate=None, non_monotonic="raise"): + """ + Interpolate experimental data to achieve a uniform temperature or time spacing. + + Accepts up to two signal columns, for example for STA data + such as mass and heat flow. + + Two processing modes are available: + + 1) ``"temperature_grid"``: + A temperature grid with spacing ``T_step`` is created and used + for interpolation. This assumes strictly increasing temperature. + + 2) ``"nominal_rate"``: + Time is treated as the independent variable and a time grid is + built from the nominal heating rate and the desired temperature + step size ``T_step``. + + Parameters + ---------- + time : array-like or pd.Series + Time series from the experimental data. + temp : array-like or pd.Series + Temperature series from the experimental data. + signal_1 : array-like or pd.Series + First signal from the experimental data, e.g. mass (TGA). + signal_2 : array-like or pd.Series, optional + Second signal from the experimental data, e.g. heat flow (STA). + T_step : float + Desired temperature step size in Kelvin, e.g. 0.5 K. + mode : str + Interpolation mode: ``"nominal_rate"`` or ``"temperature_grid"``. + nominal_heating_rate : float, optional + Nominal heating rate in K/min. Required when ``mode="nominal_rate"``. + non_monotonic : str + How to handle non-monotonic temperature data in ``"temperature_grid"`` + mode. Options are ``"raise"``, ``"warn"``, or ``"ignore"``. + + Returns + ------- + tuple + ``(new_time, new_temp, new_signal_1)`` or + ``(new_time, new_temp, new_signal_1, new_signal_2)`` if signal_2 + is provided. + + Raises + ------ + ValueError + If ``T_step`` is not positive, input arrays have inconsistent lengths, + ``non_monotonic`` receives an unknown value, or the mode is unknown. + """ + + time = series_to_numpy(time) + temp = series_to_numpy(temp) + signal_1 = series_to_numpy(signal_1) + + if signal_2 is not None: + signal_2 = series_to_numpy(signal_2) + + if T_step <= 0: + raise ValueError("T_step must be greater than zero.") + + if not (len(time) == len(temp) == len(signal_1)): + raise ValueError("time, temp, and signal_1 must have the same length.") + + if signal_2 is not None and len(signal_2) != len(time): + raise ValueError("signal_2 must have the same length as time and temp.") + + if mode == "nominal_rate": + + if not np.all(np.diff(time) > 0): + raise ValueError( + "Time data is not strictly monotonically increasing. " + "Time-based interpolation may be unreliable." + ) + + # Convert heating rate from K/min to K/s to match time in seconds. + beta = nominal_heating_rate / 60 + t_step = T_step / beta + + t_start = time[0] + t_end = time[-1] + new_time = np.arange(t_start, t_end, t_step) + + new_temp = np.interp(new_time, time, temp) + new_signal_1 = np.interp(new_time, time, signal_1) + + if signal_2 is not None: + new_signal_2 = np.interp(new_time, time, signal_2) + return new_time, new_temp, new_signal_1, new_signal_2 + + return new_time, new_temp, new_signal_1 + + elif mode == "temperature_grid": + + if not np.all(np.diff(temp) > 0): + msg = ( + "Temperature data is not strictly monotonically increasing. " + "Temperature-based interpolation may be unreliable." + ) + if non_monotonic == "raise": + raise ValueError(msg) + elif non_monotonic == "warn": + warnings.warn(msg, stacklevel=2) + elif non_monotonic == "ignore": + pass + else: + raise ValueError(f"Unknown option for non_monotonic: {non_monotonic!r}") + + T_start = temp[0] + T_end = temp[-1] + new_temp = np.arange(T_start, T_end, T_step) + + new_time = np.interp(new_temp, temp, time) + new_signal_1 = np.interp(new_temp, temp, signal_1) + + if signal_2 is not None: + new_signal_2 = np.interp(new_temp, temp, signal_2) + return new_time, new_temp, new_signal_1, new_signal_2 + + return new_time, new_temp, new_signal_1 + + else: + raise ValueError(f"Mode {mode!r} is unknown. Use 'nominal_rate' or 'temperature_grid'.") + + +def format_export_df(data_frame, column_decimals): + """ + Create a formatted export copy of a pandas DataFrame. + + Each specified column is converted to a fixed-decimal string + representation, which ensures consistent formatting when writing + results to CSV or Markdown tables. + + Parameters + ---------- + data_frame : pd.DataFrame + The DataFrame to format. It is not modified in place. + column_decimals : dict + Mapping of column name to the number of decimal places, e.g. + ``{"Temperature (K)": 2, "HRR (kW/m²)": 1}``. + + Returns + ------- + pd.DataFrame + A copy of ``data_frame`` with the specified columns formatted + as strings. + """ + + export_df = data_frame.copy() + for col, ndigits in column_decimals.items(): + if col in export_df.columns: + export_df[col] = export_df[col].map( + lambda x, n=ndigits: f"{x:.{n}f}" if pd.notna(x) else "" + ) + + return export_df diff --git a/tests/fixtures/Deatak_MCC.txt b/tests/fixtures/Deatak_MCC.txt new file mode 100644 index 0000000..1a22b0e --- /dev/null +++ b/tests/fixtures/Deatak_MCC.txt @@ -0,0 +1,15 @@ +File Name: Deatak_MCC.txt +Version: 8.3.7.3 +Sample ID: Wood +Sample Mass (mg): 2.0000 +Heating Rate (C/s): 1.0000 +T Correction Coefficients: 0.308738730 0.013615500 0.000000000 +Time Shift (s): 14.0 +Pre-Test Comments: +@ +Time (s) Temperature (C) HRR (W/g) Heating Rate (C/s) Combustor Temp (C) +0.000 88.247 0.202 -0.054 900.419 +0.501 88.201 -0.605 -0.076 900.440 +1.001 88.185 0.025 -0.062 900.422 +1.501 88.159 -1.972 -0.042 900.418 +2.001 88.125 -3.749 -0.061 900.445 diff --git a/tests/fixtures/Netzsch_Cone.csv b/tests/fixtures/Netzsch_Cone.csv new file mode 100644 index 0000000..c9f3743 --- /dev/null +++ b/tests/fixtures/Netzsch_Cone.csv @@ -0,0 +1,8 @@ +General information;;time (s);HRR/a;Mass +;;;kW/m�;g +Standard used;ISO 5660-1;0;0,0;74,22 +Heat flux (kW/m�);75;1;0,5;74,20 +Initial mass (g);74,22;2;1,2;74,18 +Time to ignition (s);13;3;15,4;73,90 +Ambient temperature (�C);25,6;4;42,1;73,50 +Non-scrubbed?;;5;38,7;73,10 diff --git a/tests/fixtures/Netzsch_STA.csv b/tests/fixtures/Netzsch_STA.csv new file mode 100644 index 0000000..0f2ba5d --- /dev/null +++ b/tests/fixtures/Netzsch_STA.csv @@ -0,0 +1,21 @@ +#EXPORTTYPE: ;DATA ALL ;;; +#FORMAT: ;NETZSCH5 ;;; +#FTYPE: ;ASCII ;;; +#DECIMAL: ;COMMA ;;; +#SEPARATOR: ;SEMICOLON ;;; +#INSTRUMENT: ;NETZSCH STA 449F5 ;;; +#REMARK: ; ;;; +#SAMPLE MASS /mg: ;5,28;;; +#REFERENCE MASS /mg: ;0;;; +#DSC RANGE /�V: ;5000;;; +#EXO: ;-1;;; +#RANGE: ;25�C/5,0(K/min)/600�C ;;; +#SEGMENT: ;S1/1 ;;; +#SEG. 1: ;25�C/5,0(K/min)/600�C ;;; +;;;; +##Temp./�C;Time/min;DSC/(mW/mg);Mass/%;Sensit./(uV/mW) +29,958;0;7,67E-02;99,99772;1,06173 +30,458;1,57633;8,44E-02;99,22316;1,06235 +30,958;1,92678;9,49E-02;99,33139;1,06296 +31,458;2,18789;0,10395;99,38054;1,06358 +31,958;2,40647;0,11035;99,31273;1,06419 diff --git a/tests/test_instruments.py b/tests/test_instruments.py new file mode 100644 index 0000000..caed831 --- /dev/null +++ b/tests/test_instruments.py @@ -0,0 +1,181 @@ +# This Source Code Form is subject to the terms of the Mozilla Public +# License, v. 2.0. If a copy of the MPL was not distributed with this +# file, You can obtain one at https://mozilla.org/MPL/2.0/. + +# The 'fixtures/' folder contains small, purpose-built example files for each +# supported instrument format. They are kept minimal (a few metadata lines and +# ~5 data rows) but cover all relevant cases: string, numeric, list, and empty +# metadata fields, as well as the special characters (² and °C) that some +# instruments encode in a non-standard way. +# These files serve as stable, known inputs so that tests can verify the parser +# output against expected values without depending on real measurement data. + +from pathlib import Path + +import pytest + +from firescipy.instruments import ( + detect_file_type, + read_instrument_file, + read_deatak_mcc_file, + read_netzsch_sta_file, + read_netzsch_cone_file, + SUPPORTED_TYPES, +) + +FIXTURES = Path(__file__).parent / "fixtures" + + +# --------------------------------------------------------------------------- +# Supported types registry +# --------------------------------------------------------------------------- + +def test_supported_types_contains_all_instruments(): + assert "Netzsch STA" in SUPPORTED_TYPES + assert "Deatak MCC" in SUPPORTED_TYPES + assert "Netzsch Cone" in SUPPORTED_TYPES + + +# --------------------------------------------------------------------------- +# Auto-detection +# --------------------------------------------------------------------------- + +def test_detect_netzsch_sta(): + assert detect_file_type(FIXTURES / "Netzsch_STA.csv") == "Netzsch STA" + + +def test_detect_deatak_mcc(): + assert detect_file_type(FIXTURES / "Deatak_MCC.txt") == "Deatak MCC" + + +def test_detect_netzsch_cone(): + assert detect_file_type(FIXTURES / "Netzsch_Cone.csv") == "Netzsch Cone" + + +# --------------------------------------------------------------------------- +# Generic reader — auto-detection path +# --------------------------------------------------------------------------- + +def test_read_instrument_file_returns_correct_type(): + for fname, expected_type in [ + ("Netzsch_STA.csv", "Netzsch STA"), + ("Deatak_MCC.txt", "Deatak MCC"), + ("Netzsch_Cone.csv", "Netzsch Cone"), + ]: + file_type, _, _ = read_instrument_file(FIXTURES / fname) + assert file_type == expected_type + + +def test_read_instrument_file_manual_override(): + # Explicitly specifying the type should work even when detection would also succeed. + file_type, meta, df = read_instrument_file( + FIXTURES / "Deatak_MCC.txt", file_type="Deatak MCC" + ) + assert file_type == "Deatak MCC" + assert df is not None + + +def test_read_instrument_file_unknown_type_raises(): + with pytest.raises(ValueError, match="Unknown file_type"): + read_instrument_file(FIXTURES / "Deatak_MCC.txt", file_type="Unknown Device") + + +# --------------------------------------------------------------------------- +# Deatak MCC parser +# --------------------------------------------------------------------------- + +class TestDeatakMCC: + def setup_method(self): + self.meta, self.df = read_deatak_mcc_file(FIXTURES / "Deatak_MCC.txt") + + def test_dataframe_shape(self): + assert self.df.shape == (5, 5) + + def test_dataframe_columns(self): + assert "Time (s)" in self.df.columns + assert "Temperature (C)" in self.df.columns + assert "HRR (W/g)" in self.df.columns + + def test_numeric_metadata(self): + # Scalar float fields + assert self.meta["Sample Mass (mg)"] == pytest.approx(2.0) + assert self.meta["Heating Rate (C/s)"] == pytest.approx(1.0) + assert self.meta["Time Shift (s)"] == pytest.approx(14.0) + + def test_list_metadata(self): + # T Correction Coefficients must be a list of floats, not strings. + coeffs = self.meta["T Correction Coefficients"] + assert isinstance(coeffs, list) + assert all(isinstance(v, float) for v in coeffs) + assert coeffs == pytest.approx([0.30873873, 0.0136155, 0.0]) + + def test_none_metadata(self): + assert self.meta["Pre-Test Comments"] is None + + def test_string_metadata(self): + assert self.meta["Sample ID"] == "Wood" + + def test_used_encoding_present(self): + assert "USED_ENCODING" in self.meta + + +# --------------------------------------------------------------------------- +# Netzsch STA parser +# --------------------------------------------------------------------------- + +class TestNetzschSTA: + def setup_method(self): + self.meta, self.df = read_netzsch_sta_file(FIXTURES / "Netzsch_STA.csv") + + def test_dataframe_shape(self): + assert self.df.shape == (5, 5) + + def test_numeric_metadata(self): + assert self.meta["SAMPLE MASS /mg"] == pytest.approx(5.28) + assert self.meta["REFERENCE MASS /mg"] == pytest.approx(0.0) + + def test_none_metadata(self): + assert self.meta["REMARK"] is None + + def test_decimal_and_separator_detected(self): + assert self.meta["DECIMAL"] == "COMMA" + assert self.meta["SEPARATOR"] == "SEMICOLON" + + def test_data_uses_correct_decimal(self): + # Values were stored with comma as decimal — must be parsed as floats. + assert self.df.dtypes["Temp./°C"] == "float64" + + def test_used_encoding_present(self): + assert "USED_ENCODING" in self.meta + + +# --------------------------------------------------------------------------- +# Netzsch Cone parser +# --------------------------------------------------------------------------- + +class TestNetzschCone: + def setup_method(self): + self.meta, self.df = read_netzsch_cone_file(FIXTURES / "Netzsch_Cone.csv") + + def test_dataframe_shape(self): + assert self.df.shape == (6, 3) + + def test_dataframe_columns(self): + assert list(self.df.columns) == ["time (s)", "HRR/a", "Mass"] + + def test_numeric_metadata_with_special_chars(self): + # These keys contain ² and °C — tests that special characters are + # correctly recovered from the file encoding. + assert self.meta["Heat flux (kW/m²)"] == pytest.approx(75.0) + assert self.meta["Ambient temperature (°C)"] == pytest.approx(25.6) + assert self.meta["Initial mass (g)"] == pytest.approx(74.22) + assert self.meta["Time to ignition (s)"] == pytest.approx(13.0) + + def test_none_metadata(self): + assert self.meta["Non-scrubbed?"] is None + + def test_string_metadata(self): + assert self.meta["Standard used"] == "ISO 5660-1" + + def test_encoding_is_latin1(self): + assert self.meta["USED_ENCODING"] == "latin-1" diff --git a/tests/test_kinetics.py b/tests/test_kinetics.py new file mode 100644 index 0000000..7956bc7 --- /dev/null +++ b/tests/test_kinetics.py @@ -0,0 +1,123 @@ +# This Source Code Form is subject to the terms of the Mozilla Public +# License, v. 2.0. If a copy of the MPL was not distributed with this +# file, You can obtain one at https://mozilla.org/MPL/2.0/. + +# Round-trip tests for isoconversional kinetics: synthetic data is generated +# with known kinetic parameters via solve_kinetics, fed through the analysis +# pipeline, and the recovered Ea is compared to the known input value. +# Using modelled data gives an exact reference — something real experiments +# cannot provide. + +import numpy as np +import pandas as pd +import pytest + +from firescipy.constants import GAS_CONSTANT +from firescipy.utils import ensure_nested_dict +from firescipy.pyrolysis.kinetics import ( + initialize_investigation_skeleton, + compute_conversion_levels, + compute_Ea_Friedman, +) +from firescipy.pyrolysis.modeling import ( + create_linear_temp_program, + solve_kinetics, +) + + +# --------------------------------------------------------------------------- +# Shared model parameters +# --------------------------------------------------------------------------- + +_A = 10**10 / 60 # pre-exponential factor in 1/s +_E = 125_400 # activation energy in J/mol (125.4 kJ/mol) +_ALPHA0 = 1e-12 # small non-zero start value to avoid division by zero +_T_START = 300 # K +_T_END = 750 # K +_N_POINTS = 451 # temperature resolution: ΔT ≈ 1 K + + +# --------------------------------------------------------------------------- +# Helpers +# --------------------------------------------------------------------------- + +def _build_friedman_data_structure(heating_rates): + """Build a data structure with modelled differential data for Friedman testing. + + Model data bypasses the normal raw-data pipeline + (add_constant_heating_rate_tga / combine_repetitions / compute_conversion) + and is inserted directly into the conversion level expected by + compute_conversion_levels. + """ + data_structure = initialize_investigation_skeleton( + material="Test material", + signal={"name": "HeatFlow", "unit": "W/g"}) + + for hr_label, beta in heating_rates.items(): + hr_model = create_linear_temp_program( + start_temp=_T_START, end_temp=_T_END, + beta=beta, beta_unit="K/min", steps=_N_POINTS) + + t_array = hr_model["Time"] + T_array = hr_model["Temperature"] + + t_sol, alpha_sol = solve_kinetics( + t_array=t_array, T_array=T_array, + A=_A, E=_E, alpha0=_ALPHA0, + R=GAS_CONSTANT, reaction_model='nth_order', + model_params={'n': 1.0}) + + # Numerical gradient is acceptable here because the model curve is + # smooth; the noise amplification problem only arises with real TGA data + dAlpha_dt = np.gradient(alpha_sol, t_sol) + + hr_entry = ensure_nested_dict( + data_structure, + ["experiments", "TGA", "constant_heating_rate", hr_label]) + hr_entry["set_value"] = {"value": beta, "unit": "K/min"} + hr_entry["data_type"] = "differential" + hr_entry["conversion"] = pd.DataFrame({ + "Time": t_sol, + "Temperature_Avg": T_array, + "Alpha": alpha_sol, + "dAlpha_dt": dAlpha_dt}) + + return data_structure + + +# --------------------------------------------------------------------------- +# Friedman method tests +# --------------------------------------------------------------------------- + +def test_compute_Ea_Friedman_recovers_known_Ea(): + """Friedman round-trip: modelled data with known Ea must be recovered within 1%.""" + heating_rates = {"5Kmin": 5, "10Kmin": 10, "20Kmin": 20, "40Kmin": 40} + + data_structure = _build_friedman_data_structure(heating_rates) + + conversion_levels = np.linspace(0.05, 0.95, 37) + compute_conversion_levels(data_structure, desired_levels=conversion_levels) + compute_Ea_Friedman(data_structure) + + Ea_recovered = data_structure["experiments"]["TGA"]["Ea_results_Friedman"]["Ea"].values + + assert np.allclose(Ea_recovered, _E, rtol=0.01), ( + f"Friedman Ea deviated more than 1% from input: " + f"mean={np.mean(Ea_recovered) / 1000:.2f} kJ/mol, " + f"expected={_E / 1000:.2f} kJ/mol") + + +def test_compute_Ea_Friedman_raises_for_integral_data(): + """compute_Ea_Friedman must raise ValueError when data_type is 'integral'.""" + data_structure = initialize_investigation_skeleton( + material="Test material", + signal={"name": "Mass", "unit": "mg"}) + + hr_entry = ensure_nested_dict( + data_structure, + ["experiments", "TGA", "constant_heating_rate", "10Kmin"]) + hr_entry["set_value"] = {"value": 10, "unit": "K/min"} + hr_entry["data_type"] = "integral" + + with pytest.raises(ValueError, match="integral"): + compute_Ea_Friedman(data_structure)