diff --git a/flixopt/batched.py b/flixopt/batched.py index 13996e9f8..03b509d89 100644 --- a/flixopt/batched.py +++ b/flixopt/batched.py @@ -1368,7 +1368,7 @@ def size_maximum_all(self) -> xr.DataArray: coords={self.dim_name: self._ids_index}, ) - @cached_property + @property def dim_name(self) -> str: """Dimension name for this data container.""" return 'flow' @@ -1831,6 +1831,17 @@ def imbalance_elements(self) -> list[Bus]: """Bus objects that allow imbalance.""" return [b for b in self._buses if b.allows_imbalance] + @cached_property + def balance_coefficients(self) -> dict[tuple[str, str], float]: + """Sparse (bus_id, flow_id) -> +1/-1 coefficients for bus balance.""" + coefficients = {} + for bus in self._buses: + for f in bus.inputs.values(): + coefficients[(bus.label_full, f.label_full)] = 1.0 + for f in bus.outputs.values(): + coefficients[(bus.label_full, f.label_full)] = -1.0 + return coefficients + def validate(self) -> None: """Validate all buses (config + DataArray checks). @@ -1852,9 +1863,19 @@ def validate(self) -> None: class ComponentsData: """Batched data container for components with status.""" - def __init__(self, components_with_status: list[Component], all_components: list[Component]): + def __init__( + self, + components_with_status: list[Component], + all_components: list[Component], + flows_data: FlowsData, + effect_ids: list[str], + timestep_duration: xr.DataArray | float, + ): self._components_with_status = components_with_status self._all_components = all_components + self._flows_data = flows_data + self._effect_ids = effect_ids + self._timestep_duration = timestep_duration self.elements: ElementContainer = ElementContainer(components_with_status) @property @@ -1869,6 +1890,110 @@ def dim_name(self) -> str: def all_components(self) -> list[Component]: return self._all_components + @cached_property + def with_prevent_simultaneous(self) -> list[Component]: + """Generic components (non-Storage, non-Transmission) with prevent_simultaneous_flows. + + Storage and Transmission handle their own prevent_simultaneous constraints + in StoragesModel and TransmissionsModel respectively. + """ + from .components import Storage, Transmission + + return [ + c + for c in self._all_components + if c.prevent_simultaneous_flows and not isinstance(c, (Storage, Transmission)) + ] + + @cached_property + def status_params(self) -> dict[str, StatusParameters]: + """Dict of component_id -> StatusParameters.""" + return {c.label: c.status_parameters for c in self._components_with_status} + + @cached_property + def previous_status_dict(self) -> dict[str, xr.DataArray]: + """Dict of component_id -> previous_status DataArray.""" + result = {} + for c in self._components_with_status: + prev = self._get_previous_status_for_component(c) + if prev is not None: + result[c.label] = prev + return result + + def _get_previous_status_for_component(self, component) -> xr.DataArray | None: + """Get previous status for a single component (OR of flow statuses). + + Args: + component: The component to get previous status for. + + Returns: + DataArray of previous status, or None if no flows have previous status. + """ + from .config import CONFIG + from .modeling import ModelingUtilitiesAbstract + + previous_status = [] + for flow in component.flows.values(): + if flow.previous_flow_rate is not None: + prev = ModelingUtilitiesAbstract.to_binary( + values=xr.DataArray( + [flow.previous_flow_rate] if np.isscalar(flow.previous_flow_rate) else flow.previous_flow_rate, + dims='time', + ), + epsilon=CONFIG.Modeling.epsilon, + dims='time', + ) + previous_status.append(prev) + + if not previous_status: + return None + + # Combine flow statuses using OR (any flow active = component active) + max_len = max(da.sizes['time'] for da in previous_status) + padded = [ + da.assign_coords(time=range(-da.sizes['time'], 0)).reindex(time=range(-max_len, 0), fill_value=0) + for da in previous_status + ] + return xr.concat(padded, dim='flow').any(dim='flow').astype(int) + + @cached_property + def status_data(self) -> StatusData: + """StatusData instance for component status.""" + return StatusData( + params=self.status_params, + dim_name=self.dim_name, + effect_ids=self._effect_ids, + timestep_duration=self._timestep_duration, + previous_states=self.previous_status_dict, + ) + + @cached_property + def flow_mask(self) -> xr.DataArray: + """(component, flow) mask: 1 if flow belongs to component.""" + from .features import MaskHelpers + + membership = MaskHelpers.build_flow_membership( + self._components_with_status, + lambda c: list(c.flows.values()), + ) + return MaskHelpers.build_mask( + row_dim='component', + row_ids=self.element_ids, + col_dim='flow', + col_ids=self._flows_data.element_ids, + membership=membership, + ) + + @cached_property + def flow_count(self) -> xr.DataArray: + """(component,) number of flows per component.""" + counts = [len(c.inputs) + len(c.outputs) for c in self._components_with_status] + return xr.DataArray( + counts, + dims=['component'], + coords={'component': self.element_ids}, + ) + def validate(self) -> None: """Validate generic components (config checks only). @@ -1885,8 +2010,10 @@ def validate(self) -> None: class ConvertersData: """Batched data container for converters.""" - def __init__(self, converters: list[LinearConverter]): + def __init__(self, converters: list[LinearConverter], flow_ids: list[str], timesteps: pd.DatetimeIndex): self._converters = converters + self._flow_ids = flow_ids + self._timesteps = timesteps self.elements: ElementContainer = ElementContainer(converters) @property @@ -1907,6 +2034,208 @@ def with_piecewise(self) -> list[LinearConverter]: """Converters with piecewise_conversion.""" return [c for c in self._converters if c.piecewise_conversion] + # === Linear Conversion Properties === + + @cached_property + def factor_element_ids(self) -> list[str]: + """Element IDs for converters with linear conversion factors.""" + return [c.label for c in self.with_factors] + + @cached_property + def max_equations(self) -> int: + """Maximum number of conversion equations across all converters.""" + if not self.with_factors: + return 0 + return max(len(c.conversion_factors) for c in self.with_factors) + + @cached_property + def equation_mask(self) -> xr.DataArray: + """(converter, equation_idx) mask: 1 if equation exists, 0 otherwise.""" + max_eq = self.max_equations + mask_data = np.zeros((len(self.factor_element_ids), max_eq)) + + for i, conv in enumerate(self.with_factors): + for eq_idx in range(len(conv.conversion_factors)): + mask_data[i, eq_idx] = 1.0 + + return xr.DataArray( + mask_data, + dims=['converter', 'equation_idx'], + coords={'converter': self.factor_element_ids, 'equation_idx': list(range(max_eq))}, + ) + + @cached_property + def signed_coefficients(self) -> dict[tuple[str, str], float | xr.DataArray]: + """Sparse (converter_id, flow_id) -> signed coefficient mapping. + + Returns a dict where keys are (converter_id, flow_id) tuples and values + are the signed coefficients (positive for inputs, negative for outputs). + + For converters with multiple equations, values are DataArrays with an + equation_idx dimension. + """ + from collections import defaultdict + + max_eq = self.max_equations + all_flow_ids_set = set(self._flow_ids) + + # Collect signed coefficients per (converter, flow) across equations + intermediate: dict[tuple[str, str], list[tuple[int, float | xr.DataArray]]] = defaultdict(list) + + for conv in self.with_factors: + flow_map = {fl.label: fl.label_full for fl in conv.flows.values()} + # +1 for inputs, -1 for outputs + flow_signs = {f.label_full: 1.0 for f in conv.inputs.values() if f.label_full in all_flow_ids_set} + flow_signs.update({f.label_full: -1.0 for f in conv.outputs.values() if f.label_full in all_flow_ids_set}) + + for eq_idx, conv_factors in enumerate(conv.conversion_factors): + for flow_label, coeff in conv_factors.items(): + flow_id = flow_map.get(flow_label) + sign = flow_signs.get(flow_id, 0.0) if flow_id else 0.0 + if sign != 0.0: + intermediate[(conv.label, flow_id)].append((eq_idx, coeff * sign)) + + # Stack each (converter, flow) pair's per-equation values into a DataArray + result: dict[tuple[str, str], float | xr.DataArray] = {} + eq_coords = list(range(max_eq)) + + for key, entries in intermediate.items(): + # Build a list indexed by equation_idx (0.0 where equation doesn't use this flow) + per_eq: list[float | xr.DataArray] = [0.0] * max_eq + for eq_idx, val in entries: + per_eq[eq_idx] = val + result[key] = stack_along_dim(per_eq, dim='equation_idx', coords=eq_coords) + + return result + + @cached_property + def n_equations_per_converter(self) -> xr.DataArray: + """(converter,) number of conversion equations per converter.""" + return xr.DataArray( + [len(c.conversion_factors) for c in self.with_factors], + dims=['converter'], + coords={'converter': self.factor_element_ids}, + ) + + # === Piecewise Conversion Properties === + + @cached_property + def piecewise_element_ids(self) -> list[str]: + """Element IDs for converters with piecewise conversion.""" + return [c.label for c in self.with_piecewise] + + @cached_property + def piecewise_segment_counts_dict(self) -> dict[str, int]: + """Dict mapping converter_id -> number of segments.""" + return {c.label: len(list(c.piecewise_conversion.piecewises.values())[0]) for c in self.with_piecewise} + + @cached_property + def piecewise_max_segments(self) -> int: + """Maximum segment count across all converters.""" + if not self.with_piecewise: + return 0 + return max(self.piecewise_segment_counts_dict.values()) + + @cached_property + def piecewise_segment_mask(self) -> xr.DataArray: + """(converter, segment) mask: 1=valid, 0=padded.""" + from .features import PiecewiseBuilder + + _, mask = PiecewiseBuilder.collect_segment_info( + self.piecewise_element_ids, self.piecewise_segment_counts_dict, self.dim_name + ) + return mask + + @cached_property + def piecewise_flow_breakpoints(self) -> dict[str, tuple[xr.DataArray, xr.DataArray]]: + """Dict mapping flow_id -> (starts, ends) padded DataArrays.""" + from .features import PiecewiseBuilder + + # Collect all flow ids that appear in piecewise conversions + all_flow_ids: set[str] = set() + for conv in self.with_piecewise: + for flow_label in conv.piecewise_conversion.piecewises: + flow_id = conv.flows[flow_label].label_full + all_flow_ids.add(flow_id) + + result = {} + for flow_id in all_flow_ids: + breakpoints: dict[str, tuple[list[float], list[float]]] = {} + for conv in self.with_piecewise: + # Check if this converter has this flow + found = False + for flow_label, piecewise in conv.piecewise_conversion.piecewises.items(): + if conv.flows[flow_label].label_full == flow_id: + starts = [p.start for p in piecewise] + ends = [p.end for p in piecewise] + breakpoints[conv.label] = (starts, ends) + found = True + break + if not found: + # This converter doesn't have this flow - use NaN + breakpoints[conv.label] = ( + [np.nan] * self.piecewise_max_segments, + [np.nan] * self.piecewise_max_segments, + ) + + # Get time coordinates for time-varying breakpoints + time_coords = self._timesteps + starts, ends = PiecewiseBuilder.pad_breakpoints( + self.piecewise_element_ids, + breakpoints, + self.piecewise_max_segments, + self.dim_name, + time_coords=time_coords, + ) + result[flow_id] = (starts, ends) + + return result + + @cached_property + def piecewise_segment_counts_array(self) -> xr.DataArray | None: + """(converter,) - number of segments per converter with piecewise conversion.""" + if not self.with_piecewise: + return None + counts = [len(list(c.piecewise_conversion.piecewises.values())[0]) for c in self.with_piecewise] + return xr.DataArray( + counts, + dims=[self.dim_name], + coords={self.dim_name: self.piecewise_element_ids}, + ) + + @cached_property + def piecewise_breakpoints(self) -> xr.Dataset | None: + """Dataset with (converter, segment, flow) or (converter, segment, flow, time) breakpoints. + + Variables: + - starts: segment start values + - ends: segment end values + + When breakpoints are time-varying, an additional 'time' dimension is included. + """ + if not self.with_piecewise: + return None + + # Collect all flows + all_flows = list(self.piecewise_flow_breakpoints.keys()) + + # Build a list of DataArrays for each flow, then combine with xr.concat + starts_list = [] + ends_list = [] + for flow_id in all_flows: + starts_da, ends_da = self.piecewise_flow_breakpoints[flow_id] + # Add 'flow' as a new coordinate + starts_da = starts_da.expand_dims(flow=[flow_id]) + ends_da = ends_da.expand_dims(flow=[flow_id]) + starts_list.append(starts_da) + ends_list.append(ends_da) + + # Concatenate along 'flow' dimension + starts_combined = xr.concat(starts_list, dim='flow') + ends_combined = xr.concat(ends_list, dim='flow') + + return xr.Dataset({'starts': starts_combined, 'ends': ends_combined}) + def validate(self) -> None: """Validate all converters (config checks, no DataArray operations needed).""" for converter in self._converters: @@ -1916,8 +2245,9 @@ def validate(self) -> None: class TransmissionsData: """Batched data container for transmissions.""" - def __init__(self, transmissions: list[Transmission]): + def __init__(self, transmissions: list[Transmission], flow_ids: list[str]): self._transmissions = transmissions + self._flow_ids = flow_ids self.elements: ElementContainer = ElementContainer(transmissions) @property @@ -1938,6 +2268,114 @@ def balanced(self) -> list[Transmission]: """Transmissions with balanced flow sizes.""" return [t for t in self._transmissions if t.balanced] + @cached_property + def bidirectional_ids(self) -> list[str]: + """Element IDs for bidirectional transmissions.""" + return [t.label for t in self.bidirectional] + + @cached_property + def balanced_ids(self) -> list[str]: + """Element IDs for balanced transmissions.""" + return [t.label for t in self.balanced] + + # === Flow Masks for Batched Selection === + + def _build_flow_mask(self, transmission_ids: list[str], flow_getter) -> xr.DataArray: + """Build (transmission, flow) mask: 1 if flow belongs to transmission. + + Args: + transmission_ids: List of transmission labels to include. + flow_getter: Function that takes a transmission and returns its flow label_full. + """ + all_flow_ids = self._flow_ids + mask_data = np.zeros((len(transmission_ids), len(all_flow_ids))) + + for t_idx, t_id in enumerate(transmission_ids): + t = next(t for t in self._transmissions if t.label == t_id) + flow_id = flow_getter(t) + if flow_id in all_flow_ids: + f_idx = all_flow_ids.index(flow_id) + mask_data[t_idx, f_idx] = 1.0 + + return xr.DataArray( + mask_data, + dims=[self.dim_name, 'flow'], + coords={self.dim_name: transmission_ids, 'flow': all_flow_ids}, + ) + + @cached_property + def in1_mask(self) -> xr.DataArray: + """(transmission, flow) mask: 1 if flow is in1 for transmission.""" + return self._build_flow_mask(self.element_ids, lambda t: t.in1.label_full) + + @cached_property + def out1_mask(self) -> xr.DataArray: + """(transmission, flow) mask: 1 if flow is out1 for transmission.""" + return self._build_flow_mask(self.element_ids, lambda t: t.out1.label_full) + + @cached_property + def in2_mask(self) -> xr.DataArray: + """(transmission, flow) mask for bidirectional: 1 if flow is in2.""" + return self._build_flow_mask(self.bidirectional_ids, lambda t: t.in2.label_full) + + @cached_property + def out2_mask(self) -> xr.DataArray: + """(transmission, flow) mask for bidirectional: 1 if flow is out2.""" + return self._build_flow_mask(self.bidirectional_ids, lambda t: t.out2.label_full) + + @cached_property + def balanced_in1_mask(self) -> xr.DataArray: + """(transmission, flow) mask for balanced: 1 if flow is in1.""" + return self._build_flow_mask(self.balanced_ids, lambda t: t.in1.label_full) + + @cached_property + def balanced_in2_mask(self) -> xr.DataArray: + """(transmission, flow) mask for balanced: 1 if flow is in2.""" + return self._build_flow_mask(self.balanced_ids, lambda t: t.in2.label_full) + + # === Loss Properties === + + @cached_property + def relative_losses(self) -> xr.DataArray: + """(transmission, [time, ...]) relative losses. 0 if None.""" + if not self._transmissions: + return xr.DataArray() + values = [] + for t in self._transmissions: + loss = t.relative_losses if t.relative_losses is not None else 0 + values.append(loss) + return stack_along_dim(values, self.dim_name, self.element_ids) + + @cached_property + def absolute_losses(self) -> xr.DataArray: + """(transmission, [time, ...]) absolute losses. 0 if None.""" + if not self._transmissions: + return xr.DataArray() + values = [] + for t in self._transmissions: + loss = t.absolute_losses if t.absolute_losses is not None else 0 + values.append(loss) + return stack_along_dim(values, self.dim_name, self.element_ids) + + @cached_property + def has_absolute_losses_mask(self) -> xr.DataArray: + """(transmission,) bool mask for transmissions with absolute losses.""" + if not self._transmissions: + return xr.DataArray() + has_abs = [t.absolute_losses is not None and np.any(t.absolute_losses != 0) for t in self._transmissions] + return xr.DataArray( + has_abs, + dims=[self.dim_name], + coords={self.dim_name: self.element_ids}, + ) + + @cached_property + def transmissions_with_abs_losses(self) -> list[str]: + """Element IDs for transmissions with absolute losses.""" + return [ + t.label for t in self._transmissions if t.absolute_losses is not None and np.any(t.absolute_losses != 0) + ] + def validate(self) -> None: """Validate all transmissions (config + DataArray checks). @@ -2063,7 +2501,13 @@ def components(self) -> ComponentsData: if self._components is None: all_components = list(self._fs.components.values()) components_with_status = [c for c in all_components if c.status_parameters is not None] - self._components = ComponentsData(components_with_status, all_components) + self._components = ComponentsData( + components_with_status, + all_components, + flows_data=self.flows, + effect_ids=list(self._fs.effects.keys()), + timestep_duration=self._fs.timestep_duration, + ) return self._components @property @@ -2073,7 +2517,7 @@ def converters(self) -> ConvertersData: from .components import LinearConverter converters = [c for c in self._fs.components.values() if isinstance(c, LinearConverter)] - self._converters = ConvertersData(converters) + self._converters = ConvertersData(converters, flow_ids=self.flows.element_ids, timesteps=self._fs.timesteps) return self._converters @property @@ -2083,7 +2527,7 @@ def transmissions(self) -> TransmissionsData: from .components import Transmission transmissions = [c for c in self._fs.components.values() if isinstance(c, Transmission)] - self._transmissions = TransmissionsData(transmissions) + self._transmissions = TransmissionsData(transmissions, flow_ids=self.flows.element_ids) return self._transmissions def _reset(self) -> None: diff --git a/flixopt/elements.py b/flixopt/elements.py index 372eaffc2..5f529dcbd 100644 --- a/flixopt/elements.py +++ b/flixopt/elements.py @@ -5,7 +5,6 @@ from __future__ import annotations import logging -from collections import defaultdict from functools import cached_property from typing import TYPE_CHECKING @@ -22,7 +21,6 @@ fast_notnull, sparse_multiply_sum, sparse_weighted_sum, - stack_along_dim, ) from .interface import InvestParameters, StatusParameters from .modeling import ModelingUtilitiesAbstract @@ -1693,15 +1691,7 @@ def create_constraints(self) -> None: logger.debug('BusesModel: no buses, skipping balance constraints') return - # Build sparse coefficients: +1 for inputs, -1 for outputs - coefficients: dict[tuple[str, str], float] = {} - for bus in self.elements.values(): - for f in bus.inputs.values(): - coefficients[(bus.label_full, f.label_full)] = 1.0 - for f in bus.outputs.values(): - coefficients[(bus.label_full, f.label_full)] = -1.0 - - balance = sparse_multiply_sum(flow_rate, coefficients, sum_dim=flow_dim, group_dim=bus_dim) + balance = sparse_multiply_sum(flow_rate, self.data.balance_coefficients, sum_dim=flow_dim, group_dim=bus_dim) if self.buses_with_imbalance: imbalance_ids = [b.label_full for b in self.buses_with_imbalance] @@ -1802,7 +1792,6 @@ def __init__( super().__init__(model, data) self._logger = logging.getLogger('flixopt') self._flows_model = flows_model - self._all_components = data.all_components self._logger.debug(f'ComponentsModel initialized: {len(self.element_ids)} with status') self.create_variables() self.create_constraints() @@ -1815,76 +1804,6 @@ def components(self) -> list[Component]: """List of components with status (alias for elements.values()).""" return list(self.elements.values()) - @cached_property - def _components_with_prevent_simultaneous(self) -> list[Component]: - """Generic components (non-Storage, non-Transmission) with prevent_simultaneous_flows. - - Storage and Transmission handle their own prevent_simultaneous constraints - in StoragesModel and TransmissionsModel respectively. - """ - from .components import Storage, Transmission - - return [ - c - for c in self._all_components - if c.prevent_simultaneous_flows and not isinstance(c, (Storage, Transmission)) - ] - - # --- Cached Properties --- - - @cached_property - def _status_params(self) -> dict[str, StatusParameters]: - """Dict of component_id -> StatusParameters.""" - return {c.label: c.status_parameters for c in self.components} - - @cached_property - def _previous_status_dict(self) -> dict[str, xr.DataArray]: - """Dict of component_id -> previous_status DataArray.""" - result = {} - for c in self.components: - prev = self._get_previous_status_for_component(c) - if prev is not None: - result[c.label] = prev - return result - - @cached_property - def _status_data(self): - """StatusData instance for component status.""" - from .batched import StatusData - - return StatusData( - params=self._status_params, - dim_name=self.dim_name, - effect_ids=list(self.model.flow_system.effects.keys()), - timestep_duration=self.model.timestep_duration, - previous_states=self._previous_status_dict, - ) - - @cached_property - def _flow_mask(self) -> xr.DataArray: - """(component, flow) mask: 1 if flow belongs to component.""" - membership = MaskHelpers.build_flow_membership( - self.components, - lambda c: list(c.flows.values()), - ) - return MaskHelpers.build_mask( - row_dim='component', - row_ids=self.element_ids, - col_dim='flow', - col_ids=self._flows_model.element_ids, - membership=membership, - ) - - @cached_property - def _flow_count(self) -> xr.DataArray: - """(component,) number of flows per component.""" - counts = [len(c.inputs) + len(c.outputs) for c in self.components] - return xr.DataArray( - counts, - dims=['component'], - coords={'component': self.element_ids}, - ) - def create_variables(self) -> None: """Create batched component status variable with component dimension.""" if not self.components: @@ -1905,8 +1824,8 @@ def create_constraints(self) -> None: comp_status = self[ComponentVarName.STATUS] flow_status = self._flows_model[FlowVarName.STATUS] - mask = self._flow_mask - n_flows = self._flow_count + mask = self.data.flow_mask + n_flows = self.data.flow_count # Sum of flow statuses for each component: (component, time, ...) flow_sum = sparse_weighted_sum(flow_status, mask, sum_dim='flow', group_dim='component') @@ -1976,32 +1895,6 @@ def previous_status_batched(self) -> xr.DataArray | None: return xr.concat(previous_arrays, dim=self.dim_name) - def _get_previous_status_for_component(self, component) -> xr.DataArray | None: - """Get previous status for a single component (OR of flow statuses). - - Args: - component: The component to get previous status for. - - Returns: - DataArray of previous status, or None if no flows have previous status. - """ - previous_status = [] - for flow in component.flows.values(): - prev = self._flows_model.get_previous_status(flow) - if prev is not None: - previous_status.append(prev) - - if not previous_status: - return None - - # Combine flow statuses using OR (any flow active = component active) - max_len = max(da.sizes['time'] for da in previous_status) - padded = [ - da.assign_coords(time=range(-da.sizes['time'], 0)).reindex(time=range(-max_len, 0), fill_value=0) - for da in previous_status - ] - return xr.concat(padded, dim='flow').any(dim='flow').astype(int) - # === Status Variables (cached_property) === @cached_property @@ -2010,7 +1903,7 @@ def active_hours(self) -> linopy.Variable | None: if not self.components: return None - sd = self._status_data + sd = self.data.status_data dim = self.dim_name total_hours = self.model.temporal_weight.sum(self.model.temporal_dims) @@ -2032,7 +1925,7 @@ def active_hours(self) -> linopy.Variable | None: @cached_property def startup(self) -> linopy.Variable | None: """(component, time, ...) - binary startup variable.""" - ids = self._status_data.with_startup_tracking + ids = self.data.status_data.with_startup_tracking if not ids: return None return self.add_variables(ComponentVarName.STARTUP, dims=None, element_ids=ids, binary=True) @@ -2040,7 +1933,7 @@ def startup(self) -> linopy.Variable | None: @cached_property def shutdown(self) -> linopy.Variable | None: """(component, time, ...) - binary shutdown variable.""" - ids = self._status_data.with_startup_tracking + ids = self.data.status_data.with_startup_tracking if not ids: return None return self.add_variables(ComponentVarName.SHUTDOWN, dims=None, element_ids=ids, binary=True) @@ -2048,7 +1941,7 @@ def shutdown(self) -> linopy.Variable | None: @cached_property def inactive(self) -> linopy.Variable | None: """(component, time, ...) - binary inactive variable.""" - ids = self._status_data.with_downtime_tracking + ids = self.data.status_data.with_downtime_tracking if not ids: return None return self.add_variables(ComponentVarName.INACTIVE, dims=None, element_ids=ids, binary=True) @@ -2056,13 +1949,13 @@ def inactive(self) -> linopy.Variable | None: @cached_property def startup_count(self) -> linopy.Variable | None: """(component, period, scenario) - startup count.""" - ids = self._status_data.with_startup_limit + ids = self.data.status_data.with_startup_limit if not ids: return None return self.add_variables( ComponentVarName.STARTUP_COUNT, lower=0, - upper=self._status_data.startup_limit, + upper=self.data.status_data.startup_limit, dims=('period', 'scenario'), element_ids=ids, ) @@ -2070,7 +1963,7 @@ def startup_count(self) -> linopy.Variable | None: @cached_property def uptime(self) -> linopy.Variable | None: """(component, time, ...) - consecutive uptime duration.""" - sd = self._status_data + sd = self.data.status_data if not sd.with_uptime_tracking: return None from .features import StatusBuilder @@ -2092,7 +1985,7 @@ def uptime(self) -> linopy.Variable | None: @cached_property def downtime(self) -> linopy.Variable | None: """(component, time, ...) - consecutive downtime duration.""" - sd = self._status_data + sd = self.data.status_data if not sd.with_downtime_tracking: return None from .features import StatusBuilder @@ -2135,7 +2028,7 @@ def constraint_complementary(self) -> None: return StatusBuilder.add_complementary_constraint( self.model, - self._status_sel(self._status_data.with_downtime_tracking), + self._status_sel(self.data.status_data.with_downtime_tracking), self.inactive, ComponentVarName.Constraint.COMPLEMENTARY, ) @@ -2146,7 +2039,7 @@ def constraint_switch_transition(self) -> None: return StatusBuilder.add_switch_transition_constraint( self.model, - self._status_sel(self._status_data.with_startup_tracking), + self._status_sel(self.data.status_data.with_startup_tracking), self.startup, self.shutdown, ComponentVarName.Constraint.SWITCH_TRANSITION, @@ -2168,8 +2061,8 @@ def constraint_switch_initial(self) -> None: if self.startup is None: return dim = self.dim_name - previous_status = self._status_data._previous_states - ids = [eid for eid in self._status_data.with_startup_tracking if eid in previous_status] + previous_status = self.data.status_data._previous_states + ids = [eid for eid in self.data.status_data.with_startup_tracking if eid in previous_status] if not ids: return @@ -2189,7 +2082,7 @@ def constraint_startup_count(self) -> None: """Constrain startup_count == sum(startup) over temporal dims.""" if self.startup_count is None: return - startup_subset = self.startup.sel({self.dim_name: self._status_data.with_startup_limit}) + startup_subset = self.startup.sel({self.dim_name: self.data.status_data.with_startup_limit}) StatusBuilder.add_startup_count_constraint( self.model, self.startup_count, @@ -2202,8 +2095,8 @@ def constraint_cluster_cyclic(self) -> None: """Constrain status[0] == status[-1] for cyclic cluster mode.""" if self.model.flow_system.clusters is None: return - params = self._status_data._params - cyclic_ids = [eid for eid in self._status_data.ids if params[eid].cluster_mode == 'cyclic'] + params = self.data.status_data._params + cyclic_ids = [eid for eid in self.data.status_data.ids if params[eid].cluster_mode == 'cyclic'] if not cyclic_ids: return StatusBuilder.add_cluster_cyclic_constraint( @@ -2257,7 +2150,7 @@ def add_effect_contributions(self, effects_model) -> None: """ dim = self.dim_name dt = self.model.timestep_duration - sd = self._status_data + sd = self.data.status_data # === Temporal: status * effects_per_active_hour * dt === if self.status is not None: @@ -2278,7 +2171,7 @@ def add_effect_contributions(self, effects_model) -> None: def constraint_prevent_simultaneous(self) -> None: """Create mutual exclusivity constraints for components with prevent_simultaneous_flows.""" _add_prevent_simultaneous_constraints( - self._components_with_prevent_simultaneous, self._flows_model, self.model, 'prevent_simultaneous' + self.data.with_prevent_simultaneous, self._flows_model, self.model, 'prevent_simultaneous' ) # === Variable accessor properties === @@ -2341,77 +2234,6 @@ def __init__( self.create_variables() self.create_constraints() - # === Linear Conversion Properties (from LinearConvertersModel) === - - @cached_property - def _factor_element_ids(self) -> list[str]: - """Element IDs for converters with linear conversion factors.""" - return [c.label for c in self.converters_with_factors] - - @cached_property - def _max_equations(self) -> int: - """Maximum number of conversion equations across all converters.""" - if not self.converters_with_factors: - return 0 - return max(len(c.conversion_factors) for c in self.converters_with_factors) - - @cached_property - def _equation_mask(self) -> xr.DataArray: - """(converter, equation_idx) mask: 1 if equation exists, 0 otherwise.""" - max_eq = self._max_equations - mask_data = np.zeros((len(self._factor_element_ids), max_eq)) - - for i, conv in enumerate(self.converters_with_factors): - for eq_idx in range(len(conv.conversion_factors)): - mask_data[i, eq_idx] = 1.0 - - return xr.DataArray( - mask_data, - dims=['converter', 'equation_idx'], - coords={'converter': self._factor_element_ids, 'equation_idx': list(range(max_eq))}, - ) - - @cached_property - def _signed_coefficients(self) -> dict[tuple[str, str], float | xr.DataArray]: - """Sparse (converter_id, flow_id) -> signed coefficient mapping. - - Returns a dict where keys are (converter_id, flow_id) tuples and values - are the signed coefficients (positive for inputs, negative for outputs). - For converters with multiple equations, values are DataArrays with an - equation_idx dimension. - """ - max_eq = self._max_equations - all_flow_ids_set = set(self._flows_model.element_ids) - - # Collect signed coefficients per (converter, flow) across equations - intermediate: dict[tuple[str, str], list[tuple[int, float | xr.DataArray]]] = defaultdict(list) - - for conv in self.converters_with_factors: - flow_map = {fl.label: fl.label_full for fl in conv.flows.values()} - # +1 for inputs, -1 for outputs - flow_signs = {f.label_full: 1.0 for f in conv.inputs.values() if f.label_full in all_flow_ids_set} - flow_signs.update({f.label_full: -1.0 for f in conv.outputs.values() if f.label_full in all_flow_ids_set}) - - for eq_idx, conv_factors in enumerate(conv.conversion_factors): - for flow_label, coeff in conv_factors.items(): - flow_id = flow_map.get(flow_label) - sign = flow_signs.get(flow_id, 0.0) if flow_id else 0.0 - if sign != 0.0: - intermediate[(conv.label, flow_id)].append((eq_idx, coeff * sign)) - - # Stack each (converter, flow) pair's per-equation values into a DataArray - result: dict[tuple[str, str], float | xr.DataArray] = {} - eq_coords = list(range(max_eq)) - - for key, entries in intermediate.items(): - # Build a list indexed by equation_idx (0.0 where equation doesn't use this flow) - per_eq: list[float | xr.DataArray] = [0.0] * max_eq - for eq_idx, val in entries: - per_eq[eq_idx] = val - result[key] = stack_along_dim(per_eq, dim='equation_idx', coords=eq_coords) - - return result - def create_linear_constraints(self) -> None: """Create batched linear conversion factor constraints. @@ -2424,23 +2246,19 @@ def create_linear_constraints(self) -> None: if not self.converters_with_factors: return + d = self.data # ConvertersData flow_rate = self._flows_model[FlowVarName.RATE] # Sparse sum: only multiplies non-zero (converter, flow) pairs - flow_sum = sparse_multiply_sum(flow_rate, self._signed_coefficients, sum_dim='flow', group_dim='converter') + flow_sum = sparse_multiply_sum(flow_rate, d.signed_coefficients, sum_dim='flow', group_dim='converter') # Build valid mask: True where converter HAS that equation - n_equations_per_converter = xr.DataArray( - [len(c.conversion_factors) for c in self.converters_with_factors], - dims=['converter'], - coords={'converter': self._factor_element_ids}, - ) equation_indices = xr.DataArray( - list(range(self._max_equations)), + list(range(d.max_equations)), dims=['equation_idx'], - coords={'equation_idx': list(range(self._max_equations))}, + coords={'equation_idx': list(range(d.max_equations))}, ) - valid_mask = equation_indices < n_equations_per_converter + valid_mask = equation_indices < d.n_equations_per_converter self.add_constraints( flow_sum == 0, @@ -2450,135 +2268,6 @@ def create_linear_constraints(self) -> None: logger.debug(f'ConvertersModel created linear constraints for {len(self.converters_with_factors)} converters') - # === Piecewise Conversion Properties (from ComponentsModel) === - - @cached_property - def _piecewise_element_ids(self) -> list[str]: - """Element IDs for converters with piecewise conversion.""" - return [c.label for c in self.converters_with_piecewise] - - @cached_property - def _piecewise_segment_counts(self) -> dict[str, int]: - """Dict mapping converter_id -> number of segments.""" - return { - c.label: len(list(c.piecewise_conversion.piecewises.values())[0]) for c in self.converters_with_piecewise - } - - @cached_property - def _piecewise_max_segments(self) -> int: - """Maximum segment count across all converters.""" - if not self.converters_with_piecewise: - return 0 - return max(self._piecewise_segment_counts.values()) - - @cached_property - def _piecewise_segment_mask(self) -> xr.DataArray: - """(converter, segment) mask: 1=valid, 0=padded.""" - _, mask = self._PiecewiseBuilder.collect_segment_info( - self._piecewise_element_ids, self._piecewise_segment_counts, self._piecewise_dim_name - ) - return mask - - @cached_property - def _piecewise_dim_name(self) -> str: - """Dimension name for piecewise converters.""" - return 'converter' - - @cached_property - def _piecewise_flow_breakpoints(self) -> dict[str, tuple[xr.DataArray, xr.DataArray]]: - """Dict mapping flow_id -> (starts, ends) padded DataArrays.""" - # Collect all flow ids that appear in piecewise conversions - all_flow_ids: set[str] = set() - for conv in self.converters_with_piecewise: - for flow_label in conv.piecewise_conversion.piecewises: - flow_id = conv.flows[flow_label].label_full - all_flow_ids.add(flow_id) - - result = {} - for flow_id in all_flow_ids: - breakpoints: dict[str, tuple[list[float], list[float]]] = {} - for conv in self.converters_with_piecewise: - # Check if this converter has this flow - found = False - for flow_label, piecewise in conv.piecewise_conversion.piecewises.items(): - if conv.flows[flow_label].label_full == flow_id: - starts = [p.start for p in piecewise] - ends = [p.end for p in piecewise] - breakpoints[conv.label] = (starts, ends) - found = True - break - if not found: - # This converter doesn't have this flow - use NaN - breakpoints[conv.label] = ( - [np.nan] * self._piecewise_max_segments, - [np.nan] * self._piecewise_max_segments, - ) - - # Get time coordinates from model for time-varying breakpoints - time_coords = self.model.flow_system.timesteps - starts, ends = self._PiecewiseBuilder.pad_breakpoints( - self._piecewise_element_ids, - breakpoints, - self._piecewise_max_segments, - self._piecewise_dim_name, - time_coords=time_coords, - ) - result[flow_id] = (starts, ends) - - return result - - @cached_property - def piecewise_segment_counts(self) -> xr.DataArray | None: - """(converter,) - number of segments per converter with piecewise conversion.""" - if not self.converters_with_piecewise: - return None - counts = [len(list(c.piecewise_conversion.piecewises.values())[0]) for c in self.converters_with_piecewise] - return xr.DataArray( - counts, - dims=[self._piecewise_dim_name], - coords={self._piecewise_dim_name: self._piecewise_element_ids}, - ) - - @cached_property - def piecewise_segment_mask(self) -> xr.DataArray | None: - """(converter, segment) - 1=valid segment, 0=padded.""" - if not self.converters_with_piecewise: - return None - return self._piecewise_segment_mask - - @cached_property - def piecewise_breakpoints(self) -> xr.Dataset | None: - """Dataset with (converter, segment, flow) or (converter, segment, flow, time) breakpoints. - - Variables: - - starts: segment start values - - ends: segment end values - - When breakpoints are time-varying, an additional 'time' dimension is included. - """ - if not self.converters_with_piecewise: - return None - - # Collect all flows - all_flows = list(self._piecewise_flow_breakpoints.keys()) - - # Build a list of DataArrays for each flow, then combine with xr.concat - starts_list = [] - ends_list = [] - for flow_id in all_flows: - starts_da, ends_da = self._piecewise_flow_breakpoints[flow_id] - # Add 'flow' as a new coordinate - starts_da = starts_da.expand_dims(flow=[flow_id]) - ends_da = ends_da.expand_dims(flow=[flow_id]) - starts_list.append(starts_da) - ends_list.append(ends_da) - - # Concatenate along 'flow' dimension - starts_combined = xr.concat(starts_list, dim='flow') - ends_combined = xr.concat(ends_list, dim='flow') - - return xr.Dataset({'starts': starts_combined, 'ends': ends_combined}) - def create_variables(self) -> None: """Create all batched variables for converters (piecewise variables).""" self._create_piecewise_variables() @@ -2597,14 +2286,15 @@ def _create_piecewise_variables(self) -> dict[str, linopy.Variable]: if not self.converters_with_piecewise: return {} + d = self.data # ConvertersData base_coords = self.model.get_coords(['time', 'period', 'scenario']) self._piecewise_variables = self._PiecewiseBuilder.create_piecewise_variables( self.model, - self._piecewise_element_ids, - self._piecewise_max_segments, - self._piecewise_dim_name, - self._piecewise_segment_mask, + d.piecewise_element_ids, + d.piecewise_max_segments, + d.dim_name, + d.piecewise_segment_mask, base_coords, ConverterVarName.PIECEWISE_PREFIX, ) @@ -2628,7 +2318,7 @@ def _create_piecewise_constraints(self) -> None: ) # Create batched coupling constraints for all piecewise flows - bp = self.piecewise_breakpoints # Dataset with (converter, segment, flow) dims + bp = self.data.piecewise_breakpoints # Dataset with (converter, segment, flow) dims if bp is None: return @@ -2710,114 +2400,6 @@ def __init__( self.transmissions, self._flows_model, self.model, 'transmission|prevent_simultaneous' ) - # === Flow Mapping Properties === - - @cached_property - def _bidirectional(self) -> list: - """List of transmissions that are bidirectional.""" - return [t for t in self.transmissions if t.in2 is not None] - - @cached_property - def _bidirectional_ids(self) -> list[str]: - """Element IDs for bidirectional transmissions.""" - return [t.label for t in self._bidirectional] - - @cached_property - def _balanced(self) -> list: - """List of transmissions with balanced=True.""" - return [t for t in self.transmissions if t.balanced] - - @cached_property - def _balanced_ids(self) -> list[str]: - """Element IDs for balanced transmissions.""" - return [t.label for t in self._balanced] - - # === Flow Masks for Batched Selection === - - def _build_flow_mask(self, transmission_ids: list[str], flow_getter) -> xr.DataArray: - """Build (transmission, flow) mask: 1 if flow belongs to transmission. - - Args: - transmission_ids: List of transmission labels to include. - flow_getter: Function that takes a transmission and returns its flow label_full. - """ - all_flow_ids = self._flows_model.element_ids - mask_data = np.zeros((len(transmission_ids), len(all_flow_ids))) - - for t_idx, t_id in enumerate(transmission_ids): - t = next(t for t in self.transmissions if t.label == t_id) - flow_id = flow_getter(t) - if flow_id in all_flow_ids: - f_idx = all_flow_ids.index(flow_id) - mask_data[t_idx, f_idx] = 1.0 - - return xr.DataArray( - mask_data, - dims=[self.dim_name, 'flow'], - coords={self.dim_name: transmission_ids, 'flow': all_flow_ids}, - ) - - @cached_property - def _in1_mask(self) -> xr.DataArray: - """(transmission, flow) mask: 1 if flow is in1 for transmission.""" - return self._build_flow_mask(self.element_ids, lambda t: t.in1.label_full) - - @cached_property - def _out1_mask(self) -> xr.DataArray: - """(transmission, flow) mask: 1 if flow is out1 for transmission.""" - return self._build_flow_mask(self.element_ids, lambda t: t.out1.label_full) - - @cached_property - def _in2_mask(self) -> xr.DataArray: - """(transmission, flow) mask for bidirectional: 1 if flow is in2.""" - return self._build_flow_mask(self._bidirectional_ids, lambda t: t.in2.label_full) - - @cached_property - def _out2_mask(self) -> xr.DataArray: - """(transmission, flow) mask for bidirectional: 1 if flow is out2.""" - return self._build_flow_mask(self._bidirectional_ids, lambda t: t.out2.label_full) - - # === Loss Properties === - - @cached_property - def _relative_losses(self) -> xr.DataArray: - """(transmission, [time, ...]) relative losses. 0 if None.""" - if not self.transmissions: - return xr.DataArray() - values = [] - for t in self.transmissions: - loss = t.relative_losses if t.relative_losses is not None else 0 - values.append(loss) - return stack_along_dim(values, self.dim_name, self.element_ids) - - @cached_property - def _absolute_losses(self) -> xr.DataArray: - """(transmission, [time, ...]) absolute losses. 0 if None.""" - if not self.transmissions: - return xr.DataArray() - values = [] - for t in self.transmissions: - loss = t.absolute_losses if t.absolute_losses is not None else 0 - values.append(loss) - return stack_along_dim(values, self.dim_name, self.element_ids) - - @cached_property - def _has_absolute_losses_mask(self) -> xr.DataArray: - """(transmission,) bool mask for transmissions with absolute losses.""" - if not self.transmissions: - return xr.DataArray() - has_abs = [t.absolute_losses is not None and np.any(t.absolute_losses != 0) for t in self.transmissions] - return xr.DataArray( - has_abs, - dims=[self.dim_name], - coords={self.dim_name: self.element_ids}, - ) - - @cached_property - def _transmissions_with_abs_losses(self) -> list[str]: - """Element IDs for transmissions with absolute losses.""" - return [t.label for t in self.transmissions if t.absolute_losses is not None and np.any(t.absolute_losses != 0)] - def create_variables(self) -> None: """No variables needed for transmissions (constraint-only model).""" pass @@ -2838,21 +2420,22 @@ def create_constraints(self) -> None: con = TransmissionVarName.Constraint flow_rate = self._flows_model[FlowVarName.RATE] + d = self.data # TransmissionsData # === Direction 1: All transmissions (batched) === # Use masks to batch flow selection: (flow_rate * mask).sum('flow') -> (transmission, time, ...) - in1_rate = (flow_rate * self._in1_mask).sum('flow') - out1_rate = (flow_rate * self._out1_mask).sum('flow') - rel_losses = self._relative_losses - abs_losses = self._absolute_losses + in1_rate = (flow_rate * d.in1_mask).sum('flow') + out1_rate = (flow_rate * d.out1_mask).sum('flow') + rel_losses = d.relative_losses + abs_losses = d.absolute_losses # Build the efficiency expression: in1 * (1 - rel_losses) - abs_losses_term efficiency_expr = in1_rate * (1 - rel_losses) # Add absolute losses term if any transmission has them - if self._transmissions_with_abs_losses: + if d.transmissions_with_abs_losses: flow_status = self._flows_model[FlowVarName.STATUS] - in1_status = (flow_status * self._in1_mask).sum('flow') + in1_status = (flow_status * d.in1_mask).sum('flow') efficiency_expr = efficiency_expr - in1_status * abs_losses # out1 == in1 * (1 - rel_losses) - in1_status * abs_losses @@ -2862,20 +2445,20 @@ def create_constraints(self) -> None: ) # === Direction 2: Bidirectional transmissions only (batched) === - if self._bidirectional: - in2_rate = (flow_rate * self._in2_mask).sum('flow') - out2_rate = (flow_rate * self._out2_mask).sum('flow') - rel_losses_bidir = self._relative_losses.sel({self.dim_name: self._bidirectional_ids}) - abs_losses_bidir = self._absolute_losses.sel({self.dim_name: self._bidirectional_ids}) + if d.bidirectional: + in2_rate = (flow_rate * d.in2_mask).sum('flow') + out2_rate = (flow_rate * d.out2_mask).sum('flow') + rel_losses_bidir = d.relative_losses.sel({self.dim_name: d.bidirectional_ids}) + abs_losses_bidir = d.absolute_losses.sel({self.dim_name: d.bidirectional_ids}) # Build the efficiency expression for direction 2 efficiency_expr_2 = in2_rate * (1 - rel_losses_bidir) # Add absolute losses for bidirectional if any have them - bidir_with_abs = [t.label for t in self._bidirectional if t.label in self._transmissions_with_abs_losses] + bidir_with_abs = [t.label for t in d.bidirectional if t.label in d.transmissions_with_abs_losses] if bidir_with_abs: flow_status = self._flows_model[FlowVarName.STATUS] - in2_status = (flow_status * self._in2_mask).sum('flow') + in2_status = (flow_status * d.in2_mask).sum('flow') efficiency_expr_2 = efficiency_expr_2 - in2_status * abs_losses_bidir # out2 == in2 * (1 - rel_losses) - in2_status * abs_losses @@ -2885,14 +2468,11 @@ def create_constraints(self) -> None: ) # === Balanced constraints: in1.size == in2.size (batched) === - if self._balanced: + if d.balanced: flow_size = self._flows_model[FlowVarName.SIZE] - # Build masks for balanced transmissions only - in1_size_mask = self._build_flow_mask(self._balanced_ids, lambda t: t.in1.label_full) - in2_size_mask = self._build_flow_mask(self._balanced_ids, lambda t: t.in2.label_full) - in1_size_batched = (flow_size * in1_size_mask).sum('flow') - in2_size_batched = (flow_size * in2_size_mask).sum('flow') + in1_size_batched = (flow_size * d.balanced_in1_mask).sum('flow') + in2_size_batched = (flow_size * d.balanced_in2_mask).sum('flow') self.add_constraints( in1_size_batched == in2_size_batched,