From 462d2f552b3726caaff551d16186e340f5d12eb2 Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Laurent=20Al=C3=A9p=C3=A9e?= Date: Fri, 19 Dec 2025 11:26:43 +0100 Subject: [PATCH 01/21] add standalone run_template. no input or output. Generate input and run simulation. --- code/run_templates/common/generate_dataset.py | 207 +++++++++--------- code/run_templates/standalone/generate.py | 4 + .../standalone/generate_dataset.py | 1 + code/run_templates/standalone/run.json | 23 ++ 4 files changed, 134 insertions(+), 101 deletions(-) create mode 100644 code/run_templates/standalone/generate.py create mode 120000 code/run_templates/standalone/generate_dataset.py create mode 100644 code/run_templates/standalone/run.json diff --git a/code/run_templates/common/generate_dataset.py b/code/run_templates/common/generate_dataset.py index 3cfb6ba..1d7cc42 100644 --- a/code/run_templates/common/generate_dataset.py +++ b/code/run_templates/common/generate_dataset.py @@ -6,121 +6,126 @@ from csv import DictWriter import numpy as np -from cosmotech.coal.cosmotech_api.connection import get_api_client from cosmotech.orchestrator.utils.logger import get_logger from cosmotech.orchestrator.utils.logger import log_data from cosmotech_api import DatasetPartTypeEnum -from cosmotech_api.api.dataset_api import DatasetApi -from cosmotech_api.api.dataset_api import DatasetCreateRequest -from cosmotech_api.api.dataset_api import DatasetPartCreateRequest +from cosmotech.coal.cosmotech_api.apis.dataset import DatasetApi +from cosmotech_api.models.dataset_create_request import DatasetCreateRequest +from cosmotech_api.models.dataset_part_create_request import DatasetPartCreateRequest from faker import Faker LOGGER = get_logger("Brewery/DatasetV5") -LOGGER.info("Generating dataset content") - -ORG_ID = os.environ.get("CSM_ORGANIZATION_ID") -WS_ID = os.environ.get("CSM_WORKSPACE_ID") -RUNNER_ID = os.environ.get("CSM_RUNNER_ID") -RUN_ID = os.environ.get("CSM_RUN_ID") - -fake_generator = Faker() -rng = np.random.default_rng(random.randint(500, 50000)) - -headers_customer = ["id", "Thirsty", "Satisfaction", "SurroundingSatisfaction"] -headers_bar = ["id", "Stock", "RestockQty", "NbWaiters"] -headers_bar_vertex = ["source", "target"] -headers_arc_satisfaction = ["source", "target"] - -customer_content = [] -bar_content = [] -bar_vertex_content = [] -arc_satisfaction_content = [] - -bars_name = ["MyBar"] -existing_names = set() -dup_counter = 0 - -for bar in bars_name: - customers = list(fake_generator.name() for _ in range(random.randint(30, 150))) - n_customers = len(customers) - _stock = random.randint(30, 500) - bar_content.append({ - "id": bar, - "Stock": _stock, - "RestockQty": random.randint(10, _stock), - "NbWaiters": random.randint(1, n_customers // 2), - }) - mask = np.random.randint(2, size=(n_customers, n_customers), dtype=bool) - r_customer = [] - for i in range(n_customers): - _name = customers[i] - if _name in existing_names: - _name = f"{_name} {dup_counter}" - dup_counter += 1 - r_customer.append(_name) - customer_content.append({ - "id": _name, - "Thirsty": "false", - "Satisfaction": 100, - "SurroundingSatisfaction": 100 - }) - bar_vertex_content.append({ - "source": bar, - "target": _name - }) - for i, j in itertools.combinations(range(n_customers), 2): - if mask[i][j] or mask[j][i]: - arc_satisfaction_content.append({ - "source": r_customer[i], - "target": r_customer[j], +def generate_data(data_dir): + fake_generator = Faker() + + headers_customer = ["id", "Thirsty", "Satisfaction", "SurroundingSatisfaction"] + headers_bar = ["id", "Stock", "RestockQty", "NbWaiters"] + headers_bar_vertex = ["source", "target"] + headers_arc_satisfaction = ["source", "target"] + + customer_content = [] + bar_content = [] + bar_vertex_content = [] + arc_satisfaction_content = [] + + bars_name = ["MyBar"] + existing_names = set() + dup_counter = 0 + + LOGGER.info("Generating dataset content") + for bar in bars_name: + customers = list(fake_generator.name() for _ in range(random.randint(30, 150))) + n_customers = len(customers) + _stock = random.randint(30, 500) + bar_content.append({ + "id": bar, + "Stock": _stock, + "RestockQty": random.randint(10, _stock), + "NbWaiters": random.randint(1, n_customers // 2), + }) + mask = np.random.randint(2, size=(n_customers, n_customers), dtype=bool) + r_customer = [] + for i in range(n_customers): + _name = customers[i] + if _name in existing_names: + _name = f"{_name} {dup_counter}" + dup_counter += 1 + r_customer.append(_name) + customer_content.append({ + "id": _name, + "Thirsty": "false", + "Satisfaction": 100, + "SurroundingSatisfaction": 100 }) - arc_satisfaction_content.append({ - "source": r_customer[j], - "target": r_customer[i], + bar_vertex_content.append({ + "source": bar, + "target": _name }) -with tempfile.TemporaryDirectory(suffix="dataset") as temp_dir: - temp_dir_path = pathlib.Path(temp_dir) - for _name, _headers, _content in [ - ("Customer.csv", headers_customer, customer_content), - ("Bar.csv", headers_bar, bar_content), - ("Bar_vertex.csv", headers_bar_vertex, bar_vertex_content), - ("arc_Satisfaction.csv", headers_arc_satisfaction, arc_satisfaction_content), - ]: - _path = temp_dir_path / _name - with _path.open("w") as _file: - _dw = DictWriter(_file, fieldnames=_headers) - _dw.writeheader() - _dw.writerows(_content) - - with get_api_client()[0] as client: - d_api = DatasetApi(client) - d_request = DatasetCreateRequest( - name=f"{RUNNER_ID} - {RUN_ID}", - runnerId=RUNNER_ID, - parts=list( - DatasetPartCreateRequest( - name=f"{RUN_ID} - {_p.name}", - sourceName=_p.name, - type=DatasetPartTypeEnum.FILE - ) - for _p in temp_dir_path.glob("*.csv") + for i, j in itertools.combinations(range(n_customers), 2): + if mask[i][j] or mask[j][i]: + arc_satisfaction_content.append({ + "source": r_customer[i], + "target": r_customer[j], + }) + arc_satisfaction_content.append({ + "source": r_customer[j], + "target": r_customer[i], + }) + + data_dir_path = pathlib.Path(data_dir) + for _name, _headers, _content in [ + ("Customer.csv", headers_customer, customer_content), + ("Bar.csv", headers_bar, bar_content), + ("Bar_vertex.csv", headers_bar_vertex, bar_vertex_content), + ("arc_Satisfaction.csv", headers_arc_satisfaction, arc_satisfaction_content), + ]: + _path = data_dir_path / _name + with _path.open("w") as _file: + _dw = DictWriter(_file, fieldnames=_headers) + _dw.writeheader() + _dw.writerows(_content) + + +def create_dataset_from(data_dir): + ORG_ID = os.environ.get("CSM_ORGANIZATION_ID") + WS_ID = os.environ.get("CSM_WORKSPACE_ID") + RUNNER_ID = os.environ.get("CSM_RUNNER_ID") + RUN_ID = os.environ.get("CSM_RUN_ID") + + data_dir_path = pathlib.Path(data_dir) + + d_api = DatasetApi() + d_request = DatasetCreateRequest( + name=f"{RUNNER_ID} - {RUN_ID}", + runnerId=RUNNER_ID, + parts=list( + DatasetPartCreateRequest( + name=f"{RUN_ID} - {_p.name}", + sourceName=_p.name, + type=DatasetPartTypeEnum.FILE ) + for _p in data_dir_path.glob("*.csv") ) - d_ret = d_api.create_dataset( - ORG_ID, - WS_ID, - d_request, - files=list( - (_p.name, _p.open("rb").read()) - for _p in temp_dir_path.glob("*.csv") - ) + ) + return d_api.create_dataset( + ORG_ID, + WS_ID, + d_request, + files=list( + (_p.name, _p.open("rb").read()) + for _p in data_dir_path.glob("*.csv") ) + ) - dataset_id = d_ret.id -log_data("dataset_id", dataset_id) +if __name__ == "__main__": + with tempfile.TemporaryFile(suffix='dataset') as temp_dir: + temp_dir_path = pathlib.Path(temp_dir) + generate_data(temp_dir_path) + dataset_id = create_dataset_from(temp_dir_path) -LOGGER.info(f"Generated dataset {dataset_id}") + log_data("dataset_id", dataset_id) + LOGGER.info(f"Generated dataset {dataset_id}") diff --git a/code/run_templates/standalone/generate.py b/code/run_templates/standalone/generate.py new file mode 100644 index 0000000..d477b5a --- /dev/null +++ b/code/run_templates/standalone/generate.py @@ -0,0 +1,4 @@ +import os +import generate_dataset + +generate_dataset.generate_data(os.environ.get("CSM_DATASET_ABSOLUTE_PATH")) diff --git a/code/run_templates/standalone/generate_dataset.py b/code/run_templates/standalone/generate_dataset.py new file mode 120000 index 0000000..ad95c07 --- /dev/null +++ b/code/run_templates/standalone/generate_dataset.py @@ -0,0 +1 @@ +../common/generate_dataset.py \ No newline at end of file diff --git a/code/run_templates/standalone/run.json b/code/run_templates/standalone/run.json new file mode 100644 index 0000000..60e0f26 --- /dev/null +++ b/code/run_templates/standalone/run.json @@ -0,0 +1,23 @@ +{ + "steps": [ + { + "id": "generate_input_data", + "command": "python", + "arguments": [ + "code/run_templates/standalone/generate.py" + ] + }, + { + "id": "engine", + "command": "csm-simulator", + "arguments": [ + "-i", + "BreweryDemoSimulationWithConnector" + ], + "precedents": [ + "generate_input_data" + ], + "useSystemEnvironment": true + } + ] +} From ccd2b167328f0e41a3cf2b76b2b91cc79a923eb0 Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Laurent=20Al=C3=A9p=C3=A9e?= Date: Fri, 19 Dec 2025 11:33:46 +0100 Subject: [PATCH 02/21] add long standalone. standalone + sleep 5m This run_template is used by the QA team to test arbort scenario --- .../run_templates/long_standalone/generate.py | 4 +++ .../long_standalone/generate_dataset.py | 1 + code/run_templates/long_standalone/run.json | 30 +++++++++++++++++++ 3 files changed, 35 insertions(+) create mode 100644 code/run_templates/long_standalone/generate.py create mode 120000 code/run_templates/long_standalone/generate_dataset.py create mode 100644 code/run_templates/long_standalone/run.json diff --git a/code/run_templates/long_standalone/generate.py b/code/run_templates/long_standalone/generate.py new file mode 100644 index 0000000..d477b5a --- /dev/null +++ b/code/run_templates/long_standalone/generate.py @@ -0,0 +1,4 @@ +import os +import generate_dataset + +generate_dataset.generate_data(os.environ.get("CSM_DATASET_ABSOLUTE_PATH")) diff --git a/code/run_templates/long_standalone/generate_dataset.py b/code/run_templates/long_standalone/generate_dataset.py new file mode 120000 index 0000000..ad95c07 --- /dev/null +++ b/code/run_templates/long_standalone/generate_dataset.py @@ -0,0 +1 @@ +../common/generate_dataset.py \ No newline at end of file diff --git a/code/run_templates/long_standalone/run.json b/code/run_templates/long_standalone/run.json new file mode 100644 index 0000000..3128ba4 --- /dev/null +++ b/code/run_templates/long_standalone/run.json @@ -0,0 +1,30 @@ +{ + "steps": [ + { + "id": "generate_input_data", + "command": "python", + "arguments": [ + "code/run_templates/standalone/generate.py" + ] + }, + { + "id": "engine", + "command": "csm-simulator", + "arguments": [ + "-i", + "BreweryDemoSimulationWithConnector" + ], + "precedents": [ + "generate_input_data" + ], + "useSystemEnvironment": true + }, + { + "id": "nap", + "command": "sleep", + "arguments": [ + "5m" + ] + } + ] +} From e279599c5a6a722f19bef10ecefba231fe130b6c Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Laurent=20Al=C3=A9p=C3=A9e?= Date: Wed, 14 Jan 2026 11:51:52 +0100 Subject: [PATCH 03/21] add brewery and debug run_template remove old, unuse run_templates --- code/requirements.txt | 2 +- code/run_templates/brewery/run.json | 42 ++++++ code/run_templates/debug/print_inputs.py | 24 ++++ .../run.json | 6 +- .../run_templates/long_standalone/generate.py | 4 - .../long_standalone/generate_dataset.py | 1 - code/run_templates/long_standalone/run.json | 30 ---- code/run_templates/minimal/run.json | 47 ------ .../sim_brewery_parameters/common | 1 - .../parameters_handler.py | 134 ------------------ .../sim_brewery_parameters/run.json | 69 --------- .../sim_mock_parameters/run.json | 57 -------- code/run_templates/sim_no_parameters/run.json | 57 -------- 13 files changed, 70 insertions(+), 404 deletions(-) create mode 100644 code/run_templates/brewery/run.json create mode 100644 code/run_templates/debug/print_inputs.py rename code/run_templates/{dynamic_values_customers => debug}/run.json (80%) delete mode 100644 code/run_templates/long_standalone/generate.py delete mode 120000 code/run_templates/long_standalone/generate_dataset.py delete mode 100644 code/run_templates/long_standalone/run.json delete mode 100644 code/run_templates/minimal/run.json delete mode 120000 code/run_templates/sim_brewery_parameters/common delete mode 100644 code/run_templates/sim_brewery_parameters/parameters_handler.py delete mode 100644 code/run_templates/sim_brewery_parameters/run.json delete mode 100644 code/run_templates/sim_mock_parameters/run.json delete mode 100644 code/run_templates/sim_no_parameters/run.json diff --git a/code/requirements.txt b/code/requirements.txt index ebeae2c..d8bc0bf 100644 --- a/code/requirements.txt +++ b/code/requirements.txt @@ -1,6 +1,6 @@ cosmotech-run-orchestrator~=2.0.2 azure-storage-blob -CosmoTech-Acceleration-Library @ git+https://github.com/Cosmo-Tech/CosmoTech-Acceleration-Library@feature/cosmotech_api_5.0 +cosmoTech-acceleration-library @ git+https://github.com/Cosmo-Tech/CosmoTech-Acceleration-Library@LAL/use_config_on_runner_api cosmotech-api~=5.0.0b3 faker~=30.6.0 psycopg2-binary==2.9.* diff --git a/code/run_templates/brewery/run.json b/code/run_templates/brewery/run.json new file mode 100644 index 0000000..3e285f3 --- /dev/null +++ b/code/run_templates/brewery/run.json @@ -0,0 +1,42 @@ +{ + "steps": [ + { + "id": "fetch_parameters", + "command": "csm-data", + "arguments": ["api", "run-load-data"], + "useSystemEnvironment": true + }, + { + "id": "engine", + "command": "csm-simulator", + "arguments": [ + "-i", + "BreweryDemoSimulationWithConnector" + ], + "precedents": [ + "fetch_parameters" + ], + "useSystemEnvironment": true + }, + { + "id": "load_results_to_store", + "command": "csm-data", + "arguments": [ + "store", + "load-csv-folder" + ], + "precedents": ["engine"], + "useSystemEnvironment": true + }, + { + "id": "send_results_to_psql", + "command": "csm-data", + "arguments": [ + "store", + "output" + ], + "precedents": ["load_results_to_store"], + "useSystemEnvironment": true + } + ] +} diff --git a/code/run_templates/debug/print_inputs.py b/code/run_templates/debug/print_inputs.py new file mode 100644 index 0000000..ba6a966 --- /dev/null +++ b/code/run_templates/debug/print_inputs.py @@ -0,0 +1,24 @@ +import os +from cosmotech.coal.utils.configuration import Configuration +from pathlib import Path + + +def main(): + _conf = Configuration() + + param_path = Path(_conf.cosmotech.parameters_absolute_path) + print(f"print {param_path.resolve()} content:") + pf = os.listdir(param_path) + print(pf) + + data_path = Path(_conf.cosmotech.dataset_absolute_path) + print(f"print {data_path.resolve()} content:") + df = os.listdir(data_path) + print(df) + for f in df: + print(f) + print((param_path / f).stat().st_size) + + +if __name__ == "__main__": + main() diff --git a/code/run_templates/dynamic_values_customers/run.json b/code/run_templates/debug/run.json similarity index 80% rename from code/run_templates/dynamic_values_customers/run.json rename to code/run_templates/debug/run.json index 0525839..2f3ce41 100644 --- a/code/run_templates/dynamic_values_customers/run.json +++ b/code/run_templates/debug/run.json @@ -10,9 +10,9 @@ "useSystemEnvironment": true }, { - "id": "engine", - "command": "csm-orc", - "arguments": ["run-step"], + "id": "print_out", + "command": "python3", + "arguments": ["code/run_templates/debug/print_inputs.py"], "precedents": ["fetch_parameters"], "useSystemEnvironment": true } diff --git a/code/run_templates/long_standalone/generate.py b/code/run_templates/long_standalone/generate.py deleted file mode 100644 index d477b5a..0000000 --- a/code/run_templates/long_standalone/generate.py +++ /dev/null @@ -1,4 +0,0 @@ -import os -import generate_dataset - -generate_dataset.generate_data(os.environ.get("CSM_DATASET_ABSOLUTE_PATH")) diff --git a/code/run_templates/long_standalone/generate_dataset.py b/code/run_templates/long_standalone/generate_dataset.py deleted file mode 120000 index ad95c07..0000000 --- a/code/run_templates/long_standalone/generate_dataset.py +++ /dev/null @@ -1 +0,0 @@ -../common/generate_dataset.py \ No newline at end of file diff --git a/code/run_templates/long_standalone/run.json b/code/run_templates/long_standalone/run.json deleted file mode 100644 index 3128ba4..0000000 --- a/code/run_templates/long_standalone/run.json +++ /dev/null @@ -1,30 +0,0 @@ -{ - "steps": [ - { - "id": "generate_input_data", - "command": "python", - "arguments": [ - "code/run_templates/standalone/generate.py" - ] - }, - { - "id": "engine", - "command": "csm-simulator", - "arguments": [ - "-i", - "BreweryDemoSimulationWithConnector" - ], - "precedents": [ - "generate_input_data" - ], - "useSystemEnvironment": true - }, - { - "id": "nap", - "command": "sleep", - "arguments": [ - "5m" - ] - } - ] -} diff --git a/code/run_templates/minimal/run.json b/code/run_templates/minimal/run.json deleted file mode 100644 index c119c8b..0000000 --- a/code/run_templates/minimal/run.json +++ /dev/null @@ -1,47 +0,0 @@ -{ - "steps": [ - { - "id": "generate", - "command": "python", - "arguments": [ - "code/run_templates/common/generate_dataset.py" - ], - "useSystemEnvironment": true, - "outputs": { - "dataset_id": { - "description": "Generated Dataset ID in the Cosmo Tech API" - } - } - }, - { - "id": "download", - "command": "python", - "arguments": [ - "code/run_templates/common/download_dataset.py" - ], - "useSystemEnvironment": true, - "inputs": { - "source_dataset": { - "stepId": "generate", - "output": "dataset_id", - "as": "CSM_DATASET_ID" - } - }, - "precedents": [ - "generate" - ] - }, - { - "id": "engine", - "command": "csm-simulator", - "arguments": [ - "-i", - "BreweryDemoSimulationWithConnector" - ], - "precedents": [ - "download" - ], - "useSystemEnvironment": true - } - ] -} diff --git a/code/run_templates/sim_brewery_parameters/common b/code/run_templates/sim_brewery_parameters/common deleted file mode 120000 index 8332399..0000000 --- a/code/run_templates/sim_brewery_parameters/common +++ /dev/null @@ -1 +0,0 @@ -../common/ \ No newline at end of file diff --git a/code/run_templates/sim_brewery_parameters/parameters_handler.py b/code/run_templates/sim_brewery_parameters/parameters_handler.py deleted file mode 100644 index 5f68a0f..0000000 --- a/code/run_templates/sim_brewery_parameters/parameters_handler.py +++ /dev/null @@ -1,134 +0,0 @@ -import os -import sys -from pathlib import Path -import csv -from tempfile import NamedTemporaryFile, TemporaryDirectory -import shutil -import glob -import json -from common.common import get_api, get_logger -from cosmotech.coal.cosmotech_api.connection import get_api_client -from cosmotech.coal.cosmotech_api.workspace import download_workspace_file - -LOGGER = get_logger() -api = get_api() - - -def main(): - organization_id = os.environ.get("CSM_ORGANIZATION_ID") - workspace_id = os.environ.get("CSM_WORKSPACE_ID") - data_folder = Path(os.environ["CSM_DATASET_ABSOLUTE_PATH"]) - - parameters_folder = Path(os.environ["CSM_PARAMETERS_ABSOLUTE_PATH"]) - json_parameters_file = parameters_folder / "parameters.json" - if not json_parameters_file.exists(): - raise Exception(f"No parameters file in {parameters_file}") - - values = { - "stock": 0, - "restock_qty": 0, - "nb_waiters": 0, - "initial_stock_dataset": "", - } - expected_parameters = list(values.keys()) - LOGGER.info("Start parsing JSON parameters file") - with open(json_parameters_file) as json_file: - parameters = json.load(json_file) - for parameter in parameters: - parameter_name = parameter["parameterId"] - parameter_value = parameter["value"] - if parameter_name not in expected_parameters: - LOGGER.warning(f"Unknown parameter {parameter_name}") - else: - LOGGER.info(f'Value for {parameter_name}: "{parameter_value}"') - values[parameter_name] = parameter_value - - if values["initial_stock_dataset"] == "": - LOGGER.info("\nInitial stock file not uploaded, skipping this part...") - else: - dataset_folder = parameters_folder / "initial_stock_dataset" - dataset_files = os.listdir(dataset_folder) - if not dataset_files: - LOGGER.info(f'\nNo files in folder: "{dataset_folder}"') - else: - LOGGER.info(f"\nParsing rows of {dataset_files[0]}") - with open(dataset_folder / dataset_files[0], "r") as initial_stock_file: - dataset_reader = csv.reader(initial_stock_file) - for row in dataset_reader: - values["stock"] = row[1] - break - - runner_data = api.runner.get_runner( - organization_id=organization_id, - workspace_id=workspace_id, - runner_id=os.environ.get("CSM_RUNNER_ID"), - ) - instance_dataset_id = runner_data.dataset_list[0] - dataset = api.dataset.find_dataset_by_id( - organization_id=organization_id, - dataset_id=instance_dataset_id, - ) - - # Default path for ETL-generated datasets - ws_file_path = f'datasets/{instance_dataset_id}/generated_brewery_instance.zip' - if dataset.source_type == "None": - try: - ws_file_path = dataset.source.location - except e: - LOGGER.warning(f"Failed to get source.location from dataset {instance_dataset_id}, using default file path") - - LOGGER.info("Downloading instance dataset...") - _file_content = api.workspace.download_workspace_file(organization_id, workspace_id, ws_file_path) - - with TemporaryDirectory() as tmp_dir: - archive_path = os.path.join(tmp_dir, 'dataset.zip') - with open(archive_path, "wb") as _file: - _file.write(_file_content) - - LOGGER.info("Extracting instance dataset...") - if not os.path.exists(data_folder): - os.mkdir(data_folder) - shutil.unpack_archive(archive_path, data_folder) - - files = "\n".join([f" - {path}" for path in glob.glob(str(data_folder / "**"), recursive=True)]) - LOGGER.info(f"\nData files:\n{files}") - - temp_file = NamedTemporaryFile("w+t", newline="", delete=False) - bar_file_path = data_folder / "Bar.csv" - LOGGER.info("\nPatching dataset file Bar.csv with parameters values...") - with open(bar_file_path, newline="") as bar_file: - bar_reader = csv.reader(bar_file) - bar_writer = csv.writer(temp_file) - - header = next(bar_reader) - bar_writer.writerow(header) - column_indices = {"stock": -1, "restock_qty": -1, "nb_waiters": -1} - csv_column_mapping = { - "Stock": "stock", - "RestockQty": "restock_qty", - "NbWaiters": "nb_waiters", - } - for index, column in enumerate(header): - if column in csv_column_mapping: - parameter_name = csv_column_mapping[column] - column_indices[parameter_name] = index - - for row in bar_reader: - for parameter_name, column_index in column_indices.items(): - if column_index == -1: - LOGGER.warning(f" - {parameter_name}: parameter never found in CSV header:") - LOGGER.warning(f' - CSV header: "{header}"') - LOGGER.warning(f' - column mapping: "{csv_column_mapping}"') - continue - previous_value = row[column_index] - new_value = values[parameter_name] - row[column_index] = new_value - LOGGER.info(f" - {parameter_name}: {previous_value} => {new_value}") - bar_writer.writerow(row) - - temp_file.close() - shutil.move(temp_file.name, bar_file_path) - - -if __name__ == "__main__": - main() diff --git a/code/run_templates/sim_brewery_parameters/run.json b/code/run_templates/sim_brewery_parameters/run.json deleted file mode 100644 index 5ba556f..0000000 --- a/code/run_templates/sim_brewery_parameters/run.json +++ /dev/null @@ -1,69 +0,0 @@ -{ - "steps": [ - { "id": "csm_data_version", "command": "csm-data", "arguments": ["--version"] }, - { "id": "csm_orc_version", "command": "csm-orc", "arguments": ["--version"] }, - { - "id": "reset_data_store", - "command": "csm-data", - "arguments": [ - "store", - "reset" - ], - "precedents": ["csm_data_version", "csm_orc_version"], - "useSystemEnvironment": true - }, - { - "id": "fetch_parameters", - "command": "csm-data", - "arguments": ["api", "run-load-data"], - "precedents": ["reset_data_store"], - "useSystemEnvironment": true - }, - { - "id": "parameters_handler", - "command": "python", - "arguments": ["code/run_templates/sim_brewery_parameters/parameters_handler.py"], - "precedents": ["fetch_parameters"], - "useSystemEnvironment": true - }, - { - "id":"engine", - "command":"main", - "arguments": ["-i","BreweryDemoSimulationWithConnector"], - "precedents": ["parameters_handler"], - "useSystemEnvironment": true - }, - { - "id": "load_results_to_store", - "command": "csm-data", - "arguments": [ - "store", - "load-csv-folder", - "--csv-folder", - "$CSM_OUTPUT_ABSOLUTE_PATH" - ], - "precedents": ["engine"], - "useSystemEnvironment": true - }, - { - "id": "send_results_to_psql", - "command": "csm-data", - "arguments": [ - "store", - "dump-to-postgresql", - "--table-prefix", - "brewery", - "--append" - ], - "precedents": ["load_results_to_store"], - "useSystemEnvironment": true - }, - { - "id":"update_runner_metadata", - "command":"python", - "arguments":["code/run_templates/common/update_runner_metadata.py"], - "useSystemEnvironment":true, - "precedents":["load_results_to_store"] - } - ] -} diff --git a/code/run_templates/sim_mock_parameters/run.json b/code/run_templates/sim_mock_parameters/run.json deleted file mode 100644 index 3b20d42..0000000 --- a/code/run_templates/sim_mock_parameters/run.json +++ /dev/null @@ -1,57 +0,0 @@ -{ - "steps": [ - { "id": "csm_data_version", "command": "csm-data", "arguments": ["--version"] }, - { "id": "csm_orc_version", "command": "csm-orc", "arguments": ["--version"] }, - { - "id":"engine", - "command":"main", - "arguments": ["-i","BreweryDemoSimulation"], - "useSystemEnvironment":false, - "precedents": ["csm_data_version", "csm_orc_version"] - }, - { - "id": "load_results_to_store", - "command": "csm-data", - "arguments": [ - "store", - "load-csv-folder", - "--csv-folder", - "$CSM_OUTPUT_ABSOLUTE_PATH" - ], - "precedents": ["engine"], - "useSystemEnvironment": true - }, - { - "id": "send_results_to_psql", - "command": "csm-data", - "arguments": [ - "store", - "dump-to-postgresql", - "--table-prefix", - "brewery", - "--append" - ], - "precedents": ["load_results_to_store"], - "useSystemEnvironment": true - }, - { - "id":"update_runner_metadata", - "command":"python", - "arguments":[ - "code/run_templates/common/update_runner_metadata.py", - "--postgres-host", "${POSTGRES_HOST_URI}", - "--postgres-port", "${POSTGRES_HOST_PORT}", - "--postgres-db", "${POSTGRES_DB_NAME}", - "--postgres-schema", "${POSTGRES_DB_SCHEMA}", - "--postgres-user", "${POSTGRES_USER_NAME}", - "--postgres-password", "${POSTGRES_USER_PASSWORD}", - "--csm-organization-id", "${CSM_ORGANIZATION_ID}", - "--csm-workspace-id", "${CSM_WORKSPACE_ID}", - "--csm-runner-id", "${CSM_RUNNER_ID}", - "--csm-run-id", "${CSM_RUN_ID}" - ], - "useSystemEnvironment":true, - "precedents":["load_results_to_store"] - } - ] -} diff --git a/code/run_templates/sim_no_parameters/run.json b/code/run_templates/sim_no_parameters/run.json deleted file mode 100644 index 3b20d42..0000000 --- a/code/run_templates/sim_no_parameters/run.json +++ /dev/null @@ -1,57 +0,0 @@ -{ - "steps": [ - { "id": "csm_data_version", "command": "csm-data", "arguments": ["--version"] }, - { "id": "csm_orc_version", "command": "csm-orc", "arguments": ["--version"] }, - { - "id":"engine", - "command":"main", - "arguments": ["-i","BreweryDemoSimulation"], - "useSystemEnvironment":false, - "precedents": ["csm_data_version", "csm_orc_version"] - }, - { - "id": "load_results_to_store", - "command": "csm-data", - "arguments": [ - "store", - "load-csv-folder", - "--csv-folder", - "$CSM_OUTPUT_ABSOLUTE_PATH" - ], - "precedents": ["engine"], - "useSystemEnvironment": true - }, - { - "id": "send_results_to_psql", - "command": "csm-data", - "arguments": [ - "store", - "dump-to-postgresql", - "--table-prefix", - "brewery", - "--append" - ], - "precedents": ["load_results_to_store"], - "useSystemEnvironment": true - }, - { - "id":"update_runner_metadata", - "command":"python", - "arguments":[ - "code/run_templates/common/update_runner_metadata.py", - "--postgres-host", "${POSTGRES_HOST_URI}", - "--postgres-port", "${POSTGRES_HOST_PORT}", - "--postgres-db", "${POSTGRES_DB_NAME}", - "--postgres-schema", "${POSTGRES_DB_SCHEMA}", - "--postgres-user", "${POSTGRES_USER_NAME}", - "--postgres-password", "${POSTGRES_USER_PASSWORD}", - "--csm-organization-id", "${CSM_ORGANIZATION_ID}", - "--csm-workspace-id", "${CSM_WORKSPACE_ID}", - "--csm-runner-id", "${CSM_RUNNER_ID}", - "--csm-run-id", "${CSM_RUN_ID}" - ], - "useSystemEnvironment":true, - "precedents":["load_results_to_store"] - } - ] -} From f19afbbd0b4aa0483a4cd0ad89e67ed466b9a135 Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Laurent=20Al=C3=A9p=C3=A9e?= Date: Fri, 16 Jan 2026 16:19:11 +0100 Subject: [PATCH 04/21] add better debug printing --- code/run_templates/debug/print_inputs.py | 52 +++++++++++++++++++----- 1 file changed, 42 insertions(+), 10 deletions(-) diff --git a/code/run_templates/debug/print_inputs.py b/code/run_templates/debug/print_inputs.py index ba6a966..20f5519 100644 --- a/code/run_templates/debug/print_inputs.py +++ b/code/run_templates/debug/print_inputs.py @@ -1,23 +1,55 @@ -import os from cosmotech.coal.utils.configuration import Configuration from pathlib import Path +# Source - https://stackoverflow.com/a +# Posted by Aaron Hall, modified by community. See post 'Timeline' for change history +# Retrieved 2026-01-15, License - CC BY-SA 4.0 + + +def tree(dir_path: Path, prefix: str = ''): + """A recursive generator, given a directory Path object + will yield a visual tree structure line by line + with each line prefixed by the same characters + """ + # prefix components: + space = ' ' + branch = '│ ' + # pointers: + tee = '├── ' + last = '└── ' + + contents = list(dir_path.iterdir()) + # contents each get pointers that are ├── with a final └── : + pointers = [tee] * (len(contents) - 1) + [last] + for pointer, path in zip(pointers, contents): + size = "" + if path.is_file(): + size = f": {path.stat().st_size}o" + yield prefix + pointer + path.name + size + if path.is_dir(): # extend the prefix and recurse: + extension = branch if pointer == tee else space + # i.e. space because last, └── , above so no more | + yield from tree(path, prefix=prefix+extension) + + +def dprint(line): + pre = '[debug]' + print(f"{pre} {line}") + + def main(): _conf = Configuration() param_path = Path(_conf.cosmotech.parameters_absolute_path) - print(f"print {param_path.resolve()} content:") - pf = os.listdir(param_path) - print(pf) + dprint(f"Printing {param_path.resolve()} content:") + for line in tree(param_path): + dprint(line) data_path = Path(_conf.cosmotech.dataset_absolute_path) - print(f"print {data_path.resolve()} content:") - df = os.listdir(data_path) - print(df) - for f in df: - print(f) - print((param_path / f).stat().st_size) + dprint(f"Printng {data_path.resolve()} content:") + for line in tree(data_path): + dprint(line) if __name__ == "__main__": From 7dd4beaaa6c024399874b0d18ade37c745fa593b Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Laurent=20Al=C3=A9p=C3=A9e?= Date: Thu, 22 Jan 2026 16:38:19 +0100 Subject: [PATCH 05/21] fix etl instance generator for v5 --- .../etl_instance_generator/etl.py | 79 +++----- .../generate_brewery_dataset.py | 190 ++++++++++-------- 2 files changed, 141 insertions(+), 128 deletions(-) diff --git a/code/run_templates/etl_instance_generator/etl.py b/code/run_templates/etl_instance_generator/etl.py index 34d6930..93c7f42 100644 --- a/code/run_templates/etl_instance_generator/etl.py +++ b/code/run_templates/etl_instance_generator/etl.py @@ -1,17 +1,16 @@ -import csv import json import os -import sys -import shutil import tempfile from dataclasses import dataclass +from pathlib import Path -from cosmotech_api.model.dataset import Dataset -from common.common import get_logger, get_api +from cosmotech.coal.utils.logger import get_logger +from cosmotech.coal.utils.configuration import Configuration +from cosmotech.coal.cosmotech_api.apis.runner import RunnerApi +from cosmotech.coal.cosmotech_api.apis.dataset import DatasetApi from generate_brewery_dataset import generate_brewery_dataset - -LOGGER = get_logger() +LOGGER = get_logger('etl') @dataclass @@ -37,22 +36,12 @@ def get_zip_file_name(dir): def main(): LOGGER.info("Starting the ETL Run") - organization_id = os.environ.get("CSM_ORGANIZATION_ID") - api = get_api() - - runner_data = api.runner.get_runner( - organization_id=organization_id, - workspace_id=os.environ.get("CSM_WORKSPACE_ID"), - runner_id=os.environ.get("CSM_RUNNER_ID"), - ) - # When a runner is an ETL created by the webapp, the 1st item in datasetList is is a dataset created by the webapp - # before starting the runner - target_dataset_id = runner_data.dataset_list[0] + coal_config = Configuration() - with open(os.path.join(os.environ.get("CSM_PARAMETERS_ABSOLUTE_PATH"), "parameters.json")) as f: + with open( + Path(coal_config.cosmotech.parameters_absolute_path) / "parameters.json" + ) as f: parameters = {d["parameterId"]: d["value"] for d in json.loads(f.read())} - LOGGER.info("All parameters are loaded") - generator_parameters = GeneratorParameters( int(parameters["etl_param_restock_quantity"]), int(parameters["etl_param_stock"]), @@ -62,38 +51,32 @@ def main(): parameters["etl_param_thirsty"] == "THIRSTY", parameters["etl_param_locale"], int(parameters["etl_param_customers_count"]), - int(parameters["etl_param_tables_count"],) + int( + parameters["etl_param_tables_count"], + ), ) + LOGGER.info("All parameters are loaded") + # When a runner is an ETL created by the webapp, the 1st item in datasetList is is a dataset created by the webapp + # before starting the runner and is used as the target for the ETL + runner_data = RunnerApi().get_runner_metadata( + runner_id=coal_config.cosmotech.runner_id + ) + target_dataset_id = runner_data['datasets']['bases'][0] with tempfile.TemporaryDirectory() as tmp_dir: generate_brewery_dataset(generator_parameters, tmp_dir) - output_file_path = os.path.join(tmp_dir, "generated_brewery_instance") - output_file_path_with_format = os.path.join(tmp_dir, "generated_brewery_instance.zip") - shutil.make_archive(output_file_path, "zip", tmp_dir) - LOGGER.info(f"Instance archive gerenated: {output_file_path_with_format}") - - ws_file_path = f'datasets/{target_dataset_id}/generated_brewery_instance.zip' - LOGGER.info(f"Starting workspace file upload to {ws_file_path}...") - ws_file = api.workspace.upload_workspace_file(organization_id, - os.environ.get("CSM_WORKSPACE_ID"), - output_file_path_with_format, - overwrite=True, - destination=ws_file_path) - LOGGER.info("Workspace file has been uploaded") - - api.dataset.update_dataset( - organization_id, target_dataset_id, {"ingestionStatus": "SUCCESS", "twincacheStatus":"FULL"} - ) + path_tmp_dir = Path(tmp_dir) + path_list_file = [ + path_tmp_dir / "arc_Satisfaction.csv", + path_tmp_dir / "Bar_vertex.csv", + path_tmp_dir / "Customer.csv", + ] + path_list_db = [path_tmp_dir / "Bar.csv"] + DatasetApi().update_dataset_from_files( + target_dataset_id, path_list_file, path_list_db + ) LOGGER.info("ETL Run finished") if __name__ == "__main__": - try: - main() - except Exception as e: - exception_type = type(e).__name__ - file_name = os.path.split(e.__traceback__.tb_frame.f_code.co_filename)[1] - line_number = e.__traceback__.tb_lineno - LOGGER.error(f"An error occured during the dataset generation: {exception_type}") - LOGGER.error('%s' % e) - sys.exit(-1) + main() diff --git a/code/run_templates/etl_instance_generator/generate_brewery_dataset.py b/code/run_templates/etl_instance_generator/generate_brewery_dataset.py index edd8762..f184070 100644 --- a/code/run_templates/etl_instance_generator/generate_brewery_dataset.py +++ b/code/run_templates/etl_instance_generator/generate_brewery_dataset.py @@ -13,37 +13,72 @@ import itertools from faker import Faker -from common.common import get_logger +from cosmotech.coal.utils.logger import get_logger - -LOGGER = get_logger() +LOGGER = get_logger('generate') def parse_arguments(): parser = argparse.ArgumentParser( - description='Generate a brewery instance with Bar and Customer entities, and relationships between them.' + description="Generate a brewery instance with Bar and Customer entities, and relationships between them." + ) + parser.add_argument( + "-t", "--tables", help="Number of tables in the bar", default="10", type=int + ) + parser.add_argument( + "-c", + "--customers", + help="Number of customers in the instance", + default="50", + type=int, + ) + parser.add_argument( + "-n", + "--name", + help="Name of the generated instance", + default="brewery_instance", + ) + parser.add_argument( + "-l", + "--locale", + help="Localization of generated names (leave empty to use English)", ) - parser.add_argument("-t", "--tables", help="Number of tables in the bar", default="10", type=int) - parser.add_argument("-c", "--customers", help="Number of customers in the instance", default="50", type=int) - parser.add_argument("-n", "--name", help="Name of the generated instance", default="brewery_instance") - parser.add_argument("-l", "--locale", help="Localization of generated names (leave empty to use English)") - parser.add_argument("--restock", help="Restock quantity for generated bar", default="30", type=int) - parser.add_argument("--stock", help="Stock quantity for generated bar", default="60", type=int) - parser.add_argument("--waiters", help="Number of waiters for generated bar", default="10", type=int) + parser.add_argument( + "--restock", help="Restock quantity for generated bar", default="30", type=int + ) + parser.add_argument( + "--stock", help="Stock quantity for generated bar", default="60", type=int + ) + parser.add_argument( + "--waiters", help="Number of waiters for generated bar", default="10", type=int + ) - parser.add_argument("--satisfaction", help="Initial statisfaction of generated customers", default="0", type=int) - parser.add_argument("--surrounding_satisfaction", help="Initial surrounding statisfaction of generated customers", - default="0", type=int) - parser.add_argument("--thirsty", help="Value for the Thirsty attribute of generated customers", default=False, - type=bool) + parser.add_argument( + "--satisfaction", + help="Initial statisfaction of generated customers", + default="0", + type=int, + ) + parser.add_argument( + "--surrounding_satisfaction", + help="Initial surrounding statisfaction of generated customers", + default="0", + type=int, + ) + parser.add_argument( + "--thirsty", + help="Value for the Thirsty attribute of generated customers", + default=False, + type=bool, + ) args = parser.parse_args() return args def get_faker_instance(locale=None): - locale = locale.split(',') if locale else None + locale = locale.split(",") if locale else None return Faker(locale) @@ -59,26 +94,26 @@ def generate_people(faker_instance, count): def get_id_from_name(name): - return re.sub(r'[\. \'-\,]', '_', name) + return re.sub(r"[\. \'-\,]", "_", name) def generate_bar(options): return { - 'id': 'MyBar', - 'name': 'MyBar', - 'restock': options['restock'], - 'stock': options['stock'], - 'waiters': options['waiters'], + "id": "MyBar", + "name": "MyBar", + "restock": options["restock"], + "stock": options["stock"], + "waiters": options["waiters"], } def generate_customer(person, options): return { - 'id': get_id_from_name(person), - 'name': person, - 'satisfaction': options['satisfaction'], - 'surrounding_satisfaction': options['surrounding_satisfaction'], - 'thirsty': options['thirsty'], + "id": get_id_from_name(person), + "name": person, + "satisfaction": options["satisfaction"], + "surrounding_satisfaction": options["surrounding_satisfaction"], + "thirsty": options["thirsty"], } @@ -87,7 +122,7 @@ def generate_customers(people, options): def generate_customers_groups(customers, groups_count): - ''' + """ Generate an array of customer groups: [ [customer_id1, customer_id3, customer_id6, customer_id9], @@ -95,26 +130,26 @@ def generate_customers_groups(customers, groups_count): [customer_id2, customer_id5, customer_id7] ] Distribution is not perfect on purpose, to have groups of different sizes - ''' + """ customer_groups = [] for i in range(0, groups_count): customer_groups.append([]) for customer in customers: group_index = random.randint(0, groups_count - 1) - customer_groups[group_index].append(customer['id']) + customer_groups[group_index].append(customer["id"]) return customer_groups def generate_bar_to_customers_mapping(bar, customers): - ''' + """ Generate a dict similar to: { bar_id1: [customer_id1, customer_id3, customer_id6, customer_id9], bar_id2: [customer_id4, customer_id8], bar_id3: [customer_id2, customer_id5, customer_id7] } - ''' - return {bar['id']: [customer['id'] for customer in customers]} + """ + return {bar["id"]: [customer["id"] for customer in customers]} def generate_customers_to_customers_links(customer_groups): @@ -128,67 +163,54 @@ def generate_customers_to_customers_links(customer_groups): def generate_bar_csv_file_content(bar): - file_content = 'id,NbWaiters,RestockQty,Stock\n' - cells = [ - bar['id'], - str(bar['waiters']), - str(bar['restock']), - str(bar['stock']) - ] - file_content += ','.join(cells) + '\n' + file_content = "id,NbWaiters,RestockQty,Stock\n" + cells = [bar["id"], str(bar["waiters"]), str(bar["restock"]), str(bar["stock"])] + file_content += ",".join(cells) + "\n" return file_content def generate_customers_csv_file_content(customers): - file_content = 'id,Satisfaction,SurroundingSatisfaction,Thirsty\n' + file_content = "id,Satisfaction,SurroundingSatisfaction,Thirsty\n" for customer in customers: cells = [ - customer['id'], - str(customer['satisfaction']), - str(customer['surrounding_satisfaction']), - str(customer['thirsty']).lower() + customer["id"], + str(customer["satisfaction"]), + str(customer["surrounding_satisfaction"]), + str(customer["thirsty"]).lower(), ] - file_content += ','.join(cells) + '\n' + file_content += ",".join(cells) + "\n" return file_content def generate_bar_to_customers_csv_file_content(bar_to_customers): - file_content = 'source,target,name\n' + file_content = "source,target,name\n" for bar, customers in bar_to_customers.items(): for customer in customers: - cells = [ - bar, - customer, - bar + '_contains_' + customer - ] - file_content += ','.join(cells) + '\n' + cells = [bar, customer, bar + "_contains_" + customer] + file_content += ",".join(cells) + "\n" return file_content def generate_arc_to_customers_csv_file_content(customers_to_customers_links): - file_content = 'source,target,name\n' + file_content = "source,target,name\n" for pair in customers_to_customers_links: - cells = [ - pair[0], - pair[1], - 'arc_from_' + pair[0] + '_to_' + pair[1] - ] - file_content += ','.join(cells) + '\n' - cells = [ - pair[1], - pair[0], - 'arc_from_' + pair[1] + '_to_' + pair[0] - ] - file_content += ','.join(cells) + '\n' + cells = [pair[0], pair[1], "arc_from_" + pair[0] + "_to_" + pair[1]] + file_content += ",".join(cells) + "\n" + cells = [pair[1], pair[0], "arc_from_" + pair[1] + "_to_" + pair[0]] + file_content += ",".join(cells) + "\n" return file_content -def generate_csv_files_content(bar, customers, bar_to_customers, customers_to_customers_links): +def generate_csv_files_content( + bar, customers, bar_to_customers, customers_to_customers_links +): return { - 'Bar.csv': generate_bar_csv_file_content(bar), - 'Customer.csv': generate_customers_csv_file_content(customers), - 'Bar_vertex.csv': generate_bar_to_customers_csv_file_content(bar_to_customers), - 'arc_Satisfaction.csv': generate_arc_to_customers_csv_file_content(customers_to_customers_links), + "Bar.csv": generate_bar_csv_file_content(bar), + "Customer.csv": generate_customers_csv_file_content(customers), + "Bar_vertex.csv": generate_bar_to_customers_csv_file_content(bar_to_customers), + "arc_Satisfaction.csv": generate_arc_to_customers_csv_file_content( + customers_to_customers_links + ), } @@ -196,16 +218,20 @@ def export_csv_files(csv_files_content, dir_path): if not os.path.exists(dir_path): os.mkdir(dir_path) for file_name, file_content in csv_files_content.items(): - with open(os.path.join(dir_path, file_name), 'w') as writer: + with open(os.path.join(dir_path, file_name), "w") as writer: writer.write(file_content) def generate_brewery_dataset(args, working_dir="."): - bar_options = {'restock': args.restock, 'stock': args.stock, 'waiters': args.waiters} + bar_options = { + "restock": args.restock, + "stock": args.stock, + "waiters": args.waiters, + } customers_options = { - 'satisfaction': args.satisfaction, - 'surrounding_satisfaction': args.surrounding_satisfaction, - 'thirsty': args.thirsty, + "satisfaction": args.satisfaction, + "surrounding_satisfaction": args.surrounding_satisfaction, + "thirsty": args.thirsty, } faker_instance = get_faker_instance(args.locale) @@ -215,9 +241,13 @@ def generate_brewery_dataset(args, working_dir="."): bar_to_customers = generate_bar_to_customers_mapping(bar, customers) customer_groups = generate_customers_groups(customers, args.tables) - customers_to_customers_links = generate_customers_to_customers_links(customer_groups) + customers_to_customers_links = generate_customers_to_customers_links( + customer_groups + ) - csv_files_content = generate_csv_files_content(bar, customers, bar_to_customers, customers_to_customers_links) + csv_files_content = generate_csv_files_content( + bar, customers, bar_to_customers, customers_to_customers_links + ) export_csv_files(csv_files_content, working_dir) LOGGER.info(f"Instance CSV files generated in folder {working_dir}") From 0f1fa71b93325d2e5423a67718e25a2f5c253de8 Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Laurent=20Al=C3=A9p=C3=A9e?= Date: Thu, 22 Jan 2026 16:39:31 +0100 Subject: [PATCH 06/21] chore black formating --- code/run_templates/common/common.py | 1 - code/run_templates/common/download_dataset.py | 4 +- code/run_templates/common/generate_dataset.py | 70 +++++++++++-------- code/run_templates/common/twingraph.py | 57 +++++++++++---- .../common/update_runner_metadata.py | 28 ++++++-- 5 files changed, 105 insertions(+), 55 deletions(-) diff --git a/code/run_templates/common/common.py b/code/run_templates/common/common.py index 5b16d9b..ca5bbe4 100644 --- a/code/run_templates/common/common.py +++ b/code/run_templates/common/common.py @@ -9,7 +9,6 @@ from cosmotech.coal.cosmotech_api.connection import get_api_client from cosmotech.coal.utils.logger import get_logger as _get_logger - LOGGER = _get_logger("my_etl_logger") diff --git a/code/run_templates/common/download_dataset.py b/code/run_templates/common/download_dataset.py index 7e06ab0..dc78e58 100644 --- a/code/run_templates/common/download_dataset.py +++ b/code/run_templates/common/download_dataset.py @@ -19,9 +19,9 @@ _d_data = download_dataset(ORG_ID, WS_ID, dataset_id, False) -tmp_path = pathlib.Path(_d_data['folder_path']) +tmp_path = pathlib.Path(_d_data["folder_path"]) for _p in tmp_path.glob("*"): shutil.copy(_p, SIM_DATA_PATH) -LOGGER.info(f"Downloaded dataset {dataset_id}") \ No newline at end of file +LOGGER.info(f"Downloaded dataset {dataset_id}") diff --git a/code/run_templates/common/generate_dataset.py b/code/run_templates/common/generate_dataset.py index 1d7cc42..eab83ec 100644 --- a/code/run_templates/common/generate_dataset.py +++ b/code/run_templates/common/generate_dataset.py @@ -39,12 +39,14 @@ def generate_data(data_dir): customers = list(fake_generator.name() for _ in range(random.randint(30, 150))) n_customers = len(customers) _stock = random.randint(30, 500) - bar_content.append({ - "id": bar, - "Stock": _stock, - "RestockQty": random.randint(10, _stock), - "NbWaiters": random.randint(1, n_customers // 2), - }) + bar_content.append( + { + "id": bar, + "Stock": _stock, + "RestockQty": random.randint(10, _stock), + "NbWaiters": random.randint(1, n_customers // 2), + } + ) mask = np.random.randint(2, size=(n_customers, n_customers), dtype=bool) r_customer = [] for i in range(n_customers): @@ -53,34 +55,41 @@ def generate_data(data_dir): _name = f"{_name} {dup_counter}" dup_counter += 1 r_customer.append(_name) - customer_content.append({ - "id": _name, - "Thirsty": "false", - "Satisfaction": 100, - "SurroundingSatisfaction": 100 - }) - bar_vertex_content.append({ - "source": bar, - "target": _name - }) + customer_content.append( + { + "id": _name, + "Thirsty": "false", + "Satisfaction": 100, + "SurroundingSatisfaction": 100, + } + ) + bar_vertex_content.append({"source": bar, "target": _name}) for i, j in itertools.combinations(range(n_customers), 2): if mask[i][j] or mask[j][i]: - arc_satisfaction_content.append({ - "source": r_customer[i], - "target": r_customer[j], - }) - arc_satisfaction_content.append({ - "source": r_customer[j], - "target": r_customer[i], - }) + arc_satisfaction_content.append( + { + "source": r_customer[i], + "target": r_customer[j], + } + ) + arc_satisfaction_content.append( + { + "source": r_customer[j], + "target": r_customer[i], + } + ) data_dir_path = pathlib.Path(data_dir) for _name, _headers, _content in [ ("Customer.csv", headers_customer, customer_content), ("Bar.csv", headers_bar, bar_content), ("Bar_vertex.csv", headers_bar_vertex, bar_vertex_content), - ("arc_Satisfaction.csv", headers_arc_satisfaction, arc_satisfaction_content), + ( + "arc_Satisfaction.csv", + headers_arc_satisfaction, + arc_satisfaction_content, + ), ]: _path = data_dir_path / _name with _path.open("w") as _file: @@ -105,24 +114,23 @@ def create_dataset_from(data_dir): DatasetPartCreateRequest( name=f"{RUN_ID} - {_p.name}", sourceName=_p.name, - type=DatasetPartTypeEnum.FILE + type=DatasetPartTypeEnum.FILE, ) for _p in data_dir_path.glob("*.csv") - ) + ), ) return d_api.create_dataset( ORG_ID, WS_ID, d_request, files=list( - (_p.name, _p.open("rb").read()) - for _p in data_dir_path.glob("*.csv") - ) + (_p.name, _p.open("rb").read()) for _p in data_dir_path.glob("*.csv") + ), ) if __name__ == "__main__": - with tempfile.TemporaryFile(suffix='dataset') as temp_dir: + with tempfile.TemporaryFile(suffix="dataset") as temp_dir: temp_dir_path = pathlib.Path(temp_dir) generate_data(temp_dir_path) dataset_id = create_dataset_from(temp_dir_path) diff --git a/code/run_templates/common/twingraph.py b/code/run_templates/common/twingraph.py index 65494cc..0eee85f 100644 --- a/code/run_templates/common/twingraph.py +++ b/code/run_templates/common/twingraph.py @@ -21,8 +21,12 @@ def parse_item(content, data, item_key): item_label = item["label"] item_type = item["type"] content_by_type = content[item_type] = content.get(item_type, {}) - content_by_label = content_by_type[item_label] = content_by_type.get(item_label, {}) - item_properties = content_by_label[item_id] = content_by_label.get(item_id, item["properties"]) + content_by_label = content_by_type[item_label] = content_by_type.get( + item_label, {} + ) + item_properties = content_by_label[item_id] = content_by_label.get( + item_id, item["properties"] + ) if item_type == "RELATION": item_properties["source"] = data[src_key]["properties"]["id"] @@ -39,8 +43,10 @@ def parse_edge(content, data): graph_content = reduce(parse_node, nodes, {}) graph_content = reduce(parse_edge, edges, graph_content) except Exception as e: - LOGGER.error(f"An error occurred while parsing the twingraph JSON representation: {e}") - raise(e) + LOGGER.error( + f"An error occurred while parsing the twingraph JSON representation: {e}" + ) + raise (e) return graph_content @@ -114,16 +120,26 @@ def dump_twingraph_dataset_to_zip_archive( edge_query_str = "OPTIONAL MATCH(src)-[edge]->(dst) RETURN src, edge, dst" fetch_start_time = time.time() - res_nodes = api.dataset.twingraph_query(organization_id, dataset_id, {"query": node_query_str}) - res_edges = api.dataset.twingraph_query(organization_id, dataset_id, {"query": edge_query_str}) + res_nodes = api.dataset.twingraph_query( + organization_id, dataset_id, {"query": node_query_str} + ) + res_edges = api.dataset.twingraph_query( + organization_id, dataset_id, {"query": edge_query_str} + ) fetch_duration_in_seconds = time.time() - fetch_start_time - LOGGER.info(f"Results received, queries took {fetch_duration_in_seconds} seconds") + LOGGER.info( + f"Results received, queries took {fetch_duration_in_seconds} seconds" + ) except cosmotech_api.ApiException as e: - LOGGER.error(f"Failed to retrieve content of target dataset '{dataset_id}': %s\n" % e) + LOGGER.error( + f"Failed to retrieve content of target dataset '{dataset_id}': %s\n" % e + ) raise e # Note: the keys are defined in the cypher queries above - graph_content = parse_twingraph_json(res_nodes, res_edges, "n", "edge", "src", "dst") + graph_content = parse_twingraph_json( + res_nodes, res_edges, "n", "edge", "src", "dst" + ) create_csv_files_from_graph_content(graph_content, folder_path) output_file_path = os.path.join(folder_path, "twingraph_dump") @@ -134,7 +150,11 @@ def dump_twingraph_dataset_to_zip_archive( def upload_twingraph_zip_archive(organization_id, dataset_id, zip_archive_path): try: - api.dataset.update_dataset(organization_id, dataset_id, {"ingestionStatus": "NONE", "sourceType": "File"}) + api.dataset.update_dataset( + organization_id, + dataset_id, + {"ingestionStatus": "NONE", "sourceType": "File"}, + ) except cosmotech_api.ApiException as e: LOGGER.error("Exception when changing twingraph type & status: %s\n" % e) raise e @@ -159,14 +179,20 @@ def upload_twingraph_zip_archive(organization_id, dataset_id, zip_archive_path): try: # Required delay to prevent some race condition, leading sometimes to the sourceType update being ignored time.sleep(2) - api.dataset.update_dataset(organization_id, dataset_id, {"ingestionStatus": "SUCCESS", "sourceType": "ETL"}) + api.dataset.update_dataset( + organization_id, + dataset_id, + {"ingestionStatus": "SUCCESS", "sourceType": "ETL"}, + ) except cosmotech_api.ApiException as e: LOGGER.error("Exception when changing twingraph type & status: %s\n" % e) raise e -def copy_dataset_twingraph(org_id, src_dataset_id, dst_dataset_id, node_query=None, edge_query=None): - ''' +def copy_dataset_twingraph( + org_id, src_dataset_id, dst_dataset_id, node_query=None, edge_query=None +): + """ Parameters: - org_id is the id of the organization containing the source and target datasets - src_dataset_id is the id of the dataset to copy @@ -176,11 +202,12 @@ def copy_dataset_twingraph(org_id, src_dataset_id, dst_dataset_id, node_query=No - edge_query (optional) is a string containing the cypher query to select the edges to copy; edge data must be returned with the aliasas 'src, edge, dst' (default query: "OPTIONAL MATCH(src)-[edge]->(dst) RETURN src, edge, dst") - ''' + """ with tempfile.TemporaryDirectory() as tmp_dir: twingraph_dump_dir_path = os.path.join(tmp_dir, "twingraph_dump") zip_archive_path = dump_twingraph_dataset_to_zip_archive( - org_id, src_dataset_id, twingraph_dump_dir_path, node_query, edge_query) + org_id, src_dataset_id, twingraph_dump_dir_path, node_query, edge_query + ) LOGGER.info(f"Twingraph extracted to {zip_archive_path}") LOGGER.info(f"Starting zip upload to existing dataset '{dst_dataset_id}'...") upload_twingraph_zip_archive(org_id, dst_dataset_id, zip_archive_path) diff --git a/code/run_templates/common/update_runner_metadata.py b/code/run_templates/common/update_runner_metadata.py index b4ac20c..f8c00fc 100644 --- a/code/run_templates/common/update_runner_metadata.py +++ b/code/run_templates/common/update_runner_metadata.py @@ -11,15 +11,26 @@ def get_arguments(): parser = argparse.ArgumentParser(description="Update runner metadata") - parser.add_argument("--table-prefix", help="RunnerMetadata table prefix", required=False, default="") parser.add_argument( - "--postgres-host", help="Postgresql host name", required=False, default=os.environ.get("POSTGRES_HOST_URI") + "--table-prefix", help="RunnerMetadata table prefix", required=False, default="" ) parser.add_argument( - "--postgres-port", help="Postgresql port", required=False, default=os.environ.get("POSTGRES_HOST_PORT") + "--postgres-host", + help="Postgresql host name", + required=False, + default=os.environ.get("POSTGRES_HOST_URI"), ) parser.add_argument( - "--postgres-db", help="Postgresql database name", required=False, default=os.environ.get("POSTGRES_DB_NAME") + "--postgres-port", + help="Postgresql port", + required=False, + default=os.environ.get("POSTGRES_HOST_PORT"), + ) + parser.add_argument( + "--postgres-db", + help="Postgresql database name", + required=False, + default=os.environ.get("POSTGRES_DB_NAME"), ) parser.add_argument( "--postgres-schema", @@ -53,7 +64,10 @@ def get_arguments(): default=os.environ.get("CSM_WORKSPACE_ID"), ) parser.add_argument( - "--csm-runner-id", help="Cosmo Tech Runner ID", required=False, default=os.environ.get("CSM_RUNNER_ID") + "--csm-runner-id", + help="Cosmo Tech Runner ID", + required=False, + default=os.environ.get("CSM_RUNNER_ID"), ) return parser.parse_args() @@ -61,7 +75,9 @@ def get_arguments(): def main(): args = get_arguments() - runner = api.runner.get_runner(args.csm_organization_id, args.csm_workspace_id, args.csm_runner_id) + runner = api.runner.get_runner( + args.csm_organization_id, args.csm_workspace_id, args.csm_runner_id + ) schema_table = f"{args.postgres_schema}.{args.table_prefix}RunnerMetadata" sql_create_table = f""" From 1ee027ad532e42cd780ad0d402ce0d0cc6ac0469 Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Laurent=20Al=C3=A9p=C3=A9e?= Date: Tue, 27 Jan 2026 10:37:25 +0100 Subject: [PATCH 07/21] add etl local file for v5 --- code/run_templates/etl_with_local_file/etl.py | 140 ++++-------------- 1 file changed, 28 insertions(+), 112 deletions(-) diff --git a/code/run_templates/etl_with_local_file/etl.py b/code/run_templates/etl_with_local_file/etl.py index 26138bf..aa269f7 100644 --- a/code/run_templates/etl_with_local_file/etl.py +++ b/code/run_templates/etl_with_local_file/etl.py @@ -1,128 +1,44 @@ -import csv -import json import os from zipfile import ZipFile +from pathlib import Path -from cosmotech_api.model.dataset import Dataset -from common.common import get_logger, get_api +from cosmotech.coal.utils.logger import get_logger +from cosmotech.coal.utils.configuration import Configuration +from cosmotech.coal.cosmotech_api.apis.runner import RunnerApi +from cosmotech.coal.cosmotech_api.apis.dataset import DatasetApi -LOGGER = get_logger() +LOGGER = get_logger('etl') -def get_zip_file_name(dir): - for _, _, files in os.walk(dir): +def get_zip_file_path(dir): + for root, _, files in os.walk(dir): for file in files: if file.endswith(".zip"): - return file + return Path(root) / file -def main(): - LOGGER.info("Starting the ETL Run") - api = get_api() - runner_data = api.runner.get_runner( - organization_id=os.environ.get("CSM_ORGANIZATION_ID"), - workspace_id=os.environ.get("CSM_WORKSPACE_ID"), - runner_id=os.environ.get("CSM_RUNNER_ID"), - ) - - with open(os.path.join(os.environ.get("CSM_PARAMETERS_ABSOLUTE_PATH"), "parameters.json")) as f: - parameters = {d["parameterId"]: d["value"] for d in json.loads(f.read())} - - LOGGER.info("All parameters are loaded") - - bars = list() - - bar = { - "type": "Bar", - "name": "MyBar", - "params": f"""NbWaiters: {int(parameters["etl_param_num_waiters"])},""" - + f"""RestockQty: {int(parameters["etl_param_restock_quantity"])},""" - + f"""Stock: {int(parameters["etl_param_stock"])}""", - } - bars.append(bar) - - base_path = parameters["etl_param_bar_instance"] - file_name = get_zip_file_name(base_path) - with ZipFile(base_path + "/" + file_name) as zip: - zip.extractall(base_path) - - base_path = base_path + "/reference" - customers = list() - with open(base_path + "/Nodes/Customer.csv") as _f: - LOGGER.info("Found 'Customer' list") - csv_r = csv.DictReader(_f) - for row in csv_r: - customer = { - "type": "Customer", - "name": row["id"], - "params": f"Name: '{row['id']}'," - f"Satisfaction: {row['Satisfaction']}," - f"SurroundingSatisfaction: {row['SurroundingSatisfaction']}," - f"Thirsty: {row['Thirsty']}", - } - customers.append(customer) - - satisfactions = list() - with open(base_path + "/Edges/arc_Satisfaction.csv") as _f: - LOGGER.info("Found 'Customer satisfaction' relation") - csv_r = csv.DictReader(_f) - for row in csv_r: - satisfaction = { - "type": "arc_Satisfaction", - "source": row["source"], - "target": row["target"], - "name": row["name"], - "params": "a: 'a'", - } - satisfactions.append(satisfaction) - - links = list() - with open(base_path + "/Edges/Bar_vertex.csv") as _f: - LOGGER.info("Found 'Bar vertex' relation") - csv_r = csv.DictReader(_f) - for row in csv_r: - link = { - "type": "bar_vertex", - "source": row["source"], - "target": row["target"], - "name": row["name"], - "params": "a: 'a'", - } - links.append(link) - - dataset = Dataset(ingestion_status="SUCCESS") - api.dataset.update_dataset(os.environ.get("CSM_ORGANIZATION_ID"), runner_data.dataset_list[0], dataset) - - try: - LOGGER.info("Erasing data from target Dataset") - api.dataset.twingraph_query( - organization_id=os.environ.get("CSM_ORGANIZATION_ID"), - dataset_id=runner_data.dataset_list[0], - dataset_twin_graph_query={"query": "MATCH (n) DETACH DELETE n"}, - ) - except Exception: - pass - - LOGGER.info("Writing entities into target Dataset") - api.dataset.create_twingraph_entities( - organization_id=os.environ.get("CSM_ORGANIZATION_ID"), - dataset_id=runner_data.dataset_list[0], - type="node", - graph_properties=bars + customers, - ) +def list_files(dir): + for root, _, files in os.walk(dir): + for file in files: + return Path(root) / file - LOGGER.info("Writing relationshipss into target Dataset") - api.dataset.create_twingraph_entities( - organization_id=os.environ.get("CSM_ORGANIZATION_ID"), - dataset_id=runner_data.dataset_list[0], - type="relationship", - graph_properties=satisfactions + links, - ) - api.dataset.update_dataset( - os.environ.get("CSM_ORGANIZATION_ID"), runner_data['dataset_list'][0], Dataset(twincacheStatus="FULL") +def main(): + LOGGER.info("Starting the ETL Run") + coal_config = Configuration() + + parameter_dir_path = Path(coal_config.cosmotech.parameters_absolute_path) + extract_path = parameter_dir_path / "extract" + with ZipFile(get_zip_file_path(parameter_dir_path)) as zip: + zip.extractall(extract_path) + + runner_data = RunnerApi().get_runner_metadata() + target_dataset_id = runner_data['datasets']['bases'][0] + path_list_db = list_files(extract_path) + datasetApi = DatasetApi() + datasetApi.update_dataset_from_files( + target_dataset_id, [], path_list_db ) - LOGGER.info("ETL Run finished") From 73b7bae56a1f4f67a6e45dddb4e4612ae53fed8c Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Laurent=20Al=C3=A9p=C3=A9e?= Date: Tue, 27 Jan 2026 10:38:39 +0100 Subject: [PATCH 08/21] add printing of parameter.json content --- code/run_templates/debug/print_inputs.py | 4 ++++ 1 file changed, 4 insertions(+) diff --git a/code/run_templates/debug/print_inputs.py b/code/run_templates/debug/print_inputs.py index 20f5519..90ba368 100644 --- a/code/run_templates/debug/print_inputs.py +++ b/code/run_templates/debug/print_inputs.py @@ -42,6 +42,10 @@ def main(): _conf = Configuration() param_path = Path(_conf.cosmotech.parameters_absolute_path) + dprint("parameter.json:") + with open(param_path / "parameters.json") as f: + print(f.read()) + dprint(f"Printing {param_path.resolve()} content:") for line in tree(param_path): dprint(line) From 32342702cbfafaa8f58bea38eadb3238035d37fa Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Laurent=20Al=C3=A9p=C3=A9e?= Date: Tue, 27 Jan 2026 11:38:12 +0100 Subject: [PATCH 09/21] fix rename etl --- code/run_templates/etl_with_local_file/common | 1 - code/run_templates/{etl_with_local_file => etl_zip2db}/etl.py | 0 code/run_templates/{etl_with_local_file => etl_zip2db}/run.json | 0 3 files changed, 1 deletion(-) delete mode 120000 code/run_templates/etl_with_local_file/common rename code/run_templates/{etl_with_local_file => etl_zip2db}/etl.py (100%) rename code/run_templates/{etl_with_local_file => etl_zip2db}/run.json (100%) diff --git a/code/run_templates/etl_with_local_file/common b/code/run_templates/etl_with_local_file/common deleted file mode 120000 index 8332399..0000000 --- a/code/run_templates/etl_with_local_file/common +++ /dev/null @@ -1 +0,0 @@ -../common/ \ No newline at end of file diff --git a/code/run_templates/etl_with_local_file/etl.py b/code/run_templates/etl_zip2db/etl.py similarity index 100% rename from code/run_templates/etl_with_local_file/etl.py rename to code/run_templates/etl_zip2db/etl.py diff --git a/code/run_templates/etl_with_local_file/run.json b/code/run_templates/etl_zip2db/run.json similarity index 100% rename from code/run_templates/etl_with_local_file/run.json rename to code/run_templates/etl_zip2db/run.json From 391616113ea730334dc0d843c75c28baaa86bf92 Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Laurent=20Al=C3=A9p=C3=A9e?= Date: Wed, 28 Jan 2026 14:04:37 +0100 Subject: [PATCH 10/21] add parameter handler for brewery use parameter Stock RestockQty NbWaiters or uploaded file to set bar.csv source data --- .../brewery/parameters_handler.py | 77 +++++++++++++++++++ code/run_templates/brewery/run.json | 13 +++- 2 files changed, 89 insertions(+), 1 deletion(-) create mode 100644 code/run_templates/brewery/parameters_handler.py diff --git a/code/run_templates/brewery/parameters_handler.py b/code/run_templates/brewery/parameters_handler.py new file mode 100644 index 0000000..64682c6 --- /dev/null +++ b/code/run_templates/brewery/parameters_handler.py @@ -0,0 +1,77 @@ +import json +import shutil +import csv +from pathlib import Path + +from cosmotech.coal.utils.configuration.Configuration import ENVIRONMENT_CONFIGURATION as EC +from cosmotech.orchestrator.utils.logger import get_logger + +LOGGER = get_logger('parameters_handler') + + +def update_first_row_in_csv(csv_path, updated_values): + """ + Read a CSV file and change the first data row with new values. + + Args: + csv_path: Path to the CSV file + updated_values: Dictionary with column names as keys and new values + """ + # Read the CSV file + with open(csv_path, 'r', newline='') as f: + reader = csv.DictReader(f) + rows = list(reader) + fieldnames = reader.fieldnames + + # Update the first row with new values + if rows: + for key, value in updated_values.items(): + if key in rows[0]: + rows[0][key] = value + + # Write back to the CSV file + with open(csv_path, 'w', newline='') as f: + writer = csv.DictWriter(f, fieldnames=fieldnames) + writer.writeheader() + writer.writerows(rows) + + +def read_parameters(): + with open( + Path(EC.cosmotech.parameters_absolute_path) / "parameters.json" + ) as f: + parameters = {d["parameterId"]: d["value"] for d in json.loads(f.read())} + + LOGGER.info("Parameters loaded from JSON") + return parameters + + +def main(): + LOGGER.info("Starting parameter handler") + + # get_parameter from json + parameters = read_parameters() + # update dataset Bar.csv with param + updated_values = {} + if "NbWaiters" in parameters: + updated_values["NbWaiters"] = parameters["NbWaiters"] + if "RestockQty" in parameters: + updated_values["RestockQty"] = parameters["RestockQty"] + if "Stock" in parameters: + updated_values["Stock"] = parameters["Stock"] + + bar_data_path = Path(EC.cosmotech.dataset_absolute_path) / "Bar.csv" + update_first_row_in_csv(bar_data_path, updated_values) + LOGGER.info("Updated Bar.csv with parameters") + + # replace bar.csv if file is provided as parameter + param_path = Path(EC.cosmotech.parameters_absolute_path) + bar_param_path = param_path / "bar.csv" + if bar_data_path.exists(): + # replace dataset Bar.csv with parameter Bar.csv + shutil.copy(bar_param_path, bar_data_path) + LOGGER.info("Bar.csv replaced by given file") + + +if __name__ == "__main__": + main() diff --git a/code/run_templates/brewery/run.json b/code/run_templates/brewery/run.json index 3e285f3..e6d1174 100644 --- a/code/run_templates/brewery/run.json +++ b/code/run_templates/brewery/run.json @@ -6,6 +6,17 @@ "arguments": ["api", "run-load-data"], "useSystemEnvironment": true }, + { + "id": "parameters_handler", + "command": "python3", + "arguments": [ + "code/run_templates/brewery/parameters_handler.py" + ], + "precedents": [ + "fetch_parameters" + ], + "useSystemEnvironment": true + }, { "id": "engine", "command": "csm-simulator", @@ -14,7 +25,7 @@ "BreweryDemoSimulationWithConnector" ], "precedents": [ - "fetch_parameters" + "parameters_handler" ], "useSystemEnvironment": true }, From 800352f773cb829d133cb5717ddba1994b2f92b4 Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Laurent=20Al=C3=A9p=C3=A9e?= Date: Fri, 30 Jan 2026 17:46:15 +0100 Subject: [PATCH 11/21] fix run template zip2db --- code/run_templates/etl_zip2db/run.json | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/code/run_templates/etl_zip2db/run.json b/code/run_templates/etl_zip2db/run.json index 38bd578..9d61b60 100644 --- a/code/run_templates/etl_zip2db/run.json +++ b/code/run_templates/etl_zip2db/run.json @@ -12,7 +12,7 @@ { "id": "ETL", "command": "python", - "arguments": ["run_templates/etl_with_local_file/etl.py"], + "arguments": ["run_templates/etl_zip2db/etl.py"], "precedents": ["fetch_parameters"], "useSystemEnvironment": true } From ce37a95904e936401a29d86ae1f5087f71d45703 Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Laurent=20Al=C3=A9p=C3=A9e?= Date: Fri, 30 Jan 2026 17:47:02 +0100 Subject: [PATCH 12/21] add output step to debug run template --- code/run_templates/debug/run.json | 22 ++++++++++++++++++++++ 1 file changed, 22 insertions(+) diff --git a/code/run_templates/debug/run.json b/code/run_templates/debug/run.json index 2f3ce41..e2e2831 100644 --- a/code/run_templates/debug/run.json +++ b/code/run_templates/debug/run.json @@ -15,6 +15,28 @@ "arguments": ["code/run_templates/debug/print_inputs.py"], "precedents": ["fetch_parameters"], "useSystemEnvironment": true + }, + { + "id": "load_results_to_store", + "command": "csm-data", + "arguments": [ + "store", + "load-csv-folder", + "--csv-folder", + "$CSM_DATASET_ABSOLUTE_PATH" + ], + "precedents": ["print_out"], + "useSystemEnvironment": true + }, + { + "id": "send_results_to_psql", + "command": "csm-data", + "arguments": [ + "store", + "output" + ], + "precedents": ["load_results_to_store"], + "useSystemEnvironment": true } ] } From 2a35e6135ea20419b36e9039cc7f903a47ae396a Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Laurent=20Al=C3=A9p=C3=A9e?= Date: Tue, 3 Feb 2026 14:31:52 +0100 Subject: [PATCH 13/21] add full demo run_template --- .../brewery/parameters_handler.py | 10 +- .../full_demo/parameters_handler.py | 128 ++++++++++++++++++ code/run_templates/full_demo/print_inputs.py | 60 ++++++++ code/run_templates/full_demo/run.json | 60 ++++++++ 4 files changed, 256 insertions(+), 2 deletions(-) create mode 100644 code/run_templates/full_demo/parameters_handler.py create mode 100644 code/run_templates/full_demo/print_inputs.py create mode 100644 code/run_templates/full_demo/run.json diff --git a/code/run_templates/brewery/parameters_handler.py b/code/run_templates/brewery/parameters_handler.py index 64682c6..f68e206 100644 --- a/code/run_templates/brewery/parameters_handler.py +++ b/code/run_templates/brewery/parameters_handler.py @@ -46,6 +46,13 @@ def read_parameters(): return parameters +def fetch_parameter_file_path(param_name: str) -> Path: + for r, d, f in os.walk(EC.cosmotech.parameters_absolute_path): + if param_name in r: + return Path(r) / f[0] + raise FileNotFoundError(f"Parameter file for {param_name} not found.") + + def main(): LOGGER.info("Starting parameter handler") @@ -65,8 +72,7 @@ def main(): LOGGER.info("Updated Bar.csv with parameters") # replace bar.csv if file is provided as parameter - param_path = Path(EC.cosmotech.parameters_absolute_path) - bar_param_path = param_path / "bar.csv" + bar_param_path = fetch_parameter_file_path("initial_stock_dataset") if bar_data_path.exists(): # replace dataset Bar.csv with parameter Bar.csv shutil.copy(bar_param_path, bar_data_path) diff --git a/code/run_templates/full_demo/parameters_handler.py b/code/run_templates/full_demo/parameters_handler.py new file mode 100644 index 0000000..f0adf27 --- /dev/null +++ b/code/run_templates/full_demo/parameters_handler.py @@ -0,0 +1,128 @@ +import csv +import json +import os +import itertools +from pathlib import Path + +from cosmotech.coal.utils.configuration.Configuration import ENVIRONMENT_CONFIGURATION as EC +from cosmotech.orchestrator.utils.logger import get_logger + +LOGGER = get_logger('parameter_handler') + + +def get_parameters(): + with open( + Path(EC.cosmotech.parameters_absolute_path) / "parameters.json" + ) as f: + parameters = {d["parameterId"]: d["value"] for d in json.loads(f.read())} + + LOGGER.info("Parameters loaded from JSON") + return parameters + + +def update_first_row_in_csv(csv_path, updated_values): + """ + Read a CSV file and change the first data row with new values. + + Args: + csv_path: Path to the CSV file + updated_values: Dictionary with column names as keys and new values + """ + # Read the CSV file + with open(csv_path, 'r', newline='') as f: + reader = csv.DictReader(f) + rows = list(reader) + fieldnames = reader.fieldnames + + # Update the first row with new values + if rows: + for key, value in updated_values.items(): + if key in rows[0]: + rows[0][key] = value + + # Write back to the CSV file + with open(csv_path, 'w', newline='') as f: + writer = csv.DictWriter(f, fieldnames=fieldnames) + writer.writeheader() + writer.writerows(rows) + + return rows[0] + + +def fetch_parameter_file_path(param_name: str) -> Path: + for r, d, f in os.walk(EC.cosmotech.parameters_absolute_path): + if param_name in r: + return Path(r) / f[0] + raise FileNotFoundError(f"Parameter file for {param_name} not found.") + + +def fetch_customers_list(): + customers = [] + + def extract_names_from_csv(csv_path: Path): + names = [] + with open(csv_path, 'r', newline='') as f: + reader = csv.DictReader(f) + for row in reader: + names.append(row['Name']) + return names + + customers_csv_path = fetch_parameter_file_path("customers") + customers.extend(extract_names_from_csv(customers_csv_path)) + + additionnal_customers_csv_path = fetch_parameter_file_path("additional_customers") + customers.extend(extract_names_from_csv(additionnal_customers_csv_path)) + + LOGGER.info(f"Fetched {len(customers)} customers from {customers_csv_path}") + return customers + + +def generate_bar_to_customer_mapping(bar_name: str, customers: list): + bar_vertex_csv_path = Path(EC.cosmotech.datasets_absolute_path) / "Bar_vertex.csv" + with open(bar_vertex_csv_path, 'w', newline='') as f: + fieldnames = ['source', 'target'] + writer = csv.DictWriter(f, fieldnames=fieldnames) + writer.writeheader() + for customer in customers: + writer.writerow({'source': bar_name, 'target': customer}) + LOGGER.info(f"Generated bar to customer mapping in {bar_vertex_csv_path}") + + +def generate_customers_to_customer_mapping(customers: list): + arc_satisfaction_csv_path = Path(EC.cosmotech.datasets_absolute_path) / "arc_Satisfaction.csv" + with open(arc_satisfaction_csv_path, 'w', newline='') as f: + fieldnames = ['source', 'target'] + writer = csv.DictWriter(f, fieldnames=fieldnames) + writer.writeheader() + for i, j in itertools.combinations(customers, 2): + writer.writerow({'source': customers[i], 'target': customers[j]}) + writer.writerow({'source': customers[j], 'target': customers[i]}) + LOGGER.info(f"Generated customers to customer mapping in {arc_satisfaction_csv_path}") + + +def main(): + LOGGER.info("Starting parameter handler") + + # get_parameter from json + parameters = get_parameters() + + # update dataset Bar.csv with param + updated_values = {} + if "NbWaiters" in parameters: + updated_values["NbWaiters"] = parameters["NbWaiters"] + if "RestockQty" in parameters: + updated_values["RestockQty"] = parameters["RestockQty"] + if "Stock" in parameters: + updated_values["Stock"] = parameters["Stock"] + if "bar_name" in parameters: + updated_values["bar_name"] = parameters["bar_name"] + bar_csv_path = Path(EC.cosmotech.datasets_absolute_path) / "Bar.csv" + first_row = update_first_row_in_csv(bar_csv_path, updated_values) + LOGGER.info("Updated Bar.csv with parameters") + + # generate_Customers + customers = fetch_customers_list() + # generate bar_vertex + generate_bar_to_customer_mapping(first_row["bar_name"], customers) + # generate arc_satisfaction + generate_customers_to_customer_mapping(customers) diff --git a/code/run_templates/full_demo/print_inputs.py b/code/run_templates/full_demo/print_inputs.py new file mode 100644 index 0000000..90ba368 --- /dev/null +++ b/code/run_templates/full_demo/print_inputs.py @@ -0,0 +1,60 @@ +from cosmotech.coal.utils.configuration import Configuration +from pathlib import Path + + +# Source - https://stackoverflow.com/a +# Posted by Aaron Hall, modified by community. See post 'Timeline' for change history +# Retrieved 2026-01-15, License - CC BY-SA 4.0 + + +def tree(dir_path: Path, prefix: str = ''): + """A recursive generator, given a directory Path object + will yield a visual tree structure line by line + with each line prefixed by the same characters + """ + # prefix components: + space = ' ' + branch = '│ ' + # pointers: + tee = '├── ' + last = '└── ' + + contents = list(dir_path.iterdir()) + # contents each get pointers that are ├── with a final └── : + pointers = [tee] * (len(contents) - 1) + [last] + for pointer, path in zip(pointers, contents): + size = "" + if path.is_file(): + size = f": {path.stat().st_size}o" + yield prefix + pointer + path.name + size + if path.is_dir(): # extend the prefix and recurse: + extension = branch if pointer == tee else space + # i.e. space because last, └── , above so no more | + yield from tree(path, prefix=prefix+extension) + + +def dprint(line): + pre = '[debug]' + print(f"{pre} {line}") + + +def main(): + _conf = Configuration() + + param_path = Path(_conf.cosmotech.parameters_absolute_path) + dprint("parameter.json:") + with open(param_path / "parameters.json") as f: + print(f.read()) + + dprint(f"Printing {param_path.resolve()} content:") + for line in tree(param_path): + dprint(line) + + data_path = Path(_conf.cosmotech.dataset_absolute_path) + dprint(f"Printng {data_path.resolve()} content:") + for line in tree(data_path): + dprint(line) + + +if __name__ == "__main__": + main() diff --git a/code/run_templates/full_demo/run.json b/code/run_templates/full_demo/run.json new file mode 100644 index 0000000..741d83c --- /dev/null +++ b/code/run_templates/full_demo/run.json @@ -0,0 +1,60 @@ +{ + "steps": [ + { + "id": "fetch_parameters", + "command": "csm-data", + "arguments": ["api", "run-load-data"], + "useSystemEnvironment": true + }, + { + "id": "print_out", + "command": "python3", + "arguments": ["code/run_templates/full_demo/print_inputs.py"], + "precedents": ["fetch_parameters"], + "useSystemEnvironment": true + }, + { + "id": "parameters_handler", + "command": "python3", + "arguments": [ + "code/run_templates/full_demo/parameters_handler.py" + ], + "precedents": [ + "fetch_parameters" + ], + "useSystemEnvironment": true + }, + { + "id": "engine", + "command": "csm-simulator", + "arguments": [ + "-i", + "BreweryDemoSimulationWithConnector" + ], + "precedents": [ + "parameters_handler" + ], + "useSystemEnvironment": true + }, + { + "id": "load_results_to_store", + "command": "csm-data", + "arguments": [ + "store", + "load-csv-folder" + ], + "precedents": ["engine"], + "useSystemEnvironment": true + }, + { + "id": "send_results_to_psql", + "command": "csm-data", + "arguments": [ + "store", + "output" + ], + "precedents": ["load_results_to_store"], + "useSystemEnvironment": true + } + ] +} From 591df76d10497d2d5138a2518ac0fe70831c38ad Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Laurent=20Al=C3=A9p=C3=A9e?= Date: Tue, 3 Feb 2026 17:03:55 +0100 Subject: [PATCH 14/21] fix Coal Environement configuration import --- code/run_templates/brewery/parameters_handler.py | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/code/run_templates/brewery/parameters_handler.py b/code/run_templates/brewery/parameters_handler.py index f68e206..4c00781 100644 --- a/code/run_templates/brewery/parameters_handler.py +++ b/code/run_templates/brewery/parameters_handler.py @@ -3,7 +3,7 @@ import csv from pathlib import Path -from cosmotech.coal.utils.configuration.Configuration import ENVIRONMENT_CONFIGURATION as EC +from cosmotech.coal.utils.configuration import ENVIRONMENT_CONFIGURATION as EC from cosmotech.orchestrator.utils.logger import get_logger LOGGER = get_logger('parameters_handler') From 4b4982907f266f05d05375e5e0ab49c1daeba065 Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Laurent=20Al=C3=A9p=C3=A9e?= Date: Wed, 4 Feb 2026 17:29:32 +0100 Subject: [PATCH 15/21] fix brewery with new coal fetch dataset/parameter --- code/run_templates/brewery/parameters_handler.py | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/code/run_templates/brewery/parameters_handler.py b/code/run_templates/brewery/parameters_handler.py index 4c00781..6c6923c 100644 --- a/code/run_templates/brewery/parameters_handler.py +++ b/code/run_templates/brewery/parameters_handler.py @@ -72,7 +72,7 @@ def main(): LOGGER.info("Updated Bar.csv with parameters") # replace bar.csv if file is provided as parameter - bar_param_path = fetch_parameter_file_path("initial_stock_dataset") + bar_param_path = fetch_parameter_file_path("initial_stock_dataset") / "initial_stock_dataset" if bar_data_path.exists(): # replace dataset Bar.csv with parameter Bar.csv shutil.copy(bar_param_path, bar_data_path) From 762d4769e919651750f335124aa5f6c70e13b3ac Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Laurent=20Al=C3=A9p=C3=A9e?= Date: Wed, 4 Feb 2026 17:30:22 +0100 Subject: [PATCH 16/21] fix etl upload zip --- code/run_templates/etl_zip2db/etl.py | 2 +- code/run_templates/etl_zip2db/run.json | 2 +- 2 files changed, 2 insertions(+), 2 deletions(-) diff --git a/code/run_templates/etl_zip2db/etl.py b/code/run_templates/etl_zip2db/etl.py index aa269f7..920b57f 100644 --- a/code/run_templates/etl_zip2db/etl.py +++ b/code/run_templates/etl_zip2db/etl.py @@ -20,7 +20,7 @@ def get_zip_file_path(dir): def list_files(dir): for root, _, files in os.walk(dir): for file in files: - return Path(root) / file + yield Path(root) / file def main(): diff --git a/code/run_templates/etl_zip2db/run.json b/code/run_templates/etl_zip2db/run.json index 9d61b60..6a6bd7d 100644 --- a/code/run_templates/etl_zip2db/run.json +++ b/code/run_templates/etl_zip2db/run.json @@ -12,7 +12,7 @@ { "id": "ETL", "command": "python", - "arguments": ["run_templates/etl_zip2db/etl.py"], + "arguments": ["code/run_templates/etl_zip2db/etl.py"], "precedents": ["fetch_parameters"], "useSystemEnvironment": true } From 2df0bb06b0bdbcfa1a075d50c9a9a339d2611ee1 Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Laurent=20Al=C3=A9p=C3=A9e?= Date: Wed, 4 Feb 2026 17:30:57 +0100 Subject: [PATCH 17/21] fix full demo run template --- .../full_demo/parameters_handler.py | 76 +++++++++++++++---- code/run_templates/full_demo/print_inputs.py | 2 +- code/run_templates/full_demo/run.json | 2 +- 3 files changed, 64 insertions(+), 16 deletions(-) diff --git a/code/run_templates/full_demo/parameters_handler.py b/code/run_templates/full_demo/parameters_handler.py index f0adf27..810ea01 100644 --- a/code/run_templates/full_demo/parameters_handler.py +++ b/code/run_templates/full_demo/parameters_handler.py @@ -2,9 +2,10 @@ import json import os import itertools +import shutil from pathlib import Path -from cosmotech.coal.utils.configuration.Configuration import ENVIRONMENT_CONFIGURATION as EC +from cosmotech.coal.utils.configuration import ENVIRONMENT_CONFIGURATION as EC from cosmotech.orchestrator.utils.logger import get_logger LOGGER = get_logger('parameter_handler') @@ -49,11 +50,18 @@ def update_first_row_in_csv(csv_path, updated_values): return rows[0] +def fetch_dataset_file_path(dataset_name: str) -> Path: + for r, d, f in os.walk(EC.cosmotech.dataset_absolute_path): + if dataset_name in f: + return Path(r) / dataset_name + raise FileNotFoundError(f"File for {dataset_name} not found in {EC.cosmotech.dataset_absolute_path}.") + + def fetch_parameter_file_path(param_name: str) -> Path: for r, d, f in os.walk(EC.cosmotech.parameters_absolute_path): if param_name in r: return Path(r) / f[0] - raise FileNotFoundError(f"Parameter file for {param_name} not found.") + raise FileNotFoundError(f"File for {param_name} not found in {EC.cosmotech.parameters_absolute_path}.") def fetch_customers_list(): @@ -67,18 +75,39 @@ def extract_names_from_csv(csv_path: Path): names.append(row['Name']) return names - customers_csv_path = fetch_parameter_file_path("customers") + try: + customers_csv_path = fetch_parameter_file_path("customers_dynamic_table") + except FileNotFoundError: + customers_csv_path = fetch_dataset_file_path("Customer.csv") customers.extend(extract_names_from_csv(customers_csv_path)) - additionnal_customers_csv_path = fetch_parameter_file_path("additional_customers") - customers.extend(extract_names_from_csv(additionnal_customers_csv_path)) + try: + additionnal_customers_csv_path = fetch_parameter_file_path("additional_customers") + customers.extend(extract_names_from_csv(additionnal_customers_csv_path)) + except FileNotFoundError: + pass LOGGER.info(f"Fetched {len(customers)} customers from {customers_csv_path}") return customers +def generate_customers(customers: list): + customer_csv_path = Path(EC.cosmotech.dataset_absolute_path) / "Customer.csv" + with open(customer_csv_path, 'w', newline='') as f: + fieldnames = ['id', 'Thirsty', 'Satisfaction', 'SurroundingSatisfaction'] + writer = csv.DictWriter(f, fieldnames=fieldnames) + writer.writeheader() + for customer in customers: + writer.writerow({ + 'id': customer, + 'Thirsty': 'false', + 'Satisfaction': 100, + 'SurroundingSatisfaction': 100 + }) + LOGGER.info(f"Generated Customer.csv with {len(customers)} customers at {customer_csv_path}") + def generate_bar_to_customer_mapping(bar_name: str, customers: list): - bar_vertex_csv_path = Path(EC.cosmotech.datasets_absolute_path) / "Bar_vertex.csv" + bar_vertex_csv_path = Path(EC.cosmotech.dataset_absolute_path) / "Bar_vertex.csv" with open(bar_vertex_csv_path, 'w', newline='') as f: fieldnames = ['source', 'target'] writer = csv.DictWriter(f, fieldnames=fieldnames) @@ -89,14 +118,14 @@ def generate_bar_to_customer_mapping(bar_name: str, customers: list): def generate_customers_to_customer_mapping(customers: list): - arc_satisfaction_csv_path = Path(EC.cosmotech.datasets_absolute_path) / "arc_Satisfaction.csv" + arc_satisfaction_csv_path = Path(EC.cosmotech.dataset_absolute_path) / "arc_Satisfaction.csv" with open(arc_satisfaction_csv_path, 'w', newline='') as f: fieldnames = ['source', 'target'] writer = csv.DictWriter(f, fieldnames=fieldnames) writer.writeheader() for i, j in itertools.combinations(customers, 2): - writer.writerow({'source': customers[i], 'target': customers[j]}) - writer.writerow({'source': customers[j], 'target': customers[i]}) + writer.writerow({'source': i, 'target': j}) + writer.writerow({'source': j, 'target': i}) LOGGER.info(f"Generated customers to customer mapping in {arc_satisfaction_csv_path}") @@ -114,15 +143,34 @@ def main(): updated_values["RestockQty"] = parameters["RestockQty"] if "Stock" in parameters: updated_values["Stock"] = parameters["Stock"] - if "bar_name" in parameters: - updated_values["bar_name"] = parameters["bar_name"] - bar_csv_path = Path(EC.cosmotech.datasets_absolute_path) / "Bar.csv" + updated_values["id"] = "MyBar" + # copy initial Bar.csv to datasets path + fetched_dataset_file_path = fetch_dataset_file_path("Bar.csv") + bar_csv_path = Path(EC.cosmotech.dataset_absolute_path) / "Bar.csv" + shutil.copy(fetched_dataset_file_path, bar_csv_path) first_row = update_first_row_in_csv(bar_csv_path, updated_values) LOGGER.info("Updated Bar.csv with parameters") - # generate_Customers + # copy initial Customer.csv to datasets path + fetched_dataset_file_path = fetch_dataset_file_path("Customer.csv") + customer_csv_path = Path(EC.cosmotech.dataset_absolute_path) / "Customer.csv" + shutil.copy(fetched_dataset_file_path, customer_csv_path) + + LOGGER.info("Updated Customer.csv with parameters") + + # fetch customers customers = fetch_customers_list() + # generate Customer.csv + generate_customers(customers) # generate bar_vertex - generate_bar_to_customer_mapping(first_row["bar_name"], customers) + generate_bar_to_customer_mapping(first_row["id"], customers) # generate arc_satisfaction generate_customers_to_customer_mapping(customers) + + print(f"oskour: {os.listdir('/mnt/coal/')}") + with open('/mnt/coal/coal-config.toml') as f: + print(f.read()) + + +if __name__ == "__main__": + main() diff --git a/code/run_templates/full_demo/print_inputs.py b/code/run_templates/full_demo/print_inputs.py index 90ba368..acb7373 100644 --- a/code/run_templates/full_demo/print_inputs.py +++ b/code/run_templates/full_demo/print_inputs.py @@ -51,7 +51,7 @@ def main(): dprint(line) data_path = Path(_conf.cosmotech.dataset_absolute_path) - dprint(f"Printng {data_path.resolve()} content:") + dprint(f"Printing {data_path.resolve()} content:") for line in tree(data_path): dprint(line) diff --git a/code/run_templates/full_demo/run.json b/code/run_templates/full_demo/run.json index 741d83c..84b257c 100644 --- a/code/run_templates/full_demo/run.json +++ b/code/run_templates/full_demo/run.json @@ -20,7 +20,7 @@ "code/run_templates/full_demo/parameters_handler.py" ], "precedents": [ - "fetch_parameters" + "print_out" ], "useSystemEnvironment": true }, From fdcf03c24ed59466bfdabcb8467b3162d1bd5a59 Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Laurent=20Al=C3=A9p=C3=A9e?= Date: Thu, 5 Feb 2026 16:04:44 +0100 Subject: [PATCH 18/21] fix: coal function rename --- code/run_templates/etl_zip2db/etl.py | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/code/run_templates/etl_zip2db/etl.py b/code/run_templates/etl_zip2db/etl.py index 920b57f..c653182 100644 --- a/code/run_templates/etl_zip2db/etl.py +++ b/code/run_templates/etl_zip2db/etl.py @@ -36,7 +36,7 @@ def main(): target_dataset_id = runner_data['datasets']['bases'][0] path_list_db = list_files(extract_path) datasetApi = DatasetApi() - datasetApi.update_dataset_from_files( + datasetApi.upload_dataset_parts( target_dataset_id, [], path_list_db ) LOGGER.info("ETL Run finished") From 4c959a88b9e3df941c6cde439d68b3c4a7281bb2 Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Laurent=20Al=C3=A9p=C3=A9e?= Date: Thu, 5 Feb 2026 16:05:56 +0100 Subject: [PATCH 19/21] update coal version to fixed tag, and cosmotech-api to rc5 --- code/requirements.txt | 4 ++-- 1 file changed, 2 insertions(+), 2 deletions(-) diff --git a/code/requirements.txt b/code/requirements.txt index d8bc0bf..d74c1b9 100644 --- a/code/requirements.txt +++ b/code/requirements.txt @@ -1,7 +1,7 @@ cosmotech-run-orchestrator~=2.0.2 azure-storage-blob -cosmoTech-acceleration-library @ git+https://github.com/Cosmo-Tech/CosmoTech-Acceleration-Library@LAL/use_config_on_runner_api -cosmotech-api~=5.0.0b3 +cosmoTech-acceleration-library @ git+https://github.com/Cosmo-Tech/CosmoTech-Acceleration-Library@2.1.0-rc1 +cosmotech-api~=5.0.0rc5 faker~=30.6.0 psycopg2-binary==2.9.* numpy~=2.3.2 From e2a0386db89cdfbb81b512cc23ec79860985885f Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Laurent=20Al=C3=A9p=C3=A9e?= Date: Thu, 5 Feb 2026 16:41:17 +0100 Subject: [PATCH 20/21] fix parameters name reading remove the "oskour" log --- .../full_demo/parameters_handler.py | 17 +++++++---------- 1 file changed, 7 insertions(+), 10 deletions(-) diff --git a/code/run_templates/full_demo/parameters_handler.py b/code/run_templates/full_demo/parameters_handler.py index 810ea01..9b946c0 100644 --- a/code/run_templates/full_demo/parameters_handler.py +++ b/code/run_templates/full_demo/parameters_handler.py @@ -90,6 +90,7 @@ def extract_names_from_csv(csv_path: Path): LOGGER.info(f"Fetched {len(customers)} customers from {customers_csv_path}") return customers + def generate_customers(customers: list): customer_csv_path = Path(EC.cosmotech.dataset_absolute_path) / "Customer.csv" with open(customer_csv_path, 'w', newline='') as f: @@ -137,12 +138,12 @@ def main(): # update dataset Bar.csv with param updated_values = {} - if "NbWaiters" in parameters: - updated_values["NbWaiters"] = parameters["NbWaiters"] - if "RestockQty" in parameters: - updated_values["RestockQty"] = parameters["RestockQty"] - if "Stock" in parameters: - updated_values["Stock"] = parameters["Stock"] + if "nb_waiters" in parameters: + updated_values["NbWaiters"] = parameters["nb_waiters"] + if "restock_qty" in parameters: + updated_values["RestockQty"] = parameters["restock_qty"] + if "stock" in parameters: + updated_values["Stock"] = parameters["stock"] updated_values["id"] = "MyBar" # copy initial Bar.csv to datasets path fetched_dataset_file_path = fetch_dataset_file_path("Bar.csv") @@ -167,10 +168,6 @@ def main(): # generate arc_satisfaction generate_customers_to_customer_mapping(customers) - print(f"oskour: {os.listdir('/mnt/coal/')}") - with open('/mnt/coal/coal-config.toml') as f: - print(f.read()) - if __name__ == "__main__": main() From 4330c48ba941a8b2ab40dbcbe5c7d424afc84fdf Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Laurent=20Al=C3=A9p=C3=A9e?= Date: Fri, 6 Feb 2026 10:55:42 +0100 Subject: [PATCH 21/21] fix etl update dataset part update_dataset_part don't update existing file if you don't set replace_existing at True. --- code/run_templates/etl_zip2db/etl.py | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/code/run_templates/etl_zip2db/etl.py b/code/run_templates/etl_zip2db/etl.py index c653182..cd49f9f 100644 --- a/code/run_templates/etl_zip2db/etl.py +++ b/code/run_templates/etl_zip2db/etl.py @@ -37,7 +37,7 @@ def main(): path_list_db = list_files(extract_path) datasetApi = DatasetApi() datasetApi.upload_dataset_parts( - target_dataset_id, [], path_list_db + target_dataset_id, [], path_list_db, replace_existing=True ) LOGGER.info("ETL Run finished")