From 4b29a49403f148ce397bfb284568071797b741eb Mon Sep 17 00:00:00 2001 From: Girum Bizuayehu Date: Thu, 4 Apr 2024 10:13:24 +0300 Subject: [PATCH 01/11] Initial commit for seasonal_production_performance pipeline see HEA-169 --- pipelines/__init__.py | 18 ++ pipelines/assets/base.py | 22 +- .../assets/seasonal_production_performance.py | 198 ++++++++++++++++++ 3 files changed, 229 insertions(+), 9 deletions(-) create mode 100644 pipelines/assets/seasonal_production_performance.py diff --git a/pipelines/__init__.py b/pipelines/__init__.py index 56fb37f6..f3d0a10e 100644 --- a/pipelines/__init__.py +++ b/pipelines/__init__.py @@ -31,6 +31,16 @@ other_cash_income_label_dataframe, summary_other_cash_income_labels_dataframe, ) +from .assets.seasonal_production_performance import ( + all_hazards_labels_dataframe, + all_seasonal_production_performance_labels_dataframe, + hazards_dataframe, + hazards_dataframe_label_dataframe, + seasonal_production_performance_dataframe, + seasonal_production_performance_dataframe_label_dataframe, + summary_hazards_labels_dataframe, + summary_seasonal_production_performance_labels_dataframe, +) from .assets.wealth_characteristic import ( all_wealth_characteristic_labels_dataframe, summary_wealth_characteristic_labels_dataframe, @@ -91,6 +101,14 @@ consolidated_fixture, uploaded_baseline, imported_baseline, + seasonal_production_performance_dataframe, + hazards_dataframe, + seasonal_production_performance_dataframe_label_dataframe, + all_seasonal_production_performance_labels_dataframe, + summary_seasonal_production_performance_labels_dataframe, + hazards_dataframe_label_dataframe, + all_hazards_labels_dataframe, + summary_hazards_labels_dataframe, ], jobs=[update_metadata, import_baseline_from_fixture], resources={ diff --git a/pipelines/assets/base.py b/pipelines/assets/base.py index a3dbf734..bc65a07c 100644 --- a/pipelines/assets/base.py +++ b/pipelines/assets/base.py @@ -47,6 +47,10 @@ "groupe de richesse": "fr", "groupe socio-economique": "fr", "group socio-economique": "fr", + "year of harvest": "en", + "production year": "en", + "year": "en", + "année": "fr", } # List of labels that indicate the start of the summary columns from row 3 in the Data, Data, and Data3 worksheets @@ -58,6 +62,8 @@ "BASE DE RÉFÉRENAE", "range", "interval", + "Average", # For timeline sheet + "MOYENNE", ] @@ -320,10 +326,7 @@ def get_bss_dataframe( # The requested worksheet does not exist in the file return Output( pd.DataFrame(), - metadata={ - "worksheet": bss_sheet, - "row_count": "Worksheet not present in file", - }, + metadata={"worksheet": bss_sheet, "row_count": "Worksheet not present in file", "datapoint_count": 0}, ) # Use a 1-based index to match the Excel Row Number @@ -331,15 +334,16 @@ def get_bss_dataframe( # Set the column names to match Excel df.columns = [get_column_letter(col + 1) for col in df.columns] - # Find the last column before the summary column, which is in row 3 - end_col = get_index(SUMMARY_LABELS, df.loc[3], offset=-1) + # Find the last column before the summary column, which is in row 3 for the WB, data sheets and 4 for timeline sheet + row = 4 if bss_sheet == "Timeline" else 3 + end_col = get_index(SUMMARY_LABELS, df.loc[row], offset=-1) if end_col == df.columns[-1]: # Need the last row because get_index has offset=-1 raise ValueError(f'No cell containing any of the summary strings: {", ".join(SUMMARY_LABELS)}') if not num_summary_cols: # If the number of summary columns wasn't specified, then assume that # there is one summary column for each wealth category, from row 3. - num_summary_cols = df.loc[3, "B":end_col].dropna().nunique() + num_summary_cols = df.loc[row, "B":end_col].dropna().nunique() end_col = df.columns[df.columns.get_loc(end_col) + num_summary_cols] # Find the row index of the start of the Livelihood Activities or Wealth Group Characteristic Values @@ -360,8 +364,8 @@ def get_bss_dataframe( # Find the last row that contains a label end_row = df.index[df["A"].notna()][-1] - # Find the language based on the value in cell A3 - lang = LANGS[df.loc[3, "A"].strip().lower()] + # Find the language based on the value in cell A3, or A4 + lang = LANGS[df.loc[row, "A"].strip().lower()] # Filter to just the Wealth Group header rows and the Livelihood Activities df = pd.concat([df.loc[header_rows, :end_col], df.loc[start_row:end_row, :end_col]]) diff --git a/pipelines/assets/seasonal_production_performance.py b/pipelines/assets/seasonal_production_performance.py new file mode 100644 index 00000000..84f2cd7a --- /dev/null +++ b/pipelines/assets/seasonal_production_performance.py @@ -0,0 +1,198 @@ +import os + +import django +import pandas as pd +from dagster import AssetExecutionContext, Output, asset + +from ..configs import BSSMetadataConfig +from ..partitions import bss_files_partitions_def, bss_instances_partitions_def +from .base import ( + get_all_bss_labels_dataframe, + get_bss_dataframe, + get_bss_label_dataframe, + get_summary_bss_label_dataframe, +) + +# set the default Django settings module +os.environ.setdefault("DJANGO_SETTINGS_MODULE", "hea.settings.production") + +# Configure Django with our custom settings before importing any Django classes +django.setup() + +# Indexes of header rows in the Data3 dataframe (wealth_group_category, district, village) +HEADER_ROWS = [ + 4, +] + + +@asset(partitions_def=bss_files_partitions_def) +def seasonal_production_performance_dataframe(config: BSSMetadataConfig, corrected_files) -> Output[pd.DataFrame]: + """ + DataFrame asset for seasonal production performance from a BSS in 'Timeline' sheet + """ + return get_bss_dataframe( + config, + corrected_files, + "Timeline", + start_strings=["Year of harvest", "year", "Année", "Production year"], + end_strings=[ + "Chronic hazards:", + "Chronic hazards (every year):", + "Alás croniques", + "Alás chroniques", + "Aléas croniques", + "Aléas/ risques - ranking 1 à 8", + "Aleás croniques", + "Aléas chroniques", + ], + header_rows=HEADER_ROWS, + num_summary_cols=1, + ) + + +@asset(partitions_def=bss_files_partitions_def) +def seasonal_production_performance_dataframe_label_dataframe( + context: AssetExecutionContext, + config: BSSMetadataConfig, + seasonal_production_performance_dataframe: pd.DataFrame, +) -> Output[pd.DataFrame]: + """ + Dataframe of Seasonal Production Perf (Timeline) Label References + """ + return get_bss_label_dataframe( + context, + config, + seasonal_production_performance_dataframe, + "seasonal_production_performance_dataframe", + len(HEADER_ROWS), + ) + + +@asset(io_manager_key="dataframe_csv_io_manager") +def all_seasonal_production_performance_labels_dataframe( + config: BSSMetadataConfig, seasonal_production_performance_dataframe_label_dataframe: dict[str, pd.DataFrame] +) -> Output[pd.DataFrame]: + """ + Combined dataframe of the seasonal production performance labels in use across all BSSs. + """ + return get_all_bss_labels_dataframe(config, seasonal_production_performance_dataframe_label_dataframe) + + +@asset(io_manager_key="dataframe_csv_io_manager") +def summary_seasonal_production_performance_labels_dataframe( + config: BSSMetadataConfig, all_seasonal_production_performance_labels_dataframe: pd.DataFrame +) -> Output[pd.DataFrame]: + """ + Summary of the seasonal production performance labels in use across all BSSs. + """ + return get_summary_bss_label_dataframe(config, all_seasonal_production_performance_labels_dataframe) + + +@asset(partitions_def=bss_instances_partitions_def, io_manager_key="json_io_manager") +def seasonal_production_performance_instances( + context: AssetExecutionContext, + config: BSSMetadataConfig, + completed_bss_metadata, + seasonal_production_performance_dataframe, +) -> Output[dict]: + """ + Seasonal Production Performance instances extracted from the Timeline sheet of the BSS. + """ + df = seasonal_production_performance_dataframe + # The last column is the average, let us drop it + df = df.drop(df.columns[-1], axis=1) + df = df.melt(id_vars="Year of harvest", var_name="community", value_name="seasonal_performance") + df["Year"] = pd.to_datetime(df["Year"], errors="coerce") + + # Function to extract start and end dates + def extract_dates(year): + if pd.notna(year): + if isinstance(year, int): + start_date = pd.Timestamp(year, 1, 1) + end_date = pd.Timestamp(year, 12, 31) + elif isinstance(year, pd.Timestamp): + start_date = year + end_date = year + pd.offsets.DateOffset(years=1) - pd.offsets.Day(1) + else: # Assume it's a string with a range + start_year, end_year = map(int, year.split(" - ")) + start_date = pd.Timestamp(start_year, 1, 1) + end_date = pd.Timestamp(end_year, 12, 31) + return start_date, end_date + else: + return None, None + + # Apply the function to create new 'Start Date' and 'End Date' columns + df[["performance_year_start_date", "performance_year_end_date"]] = df["Year"].apply( + lambda x: pd.Series(extract_dates(x)) + ) + # TODO: agree on season default value, cover other cases for the year and return the expected dict + + +@asset(partitions_def=bss_files_partitions_def) +def hazards_dataframe(config: BSSMetadataConfig, corrected_files) -> Output[pd.DataFrame]: + """ + DataFrame asset for Hazards from a BSS in 'Timeline' sheet + """ + return get_bss_dataframe( + config, + corrected_files, + "Timeline", + start_strings=[ + "Chronic hazards:", + "Chronic hazards (every year):", + "Alás croniques", + "Alás chroniques", + "Aléas croniques", + "Aléas/ risques - ranking 1 à 8", + "Aleás croniques", + "Aléas chroniques", + ], + header_rows=HEADER_ROWS, + num_summary_cols=1, + ) + + +@asset(partitions_def=bss_files_partitions_def) +def hazards_dataframe_label_dataframe( + context: AssetExecutionContext, + config: BSSMetadataConfig, + hazards_dataframe: pd.DataFrame, +) -> Output[pd.DataFrame]: + """ + Dataframe of Hazards (Timeline) Label References + """ + return get_bss_label_dataframe(context, config, hazards_dataframe, "hazards_dataframe", len(HEADER_ROWS)) + + +@asset(io_manager_key="dataframe_csv_io_manager") +def all_hazards_labels_dataframe( + config: BSSMetadataConfig, hazards_dataframe_label_dataframe: dict[str, pd.DataFrame] +) -> Output[pd.DataFrame]: + """ + Combined dataframe of the Hazards labels in use across all BSSs. + """ + return get_all_bss_labels_dataframe(config, hazards_dataframe_label_dataframe) + + +@asset(io_manager_key="dataframe_csv_io_manager") +def summary_hazards_labels_dataframe( + config: BSSMetadataConfig, all_hazards_labels_dataframe: pd.DataFrame +) -> Output[pd.DataFrame]: + """ + Summary of the Hazards labels in use across all BSSs. + """ + return get_summary_bss_label_dataframe(config, all_hazards_labels_dataframe) + + +@asset(partitions_def=bss_files_partitions_def) +def events_dataframe(config: BSSMetadataConfig, corrected_files) -> Output[pd.DataFrame]: + """ + DataFrame asset for events from a BSS in 'Timeline' sheet + """ + return get_bss_dataframe( + config, + corrected_files, + "Timeline", + start_strings=["Periodic hazards (not every year):", "Aléas periodiques"], + header_rows=HEADER_ROWS, + ) From ec5263719f133d55d7864e5ce086c8eb0e61b04a Mon Sep 17 00:00:00 2001 From: Girum Bizuayehu Date: Thu, 4 Apr 2024 14:00:13 +0300 Subject: [PATCH 02/11] Rename some of the assets for naming consistency see HEA-169 --- pipelines/__init__.py | 12 ++++++------ pipelines/assets/seasonal_production_performance.py | 6 +++--- 2 files changed, 9 insertions(+), 9 deletions(-) diff --git a/pipelines/__init__.py b/pipelines/__init__.py index f3d0a10e..fbd39664 100644 --- a/pipelines/__init__.py +++ b/pipelines/__init__.py @@ -32,13 +32,13 @@ summary_other_cash_income_labels_dataframe, ) from .assets.seasonal_production_performance import ( - all_hazards_labels_dataframe, + all_hazard_labels_dataframe, all_seasonal_production_performance_labels_dataframe, + hazard_labels_dataframe, hazards_dataframe, - hazards_dataframe_label_dataframe, seasonal_production_performance_dataframe, seasonal_production_performance_dataframe_label_dataframe, - summary_hazards_labels_dataframe, + summary_hazard_labels_dataframe, summary_seasonal_production_performance_labels_dataframe, ) from .assets.wealth_characteristic import ( @@ -106,9 +106,9 @@ seasonal_production_performance_dataframe_label_dataframe, all_seasonal_production_performance_labels_dataframe, summary_seasonal_production_performance_labels_dataframe, - hazards_dataframe_label_dataframe, - all_hazards_labels_dataframe, - summary_hazards_labels_dataframe, + hazard_labels_dataframe, + all_hazard_labels_dataframe, + summary_hazard_labels_dataframe, ], jobs=[update_metadata, import_baseline_from_fixture], resources={ diff --git a/pipelines/assets/seasonal_production_performance.py b/pipelines/assets/seasonal_production_performance.py index 84f2cd7a..7c45c1b5 100644 --- a/pipelines/assets/seasonal_production_performance.py +++ b/pipelines/assets/seasonal_production_performance.py @@ -153,7 +153,7 @@ def hazards_dataframe(config: BSSMetadataConfig, corrected_files) -> Output[pd.D @asset(partitions_def=bss_files_partitions_def) -def hazards_dataframe_label_dataframe( +def hazard_labels_dataframe( context: AssetExecutionContext, config: BSSMetadataConfig, hazards_dataframe: pd.DataFrame, @@ -165,7 +165,7 @@ def hazards_dataframe_label_dataframe( @asset(io_manager_key="dataframe_csv_io_manager") -def all_hazards_labels_dataframe( +def all_hazard_labels_dataframe( config: BSSMetadataConfig, hazards_dataframe_label_dataframe: dict[str, pd.DataFrame] ) -> Output[pd.DataFrame]: """ @@ -175,7 +175,7 @@ def all_hazards_labels_dataframe( @asset(io_manager_key="dataframe_csv_io_manager") -def summary_hazards_labels_dataframe( +def summary_hazard_labels_dataframe( config: BSSMetadataConfig, all_hazards_labels_dataframe: pd.DataFrame ) -> Output[pd.DataFrame]: """ From 87a0788c55780548cfc76ca25725c5263606cb64 Mon Sep 17 00:00:00 2001 From: Girum Bizuayehu Date: Thu, 4 Apr 2024 14:04:53 +0300 Subject: [PATCH 03/11] Rename some of the assets for naming consistency see HEA-169 --- pipelines/assets/seasonal_production_performance.py | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/pipelines/assets/seasonal_production_performance.py b/pipelines/assets/seasonal_production_performance.py index 7c45c1b5..2276bf68 100644 --- a/pipelines/assets/seasonal_production_performance.py +++ b/pipelines/assets/seasonal_production_performance.py @@ -51,7 +51,7 @@ def seasonal_production_performance_dataframe(config: BSSMetadataConfig, correct @asset(partitions_def=bss_files_partitions_def) -def seasonal_production_performance_dataframe_label_dataframe( +def seasonal_production_performance_label_dataframe( context: AssetExecutionContext, config: BSSMetadataConfig, seasonal_production_performance_dataframe: pd.DataFrame, From 05a4b8a0892afb3e6c96f1cedd29cba378508d7c Mon Sep 17 00:00:00 2001 From: Girum Bizuayehu Date: Thu, 4 Apr 2024 14:35:10 +0300 Subject: [PATCH 04/11] Rename some of the assets for naming consistency see HEA-169 --- pipelines/__init__.py | 4 ++-- 1 file changed, 2 insertions(+), 2 deletions(-) diff --git a/pipelines/__init__.py b/pipelines/__init__.py index fbd39664..4414c590 100644 --- a/pipelines/__init__.py +++ b/pipelines/__init__.py @@ -37,7 +37,7 @@ hazard_labels_dataframe, hazards_dataframe, seasonal_production_performance_dataframe, - seasonal_production_performance_dataframe_label_dataframe, + seasonal_production_performance_label_dataframe, summary_hazard_labels_dataframe, summary_seasonal_production_performance_labels_dataframe, ) @@ -103,7 +103,7 @@ imported_baseline, seasonal_production_performance_dataframe, hazards_dataframe, - seasonal_production_performance_dataframe_label_dataframe, + seasonal_production_performance_label_dataframe, all_seasonal_production_performance_labels_dataframe, summary_seasonal_production_performance_labels_dataframe, hazard_labels_dataframe, From 418164a78a8d83b5290c6d66f76a35820734d537 Mon Sep 17 00:00:00 2001 From: Girum Bizuayehu Date: Thu, 4 Apr 2024 15:04:20 +0300 Subject: [PATCH 05/11] Correct asset name see HEA-169 --- pipelines/assets/seasonal_production_performance.py | 4 ++-- 1 file changed, 2 insertions(+), 2 deletions(-) diff --git a/pipelines/assets/seasonal_production_performance.py b/pipelines/assets/seasonal_production_performance.py index 2276bf68..5ee5a125 100644 --- a/pipelines/assets/seasonal_production_performance.py +++ b/pipelines/assets/seasonal_production_performance.py @@ -70,12 +70,12 @@ def seasonal_production_performance_label_dataframe( @asset(io_manager_key="dataframe_csv_io_manager") def all_seasonal_production_performance_labels_dataframe( - config: BSSMetadataConfig, seasonal_production_performance_dataframe_label_dataframe: dict[str, pd.DataFrame] + config: BSSMetadataConfig, seasonal_production_performance_label_dataframe: dict[str, pd.DataFrame] ) -> Output[pd.DataFrame]: """ Combined dataframe of the seasonal production performance labels in use across all BSSs. """ - return get_all_bss_labels_dataframe(config, seasonal_production_performance_dataframe_label_dataframe) + return get_all_bss_labels_dataframe(config, seasonal_production_performance_label_dataframe) @asset(io_manager_key="dataframe_csv_io_manager") From cedd4b3691d662edbc5ac7ee052e768715b3ec8f Mon Sep 17 00:00:00 2001 From: Girum Bizuayehu Date: Thu, 4 Apr 2024 15:22:28 +0300 Subject: [PATCH 06/11] Correct asset name references see HEA-169 --- pipelines/assets/seasonal_production_performance.py | 8 ++++---- 1 file changed, 4 insertions(+), 4 deletions(-) diff --git a/pipelines/assets/seasonal_production_performance.py b/pipelines/assets/seasonal_production_performance.py index 5ee5a125..912bf9ae 100644 --- a/pipelines/assets/seasonal_production_performance.py +++ b/pipelines/assets/seasonal_production_performance.py @@ -166,22 +166,22 @@ def hazard_labels_dataframe( @asset(io_manager_key="dataframe_csv_io_manager") def all_hazard_labels_dataframe( - config: BSSMetadataConfig, hazards_dataframe_label_dataframe: dict[str, pd.DataFrame] + config: BSSMetadataConfig, hazard_labels_dataframe: dict[str, pd.DataFrame] ) -> Output[pd.DataFrame]: """ Combined dataframe of the Hazards labels in use across all BSSs. """ - return get_all_bss_labels_dataframe(config, hazards_dataframe_label_dataframe) + return get_all_bss_labels_dataframe(config, hazard_labels_dataframe) @asset(io_manager_key="dataframe_csv_io_manager") def summary_hazard_labels_dataframe( - config: BSSMetadataConfig, all_hazards_labels_dataframe: pd.DataFrame + config: BSSMetadataConfig, all_hazard_labels_dataframe: pd.DataFrame ) -> Output[pd.DataFrame]: """ Summary of the Hazards labels in use across all BSSs. """ - return get_summary_bss_label_dataframe(config, all_hazards_labels_dataframe) + return get_summary_bss_label_dataframe(config, all_hazard_labels_dataframe) @asset(partitions_def=bss_files_partitions_def) From b23a730f8ea2ebab85bd93f8c721b55f69a80f77 Mon Sep 17 00:00:00 2001 From: Girum Bizuayehu Date: Thu, 25 Apr 2024 09:38:32 +0300 Subject: [PATCH 07/11] Add Manager classes and natural_key methods for Hazard and SeasonalProductionPerformance see HEA-169 --- apps/baseline/models.py | 51 +++++++++++++++++++++++++++++++++++++++++ 1 file changed, 51 insertions(+) diff --git a/apps/baseline/models.py b/apps/baseline/models.py index 7fee0845..ff446588 100644 --- a/apps/baseline/models.py +++ b/apps/baseline/models.py @@ -2100,6 +2100,24 @@ class Meta: verbose_name_plural = _("Market Prices") +class SeasonalProductionPerformanceManager(common_models.IdentifierManager): + def get_by_natural_key( + self, + code: str, + reference_year_end_date: str, + community_full_name: str, + season_name: str, + performance_year_end_date: str, + ): + return self.get( + community__livelihood_zone_baseline__livelihood_zone__code=code, + community__livelihood_zone_baseline__reference_year_end_date=reference_year_end_date, + community__full_name=community_full_name, + season__name_en=season_name, + performance_year_end_date=performance_year_end_date, + ) + + # @TODO Ask Save what to call this class SeasonalProductionPerformance(common_models.Model): """ @@ -2138,12 +2156,35 @@ class Performance(models.IntegerChoices): verbose_name=_("Seasonal Performance"), help_text=_("Rating of the seasonal production performance from Very Poor (1) to Very Good (5)"), ) + objects = SeasonalProductionPerformanceManager() + + def natural_key(self): + return ( + self.community.livelihood_zone_baseline.livelihood_zone.code, + self.community.livelihood_zone_baseline.reference_year_end_date.isoformat(), + self.community.full_name, + self.season.name_en, + self.performance_year_end_date.isoformat(), + ) class Meta: verbose_name = _("Seasonal Production Performance") verbose_name_plural = _("Seasonal Production Performance") +class HazardManager(common_models.IdentifierManager): + def get_by_natural_key( + self, code: str, reference_year_end_date: str, community_full_name: str, chronic_or_periodic: str, ranking: str + ): + return self.get( + community__livelihood_zone_baseline__livelihood_zone__code=code, + community__livelihood_zone_baseline__reference_year_end_date=reference_year_end_date, + community__full_name=community_full_name, + chronic_or_periodic=chronic_or_periodic, + ranking=int(ranking), + ) + + class Hazard(common_models.Model): """ A shock such as drought, flood, conflict or market disruption which is likely @@ -2184,11 +2225,21 @@ class HazardRanking(models.IntegerChoices): description = common_models.DescriptionField( max_length=255, verbose_name=_("Description of Event(s) and/or Response(s)") ) + objects = HazardManager() class Meta: verbose_name = _("Hazard") verbose_name_plural = _("Hazards") + def natural_key(self): + return ( + self.community.livelihood_zone_baseline.livelihood_zone.code, + self.community.livelihood_zone_baseline.reference_year_end_date.isoformat(), + self.community.full_name, + self.chronic_or_periodic, + str(self.ranking), + ) + class Event(common_models.Model): """ From 20a291dce49109eb6a91d1bbb8e5e20aa059230a Mon Sep 17 00:00:00 2001 From: Girum Bizuayehu Date: Thu, 25 Apr 2024 09:39:39 +0300 Subject: [PATCH 08/11] Add Hazards related assets to the seasonal_production_performance.py see HEA-169 --- .../assets/seasonal_production_performance.py | 262 ++++++++++++++++-- 1 file changed, 242 insertions(+), 20 deletions(-) diff --git a/pipelines/assets/seasonal_production_performance.py b/pipelines/assets/seasonal_production_performance.py index 912bf9ae..4a3805f9 100644 --- a/pipelines/assets/seasonal_production_performance.py +++ b/pipelines/assets/seasonal_production_performance.py @@ -1,8 +1,12 @@ +import json import os import django import pandas as pd -from dagster import AssetExecutionContext, Output, asset +from dagster import AssetExecutionContext, MetadataValue, Output, asset + +from baseline.models import Hazard # NOQA: E402 +from metadata.lookups import HazardCategoryLookup, SeasonNameLookup # NOQA: E402 from ..configs import BSSMetadataConfig from ..partitions import bss_files_partitions_def, bss_instances_partitions_def @@ -19,10 +23,25 @@ # Configure Django with our custom settings before importing any Django classes django.setup() -# Indexes of header rows in the Data3 dataframe (wealth_group_category, district, village) +# Indexes of header rows in the Timeline dataframe (year, community) HEADER_ROWS = [ 4, ] +YEAR_COLUMNS = ["Year of harvest", "year", "Year", "Année", "Production year", "Production Year"] + +HAZARD_TYPE_COLUMNS = { + Hazard.ChronicOrPeriodic.CHRONIC: [ + "Chronic hazards:", + "Chronic hazards (every year):", + "Alás croniques", + "Alás chroniques", + "Aléas croniques", + "Aléas/ risques - ranking 1 à 8", + "Aleás croniques", + "Aléas chroniques", + ], + Hazard.ChronicOrPeriodic.PERIODIC: ["Periodic hazards (not every year):"], +} @asset(partitions_def=bss_files_partitions_def) @@ -34,7 +53,7 @@ def seasonal_production_performance_dataframe(config: BSSMetadataConfig, correct config, corrected_files, "Timeline", - start_strings=["Year of harvest", "year", "Année", "Production year"], + start_strings=YEAR_COLUMNS, end_strings=[ "Chronic hazards:", "Chronic hazards (every year):", @@ -92,40 +111,138 @@ def summary_seasonal_production_performance_labels_dataframe( def seasonal_production_performance_instances( context: AssetExecutionContext, config: BSSMetadataConfig, - completed_bss_metadata, + baseline_instances, seasonal_production_performance_dataframe, ) -> Output[dict]: """ Seasonal Production Performance instances extracted from the Timeline sheet of the BSS. """ + partition_key = context.asset_partition_key_for_output() + result = { + "SeasonalProductionPerformance": seasonal_production_performance_dataframe.to_dict(orient="records"), + } + metadata = { + "num_seasonal_production_performances": 0, + } + if seasonal_production_performance_dataframe.empty: + # We don't have a Timeline sheet with seasonal production performance for this BSS + # so prepare an empty dataframe with 0 counts + metadata["message"] = ( + f"Empty seasonal production encountered for partition {partition_key}, due to missing timeline sheet" + ) + return Output( + result, + metadata=metadata, + ) df = seasonal_production_performance_dataframe + baseline_df = baseline_instances + reference_year_end_date = baseline_df["LivelihoodZoneBaseline"][0]["reference_year_end_date"] + reference_year_start_date = baseline_df["LivelihoodZoneBaseline"][0]["reference_year_start_date"] + + df = df.reset_index(drop=True) # The last column is the average, let us drop it df = df.drop(df.columns[-1], axis=1) - df = df.melt(id_vars="Year of harvest", var_name="community", value_name="seasonal_performance") - df["Year"] = pd.to_datetime(df["Year"], errors="coerce") + # Make the column names the first row + df.columns = df.iloc[0] + df = df.drop([0, 1]) + df = df.reset_index(drop=True) + + def get_year_column_name(df): + for column_name in YEAR_COLUMNS: + if column_name in df.columns: + return column_name + raise ValueError( + "No candidate columns found for the year from %s, current columns %s" % (YEAR_COLUMNS, df.columns) + ) + + # Get the name of the first matching year column + year_column_name = get_year_column_name(df) + df = df.rename(columns={year_column_name: "year"}) + df["year"] = df["year"].astype(str) + + df = df.melt(id_vars="year", var_name="community", value_name="seasonal_performance") + + columns = ["year", "community", "seasonal_performance"] + df = df.dropna(subset=columns) + # Some BSSs may contain the Timeline sheet but the entries for seasonal performance could just be empty + if df.empty: + metadata["message"] = f"Empty seasonal production encountered for partition {partition_key}" + return Output( + result, + metadata=metadata, + ) + df = df[df[columns].apply(lambda x: x.str.strip()).all(axis=1)] # Function to extract start and end dates def extract_dates(year): if pd.notna(year): - if isinstance(year, int): - start_date = pd.Timestamp(year, 1, 1) - end_date = pd.Timestamp(year, 12, 31) - elif isinstance(year, pd.Timestamp): - start_date = year - end_date = year + pd.offsets.DateOffset(years=1) - pd.offsets.Day(1) - else: # Assume it's a string with a range - start_year, end_year = map(int, year.split(" - ")) - start_date = pd.Timestamp(start_year, 1, 1) - end_date = pd.Timestamp(end_year, 12, 31) - return start_date, end_date + if len(year) == 4: # Check if it's a single year value + start_date = f"{year}-{reference_year_start_date.split('-')[1]}-01" + end_date = f"{int(year) + 1}-{reference_year_end_date.split('-')[1]}-01" + else: + if "/" in year: + start_year, end_year = map(str, year.split("/")) + elif "-" in year: + start_year, end_year = map(str, year.split("-")) + elif ":" in year: + start_year, end_year = map(str, year.split(":")) + elif " " in year: + start_year, end_year = map(str, year.split()) + else: + raise ValueError(f"Invalid year format {str(year)}") + # check if the year format provided is with 2 or 4 digits + if end_year and len(end_year) == 2: + end_year = start_year[:2] + end_year + + start_date = f"({int(start_year)}-{int(reference_year_start_date.split(' - ')[1])}-01)" + end_date = f"({int(end_year)}-{int(reference_year_start_date.split(' - ')[1])}-01)" else: - return None, None + raise ValueError("Year is not recognized") + return start_date, end_date # Apply the function to create new 'Start Date' and 'End Date' columns - df[["performance_year_start_date", "performance_year_end_date"]] = df["Year"].apply( + df[["performance_year_start_date", "performance_year_end_date"]] = df["year"].apply( lambda x: pd.Series(extract_dates(x)) ) - # TODO: agree on season default value, cover other cases for the year and return the expected dict + # Add season for a season Lookup, a default season is Harvest + # We may need to update this for some BSS that my have season + seasonnamelookup = SeasonNameLookup() + + def get_season(): + return [seasonnamelookup.get("Harvest", country_id=baseline_df["LivelihoodZone"][0]["country_id"])] + + df["season"] = "Harvest" + df["season"] = df["season"].apply(lambda c: get_season()) + + # Create function for looking for the community + def get_community(community): + for community_dict in baseline_df["Community"]: + if community_dict["name"] == community: + return [ + community_dict["livelihood_zone_baseline"][0], + baseline_df["LivelihoodZoneBaseline"][0]["reference_year_end_date"], + community_dict["full_name"], + ] + return None + # raise ValueError( + # "Unable to get community %s, perhaps that is wrongly spelled:" % (community) + # ) + + df["community"] = df["community"].apply(lambda c: get_community(c)) + df = df.dropna(subset="community") + seasonal_performances = df.to_dict(orient="records") + result = { + "SeasonalProductionPerformance": seasonal_performances, + } + metadata = { + "num_seasonal_production_performances": len(seasonal_performances), + "preview": MetadataValue.md(f"```json\n{json.dumps(result, indent=4)}\n```"), + } + + return Output( + result, + metadata=metadata, + ) @asset(partitions_def=bss_files_partitions_def) @@ -152,6 +269,111 @@ def hazards_dataframe(config: BSSMetadataConfig, corrected_files) -> Output[pd.D ) +@asset(partitions_def=bss_instances_partitions_def, io_manager_key="json_io_manager") +def hazard_instances( + context: AssetExecutionContext, + config: BSSMetadataConfig, + baseline_instances, + hazards_dataframe, +) -> Output[dict]: + """ + Hazard instances extracted from the Timeline sheet of the BSS. + """ + partition_key = context.asset_partition_key_for_output() + result = { + "Hazard": hazards_dataframe.to_dict(orient="records"), + } + metadata = { + "hazards": 0, + } + if hazards_dataframe.empty: + # We don't have a Timeline sheet with Hazard for this BSS so prepare an empty dataframe with 0 counts + metadata["message"] = f"Empty Hazard encountered for partition {partition_key}, due to missing timeline sheet" + return Output( + result, + metadata=metadata, + ) + df = hazards_dataframe + baseline_df = baseline_instances + df = df.reset_index(drop=True) + # The last column is the average, let us drop it + df = df.drop(df.columns[-1], axis=1) + # Make the column names the first row + df.columns = df.iloc[0] + df = df.drop(0) + + def get_year_column_name(df): + for column_name in YEAR_COLUMNS: + if column_name in df.columns: + return column_name + raise ValueError( + "No candidate columns found for the year from %s, current columns %s" % (YEAR_COLUMNS, df.columns) + ) + + # Get the name of the first matching year column + year_column_name = get_year_column_name(df) + df = df.rename(columns={year_column_name: "ranking"}) + + # Determine is chronic vs periodic + df = df.melt(id_vars="ranking", var_name="community", value_name="hazard_category") + + df["chronic_or_periodic"] = Hazard.ChronicOrPeriodic.CHRONIC + periodic_encountered = False + for index, row in df.iterrows(): + if row["ranking"] in HAZARD_TYPE_COLUMNS[Hazard.ChronicOrPeriodic.PERIODIC]: + periodic_encountered = True + if row["ranking"] in HAZARD_TYPE_COLUMNS[Hazard.ChronicOrPeriodic.CHRONIC]: + periodic_encountered = False + if periodic_encountered: + df.loc[index, "chronic_or_periodic"] = Hazard.ChronicOrPeriodic.PERIODIC + + # Drop empty or invalid ranking and hazard values + df = df[df["ranking"].apply(lambda x: isinstance(x, int) and x != "")] + df = df[ + df["hazard_category"].apply( + lambda x: isinstance(x, str) and x.strip() != "" and not pd.isna(x) and not pd.isnull(x) + ) + ] + # Some BSSs may contain the Timeline sheet but the entries for Hazards could just be empty + if df.empty: + metadata["message"] = f"Empty Hazards encountered for partition {partition_key}" + return Output( + result, + metadata=metadata, + ) + df["hazard_category"] = df["hazard_category"].str.lower() + df = HazardCategoryLookup().do_lookup(df, "hazard_category", "hazard_category") + + def get_community(community): + for community_dict in baseline_df["Community"]: + if community_dict["name"] == community: + return [ + community_dict["livelihood_zone_baseline"][0], + baseline_df["LivelihoodZoneBaseline"][0]["reference_year_end_date"], + community_dict["full_name"], + ] + return None + # raise ValueError( + # "Unable to get community %s, perhaps that is wrongly spelled:" % (community) + # ) + + df["community"] = df["community"].apply(lambda c: get_community(c)) + df = df.dropna(subset="community") + hazards = df.to_dict(orient="records") + result = { + "Hazard": hazards, + } + metadata = { + "num_hazards": len(hazards), + "preview": MetadataValue.md(f"```json\n{json.dumps(result, indent=4)}\n```"), + } + + return Output( + result, + metadata=metadata, + ) + + @asset(partitions_def=bss_files_partitions_def) def hazard_labels_dataframe( context: AssetExecutionContext, From a8ba0a021763fc36eca94bc2230ba7127fdad76d Mon Sep 17 00:00:00 2001 From: Girum Bizuayehu Date: Thu, 25 Apr 2024 09:40:04 +0300 Subject: [PATCH 09/11] Add HazardCategoryLookup see HEA-169 --- apps/metadata/lookups.py | 5 +++++ 1 file changed, 5 insertions(+) diff --git a/apps/metadata/lookups.py b/apps/metadata/lookups.py index aca8075c..8611c768 100644 --- a/apps/metadata/lookups.py +++ b/apps/metadata/lookups.py @@ -7,6 +7,7 @@ from common.lookups import Lookup from .models import ( + HazardCategory, LivelihoodCategory, ReferenceData, Season, @@ -66,3 +67,7 @@ class SeasonNameLookup(SeasonLookup): *translation_fields("description"), "aliases", ] + + +class HazardCategoryLookup(ReferenceDataLookup): + model = HazardCategory From 7633688a2e53e3cc99e82e7f8b85454cec2de15c Mon Sep 17 00:00:00 2001 From: Girum Bizuayehu Date: Thu, 25 Apr 2024 09:42:56 +0300 Subject: [PATCH 10/11] Update fixtures and sensors classes to include seasonal production performance and Hazard related assets see HEA-169 --- pipelines/__init__.py | 4 ++++ pipelines/assets/base.py | 2 +- pipelines/assets/fixtures.py | 5 +++++ pipelines/jobs/fixtures.py | 6 ++++++ pipelines/sensors.py | 6 ++++++ 5 files changed, 22 insertions(+), 1 deletion(-) diff --git a/pipelines/__init__.py b/pipelines/__init__.py index 4414c590..a9a3b7e1 100644 --- a/pipelines/__init__.py +++ b/pipelines/__init__.py @@ -34,9 +34,11 @@ from .assets.seasonal_production_performance import ( all_hazard_labels_dataframe, all_seasonal_production_performance_labels_dataframe, + hazard_instances, hazard_labels_dataframe, hazards_dataframe, seasonal_production_performance_dataframe, + seasonal_production_performance_instances, seasonal_production_performance_label_dataframe, summary_hazard_labels_dataframe, summary_seasonal_production_performance_labels_dataframe, @@ -109,6 +111,8 @@ hazard_labels_dataframe, all_hazard_labels_dataframe, summary_hazard_labels_dataframe, + seasonal_production_performance_instances, + hazard_instances, ], jobs=[update_metadata, import_baseline_from_fixture], resources={ diff --git a/pipelines/assets/base.py b/pipelines/assets/base.py index bc65a07c..9e49044f 100644 --- a/pipelines/assets/base.py +++ b/pipelines/assets/base.py @@ -308,7 +308,7 @@ def validate_previous_value(cell, expected_prev_value, prev_value): def get_bss_dataframe( config: BSSMetadataConfig, - filepath_or_buffer, + filepath_or_buffer: object, bss_sheet: str, start_strings: list[str], end_strings: Optional[list[str]] = None, diff --git a/pipelines/assets/fixtures.py b/pipelines/assets/fixtures.py index feeaa7ae..1f24c6fc 100644 --- a/pipelines/assets/fixtures.py +++ b/pipelines/assets/fixtures.py @@ -99,6 +99,8 @@ def consolidated_instances( livelihood_activity_instances, other_cash_income_instances, wild_foods_instances, + seasonal_production_performance_instances, + hazard_instances, ) -> Output[dict]: """ Consolidated record instances from a BSS, ready to be validated. @@ -109,7 +111,10 @@ def consolidated_instances( # WealthGroup instances, which are needed as a foreign key from LivelihoodActivity, etc. **wealth_characteristic_instances, **livelihood_activity_instances, + **seasonal_production_performance_instances, + **hazard_instances, } + # Add the wild foods and other cash income instances, if they are present for model_name, instances in {**other_cash_income_instances, **wild_foods_instances}.items(): if instances: diff --git a/pipelines/jobs/fixtures.py b/pipelines/jobs/fixtures.py index 08534b69..2b937a4b 100644 --- a/pipelines/jobs/fixtures.py +++ b/pipelines/jobs/fixtures.py @@ -12,6 +12,10 @@ ) from ..assets.livelihood_activity import livelihood_activity_instances from ..assets.other_cash_income import other_cash_income_instances +from ..assets.seasonal_production_performance import ( + hazard_instances, + seasonal_production_performance_instances, +) from ..assets.wealth_characteristic import wealth_characteristic_instances from ..assets.wild_foods import wild_foods_instances from ..partitions import bss_instances_partitions_def @@ -27,6 +31,8 @@ validated_instances, consolidated_fixture, imported_baseline, + seasonal_production_performance_instances, + hazard_instances, ), partitions_def=bss_instances_partitions_def, ) diff --git a/pipelines/sensors.py b/pipelines/sensors.py index 0724a26b..1db6ef47 100644 --- a/pipelines/sensors.py +++ b/pipelines/sensors.py @@ -11,6 +11,10 @@ other_cash_income_dataframe, other_cash_income_instances, ) +from .assets.seasonal_production_performance import ( + hazards_dataframe, + seasonal_production_performance_dataframe, +) from .assets.wealth_characteristic import ( wealth_characteristic_dataframe, wealth_characteristic_instances, @@ -37,6 +41,8 @@ wild_foods_instances.key, wealth_characteristic_dataframe.key, wealth_characteristic_instances.key, + seasonal_production_performance_dataframe.key, + hazards_dataframe.key, ), minimum_interval_seconds=600, ) From 17ad8d15d8367b45fd005fa62cf6faade56b69d1 Mon Sep 17 00:00:00 2001 From: Girum Bizuayehu Date: Thu, 25 Apr 2024 10:11:44 +0300 Subject: [PATCH 11/11] Merge branch 'main' into HEA-169/ingestion_of_seasonal_production_performance see HEA-169 --- pipelines/jobs/fixtures.py | 8 ++++---- 1 file changed, 4 insertions(+), 4 deletions(-) diff --git a/pipelines/jobs/fixtures.py b/pipelines/jobs/fixtures.py index 67fb8ea2..7248524c 100644 --- a/pipelines/jobs/fixtures.py +++ b/pipelines/jobs/fixtures.py @@ -25,10 +25,6 @@ livelihood_activity_label_dataframe, summary_livelihood_activity_labels_dataframe, ) -from ..assets.seasonal_production_performance import ( - hazard_instances, - seasonal_production_performance_instances, -) from ..assets.other_cash_income import ( all_other_cash_income_labels_dataframe, other_cash_income_dataframe, @@ -36,6 +32,10 @@ other_cash_income_label_dataframe, summary_other_cash_income_labels_dataframe, ) +from ..assets.seasonal_production_performance import ( + hazard_instances, + seasonal_production_performance_instances, +) from ..assets.wealth_characteristic import ( all_wealth_characteristic_labels_dataframe, summary_wealth_characteristic_labels_dataframe,