diff --git a/.gitignore b/.gitignore index bdc9cfa1d1..f102a0b8e4 100644 --- a/.gitignore +++ b/.gitignore @@ -173,6 +173,7 @@ cython_debug/ # Data that is downloaded data/ test_data/ +/imap_processing/tests/glows/validation_data/ /imap_processing/tests/mag/validation/L1c/T013/mag-l1b-l1c-t013-magi-burst-in.csv /imap_processing/tests/mag/validation/L1c/T013/mag-l1b-l1c-t013-mago-burst-in.csv /imap_processing/tests/mag/validation/L1c/T014/mag-l1b-l1c-t014-magi-burst-in.csv diff --git a/imap_processing/codice/codice_l1a.py b/imap_processing/codice/codice_l1a.py index 578159a55a..7d074104a6 100644 --- a/imap_processing/codice/codice_l1a.py +++ b/imap_processing/codice/codice_l1a.py @@ -30,6 +30,7 @@ from imap_processing.codice.codice_l1a_lo_species import l1a_lo_species from imap_processing.codice.utils import ( CODICEAPID, + process_by_table_id, ) from imap_processing.utils import packet_file_to_datasets @@ -79,10 +80,14 @@ def process_l1a( # noqa: PLR0912 if apid == CODICEAPID.COD_LO_SW_SPECIES_COUNTS: logger.info("Processing Lo SW Species Counts") - datasets.append(l1a_lo_species(datasets_by_apid[apid], lut_file)) + datasets.append( + process_by_table_id(datasets_by_apid[apid], lut_file, l1a_lo_species) + ) elif apid == CODICEAPID.COD_LO_NSW_SPECIES_COUNTS: logger.info("Processing Lo NSW Species Counts") - datasets.append(l1a_lo_species(datasets_by_apid[apid], lut_file)) + datasets.append( + process_by_table_id(datasets_by_apid[apid], lut_file, l1a_lo_species) + ) elif apid == CODICEAPID.COD_LO_SW_ANGULAR_COUNTS: logger.info("Processing Lo SW Angular Counts") datasets.append(l1a_lo_angular(datasets_by_apid[apid], lut_file)) @@ -90,10 +95,14 @@ def process_l1a( # noqa: PLR0912 logger.info("Processing Lo NSW Angular Counts") datasets.append(l1a_lo_angular(datasets_by_apid[apid], lut_file)) elif apid == CODICEAPID.COD_HI_OMNI_SPECIES_COUNTS: - datasets.append(l1a_hi_omni(datasets_by_apid[apid], lut_file)) + datasets.append( + process_by_table_id(datasets_by_apid[apid], lut_file, l1a_hi_omni) + ) elif apid == CODICEAPID.COD_HI_SECT_SPECIES_COUNTS: logger.info("Processing Hi Sectored Species Counts") - datasets.append(l1a_hi_sectored(datasets_by_apid[apid], lut_file)) + datasets.append( + process_by_table_id(datasets_by_apid[apid], lut_file, l1a_hi_sectored) + ) elif apid == CODICEAPID.COD_HI_PHA: logger.info("Processing Direct Events for Hi") datasets.append(l1a_direct_event(datasets_by_apid[apid], apid=apid)) @@ -105,26 +114,42 @@ def process_l1a( # noqa: PLR0912 CODICEAPID.COD_LO_NSW_PRIORITY_COUNTS, ]: logger.info(f"Processing {apid} Priority Counts") - datasets.append(l1a_lo_priority(datasets_by_apid[apid], lut_file)) + datasets.append( + process_by_table_id(datasets_by_apid[apid], lut_file, l1a_lo_priority) + ) elif apid == CODICEAPID.COD_HI_INST_COUNTS_PRIORITIES: logger.info("Processing Hi Priority Counts") - datasets.append(l1a_hi_priority(datasets_by_apid[apid], lut_file)) + datasets.append( + process_by_table_id(datasets_by_apid[apid], lut_file, l1a_hi_priority) + ) elif apid == CODICEAPID.COD_HI_INST_COUNTS_AGGREGATED: logger.info("Processing Hi Counters aggregated") datasets.append( - l1a_hi_counters_aggregated(datasets_by_apid[apid], lut_file) + process_by_table_id( + datasets_by_apid[apid], lut_file, l1a_hi_counters_aggregated + ) ) elif apid == CODICEAPID.COD_HI_INST_COUNTS_SINGLES: logger.info("Processing Hi Counters singles") - datasets.append(l1a_hi_counters_singles(datasets_by_apid[apid], lut_file)) + datasets.append( + process_by_table_id( + datasets_by_apid[apid], lut_file, l1a_hi_counters_singles + ) + ) elif apid == CODICEAPID.COD_LO_INST_COUNTS_AGGREGATED: logger.info("Processing Lo Counters aggregated") datasets.append( - l1a_lo_counters_aggregated(datasets_by_apid[apid], lut_file) + process_by_table_id( + datasets_by_apid[apid], lut_file, l1a_lo_counters_aggregated + ) ) elif apid == CODICEAPID.COD_LO_INST_COUNTS_SINGLES: logger.info("Processing Lo Counters singles") - datasets.append(l1a_lo_counters_singles(datasets_by_apid[apid], lut_file)) + datasets.append( + process_by_table_id( + datasets_by_apid[apid], lut_file, l1a_lo_counters_singles + ) + ) elif apid == CODICEAPID.COD_NHK: logger.info("Processing l1a housekeeping data") cdf_attrs = ImapCdfAttributes() diff --git a/imap_processing/codice/codice_l1a_hi_counters_aggregated.py b/imap_processing/codice/codice_l1a_hi_counters_aggregated.py index ef815c347b..b694bab4fc 100644 --- a/imap_processing/codice/codice_l1a_hi_counters_aggregated.py +++ b/imap_processing/codice/codice_l1a_hi_counters_aggregated.py @@ -11,11 +11,9 @@ from imap_processing.codice.decompress import decompress from imap_processing.codice.utils import ( CoDICECompression, - ViewTabInfo, get_codice_epoch_time, get_counters_aggregated_pattern, - get_view_tab_info, - read_sci_lut, + get_view_tab_obj, ) from imap_processing.spice.time import met_to_ttj2000ns @@ -23,47 +21,45 @@ def l1a_hi_counters_aggregated( - unpacked_dataset: xr.Dataset, lut_file: Path + group_ds: xr.Dataset, + lut_file: Path, + table_id: str, + view_id: int, + apid: int, + plan_id: int, + plan_step: int, ) -> xr.Dataset: """ - Process CoDICE Hi Counters aggregated L1A data. + Process a single table-ID group of CoDICE Hi Counters Aggregated L1A data. Parameters ---------- - unpacked_dataset : xarray.Dataset - Unpacked dataset from L0 packet file. + group_ds : xarray.Dataset + Dataset filtered to a single table_id. lut_file : Path Path to the LUT file for processing. + table_id : str + The table ID for this group. + view_id : int + View ID (uniform across the product). + apid : int + APID (uniform across the product). + plan_id : int + Plan ID (uniform across the product). + plan_step : int + Plan step (uniform across the product). Returns ------- xarray.Dataset - Processed L1A dataset for Hi Omni data. + Processed L1A dataset for input table-ID group. """ - # lookup in LUT table. - table_id = unpacked_dataset["table_id"].values[0] - view_id = unpacked_dataset["view_id"].values[0] - apid = unpacked_dataset["pkt_apid"].values[0] - plan_id = unpacked_dataset["plan_id"].values[0] - plan_step = unpacked_dataset["plan_step"].values[0] - logger.info( f"Processing species with - APID: {apid} / 0x{apid:X}, View ID: {view_id}, " f"Table ID: {table_id}, Plan ID: {plan_id}, Plan Step: {plan_step}" ) # ========== Get LUT Data =========== - # Read information from LUT - sci_lut_data = read_sci_lut(lut_file, table_id) - - view_tab_info = get_view_tab_info(sci_lut_data, view_id, apid) - view_tab_obj = ViewTabInfo( - apid=apid, - view_id=view_id, - sensor=view_tab_info["sensor"], - three_d_collapsed=view_tab_info["3d_collapse"], - collapse_table=view_tab_info["collapse_table"], - compression=view_tab_info["compression"], - ) + sci_lut_data, view_tab_obj = get_view_tab_obj(lut_file, table_id, view_id, apid) if view_tab_obj.sensor != 1: raise ValueError("Unsupported sensor ID for Hi processing.") @@ -85,8 +81,8 @@ def l1a_hi_counters_aggregated( # of active variables to reshape decompressed data. num_variables = len(non_reserved_variables) # Decompress data using byte count information from decommed data - binary_data_list = unpacked_dataset["data"].values - byte_count_list = unpacked_dataset["byte_count"].values + binary_data_list = group_ds["data"].values + byte_count_list = group_ds["byte_count"].values compression_algorithm = CoDICECompression(view_tab_obj.compression) @@ -109,9 +105,9 @@ def l1a_hi_counters_aggregated( # ========= Get Epoch Time Data =========== # Epoch center time and delta epoch_center, deltas = get_codice_epoch_time( - unpacked_dataset["acq_start_seconds"].values, - unpacked_dataset["acq_start_subseconds"].values, - unpacked_dataset["spin_period"].values, + group_ds["acq_start_seconds"].values, + group_ds["acq_start_subseconds"].values, + group_ds["spin_period"].values, view_tab_obj, ) @@ -147,12 +143,12 @@ def l1a_hi_counters_aggregated( # Add first few unique variables l1a_dataset["spin_period"] = xr.DataArray( - unpacked_dataset["spin_period"].values * constants.SPIN_PERIOD_CONVERSION, + group_ds["spin_period"].values * constants.SPIN_PERIOD_CONVERSION, dims=("epoch",), attrs=cdf_attrs.get_variable_attributes("spin_period"), ) l1a_dataset["data_quality"] = xr.DataArray( - unpacked_dataset["suspect"].values, + group_ds["suspect"].values, dims=("epoch",), attrs=cdf_attrs.get_variable_attributes("data_quality"), ) diff --git a/imap_processing/codice/codice_l1a_hi_counters_singles.py b/imap_processing/codice/codice_l1a_hi_counters_singles.py index 9da96b6ae2..a33288ce47 100644 --- a/imap_processing/codice/codice_l1a_hi_counters_singles.py +++ b/imap_processing/codice/codice_l1a_hi_counters_singles.py @@ -11,57 +11,55 @@ from imap_processing.codice.decompress import decompress from imap_processing.codice.utils import ( CoDICECompression, - ViewTabInfo, get_codice_epoch_time, get_collapse_pattern_shape, - get_view_tab_info, - read_sci_lut, + get_view_tab_obj, ) from imap_processing.spice.time import met_to_ttj2000ns logger = logging.getLogger(__name__) -def l1a_hi_counters_singles(unpacked_dataset: xr.Dataset, lut_file: Path) -> xr.Dataset: +def l1a_hi_counters_singles( + group_ds: xr.Dataset, + lut_file: Path, + table_id: str, + view_id: int, + apid: int, + plan_id: int, + plan_step: int, +) -> xr.Dataset: """ - Process CoDICE Hi Counters singles L1A data. + Process a single table-ID group of CoDICE Hi Counters Singles L1A data. Parameters ---------- - unpacked_dataset : xarray.Dataset - Unpacked dataset from L0 packet file. + group_ds : xarray.Dataset + Dataset filtered to a single table_id. lut_file : Path Path to the LUT file for processing. + table_id : str + The table ID for this group. + view_id : int + View ID (uniform across the product). + apid : int + APID (uniform across the product). + plan_id : int + Plan ID (uniform across the product). + plan_step : int + Plan step (uniform across the product). Returns ------- xarray.Dataset - Processed L1A dataset for Hi Omni data. + Processed L1A dataset for input table-ID group. """ - # lookup in LUT table. - table_id = unpacked_dataset["table_id"].values[0] - view_id = unpacked_dataset["view_id"].values[0] - apid = unpacked_dataset["pkt_apid"].values[0] - plan_id = unpacked_dataset["plan_id"].values[0] - plan_step = unpacked_dataset["plan_step"].values[0] - logger.info( f"Processing species with - APID: {apid} / 0x{apid:X}, View ID: {view_id}, " f"Table ID: {table_id}, Plan ID: {plan_id}, Plan Step: {plan_step}" ) # ========== Get LUT Data =========== - # Read information from LUT - sci_lut_data = read_sci_lut(lut_file, table_id) - - view_tab_info = get_view_tab_info(sci_lut_data, view_id, apid) - view_tab_obj = ViewTabInfo( - apid=apid, - view_id=view_id, - sensor=view_tab_info["sensor"], - three_d_collapsed=view_tab_info["3d_collapse"], - collapse_table=view_tab_info["collapse_table"], - compression=view_tab_info["compression"], - ) + sci_lut_data, view_tab_obj = get_view_tab_obj(lut_file, table_id, view_id, apid) if view_tab_obj.sensor != 1: raise ValueError("Unsupported sensor ID for Hi processing.") @@ -82,8 +80,8 @@ def l1a_hi_counters_singles(unpacked_dataset: xr.Dataset, lut_file: Path) -> xr. compression_algorithm = CoDICECompression(view_tab_obj.compression) # Decompress data using byte count information from decommed data - binary_data_list = unpacked_dataset["data"].values - byte_count_list = unpacked_dataset["byte_count"].values + binary_data_list = group_ds["data"].values + byte_count_list = group_ds["byte_count"].values # The decompressed data in the shape of (epoch, n). Then reshape later. decompressed_data = [ @@ -103,9 +101,9 @@ def l1a_hi_counters_singles(unpacked_dataset: xr.Dataset, lut_file: Path) -> xr. # ========= Get Epoch Time Data =========== # Epoch center time and delta epoch_center, deltas = get_codice_epoch_time( - unpacked_dataset["acq_start_seconds"].values, - unpacked_dataset["acq_start_subseconds"].values, - unpacked_dataset["spin_period"].values, + group_ds["acq_start_seconds"].values, + group_ds["acq_start_subseconds"].values, + group_ds["spin_period"].values, view_tab_obj, ) @@ -153,12 +151,12 @@ def l1a_hi_counters_singles(unpacked_dataset: xr.Dataset, lut_file: Path) -> xr. # Add first few unique variables l1a_dataset["spin_period"] = xr.DataArray( - unpacked_dataset["spin_period"].values * constants.SPIN_PERIOD_CONVERSION, + group_ds["spin_period"].values * constants.SPIN_PERIOD_CONVERSION, dims=("epoch",), attrs=cdf_attrs.get_variable_attributes("spin_period"), ) l1a_dataset["data_quality"] = xr.DataArray( - unpacked_dataset["suspect"].values, + group_ds["suspect"].values, dims=("epoch",), attrs=cdf_attrs.get_variable_attributes("data_quality"), ) diff --git a/imap_processing/codice/codice_l1a_hi_omni.py b/imap_processing/codice/codice_l1a_hi_omni.py index d9dc24a6e3..a2bd2e1833 100644 --- a/imap_processing/codice/codice_l1a_hi_omni.py +++ b/imap_processing/codice/codice_l1a_hi_omni.py @@ -11,59 +11,57 @@ from imap_processing.codice.decompress import decompress from imap_processing.codice.utils import ( CoDICECompression, - ViewTabInfo, apply_replacements_to_attrs, get_codice_epoch_time, get_energy_info, - get_view_tab_info, - read_sci_lut, + get_view_tab_obj, ) from imap_processing.spice.time import met_to_ttj2000ns logger = logging.getLogger(__name__) -def l1a_hi_omni(unpacked_dataset: xr.Dataset, lut_file: Path) -> xr.Dataset: +def l1a_hi_omni( + group_ds: xr.Dataset, + lut_file: Path, + table_id: str, + view_id: int, + apid: int, + plan_id: int, + plan_step: int, +) -> xr.Dataset: """ - Process CoDICE Hi Omni L1A data. + Process a single table-ID group of CoDICE Hi Omni L1A data. Parameters ---------- - unpacked_dataset : xarray.Dataset - Unpacked dataset from L0 packet file. + group_ds : xarray.Dataset + Dataset filtered to a single table_id. lut_file : Path Path to the LUT file for processing. + table_id : str + The table ID for this group. + view_id : int + View ID (uniform across the product). + apid : int + APID (uniform across the product). + plan_id : int + Plan ID (uniform across the product). + plan_step : int + Plan step (uniform across the product). Returns ------- xarray.Dataset - Processed L1A dataset for Hi Omni data. + Processed L1A dataset for input table-ID group. """ - # lookup in LUT table. - table_id = unpacked_dataset["table_id"].values[0] - view_id = unpacked_dataset["view_id"].values[0] - apid = unpacked_dataset["pkt_apid"].values[0] - plan_id = unpacked_dataset["plan_id"].values[0] - plan_step = unpacked_dataset["plan_step"].values[0] - logger.info( f"Processing species with - APID: {apid} / 0x{apid:X}, View ID: {view_id}, " f"Table ID: {table_id}, Plan ID: {plan_id}, Plan Step: {plan_step}" ) # ========== Get LUT Data =========== - # Read information from LUT - sci_lut_data = read_sci_lut(lut_file, table_id) - - view_tab_info = get_view_tab_info(sci_lut_data, view_id, apid) - view_tab_obj = ViewTabInfo( - apid=apid, - view_id=view_id, - sensor=view_tab_info["sensor"], - three_d_collapsed=view_tab_info["3d_collapse"], - collapse_table=view_tab_info["collapse_table"], - compression=view_tab_info["compression"], - ) + sci_lut_data, view_tab_obj = get_view_tab_obj(lut_file, table_id, view_id, apid) if view_tab_obj.sensor != 1: raise ValueError("Unsupported sensor ID for Hi processing.") @@ -75,8 +73,8 @@ def l1a_hi_omni(unpacked_dataset: xr.Dataset, lut_file: Path) -> xr.Dataset: compression_algorithm = CoDICECompression(view_tab_obj.compression) # Decompress data using byte count information from decommed data - binary_data_list = unpacked_dataset["data"].values - byte_count_list = unpacked_dataset["byte_count"].values + binary_data_list = group_ds["data"].values + byte_count_list = group_ds["byte_count"].values # The decompressed data in the shape of (epoch, n). Then reshape later. decompressed_data = [ @@ -92,9 +90,9 @@ def l1a_hi_omni(unpacked_dataset: xr.Dataset, lut_file: Path) -> xr.Dataset: # ========= Get Epoch Time Data =========== # Epoch center time and delta epoch_center, deltas = get_codice_epoch_time( - unpacked_dataset["acq_start_seconds"].values, - unpacked_dataset["acq_start_subseconds"].values, - unpacked_dataset["spin_period"].values, + group_ds["acq_start_seconds"].values, + group_ds["acq_start_subseconds"].values, + group_ds["spin_period"].values, view_tab_obj, ) @@ -261,13 +259,13 @@ def l1a_hi_omni(unpacked_dataset: xr.Dataset, lut_file: Path) -> xr.Dataset: # ========= Add Additional Variables =========== # Repeat spin_period and data_quality to match new epoch shape (num_epochs) l1a_dataset["spin_period"] = xr.DataArray( - np.repeat(unpacked_dataset["spin_period"].values, n_spins) + np.repeat(group_ds["spin_period"].values, n_spins) * constants.SPIN_PERIOD_CONVERSION, dims=("epoch",), attrs=cdf_attrs.get_variable_attributes("spin_period"), ) l1a_dataset["data_quality"] = xr.DataArray( - np.repeat(unpacked_dataset["suspect"].values, n_spins), + np.repeat(group_ds["suspect"].values, n_spins), dims=("epoch",), attrs=cdf_attrs.get_variable_attributes("data_quality"), ) diff --git a/imap_processing/codice/codice_l1a_hi_priority.py b/imap_processing/codice/codice_l1a_hi_priority.py index d67e17daf1..f990aa6932 100644 --- a/imap_processing/codice/codice_l1a_hi_priority.py +++ b/imap_processing/codice/codice_l1a_hi_priority.py @@ -11,60 +11,56 @@ from imap_processing.codice.decompress import decompress from imap_processing.codice.utils import ( CoDICECompression, - ViewTabInfo, get_codice_epoch_time, get_collapse_pattern_shape, - get_view_tab_info, - read_sci_lut, + get_view_tab_obj, ) from imap_processing.spice.time import met_to_ttj2000ns logger = logging.getLogger(__name__) -def l1a_hi_priority(unpacked_dataset: xr.Dataset, lut_file: Path) -> xr.Dataset: +def l1a_hi_priority( + group_ds: xr.Dataset, + lut_file: Path, + table_id: str, + view_id: int, + apid: int, + plan_id: int, + plan_step: int, +) -> xr.Dataset: """ - Process CoDICE Hi Priority L1A data. + Process a single table-ID group of CoDICE Hi Priority L1A data. Parameters ---------- - unpacked_dataset : xarray.Dataset - Unpacked dataset from L0 packet file. + group_ds : xarray.Dataset + Dataset filtered to a single table_id. lut_file : Path Path to the LUT file for processing. + table_id : str + The table ID for this group. + view_id : int + View ID (uniform across the product). + apid : int + APID (uniform across the product). + plan_id : int + Plan ID (uniform across the product). + plan_step : int + Plan step (uniform across the product). Returns ------- xarray.Dataset - Processed L1A dataset for Hi Omni data. + Processed L1A dataset for input table-ID group. """ - # Get these values from unpacked data. These are used to - # lookup in LUT table. - table_id = unpacked_dataset["table_id"].values[0] - view_id = unpacked_dataset["view_id"].values[0] - apid = unpacked_dataset["pkt_apid"].values[0] - plan_id = unpacked_dataset["plan_id"].values[0] - plan_step = unpacked_dataset["plan_step"].values[0] - logger.info( f"Processing species with - APID: {apid} / 0x{apid:X}, View ID: {view_id}, " f"Table ID: {table_id}, Plan ID: {plan_id}, Plan Step: {plan_step}" ) # ========== Get LUT Data =========== - # Read information from LUT - sci_lut_data = read_sci_lut(lut_file, table_id) - - view_tab_info = get_view_tab_info(sci_lut_data, view_id, apid) - - view_tab_obj = ViewTabInfo( - apid=apid, - view_id=view_id, - sensor=view_tab_info["sensor"], - three_d_collapsed=view_tab_info["3d_collapse"], - collapse_table=view_tab_info["collapse_table"], - compression=view_tab_info["compression"], - ) + sci_lut_data, view_tab_obj = get_view_tab_obj(lut_file, table_id, view_id, apid) if view_tab_obj.sensor != 1: raise ValueError("Unsupported sensor ID for Hi priority processing.") @@ -72,9 +68,9 @@ def l1a_hi_priority(unpacked_dataset: xr.Dataset, lut_file: Path) -> xr.Dataset: # ========= Get Epoch Time Data =========== # Epoch center time and delta epoch_center, deltas = get_codice_epoch_time( - unpacked_dataset["acq_start_seconds"].values, - unpacked_dataset["acq_start_subseconds"].values, - unpacked_dataset["spin_period"].values, + group_ds["acq_start_seconds"].values, + group_ds["acq_start_subseconds"].values, + group_ds["spin_period"].values, view_tab_obj, ) @@ -84,9 +80,9 @@ def l1a_hi_priority(unpacked_dataset: xr.Dataset, lut_file: Path) -> xr.Dataset: logical_source_id = "imap_codice_l1a_hi-priority" compression_algorithm = CoDICECompression(view_tab_obj.compression) # Decompress data using byte count information from decommed data - binary_data_list = unpacked_dataset["data"].values - byte_count_list = unpacked_dataset["byte_count"].values - packet_version = unpacked_dataset["packet_version"].values[0] + binary_data_list = group_ds["data"].values + byte_count_list = group_ds["byte_count"].values + packet_version = group_ds["packet_version"].values[0] # The decompressed data in the shape of (epoch, n). Then reshape later. decompressed_data = [ np.frombuffer( @@ -155,12 +151,12 @@ def l1a_hi_priority(unpacked_dataset: xr.Dataset, lut_file: Path) -> xr.Dataset: ) # Add first few unique variables l1a_dataset["spin_period"] = xr.DataArray( - unpacked_dataset["spin_period"].values * constants.SPIN_PERIOD_CONVERSION, + group_ds["spin_period"].values * constants.SPIN_PERIOD_CONVERSION, dims=("epoch",), attrs=cdf_attrs.get_variable_attributes("spin_period"), ) l1a_dataset["data_quality"] = xr.DataArray( - unpacked_dataset["suspect"].values, + group_ds["suspect"].values, dims=("epoch",), attrs=cdf_attrs.get_variable_attributes("data_quality"), ) diff --git a/imap_processing/codice/codice_l1a_hi_sectored.py b/imap_processing/codice/codice_l1a_hi_sectored.py index 1c84fded6f..b4fce45c28 100644 --- a/imap_processing/codice/codice_l1a_hi_sectored.py +++ b/imap_processing/codice/codice_l1a_hi_sectored.py @@ -11,61 +11,58 @@ from imap_processing.codice.decompress import decompress from imap_processing.codice.utils import ( CoDICECompression, - ViewTabInfo, apply_replacements_to_attrs, get_codice_epoch_time, get_collapse_pattern_shape, get_energy_info, - get_view_tab_info, - read_sci_lut, + get_view_tab_obj, ) from imap_processing.spice.time import met_to_ttj2000ns logger = logging.getLogger(__name__) -def l1a_hi_sectored(unpacked_dataset: xr.Dataset, lut_file: Path) -> xr.Dataset: +def l1a_hi_sectored( + group_ds: xr.Dataset, + lut_file: Path, + table_id: str, + view_id: int, + apid: int, + plan_id: int, + plan_step: int, +) -> xr.Dataset: """ - Process CoDICE Hi Sectored L1A data. + Process a single table-ID group of CoDICE Hi Sectored L1A data. Parameters ---------- - unpacked_dataset : xarray.Dataset - Unpacked dataset from L0 packet file. + group_ds : xarray.Dataset + Dataset filtered to a single table_id. lut_file : Path Path to the LUT file for processing. + table_id : str + The table ID for this group. + view_id : int + View ID (uniform across the product). + apid : int + APID (uniform across the product). + plan_id : int + Plan ID (uniform across the product). + plan_step : int + Plan step (uniform across the product). Returns ------- xarray.Dataset - Processed L1A dataset for Hi Omni data. + Processed L1A dataset for input table-ID group. """ - # Get these values from unpacked data. These are used to - # lookup in LUT table. - table_id = unpacked_dataset["table_id"].values[0] - view_id = unpacked_dataset["view_id"].values[0] - apid = unpacked_dataset["pkt_apid"].values[0] - plan_id = unpacked_dataset["plan_id"].values[0] - plan_step = unpacked_dataset["plan_step"].values[0] - logger.info( f"Processing species with - APID: {apid} / 0x{apid:X}, View ID: {view_id}, " f"Table ID: {table_id}, Plan ID: {plan_id}, Plan Step: {plan_step}" ) # ========== Get LUT Data =========== - # Read information from LUT - sci_lut_data = read_sci_lut(lut_file, table_id) - - view_tab_info = get_view_tab_info(sci_lut_data, view_id, apid) - view_tab_obj = ViewTabInfo( - apid=apid, - view_id=view_id, - sensor=view_tab_info["sensor"], - three_d_collapsed=view_tab_info["3d_collapse"], - collapse_table=view_tab_info["collapse_table"], - compression=view_tab_info["compression"], - ) + sci_lut_data, view_tab_obj = get_view_tab_obj(lut_file, table_id, view_id, apid) if view_tab_obj.sensor != 1: raise ValueError("Unsupported sensor ID for Hi Sectored processing.") @@ -73,9 +70,9 @@ def l1a_hi_sectored(unpacked_dataset: xr.Dataset, lut_file: Path) -> xr.Dataset: # ========= Get Epoch Time Data =========== # Epoch center time and delta epoch_center, deltas = get_codice_epoch_time( - unpacked_dataset["acq_start_seconds"].values, - unpacked_dataset["acq_start_subseconds"].values, - unpacked_dataset["spin_period"].values, + group_ds["acq_start_seconds"].values, + group_ds["acq_start_subseconds"].values, + group_ds["spin_period"].values, view_tab_obj, ) @@ -86,8 +83,8 @@ def l1a_hi_sectored(unpacked_dataset: xr.Dataset, lut_file: Path) -> xr.Dataset: compression_algorithm = CoDICECompression(view_tab_obj.compression) # Decompress data using byte count information from decommed data - binary_data_list = unpacked_dataset["data"].values - byte_count_list = unpacked_dataset["byte_count"].values + binary_data_list = group_ds["data"].values + byte_count_list = group_ds["byte_count"].values # The decompressed data in the shape of (epoch, n). Then reshape later. decompressed_data = [ @@ -274,12 +271,12 @@ def l1a_hi_sectored(unpacked_dataset: xr.Dataset, lut_file: Path) -> xr.Dataset: # ========= Add Additional Variables =========== l1a_dataset["spin_period"] = xr.DataArray( - unpacked_dataset["spin_period"].values * constants.SPIN_PERIOD_CONVERSION, + group_ds["spin_period"].values * constants.SPIN_PERIOD_CONVERSION, dims=("epoch",), attrs=cdf_attrs.get_variable_attributes("spin_period"), ) l1a_dataset["data_quality"] = xr.DataArray( - unpacked_dataset["suspect"].values, + group_ds["suspect"].values, dims=("epoch",), attrs=cdf_attrs.get_variable_attributes("data_quality"), ) diff --git a/imap_processing/codice/codice_l1a_lo_counters_aggregated.py b/imap_processing/codice/codice_l1a_lo_counters_aggregated.py index a390d0dd4d..1bd323bccd 100644 --- a/imap_processing/codice/codice_l1a_lo_counters_aggregated.py +++ b/imap_processing/codice/codice_l1a_lo_counters_aggregated.py @@ -12,12 +12,10 @@ from imap_processing.codice.decompress import decompress from imap_processing.codice.utils import ( CoDICECompression, - ViewTabInfo, calculate_acq_time_per_step, get_codice_epoch_time, get_counters_aggregated_pattern, - get_view_tab_info, - read_sci_lut, + get_view_tab_obj, ) from imap_processing.spice.time import met_to_ttj2000ns @@ -25,47 +23,45 @@ def l1a_lo_counters_aggregated( - unpacked_dataset: xr.Dataset, lut_file: Path + group_ds: xr.Dataset, + lut_file: Path, + table_id: str, + view_id: int, + apid: int, + plan_id: int, + plan_step: int, ) -> xr.Dataset: """ - Process CoDICE Lo Counters aggregated L1A data. + Process a single table-ID group of CoDICE Lo Counters Aggregated L1A data. Parameters ---------- - unpacked_dataset : xarray.Dataset - Unpacked dataset from L0 packet file. + group_ds : xarray.Dataset + Dataset filtered to a single table_id. lut_file : Path Path to the LUT file for processing. + table_id : str + The table ID for this group. + view_id : int + View ID (uniform across the product). + apid : int + APID (uniform across the product). + plan_id : int + Plan ID (uniform across the product). + plan_step : int + Plan step (uniform across the product). Returns ------- xarray.Dataset - Processed L1A dataset for Hi Omni data. + Processed L1A dataset for input table-ID group. """ - # lookup in LUT table. - table_id = unpacked_dataset["table_id"].values[0] - view_id = unpacked_dataset["view_id"].values[0] - apid = unpacked_dataset["pkt_apid"].values[0] - plan_id = unpacked_dataset["plan_id"].values[0] - plan_step = unpacked_dataset["plan_step"].values[0] - logger.info( f"Processing aggregated with - APID: {apid} / 0x{apid:X}, View ID: {view_id}, " f"Table ID: {table_id}, Plan ID: {plan_id}, Plan Step: {plan_step}" ) # ========== Get LUT Data =========== - # Read information from LUT - sci_lut_data = read_sci_lut(lut_file, table_id) - - view_tab_info = get_view_tab_info(sci_lut_data, view_id, apid) - view_tab_obj = ViewTabInfo( - apid=apid, - view_id=view_id, - sensor=view_tab_info["sensor"], - three_d_collapsed=view_tab_info["3d_collapse"], - collapse_table=view_tab_info["collapse_table"], - compression=view_tab_info["compression"], - ) + sci_lut_data, view_tab_obj = get_view_tab_obj(lut_file, table_id, view_id, apid) if view_tab_obj.sensor != 0: raise ValueError("Unsupported sensor ID for Lo processing.") @@ -97,8 +93,8 @@ def l1a_lo_counters_aggregated( spin_sector_pairs = non_reserved_variables["tcr"] esa_step = constants.NUM_ESA_STEPS # Decompress data using byte count information from decommed data - binary_data_list = unpacked_dataset["data"].values - byte_count_list = unpacked_dataset["byte_count"].values + binary_data_list = group_ds["data"].values + byte_count_list = group_ds["byte_count"].values compression_algorithm = CoDICECompression(view_tab_obj.compression) @@ -124,13 +120,10 @@ def l1a_lo_counters_aggregated( half_spin_per_esa_step = np.concatenate( (np.array(half_spin_per_esa_step), np.full(pad_size, HALF_SPIN_FILLVAL)) ) - # TODO: Handle epoch dependent acquisition time and half spin per esa step - # For now, just tile the same array for all epochs. - # Eventually we may have data from a day where the LUT changed. If this is the - # case, we need to split the data by epoch and assign different acquisition times + # Each group shares the same table_id, so all epochs use the same LUT values. half_spin_per_esa_step = np.tile( np.asarray(half_spin_per_esa_step).astype(np.uint8), - (len(unpacked_dataset["acq_start_seconds"]), 1), + (len(group_ds["acq_start_seconds"]), 1), ) # Get acquisition time per esa step acquisition_time_per_step = calculate_acq_time_per_step( @@ -138,10 +131,10 @@ def l1a_lo_counters_aggregated( ) acquisition_time_per_step = np.tile( np.asarray(acquisition_time_per_step), - (len(unpacked_dataset["acq_start_seconds"]), 1), + (len(group_ds["acq_start_seconds"]), 1), ) # For every energy after nso_half_spin, set data to fill values - nso_half_spin = unpacked_dataset["nso_half_spin"].values + nso_half_spin = group_ds["nso_half_spin"].values nso_mask = (half_spin_per_esa_step >= nso_half_spin[:, np.newaxis]) | ( half_spin_per_esa_step == HALF_SPIN_FILLVAL ) @@ -157,9 +150,9 @@ def l1a_lo_counters_aggregated( # ========= Get Epoch Time Data =========== # Epoch center time and delta epoch_center, deltas = get_codice_epoch_time( - unpacked_dataset["acq_start_seconds"].values, - unpacked_dataset["acq_start_subseconds"].values, - unpacked_dataset["spin_period"].values, + group_ds["acq_start_seconds"].values, + group_ds["acq_start_subseconds"].values, + group_ds["spin_period"].values, view_tab_obj, ) @@ -231,7 +224,7 @@ def l1a_lo_counters_aggregated( # Add first few unique variables l1a_dataset["spin_period"] = xr.DataArray( - unpacked_dataset["spin_period"].values * constants.SPIN_PERIOD_CONVERSION, + group_ds["spin_period"].values * constants.SPIN_PERIOD_CONVERSION, dims=("epoch",), attrs=cdf_attrs.get_variable_attributes("spin_period"), ) @@ -246,7 +239,7 @@ def l1a_lo_counters_aggregated( attrs=cdf_attrs.get_variable_attributes("voltage_table", check_schema=False), ) l1a_dataset["data_quality"] = xr.DataArray( - unpacked_dataset["suspect"].values, + group_ds["suspect"].values, dims=("epoch",), attrs=cdf_attrs.get_variable_attributes("data_quality"), ) @@ -258,18 +251,18 @@ def l1a_lo_counters_aggregated( ), ) # Rename vars - unpacked_dataset = unpacked_dataset.rename( + group_ds = group_ds.rename( { k: v for k, v in [ ("rgfo_energy_step", "rgfo_esa_step"), ("nso_energy_step", "nso_esa_step"), ] - if k in unpacked_dataset + if k in group_ds } ) # These variables were added to the packet definition after 20260129, so they only - # exist in the unpacked dataset if packet_version > 1 + # exist in the dataset if packet_version > 1. # If they don't exist, initialize them with fill val arrays since they won't be # used in the NSO/RGFO masking logic but should still exist in l1a for SPDF # compliance/consistency. @@ -280,10 +273,8 @@ def l1a_lo_counters_aggregated( "nso_esa_step", ] for var in l1a_additional_vars: - if var not in unpacked_dataset: - unpacked_dataset[var] = np.full( - unpacked_dataset.sizes["epoch"], fill_value=np.nan - ) + if var not in group_ds: + group_ds[var] = np.full(group_ds.sizes["epoch"], fill_value=np.nan) # Carry over these variables from unpacked data to l1a_dataset l1a_carryover_vars = [ @@ -296,7 +287,7 @@ def l1a_lo_counters_aggregated( # Loop through them since we need to set their attrs too for var in l1a_carryover_vars: l1a_dataset[var] = xr.DataArray( - unpacked_dataset[var].values, + group_ds[var].values, dims=("epoch",), attrs=cdf_attrs.get_variable_attributes(var), ) diff --git a/imap_processing/codice/codice_l1a_lo_counters_singles.py b/imap_processing/codice/codice_l1a_lo_counters_singles.py index 82b96da252..b0b29c8b60 100644 --- a/imap_processing/codice/codice_l1a_lo_counters_singles.py +++ b/imap_processing/codice/codice_l1a_lo_counters_singles.py @@ -12,58 +12,56 @@ from imap_processing.codice.decompress import decompress from imap_processing.codice.utils import ( CoDICECompression, - ViewTabInfo, calculate_acq_time_per_step, get_codice_epoch_time, get_collapse_pattern_shape, - get_view_tab_info, - read_sci_lut, + get_view_tab_obj, ) from imap_processing.spice.time import met_to_ttj2000ns logger = logging.getLogger(__name__) -def l1a_lo_counters_singles(unpacked_dataset: xr.Dataset, lut_file: Path) -> xr.Dataset: +def l1a_lo_counters_singles( + group_ds: xr.Dataset, + lut_file: Path, + table_id: str, + view_id: int, + apid: int, + plan_id: int, + plan_step: int, +) -> xr.Dataset: """ - Process CoDICE Lo Counters singles L1A data. + Process a single table-ID group of CoDICE Lo Counters Singles L1A data. Parameters ---------- - unpacked_dataset : xarray.Dataset - Unpacked dataset from L0 packet file. + group_ds : xarray.Dataset + Dataset filtered to a single table_id. lut_file : Path Path to the LUT file for processing. + table_id : str + The table ID for this group. + view_id : int + View ID (uniform across the product). + apid : int + APID (uniform across the product). + plan_id : int + Plan ID (uniform across the product). + plan_step : int + Plan step (uniform across the product). Returns ------- xarray.Dataset - Processed L1A dataset for Hi Omni data. + Processed L1A dataset for input table-ID group. """ - # lookup in LUT table. - table_id = unpacked_dataset["table_id"].values[0] - view_id = unpacked_dataset["view_id"].values[0] - apid = unpacked_dataset["pkt_apid"].values[0] - plan_id = unpacked_dataset["plan_id"].values[0] - plan_step = unpacked_dataset["plan_step"].values[0] - logger.info( f"Processing species with - APID: {apid} / 0x{apid:X}, View ID: {view_id}, " f"Table ID: {table_id}, Plan ID: {plan_id}, Plan Step: {plan_step}" ) # ========== Get LUT Data =========== - # Read information from LUT - sci_lut_data = read_sci_lut(lut_file, table_id) - - view_tab_info = get_view_tab_info(sci_lut_data, view_id, apid) - view_tab_obj = ViewTabInfo( - apid=apid, - view_id=view_id, - sensor=view_tab_info["sensor"], - three_d_collapsed=view_tab_info["3d_collapse"], - collapse_table=view_tab_info["collapse_table"], - compression=view_tab_info["compression"], - ) + sci_lut_data, view_tab_obj = get_view_tab_obj(lut_file, table_id, view_id, apid) if view_tab_obj.sensor != 0: raise ValueError("Unsupported sensor ID for Hi processing.") @@ -93,8 +91,8 @@ def l1a_lo_counters_singles(unpacked_dataset: xr.Dataset, lut_file: Path) -> xr. compression_algorithm = CoDICECompression(view_tab_obj.compression) # Decompress data using byte count information from decommed data - binary_data_list = unpacked_dataset["data"].values - byte_count_list = unpacked_dataset["byte_count"].values + binary_data_list = group_ds["data"].values + byte_count_list = group_ds["byte_count"].values # The decompressed data in the shape of (epoch, n). Then reshape later. decompressed_data = [ @@ -120,13 +118,10 @@ def l1a_lo_counters_singles(unpacked_dataset: xr.Dataset, lut_file: Path) -> xr. half_spin_per_esa_step = np.concatenate( (np.array(half_spin_per_esa_step), np.full(pad_size, HALF_SPIN_FILLVAL)) ) - # TODO: Handle epoch dependent acquisition time and half spin per esa step - # For now, just tile the same array for all epochs. - # Eventually we may have data from a day where the LUT changed. If this is the - # case, we need to split the data by epoch and assign different acquisition times + # Each group shares the same table_id, so all epochs use the same LUT values. half_spin_per_esa_step = np.tile( np.asarray(half_spin_per_esa_step).astype(np.uint8), - (len(unpacked_dataset["acq_start_seconds"]), 1), + (len(group_ds["acq_start_seconds"]), 1), ) # Get acquisition time per esa step acquisition_time_per_step = calculate_acq_time_per_step( @@ -134,7 +129,7 @@ def l1a_lo_counters_singles(unpacked_dataset: xr.Dataset, lut_file: Path) -> xr. ) acquisition_time_per_step = np.tile( np.asarray(acquisition_time_per_step), - (len(unpacked_dataset["acq_start_seconds"]), 1), + (len(group_ds["acq_start_seconds"]), 1), ) # ========== Apply NSO/RGFO Masking =========== # After FSW changes on 20260129, The Lo L1A product contains variables that @@ -150,8 +145,8 @@ def l1a_lo_counters_singles(unpacked_dataset: xr.Dataset, lut_file: Path) -> xr. # For every energy after nso_half_spin, set data to fill values # For data before 20260129 ( packet_version <=1 ) set all data to NaN where # half_spin > nso_half_spin - packet_versions = unpacked_dataset["packet_version"].values - nso_half_spin = unpacked_dataset["nso_half_spin"].values + packet_versions = group_ds["packet_version"].values + nso_half_spin = group_ds["nso_half_spin"].values # TODO handle boundary days where the FSW changed halfway through the dataset. E.g # Some packet_version = 1 and some = 2 if packet_versions[0] <= 1: @@ -165,16 +160,14 @@ def l1a_lo_counters_singles(unpacked_dataset: xr.Dataset, lut_file: Path) -> xr. # nso_spin_sector and nso_esa_step for comparison. Shape (epoch, 1, 1) # to broadcast nso_spin_sector = ( - unpacked_dataset["nso_spin_sector"].values[:, np.newaxis, np.newaxis] % 12 + group_ds["nso_spin_sector"].values[:, np.newaxis, np.newaxis] % 12 ) # Mod 12 since spin sector is reported as 0-23 but we want to compare to # 0-11 bins in the data. # After modulo 12, we need to floor divide by 2 since the counters data has 6 # spin sector bins (2 spin sectors per bin). nso_spin_sector = nso_spin_sector // 2 # compare it to the 0-5 spin sector bins in the data - nso_esa_step = unpacked_dataset["nso_energy_step"].values[ - :, np.newaxis, np.newaxis - ] + nso_esa_step = group_ds["nso_energy_step"].values[:, np.newaxis, np.newaxis] num_esa_steps = counters_data.shape[1] num_spin_sectors = counters_data.shape[2] # Create arrays for spin sectors and esa steps to compare with nso values. @@ -224,9 +217,9 @@ def l1a_lo_counters_singles(unpacked_dataset: xr.Dataset, lut_file: Path) -> xr. # ========= Get Epoch Time Data =========== # Epoch center time and delta epoch_center, deltas = get_codice_epoch_time( - unpacked_dataset["acq_start_seconds"].values, - unpacked_dataset["acq_start_subseconds"].values, - unpacked_dataset["spin_period"].values, + group_ds["acq_start_seconds"].values, + group_ds["acq_start_subseconds"].values, + group_ds["spin_period"].values, view_tab_obj, ) @@ -310,7 +303,7 @@ def l1a_lo_counters_singles(unpacked_dataset: xr.Dataset, lut_file: Path) -> xr. # Add first few unique variables l1a_dataset["spin_period"] = xr.DataArray( - unpacked_dataset["spin_period"].values * constants.SPIN_PERIOD_CONVERSION, + group_ds["spin_period"].values * constants.SPIN_PERIOD_CONVERSION, dims=("epoch",), attrs=cdf_attrs.get_variable_attributes("spin_period"), ) @@ -325,7 +318,7 @@ def l1a_lo_counters_singles(unpacked_dataset: xr.Dataset, lut_file: Path) -> xr. attrs=cdf_attrs.get_variable_attributes("voltage_table", check_schema=False), ) l1a_dataset["data_quality"] = xr.DataArray( - unpacked_dataset["suspect"].values, + group_ds["suspect"].values, dims=("epoch",), attrs=cdf_attrs.get_variable_attributes("data_quality"), ) @@ -337,18 +330,18 @@ def l1a_lo_counters_singles(unpacked_dataset: xr.Dataset, lut_file: Path) -> xr. ), ) # Rename vars - unpacked_dataset = unpacked_dataset.rename( + group_ds = group_ds.rename( { k: v for k, v in [ ("rgfo_energy_step", "rgfo_esa_step"), ("nso_energy_step", "nso_esa_step"), ] - if k in unpacked_dataset + if k in group_ds } ) # These variables were added to the packet definition after 20260129, so they only - # exist in the unpacked dataset if packet_version > 1 + # exist in the dataset if packet_version > 1. # If they don't exist, initialize them with fill val arrays since they won't be # used in the NSO/RGFO masking logic but should still exist in l1a for SPDF # compliance/consistency. @@ -359,10 +352,8 @@ def l1a_lo_counters_singles(unpacked_dataset: xr.Dataset, lut_file: Path) -> xr. "nso_esa_step", ] for var in l1a_additional_vars: - if var not in unpacked_dataset: - unpacked_dataset[var] = np.full( - unpacked_dataset.sizes["epoch"], fill_value=np.nan - ) + if var not in group_ds: + group_ds[var] = np.full(group_ds.sizes["epoch"], fill_value=np.nan) # Carry over these variables from unpacked data to l1a_dataset l1a_carryover_vars = [ @@ -375,7 +366,7 @@ def l1a_lo_counters_singles(unpacked_dataset: xr.Dataset, lut_file: Path) -> xr. # Loop through them since we need to set their attrs too for var in l1a_carryover_vars: l1a_dataset[var] = xr.DataArray( - unpacked_dataset[var].values, + group_ds[var].values, dims=("epoch",), attrs=cdf_attrs.get_variable_attributes(var), ) diff --git a/imap_processing/codice/codice_l1a_lo_priority.py b/imap_processing/codice/codice_l1a_lo_priority.py index 61c5dd5f6e..34a32349a9 100644 --- a/imap_processing/codice/codice_l1a_lo_priority.py +++ b/imap_processing/codice/codice_l1a_lo_priority.py @@ -13,60 +13,57 @@ from imap_processing.codice.utils import ( CODICEAPID, CoDICECompression, - ViewTabInfo, calculate_acq_time_per_step, get_codice_epoch_time, get_collapse_pattern_shape, - get_view_tab_info, - read_sci_lut, + get_view_tab_obj, ) from imap_processing.spice.time import met_to_ttj2000ns logger = logging.getLogger(__name__) -def l1a_lo_priority(unpacked_dataset: xr.Dataset, lut_file: Path) -> xr.Dataset: +def l1a_lo_priority( + group_ds: xr.Dataset, + lut_file: Path, + table_id: str, + view_id: int, + apid: int, + plan_id: int, + plan_step: int, +) -> xr.Dataset: """ - Process CoDICE Lo Priority L1A data. + Process a single table-ID group of CoDICE Lo Priority L1A data. Parameters ---------- - unpacked_dataset : xarray.Dataset - Unpacked dataset from L0 packet file. + group_ds : xarray.Dataset + Dataset filtered to a single table_id. lut_file : Path Path to the LUT file for processing. + table_id : str + The table ID for this group. + view_id : int + View ID (uniform across the product). + apid : int + APID (uniform across the product). + plan_id : int + Plan ID (uniform across the product). + plan_step : int + Plan step (uniform across the product). Returns ------- xarray.Dataset - Processed L1A dataset for Hi Omni data. + Processed L1A dataset for input table-ID group. """ - # Get these values from unpacked data. These are used to - # lookup in LUT table. - table_id = unpacked_dataset["table_id"].values[0] - view_id = unpacked_dataset["view_id"].values[0] - apid = unpacked_dataset["pkt_apid"].values[0] - plan_id = unpacked_dataset["plan_id"].values[0] - plan_step = unpacked_dataset["plan_step"].values[0] - logger.info( f"Processing species with - APID: {apid} / 0x{apid:X}, View ID: {view_id}, " f"Table ID: {table_id}, Plan ID: {plan_id}, Plan Step: {plan_step}" ) # ========== Get LUT Data =========== - # Read information from LUT - sci_lut_data = read_sci_lut(lut_file, table_id) - - view_tab_info = get_view_tab_info(sci_lut_data, view_id, apid) - view_tab_obj = ViewTabInfo( - apid=apid, - view_id=view_id, - sensor=view_tab_info["sensor"], - three_d_collapsed=view_tab_info["3d_collapse"], - collapse_table=view_tab_info["collapse_table"], - compression=view_tab_info["compression"], - ) + sci_lut_data, view_tab_obj = get_view_tab_obj(lut_file, table_id, view_id, apid) if view_tab_obj.sensor != 0: raise ValueError("Unsupported sensor ID for Lo priority processing.") @@ -82,9 +79,9 @@ def l1a_lo_priority(unpacked_dataset: xr.Dataset, lut_file: Path) -> xr.Dataset: # ========= Get Epoch Time Data =========== # Epoch center time and delta epoch_center, deltas = get_codice_epoch_time( - unpacked_dataset["acq_start_seconds"].values, - unpacked_dataset["acq_start_subseconds"].values, - unpacked_dataset["spin_period"].values, + group_ds["acq_start_seconds"].values, + group_ds["acq_start_subseconds"].values, + group_ds["spin_period"].values, view_tab_obj, ) @@ -106,10 +103,10 @@ def l1a_lo_priority(unpacked_dataset: xr.Dataset, lut_file: Path) -> xr.Dataset: raise ValueError("Unsupported APID for Lo priority processing.") # Decompress data using byte count information from decommed data - binary_data_list = unpacked_dataset["data"].values - byte_count_list = unpacked_dataset["byte_count"].values + binary_data_list = group_ds["data"].values + byte_count_list = group_ds["byte_count"].values - packet_version = unpacked_dataset["packet_version"].values[0] + packet_version = group_ds["packet_version"].values[0] # The decompressed data in the shape of (epoch, n). Then reshape later. decompressed_data = [ np.frombuffer( @@ -157,13 +154,10 @@ def l1a_lo_priority(unpacked_dataset: xr.Dataset, lut_file: Path) -> xr.Dataset: (np.array(half_spin_per_esa_step), np.full(pad_size, HALF_SPIN_FILLVAL)) ) - # TODO: Handle epoch dependent acquisition time and half spin per esa step - # For now, just tile the same array for all epochs. - # Eventually we may have data from a day where the LUT changed. If this is the - # case, we need to split the data by epoch and assign different acquisition times + # Each group shares the same table_id, so all epochs use the same LUT values. half_spin_per_esa_step = np.tile( np.asarray(half_spin_per_esa_step).astype(np.uint8), - (len(unpacked_dataset["acq_start_seconds"]), 1), + (len(group_ds["acq_start_seconds"]), 1), ) # Get acquisition time per esa step acquisition_time_per_step = calculate_acq_time_per_step( @@ -171,7 +165,7 @@ def l1a_lo_priority(unpacked_dataset: xr.Dataset, lut_file: Path) -> xr.Dataset: ) acquisition_time_per_step = np.tile( np.asarray(acquisition_time_per_step), - (len(unpacked_dataset["acq_start_seconds"]), 1), + (len(group_ds["acq_start_seconds"]), 1), ) # ========== Apply NSO/RGFO Masking =========== # After FSW changes on 20260129, The Lo L1A product contains variables that @@ -188,8 +182,8 @@ def l1a_lo_priority(unpacked_dataset: xr.Dataset, lut_file: Path) -> xr.Dataset: # For every energy after nso_half_spin, set data to fill values # For data before 20260129 ( packet_version <=1 ) set all data to NaN where # half_spin > nso_half_spin - packet_versions = unpacked_dataset["packet_version"].values - nso_half_spin = unpacked_dataset["nso_half_spin"].values + packet_versions = group_ds["packet_version"].values + nso_half_spin = group_ds["nso_half_spin"].values # TODO handle boundary days where the FSW changed halfway through the dataset. E.g # Some packet_version = 1 and some = 2 if packet_versions[0] <= 1: @@ -206,11 +200,9 @@ def l1a_lo_priority(unpacked_dataset: xr.Dataset, lut_file: Path) -> xr.Dataset: # spin_sector dimension is half-spin indexed (0-11), so modulo 12 is # intentional to align packet NSO metadata with the data coordinates. nso_spin_sector = ( - unpacked_dataset["nso_spin_sector"].values[:, np.newaxis, np.newaxis] % 12 + group_ds["nso_spin_sector"].values[:, np.newaxis, np.newaxis] % 12 ) - nso_esa_step = unpacked_dataset["nso_energy_step"].values[ - :, np.newaxis, np.newaxis - ] + nso_esa_step = group_ds["nso_energy_step"].values[:, np.newaxis, np.newaxis] # Create arrays for spin sectors and esa steps to compare with nso values. # Shape (1, 1, spin_sector) and (1, esa_step, 1) spin_sectors = np.arange(num_spin_sectors)[np.newaxis, np.newaxis, :] @@ -327,7 +319,7 @@ def l1a_lo_priority(unpacked_dataset: xr.Dataset, lut_file: Path) -> xr.Dataset: ) # Add first few unique variables l1a_dataset["spin_period"] = xr.DataArray( - unpacked_dataset["spin_period"].values * constants.SPIN_PERIOD_CONVERSION, + group_ds["spin_period"].values * constants.SPIN_PERIOD_CONVERSION, dims=("epoch",), attrs=cdf_attrs.get_variable_attributes("spin_period"), ) @@ -342,7 +334,7 @@ def l1a_lo_priority(unpacked_dataset: xr.Dataset, lut_file: Path) -> xr.Dataset: attrs=cdf_attrs.get_variable_attributes("voltage_table", check_schema=False), ) l1a_dataset["data_quality"] = xr.DataArray( - unpacked_dataset["suspect"].values, + group_ds["suspect"].values, dims=("epoch",), attrs=cdf_attrs.get_variable_attributes("data_quality"), ) @@ -354,18 +346,18 @@ def l1a_lo_priority(unpacked_dataset: xr.Dataset, lut_file: Path) -> xr.Dataset: ), ) # Rename vars - unpacked_dataset = unpacked_dataset.rename( + group_ds = group_ds.rename( { k: v for k, v in [ ("rgfo_energy_step", "rgfo_esa_step"), ("nso_energy_step", "nso_esa_step"), ] - if k in unpacked_dataset + if k in group_ds } ) # These variables were added to the packet definition after 20260129, so they only - # exist in the unpacked dataset if packet_version > 1 + # exist in the dataset if packet_version > 1. # If they don't exist, initialize them with fill val arrays since they won't be # used in the NSO/RGFO masking logic but should still exist in l1a for SPDF # compliance/consistency. @@ -376,10 +368,8 @@ def l1a_lo_priority(unpacked_dataset: xr.Dataset, lut_file: Path) -> xr.Dataset: "nso_esa_step", ] for var in l1a_additional_vars: - if var not in unpacked_dataset: - unpacked_dataset[var] = np.full( - unpacked_dataset.sizes["epoch"], fill_value=np.nan - ) + if var not in group_ds: + group_ds[var] = np.full(group_ds.sizes["epoch"], fill_value=np.nan) # Carry over these variables from unpacked data to l1a_dataset l1a_carryover_vars = [ @@ -392,7 +382,7 @@ def l1a_lo_priority(unpacked_dataset: xr.Dataset, lut_file: Path) -> xr.Dataset: # Loop through them since we need to set their attrs too for var in l1a_carryover_vars: l1a_dataset[var] = xr.DataArray( - unpacked_dataset[var].values, + group_ds[var].values, dims=("epoch",), attrs=cdf_attrs.get_variable_attributes(var), ) diff --git a/imap_processing/codice/codice_l1a_lo_species.py b/imap_processing/codice/codice_l1a_lo_species.py index 93e1c2e6ca..01eb2fefde 100644 --- a/imap_processing/codice/codice_l1a_lo_species.py +++ b/imap_processing/codice/codice_l1a_lo_species.py @@ -18,60 +18,57 @@ from imap_processing.codice.utils import ( CODICEAPID, CoDICECompression, - ViewTabInfo, calculate_acq_time_per_step, get_codice_epoch_time, get_collapse_pattern_shape, - get_view_tab_info, - read_sci_lut, + get_view_tab_obj, ) from imap_processing.spice.time import met_to_ttj2000ns logger = logging.getLogger(__name__) -def l1a_lo_species(unpacked_dataset: xr.Dataset, lut_file: Path) -> xr.Dataset: # noqa: PLR0912 +def l1a_lo_species( # noqa: PLR0912 + group_ds: xr.Dataset, + lut_file: Path, + table_id: str, + view_id: int, + apid: int, + plan_id: int, + plan_step: int, +) -> xr.Dataset: """ - L1A processing code. + Process a single table-ID group of CoDICE Lo Species L1A data. Parameters ---------- - unpacked_dataset : xarray.Dataset - The decompressed and unpacked data from the packet file. - lut_file : pathlib.Path - Path to the LUT (Lookup Table) file used for processing. + group_ds : xarray.Dataset + Dataset filtered to a single table_id. + lut_file : Path + Path to the LUT file for processing. + table_id : str + The table ID for this group. + view_id : int + View ID (uniform across the product). + apid : int + APID (uniform across the product). + plan_id : int + Plan ID (uniform across the product). + plan_step : int + Plan step (uniform across the product). Returns ------- xarray.Dataset - The processed L1A dataset for the given species product. + The processed L1A dataset for input table-ID group. """ - # Get these values from unpacked data. These are used to - # lookup in LUT table. - table_id = unpacked_dataset["table_id"].values[0] - view_id = unpacked_dataset["view_id"].values[0] - apid = unpacked_dataset["pkt_apid"].values[0] - plan_id = unpacked_dataset["plan_id"].values[0] - plan_step = unpacked_dataset["plan_step"].values[0] - logger.info( f"Processing species with - APID: {apid} / 0x{apid:X}, View ID: {view_id}, " f"Table ID: {table_id}, Plan ID: {plan_id}, Plan Step: {plan_step}" ) # ========== Get LUT Data =========== - # Read information from LUT - sci_lut_data = read_sci_lut(lut_file, table_id) - - view_tab_info = get_view_tab_info(sci_lut_data, view_id, apid) - view_tab_obj = ViewTabInfo( - apid=apid, - view_id=view_id, - sensor=view_tab_info["sensor"], - three_d_collapsed=view_tab_info["3d_collapse"], - collapse_table=view_tab_info["collapse_table"], - compression=view_tab_info["compression"], - ) + sci_lut_data, view_tab_obj = get_view_tab_obj(lut_file, table_id, view_id, apid) if view_tab_obj.sensor != 0: raise ValueError("Unsupported sensor ID for Lo species processing.") @@ -106,7 +103,7 @@ def l1a_lo_species(unpacked_dataset: xr.Dataset, lut_file: Path) -> xr.Dataset: # are referencing is actually data that we want to toss out and fill with # fill vals. This only affects data before the LUT was updated # (table_id 3978152295). - if table_id <= 3978152295: + if int(table_id) <= 3978152295: actual_species_names = [ "junk" if name == "cnoplus" else name for name in actual_species_names ] @@ -127,10 +124,8 @@ def l1a_lo_species(unpacked_dataset: xr.Dataset, lut_file: Path) -> xr.Dataset: compression_algorithm = CoDICECompression(view_tab_obj.compression) # Decompress data using byte count information from decommed data - binary_data_list = unpacked_dataset["data"].values - byte_count_list = unpacked_dataset["byte_count"].values - - # The decompressed data in the shape of (epoch, n). Then reshape later. + binary_data_list = group_ds["data"].values + byte_count_list = group_ds["byte_count"].values decompressed_data = [ decompress( packet_data[:byte_count], @@ -168,23 +163,20 @@ def l1a_lo_species(unpacked_dataset: xr.Dataset, lut_file: Path) -> xr.Dataset: acquisition_time_per_step = calculate_acq_time_per_step( sci_lut_data["lo_stepping_tab"] ) - # Get acquisition time per esa step - # TODO: Handle epoch dependent acquisition time and half spin per esa step - # For now, just tile the same array for all epochs. - # Eventually we may have data from a day where the LUT changed. If this is the - # case, we need to split the data by epoch and assign different acquisition times + # Get acquisition time per esa step. Each group shares the same table_id, so + # all epochs within the group use the same LUT values. half_spin_per_esa_step = np.tile( np.asarray( half_spin_per_esa_step, ).astype(np.uint8), - (len(unpacked_dataset["acq_start_seconds"]), 1), + (len(group_ds["acq_start_seconds"]), 1), ) acquisition_time_per_step = np.tile( np.asarray(acquisition_time_per_step), - (len(unpacked_dataset["acq_start_seconds"]), 1), + (len(group_ds["acq_start_seconds"]), 1), ) # For every energy after nso_half_spin, set data to fill values - nso_half_spin = unpacked_dataset["nso_half_spin"].values + nso_half_spin = group_ds["nso_half_spin"].values nso_mask = (half_spin_per_esa_step >= nso_half_spin[:, np.newaxis]) | ( half_spin_per_esa_step == HALF_SPIN_FILLVAL ) @@ -208,9 +200,9 @@ def l1a_lo_species(unpacked_dataset: xr.Dataset, lut_file: Path) -> xr.Dataset: # ========= Get Epoch Time Data =========== # Epoch center time and delta epoch_center, deltas = get_codice_epoch_time( - unpacked_dataset["acq_start_seconds"].values, - unpacked_dataset["acq_start_subseconds"].values, - unpacked_dataset["spin_period"].values, + group_ds["acq_start_seconds"].values, + group_ds["acq_start_subseconds"].values, + group_ds["spin_period"].values, view_tab_obj, ) @@ -288,7 +280,7 @@ def l1a_lo_species(unpacked_dataset: xr.Dataset, lut_file: Path) -> xr.Dataset: ) # Add first few unique variables l1a_dataset["spin_period"] = xr.DataArray( - unpacked_dataset["spin_period"].values * constants.SPIN_PERIOD_CONVERSION, + group_ds["spin_period"].values * constants.SPIN_PERIOD_CONVERSION, dims=("epoch",), attrs=cdf_attrs.get_variable_attributes("spin_period"), ) @@ -303,7 +295,7 @@ def l1a_lo_species(unpacked_dataset: xr.Dataset, lut_file: Path) -> xr.Dataset: attrs=cdf_attrs.get_variable_attributes("voltage_table", check_schema=False), ) l1a_dataset["data_quality"] = xr.DataArray( - unpacked_dataset["suspect"].values, + group_ds["suspect"].values, dims=("epoch",), attrs=cdf_attrs.get_variable_attributes("data_quality"), ) @@ -315,14 +307,14 @@ def l1a_lo_species(unpacked_dataset: xr.Dataset, lut_file: Path) -> xr.Dataset: ), ) # Rename vars - unpacked_dataset = unpacked_dataset.rename( + group_ds = group_ds.rename( { k: v for k, v in [ ("rgfo_energy_step", "rgfo_esa_step"), ("nso_energy_step", "nso_esa_step"), ] - if k in unpacked_dataset + if k in group_ds } ) # These variables were added to the packet definition after 20260129, so they only @@ -337,10 +329,8 @@ def l1a_lo_species(unpacked_dataset: xr.Dataset, lut_file: Path) -> xr.Dataset: "nso_esa_step", ] for var in l1a_additional_vars: - if var not in unpacked_dataset: - unpacked_dataset[var] = np.full( - unpacked_dataset.sizes["epoch"], fill_value=np.nan - ) + if var not in group_ds: + group_ds[var] = np.full(group_ds.sizes["epoch"], fill_value=np.nan) # Carry over these variables from unpacked data to l1a_dataset l1a_carryover_vars = [ @@ -354,7 +344,7 @@ def l1a_lo_species(unpacked_dataset: xr.Dataset, lut_file: Path) -> xr.Dataset: # Loop through them since we need to set their attrs too for var in l1a_carryover_vars: l1a_dataset[var] = xr.DataArray( - unpacked_dataset[var].values, + group_ds[var].values, dims=("epoch",), attrs=cdf_attrs.get_variable_attributes(var), ) diff --git a/imap_processing/codice/utils.py b/imap_processing/codice/utils.py index 200d86b2e3..0a38b34b53 100644 --- a/imap_processing/codice/utils.py +++ b/imap_processing/codice/utils.py @@ -6,11 +6,13 @@ """ import json +from collections.abc import Callable from dataclasses import dataclass from enum import IntEnum from pathlib import Path import numpy as np +import xarray as xr from imap_processing.codice import constants @@ -155,6 +157,98 @@ def get_view_tab_info(json_data: dict, view_id: int, apid: int) -> dict: return view_tab +def get_view_tab_obj( + lut_file: Path, table_id: str, view_id: int, apid: int +) -> tuple[dict, "ViewTabInfo"]: + """ + Read the SCI-LUT and build a ViewTabInfo for the given table ID. + + Parameters + ---------- + lut_file : pathlib.Path + Path to the SCI-LUT JSON file. + table_id : str + Table identifier to extract from the JSON. + view_id : int + The view ID from the packet. + apid : int + The APID from the packet. + + Returns + ------- + tuple[dict, ViewTabInfo] + The SCI-LUT data dict and a populated ViewTabInfo for the given table ID. + """ + sci_lut_data = read_sci_lut(lut_file, table_id) + view_tab_info = get_view_tab_info(sci_lut_data, view_id, apid) + view_tab_obj = ViewTabInfo( + apid=apid, + view_id=view_id, + sensor=view_tab_info["sensor"], + three_d_collapsed=view_tab_info["3d_collapse"], + collapse_table=view_tab_info["collapse_table"], + compression=view_tab_info["compression"], + ) + return sci_lut_data, view_tab_obj + + +def process_by_table_id( + unpacked_dataset: xr.Dataset, + lut_file: Path, + process_fn: Callable[..., xr.Dataset], +) -> xr.Dataset: + """ + Split dataset by unique table_id values, process each group, and recombine. + + This is the shared wrapper logic used by all non-DE/NHK L1A processing + functions. It extracts the fields that are uniform across a packet stream + (``view_id``, ``apid``, ``plan_id``, ``plan_step``), iterates over every + unique ``table_id`` found in the dataset, filters to that group via + ``isel``, calls *process_fn* for each group, and finally concatenates the + results sorted by epoch. + + Parameters + ---------- + unpacked_dataset : xarray.Dataset + Full unpacked dataset from the L0 packet file. + lut_file : pathlib.Path + Path to the SCI-LUT JSON file passed through to *process_fn*. + process_fn : Callable + The private ``_process_xxx`` function to call for each table_id group. + It must accept the signature + ``(group_ds, lut_file, table_id, view_id, apid, plan_id, plan_step)`` + and return an ``xr.Dataset``. + + Returns + ------- + xarray.Dataset + Combined L1A dataset sorted by epoch. + """ + view_id = unpacked_dataset["view_id"].values[0] + apid = unpacked_dataset["pkt_apid"].values[0] + plan_id = unpacked_dataset["plan_id"].values[0] + plan_step = unpacked_dataset["plan_step"].values[0] + + unique_table_ids = np.unique(unpacked_dataset["table_id"].values) + processed = [ + process_fn( + unpacked_dataset.isel( + epoch=unpacked_dataset["table_id"].values == table_id + ), + lut_file, + table_id, + view_id, + apid, + plan_id, + plan_step, + ) + for table_id in unique_table_ids + ] + if len(processed) == 1: + return processed[0] + return xr.concat(processed, dim="epoch").sortby("epoch") + + def get_collapse_pattern_shape( json_data: dict, sensor_id: int, collapse_table_id: int ) -> tuple[int, ...]: @@ -384,16 +478,6 @@ def calculate_acq_time_per_step( np.ndarray Array of acquisition times per step of shape (num_esa_steps,). """ - # TODO: Handle time-varying num_steps_data length - # The num_steps_data length can change over time (e.g., 6 → 3 steps) and is not - # constant. E.g. at a day where the LUT changes we need to handle that. Update the - # computation to: - # Use the actual length of num_steps_data at each point in time instead of - # assuming a constant value - # - Make the calculation time-varying with epoch dependency - # - Ensure values are divided by their corresponding epoch in L1B processing - # - These tunable values are used to calculate acquisition time per step - # These tunable values are used to calculate acquisition time per step tunable_values = low_stepping_tab["tunable_values"] diff --git a/imap_processing/ialirt/l0/process_codice.py b/imap_processing/ialirt/l0/process_codice.py index aa13b1a05c..1675acd664 100644 --- a/imap_processing/ialirt/l0/process_codice.py +++ b/imap_processing/ialirt/l0/process_codice.py @@ -22,6 +22,7 @@ get_geometric_factor_lut, process_lo_species_intensity, ) +from imap_processing.codice.utils import process_by_table_id from imap_processing.ialirt.utils.grouping import ( _populate_instrument_header_items, find_groups, @@ -473,7 +474,7 @@ def process_codice( cod_lo_dataset = create_xarray_dataset( cod_lo_science_values, cod_lo_metadata_values, "lo" ) - l1a_lo = l1a_lo_species(cod_lo_dataset, l1a_lut_path) + l1a_lo = process_by_table_id(cod_lo_dataset, l1a_lut_path, l1a_lo_species) l1b_lo = cast( xr.Dataset, convert_to_rates( diff --git a/imap_processing/tests/codice/test_codice_l1a.py b/imap_processing/tests/codice/test_codice_l1a.py index 646058fedb..105ff30073 100644 --- a/imap_processing/tests/codice/test_codice_l1a.py +++ b/imap_processing/tests/codice/test_codice_l1a.py @@ -17,10 +17,9 @@ from imap_processing import imap_module_directory from imap_processing.cdf.utils import load_cdf, write_cdf -from imap_processing.codice import constants from imap_processing.codice.codice_l1a import process_l1a from imap_processing.codice.codice_l1a_de import l1a_direct_event -from imap_processing.codice.utils import CODICEAPID, read_sci_lut +from imap_processing.codice.utils import CODICEAPID from imap_processing.tests.codice.conftest import ( VALIDATION_FILE_DATE, VALIDATION_FILE_VERSION, @@ -179,36 +178,6 @@ def test_lo_counters_singles(mock_get_file_paths, codice_lut_path): ) -@patch("imap_data_access.processing_input.ProcessingInputCollection.get_file_paths") -@patch("imap_processing.codice.codice_l1a_lo_counters_singles.read_sci_lut") -def test_lo_counters_singles_mock_esa_steps( - mock_read_sci_lut, mock_get_file_paths, codice_lut_path -): - """Tests lo-counters-singles with mocked ESA steps.""" - mock_get_file_paths.side_effect = [ - codice_lut_path(descriptor="lo-counters-singles", data_type="l0"), - codice_lut_path(descriptor="l1a-sci-lut"), - ] - - # Load the sci lut - sci_lut = read_sci_lut( - codice_lut_path(descriptor="l1a-sci-lut")[0], table_id="3952862729" - ) - - # Modify the lo_stepping_tab to have fewer values - # This is expected in future sci luts - sci_lut["lo_stepping_tab"]["row_number"]["data"] = sci_lut["lo_stepping_tab"][ - "row_number" - ]["data"][:100] - - mock_read_sci_lut.return_value = sci_lut - - processed_data = process_l1a(dependency=ProcessingInputCollection())[0] - # Although the sci lut had fewer ESA steps, the processing should still - # produce the full number of ESA steps defined in constants. - assert processed_data.sizes["esa_step"] == constants.NUM_ESA_STEPS - - @patch("imap_data_access.processing_input.ProcessingInputCollection.get_file_paths") def test_lo_sw_priority(mock_get_file_paths, codice_lut_path): """Tests lo-sw-priority.""" diff --git a/imap_processing/tests/ialirt/unit/test_process_codice.py b/imap_processing/tests/ialirt/unit/test_process_codice.py index 28ac8109f8..9410d1183f 100644 --- a/imap_processing/tests/ialirt/unit/test_process_codice.py +++ b/imap_processing/tests/ialirt/unit/test_process_codice.py @@ -26,6 +26,7 @@ process_lo_species_intensity, ) from imap_processing.codice.decompress import decompress +from imap_processing.codice.utils import process_by_table_id from imap_processing.ialirt.l0.process_codice import ( COD_HI_COUNTER, COD_LO_COUNTER, @@ -558,7 +559,7 @@ def test_group_and_decompress_ialirt_cod_lo( np.testing.assert_array_equal(decompressed_values, test_decom_data_array) dataset = create_xarray_dataset(science_values, metadata_values, "lo") - result = l1a_lo_species(dataset, l1a_lut_path) + result = process_by_table_id(dataset, l1a_lut_path, l1a_lo_species) expected_species = [ "heplusplus",