diff --git a/code/requirements.txt b/code/requirements.txt index ebeae2c..d74c1b9 100644 --- a/code/requirements.txt +++ b/code/requirements.txt @@ -1,7 +1,7 @@ cosmotech-run-orchestrator~=2.0.2 azure-storage-blob -CosmoTech-Acceleration-Library @ git+https://github.com/Cosmo-Tech/CosmoTech-Acceleration-Library@feature/cosmotech_api_5.0 -cosmotech-api~=5.0.0b3 +cosmoTech-acceleration-library @ git+https://github.com/Cosmo-Tech/CosmoTech-Acceleration-Library@2.1.0-rc1 +cosmotech-api~=5.0.0rc5 faker~=30.6.0 psycopg2-binary==2.9.* numpy~=2.3.2 diff --git a/code/run_templates/brewery/parameters_handler.py b/code/run_templates/brewery/parameters_handler.py new file mode 100644 index 0000000..6c6923c --- /dev/null +++ b/code/run_templates/brewery/parameters_handler.py @@ -0,0 +1,83 @@ +import json +import shutil +import csv +from pathlib import Path + +from cosmotech.coal.utils.configuration import ENVIRONMENT_CONFIGURATION as EC +from cosmotech.orchestrator.utils.logger import get_logger + +LOGGER = get_logger('parameters_handler') + + +def update_first_row_in_csv(csv_path, updated_values): + """ + Read a CSV file and change the first data row with new values. + + Args: + csv_path: Path to the CSV file + updated_values: Dictionary with column names as keys and new values + """ + # Read the CSV file + with open(csv_path, 'r', newline='') as f: + reader = csv.DictReader(f) + rows = list(reader) + fieldnames = reader.fieldnames + + # Update the first row with new values + if rows: + for key, value in updated_values.items(): + if key in rows[0]: + rows[0][key] = value + + # Write back to the CSV file + with open(csv_path, 'w', newline='') as f: + writer = csv.DictWriter(f, fieldnames=fieldnames) + writer.writeheader() + writer.writerows(rows) + + +def read_parameters(): + with open( + Path(EC.cosmotech.parameters_absolute_path) / "parameters.json" + ) as f: + parameters = {d["parameterId"]: d["value"] for d in json.loads(f.read())} + + LOGGER.info("Parameters loaded from JSON") + return parameters + + +def fetch_parameter_file_path(param_name: str) -> Path: + for r, d, f in os.walk(EC.cosmotech.parameters_absolute_path): + if param_name in r: + return Path(r) / f[0] + raise FileNotFoundError(f"Parameter file for {param_name} not found.") + + +def main(): + LOGGER.info("Starting parameter handler") + + # get_parameter from json + parameters = read_parameters() + # update dataset Bar.csv with param + updated_values = {} + if "NbWaiters" in parameters: + updated_values["NbWaiters"] = parameters["NbWaiters"] + if "RestockQty" in parameters: + updated_values["RestockQty"] = parameters["RestockQty"] + if "Stock" in parameters: + updated_values["Stock"] = parameters["Stock"] + + bar_data_path = Path(EC.cosmotech.dataset_absolute_path) / "Bar.csv" + update_first_row_in_csv(bar_data_path, updated_values) + LOGGER.info("Updated Bar.csv with parameters") + + # replace bar.csv if file is provided as parameter + bar_param_path = fetch_parameter_file_path("initial_stock_dataset") / "initial_stock_dataset" + if bar_data_path.exists(): + # replace dataset Bar.csv with parameter Bar.csv + shutil.copy(bar_param_path, bar_data_path) + LOGGER.info("Bar.csv replaced by given file") + + +if __name__ == "__main__": + main() diff --git a/code/run_templates/brewery/run.json b/code/run_templates/brewery/run.json new file mode 100644 index 0000000..e6d1174 --- /dev/null +++ b/code/run_templates/brewery/run.json @@ -0,0 +1,53 @@ +{ + "steps": [ + { + "id": "fetch_parameters", + "command": "csm-data", + "arguments": ["api", "run-load-data"], + "useSystemEnvironment": true + }, + { + "id": "parameters_handler", + "command": "python3", + "arguments": [ + "code/run_templates/brewery/parameters_handler.py" + ], + "precedents": [ + "fetch_parameters" + ], + "useSystemEnvironment": true + }, + { + "id": "engine", + "command": "csm-simulator", + "arguments": [ + "-i", + "BreweryDemoSimulationWithConnector" + ], + "precedents": [ + "parameters_handler" + ], + "useSystemEnvironment": true + }, + { + "id": "load_results_to_store", + "command": "csm-data", + "arguments": [ + "store", + "load-csv-folder" + ], + "precedents": ["engine"], + "useSystemEnvironment": true + }, + { + "id": "send_results_to_psql", + "command": "csm-data", + "arguments": [ + "store", + "output" + ], + "precedents": ["load_results_to_store"], + "useSystemEnvironment": true + } + ] +} diff --git a/code/run_templates/common/common.py b/code/run_templates/common/common.py index 5b16d9b..ca5bbe4 100644 --- a/code/run_templates/common/common.py +++ b/code/run_templates/common/common.py @@ -9,7 +9,6 @@ from cosmotech.coal.cosmotech_api.connection import get_api_client from cosmotech.coal.utils.logger import get_logger as _get_logger - LOGGER = _get_logger("my_etl_logger") diff --git a/code/run_templates/common/download_dataset.py b/code/run_templates/common/download_dataset.py index 7e06ab0..dc78e58 100644 --- a/code/run_templates/common/download_dataset.py +++ b/code/run_templates/common/download_dataset.py @@ -19,9 +19,9 @@ _d_data = download_dataset(ORG_ID, WS_ID, dataset_id, False) -tmp_path = pathlib.Path(_d_data['folder_path']) +tmp_path = pathlib.Path(_d_data["folder_path"]) for _p in tmp_path.glob("*"): shutil.copy(_p, SIM_DATA_PATH) -LOGGER.info(f"Downloaded dataset {dataset_id}") \ No newline at end of file +LOGGER.info(f"Downloaded dataset {dataset_id}") diff --git a/code/run_templates/common/generate_dataset.py b/code/run_templates/common/generate_dataset.py index 3cfb6ba..eab83ec 100644 --- a/code/run_templates/common/generate_dataset.py +++ b/code/run_templates/common/generate_dataset.py @@ -6,121 +6,134 @@ from csv import DictWriter import numpy as np -from cosmotech.coal.cosmotech_api.connection import get_api_client from cosmotech.orchestrator.utils.logger import get_logger from cosmotech.orchestrator.utils.logger import log_data from cosmotech_api import DatasetPartTypeEnum -from cosmotech_api.api.dataset_api import DatasetApi -from cosmotech_api.api.dataset_api import DatasetCreateRequest -from cosmotech_api.api.dataset_api import DatasetPartCreateRequest +from cosmotech.coal.cosmotech_api.apis.dataset import DatasetApi +from cosmotech_api.models.dataset_create_request import DatasetCreateRequest +from cosmotech_api.models.dataset_part_create_request import DatasetPartCreateRequest from faker import Faker LOGGER = get_logger("Brewery/DatasetV5") -LOGGER.info("Generating dataset content") - -ORG_ID = os.environ.get("CSM_ORGANIZATION_ID") -WS_ID = os.environ.get("CSM_WORKSPACE_ID") -RUNNER_ID = os.environ.get("CSM_RUNNER_ID") -RUN_ID = os.environ.get("CSM_RUN_ID") - -fake_generator = Faker() -rng = np.random.default_rng(random.randint(500, 50000)) - -headers_customer = ["id", "Thirsty", "Satisfaction", "SurroundingSatisfaction"] -headers_bar = ["id", "Stock", "RestockQty", "NbWaiters"] -headers_bar_vertex = ["source", "target"] -headers_arc_satisfaction = ["source", "target"] - -customer_content = [] -bar_content = [] -bar_vertex_content = [] -arc_satisfaction_content = [] - -bars_name = ["MyBar"] -existing_names = set() -dup_counter = 0 - -for bar in bars_name: - customers = list(fake_generator.name() for _ in range(random.randint(30, 150))) - n_customers = len(customers) - _stock = random.randint(30, 500) - bar_content.append({ - "id": bar, - "Stock": _stock, - "RestockQty": random.randint(10, _stock), - "NbWaiters": random.randint(1, n_customers // 2), - }) - mask = np.random.randint(2, size=(n_customers, n_customers), dtype=bool) - r_customer = [] - for i in range(n_customers): - _name = customers[i] - if _name in existing_names: - _name = f"{_name} {dup_counter}" - dup_counter += 1 - r_customer.append(_name) - customer_content.append({ - "id": _name, - "Thirsty": "false", - "Satisfaction": 100, - "SurroundingSatisfaction": 100 - }) - bar_vertex_content.append({ - "source": bar, - "target": _name - }) - - for i, j in itertools.combinations(range(n_customers), 2): - if mask[i][j] or mask[j][i]: - arc_satisfaction_content.append({ - "source": r_customer[i], - "target": r_customer[j], - }) - arc_satisfaction_content.append({ - "source": r_customer[j], - "target": r_customer[i], - }) - -with tempfile.TemporaryDirectory(suffix="dataset") as temp_dir: - temp_dir_path = pathlib.Path(temp_dir) - for _name, _headers, _content in [ - ("Customer.csv", headers_customer, customer_content), - ("Bar.csv", headers_bar, bar_content), - ("Bar_vertex.csv", headers_bar_vertex, bar_vertex_content), - ("arc_Satisfaction.csv", headers_arc_satisfaction, arc_satisfaction_content), - ]: - _path = temp_dir_path / _name - with _path.open("w") as _file: - _dw = DictWriter(_file, fieldnames=_headers) - _dw.writeheader() - _dw.writerows(_content) - - with get_api_client()[0] as client: - d_api = DatasetApi(client) - d_request = DatasetCreateRequest( - name=f"{RUNNER_ID} - {RUN_ID}", - runnerId=RUNNER_ID, - parts=list( - DatasetPartCreateRequest( - name=f"{RUN_ID} - {_p.name}", - sourceName=_p.name, - type=DatasetPartTypeEnum.FILE - ) - for _p in temp_dir_path.glob("*.csv") - ) + +def generate_data(data_dir): + fake_generator = Faker() + + headers_customer = ["id", "Thirsty", "Satisfaction", "SurroundingSatisfaction"] + headers_bar = ["id", "Stock", "RestockQty", "NbWaiters"] + headers_bar_vertex = ["source", "target"] + headers_arc_satisfaction = ["source", "target"] + + customer_content = [] + bar_content = [] + bar_vertex_content = [] + arc_satisfaction_content = [] + + bars_name = ["MyBar"] + existing_names = set() + dup_counter = 0 + + LOGGER.info("Generating dataset content") + for bar in bars_name: + customers = list(fake_generator.name() for _ in range(random.randint(30, 150))) + n_customers = len(customers) + _stock = random.randint(30, 500) + bar_content.append( + { + "id": bar, + "Stock": _stock, + "RestockQty": random.randint(10, _stock), + "NbWaiters": random.randint(1, n_customers // 2), + } ) - d_ret = d_api.create_dataset( - ORG_ID, - WS_ID, - d_request, - files=list( - (_p.name, _p.open("rb").read()) - for _p in temp_dir_path.glob("*.csv") + mask = np.random.randint(2, size=(n_customers, n_customers), dtype=bool) + r_customer = [] + for i in range(n_customers): + _name = customers[i] + if _name in existing_names: + _name = f"{_name} {dup_counter}" + dup_counter += 1 + r_customer.append(_name) + customer_content.append( + { + "id": _name, + "Thirsty": "false", + "Satisfaction": 100, + "SurroundingSatisfaction": 100, + } ) - ) + bar_vertex_content.append({"source": bar, "target": _name}) + + for i, j in itertools.combinations(range(n_customers), 2): + if mask[i][j] or mask[j][i]: + arc_satisfaction_content.append( + { + "source": r_customer[i], + "target": r_customer[j], + } + ) + arc_satisfaction_content.append( + { + "source": r_customer[j], + "target": r_customer[i], + } + ) + + data_dir_path = pathlib.Path(data_dir) + for _name, _headers, _content in [ + ("Customer.csv", headers_customer, customer_content), + ("Bar.csv", headers_bar, bar_content), + ("Bar_vertex.csv", headers_bar_vertex, bar_vertex_content), + ( + "arc_Satisfaction.csv", + headers_arc_satisfaction, + arc_satisfaction_content, + ), + ]: + _path = data_dir_path / _name + with _path.open("w") as _file: + _dw = DictWriter(_file, fieldnames=_headers) + _dw.writeheader() + _dw.writerows(_content) + + +def create_dataset_from(data_dir): + ORG_ID = os.environ.get("CSM_ORGANIZATION_ID") + WS_ID = os.environ.get("CSM_WORKSPACE_ID") + RUNNER_ID = os.environ.get("CSM_RUNNER_ID") + RUN_ID = os.environ.get("CSM_RUN_ID") + + data_dir_path = pathlib.Path(data_dir) + + d_api = DatasetApi() + d_request = DatasetCreateRequest( + name=f"{RUNNER_ID} - {RUN_ID}", + runnerId=RUNNER_ID, + parts=list( + DatasetPartCreateRequest( + name=f"{RUN_ID} - {_p.name}", + sourceName=_p.name, + type=DatasetPartTypeEnum.FILE, + ) + for _p in data_dir_path.glob("*.csv") + ), + ) + return d_api.create_dataset( + ORG_ID, + WS_ID, + d_request, + files=list( + (_p.name, _p.open("rb").read()) for _p in data_dir_path.glob("*.csv") + ), + ) - dataset_id = d_ret.id -log_data("dataset_id", dataset_id) +if __name__ == "__main__": + with tempfile.TemporaryFile(suffix="dataset") as temp_dir: + temp_dir_path = pathlib.Path(temp_dir) + generate_data(temp_dir_path) + dataset_id = create_dataset_from(temp_dir_path) -LOGGER.info(f"Generated dataset {dataset_id}") + log_data("dataset_id", dataset_id) + LOGGER.info(f"Generated dataset {dataset_id}") diff --git a/code/run_templates/common/print_inputs.py b/code/run_templates/common/print_inputs.py new file mode 100644 index 0000000..4d13445 --- /dev/null +++ b/code/run_templates/common/print_inputs.py @@ -0,0 +1,61 @@ +from cosmotech.coal.utils.configuration import Configuration +from pathlib import Path + +# Function 'tree' retrived from StackOverflow +# modified to print file size +# Source - https://stackoverflow.com/a/59109706 +# Posted by Aaron Hall, modified by community. See post 'Timeline' for change history +# Retrieved 2026-01-15, License - CC BY-SA 4.0 + + +def tree(dir_path: Path, prefix: str = ''): + """A recursive generator, given a directory Path object + will yield a visual tree structure line by line + with each line prefixed by the same characters + """ + # prefix components: + space = ' ' + branch = '│ ' + # pointers: + tee = '├── ' + last = '└── ' + + contents = list(dir_path.iterdir()) + # contents each get pointers that are ├── with a final └── : + pointers = [tee] * (len(contents) - 1) + [last] + for pointer, path in zip(pointers, contents): + size = "" + if path.is_file(): + size = f": {path.stat().st_size}o" + yield prefix + pointer + path.name + size + if path.is_dir(): # extend the prefix and recurse: + extension = branch if pointer == tee else space + # i.e. space because last, └── , above so no more | + yield from tree(path, prefix=prefix+extension) + + +def dprint(line): + pre = '[debug]' + print(f"{pre} {line}") + + +def main(): + _conf = Configuration() + + param_path = Path(_conf.cosmotech.parameters_absolute_path) + dprint("parameter.json:") + with open(param_path / "parameters.json") as f: + print(f.read()) + + dprint(f"Printing {param_path.resolve()} content:") + for line in tree(param_path): + dprint(line) + + data_path = Path(_conf.cosmotech.dataset_absolute_path) + dprint(f"Printng {data_path.resolve()} content:") + for line in tree(data_path): + dprint(line) + + +if __name__ == "__main__": + main() diff --git a/code/run_templates/common/twingraph.py b/code/run_templates/common/twingraph.py index 65494cc..0eee85f 100644 --- a/code/run_templates/common/twingraph.py +++ b/code/run_templates/common/twingraph.py @@ -21,8 +21,12 @@ def parse_item(content, data, item_key): item_label = item["label"] item_type = item["type"] content_by_type = content[item_type] = content.get(item_type, {}) - content_by_label = content_by_type[item_label] = content_by_type.get(item_label, {}) - item_properties = content_by_label[item_id] = content_by_label.get(item_id, item["properties"]) + content_by_label = content_by_type[item_label] = content_by_type.get( + item_label, {} + ) + item_properties = content_by_label[item_id] = content_by_label.get( + item_id, item["properties"] + ) if item_type == "RELATION": item_properties["source"] = data[src_key]["properties"]["id"] @@ -39,8 +43,10 @@ def parse_edge(content, data): graph_content = reduce(parse_node, nodes, {}) graph_content = reduce(parse_edge, edges, graph_content) except Exception as e: - LOGGER.error(f"An error occurred while parsing the twingraph JSON representation: {e}") - raise(e) + LOGGER.error( + f"An error occurred while parsing the twingraph JSON representation: {e}" + ) + raise (e) return graph_content @@ -114,16 +120,26 @@ def dump_twingraph_dataset_to_zip_archive( edge_query_str = "OPTIONAL MATCH(src)-[edge]->(dst) RETURN src, edge, dst" fetch_start_time = time.time() - res_nodes = api.dataset.twingraph_query(organization_id, dataset_id, {"query": node_query_str}) - res_edges = api.dataset.twingraph_query(organization_id, dataset_id, {"query": edge_query_str}) + res_nodes = api.dataset.twingraph_query( + organization_id, dataset_id, {"query": node_query_str} + ) + res_edges = api.dataset.twingraph_query( + organization_id, dataset_id, {"query": edge_query_str} + ) fetch_duration_in_seconds = time.time() - fetch_start_time - LOGGER.info(f"Results received, queries took {fetch_duration_in_seconds} seconds") + LOGGER.info( + f"Results received, queries took {fetch_duration_in_seconds} seconds" + ) except cosmotech_api.ApiException as e: - LOGGER.error(f"Failed to retrieve content of target dataset '{dataset_id}': %s\n" % e) + LOGGER.error( + f"Failed to retrieve content of target dataset '{dataset_id}': %s\n" % e + ) raise e # Note: the keys are defined in the cypher queries above - graph_content = parse_twingraph_json(res_nodes, res_edges, "n", "edge", "src", "dst") + graph_content = parse_twingraph_json( + res_nodes, res_edges, "n", "edge", "src", "dst" + ) create_csv_files_from_graph_content(graph_content, folder_path) output_file_path = os.path.join(folder_path, "twingraph_dump") @@ -134,7 +150,11 @@ def dump_twingraph_dataset_to_zip_archive( def upload_twingraph_zip_archive(organization_id, dataset_id, zip_archive_path): try: - api.dataset.update_dataset(organization_id, dataset_id, {"ingestionStatus": "NONE", "sourceType": "File"}) + api.dataset.update_dataset( + organization_id, + dataset_id, + {"ingestionStatus": "NONE", "sourceType": "File"}, + ) except cosmotech_api.ApiException as e: LOGGER.error("Exception when changing twingraph type & status: %s\n" % e) raise e @@ -159,14 +179,20 @@ def upload_twingraph_zip_archive(organization_id, dataset_id, zip_archive_path): try: # Required delay to prevent some race condition, leading sometimes to the sourceType update being ignored time.sleep(2) - api.dataset.update_dataset(organization_id, dataset_id, {"ingestionStatus": "SUCCESS", "sourceType": "ETL"}) + api.dataset.update_dataset( + organization_id, + dataset_id, + {"ingestionStatus": "SUCCESS", "sourceType": "ETL"}, + ) except cosmotech_api.ApiException as e: LOGGER.error("Exception when changing twingraph type & status: %s\n" % e) raise e -def copy_dataset_twingraph(org_id, src_dataset_id, dst_dataset_id, node_query=None, edge_query=None): - ''' +def copy_dataset_twingraph( + org_id, src_dataset_id, dst_dataset_id, node_query=None, edge_query=None +): + """ Parameters: - org_id is the id of the organization containing the source and target datasets - src_dataset_id is the id of the dataset to copy @@ -176,11 +202,12 @@ def copy_dataset_twingraph(org_id, src_dataset_id, dst_dataset_id, node_query=No - edge_query (optional) is a string containing the cypher query to select the edges to copy; edge data must be returned with the aliasas 'src, edge, dst' (default query: "OPTIONAL MATCH(src)-[edge]->(dst) RETURN src, edge, dst") - ''' + """ with tempfile.TemporaryDirectory() as tmp_dir: twingraph_dump_dir_path = os.path.join(tmp_dir, "twingraph_dump") zip_archive_path = dump_twingraph_dataset_to_zip_archive( - org_id, src_dataset_id, twingraph_dump_dir_path, node_query, edge_query) + org_id, src_dataset_id, twingraph_dump_dir_path, node_query, edge_query + ) LOGGER.info(f"Twingraph extracted to {zip_archive_path}") LOGGER.info(f"Starting zip upload to existing dataset '{dst_dataset_id}'...") upload_twingraph_zip_archive(org_id, dst_dataset_id, zip_archive_path) diff --git a/code/run_templates/common/update_runner_metadata.py b/code/run_templates/common/update_runner_metadata.py index b4ac20c..f8c00fc 100644 --- a/code/run_templates/common/update_runner_metadata.py +++ b/code/run_templates/common/update_runner_metadata.py @@ -11,15 +11,26 @@ def get_arguments(): parser = argparse.ArgumentParser(description="Update runner metadata") - parser.add_argument("--table-prefix", help="RunnerMetadata table prefix", required=False, default="") parser.add_argument( - "--postgres-host", help="Postgresql host name", required=False, default=os.environ.get("POSTGRES_HOST_URI") + "--table-prefix", help="RunnerMetadata table prefix", required=False, default="" ) parser.add_argument( - "--postgres-port", help="Postgresql port", required=False, default=os.environ.get("POSTGRES_HOST_PORT") + "--postgres-host", + help="Postgresql host name", + required=False, + default=os.environ.get("POSTGRES_HOST_URI"), ) parser.add_argument( - "--postgres-db", help="Postgresql database name", required=False, default=os.environ.get("POSTGRES_DB_NAME") + "--postgres-port", + help="Postgresql port", + required=False, + default=os.environ.get("POSTGRES_HOST_PORT"), + ) + parser.add_argument( + "--postgres-db", + help="Postgresql database name", + required=False, + default=os.environ.get("POSTGRES_DB_NAME"), ) parser.add_argument( "--postgres-schema", @@ -53,7 +64,10 @@ def get_arguments(): default=os.environ.get("CSM_WORKSPACE_ID"), ) parser.add_argument( - "--csm-runner-id", help="Cosmo Tech Runner ID", required=False, default=os.environ.get("CSM_RUNNER_ID") + "--csm-runner-id", + help="Cosmo Tech Runner ID", + required=False, + default=os.environ.get("CSM_RUNNER_ID"), ) return parser.parse_args() @@ -61,7 +75,9 @@ def get_arguments(): def main(): args = get_arguments() - runner = api.runner.get_runner(args.csm_organization_id, args.csm_workspace_id, args.csm_runner_id) + runner = api.runner.get_runner( + args.csm_organization_id, args.csm_workspace_id, args.csm_runner_id + ) schema_table = f"{args.postgres_schema}.{args.table_prefix}RunnerMetadata" sql_create_table = f""" diff --git a/code/run_templates/debug/print_inputs.py b/code/run_templates/debug/print_inputs.py new file mode 120000 index 0000000..331fa12 --- /dev/null +++ b/code/run_templates/debug/print_inputs.py @@ -0,0 +1 @@ +../common/print_inputs.py \ No newline at end of file diff --git a/code/run_templates/debug/run.json b/code/run_templates/debug/run.json new file mode 100644 index 0000000..e2e2831 --- /dev/null +++ b/code/run_templates/debug/run.json @@ -0,0 +1,42 @@ +{ + "steps": [ + { "id": "csm_data_version", "command": "csm-data", "arguments": ["--version"] }, + { "id": "csm_orc_version", "command": "csm-orc", "arguments": ["--version"] }, + { + "id": "fetch_parameters", + "command": "csm-data", + "arguments": ["api", "run-load-data"], + "precedents": ["csm_data_version", "csm_orc_version"], + "useSystemEnvironment": true + }, + { + "id": "print_out", + "command": "python3", + "arguments": ["code/run_templates/debug/print_inputs.py"], + "precedents": ["fetch_parameters"], + "useSystemEnvironment": true + }, + { + "id": "load_results_to_store", + "command": "csm-data", + "arguments": [ + "store", + "load-csv-folder", + "--csv-folder", + "$CSM_DATASET_ABSOLUTE_PATH" + ], + "precedents": ["print_out"], + "useSystemEnvironment": true + }, + { + "id": "send_results_to_psql", + "command": "csm-data", + "arguments": [ + "store", + "output" + ], + "precedents": ["load_results_to_store"], + "useSystemEnvironment": true + } + ] +} diff --git a/code/run_templates/dynamic_values_customers/run.json b/code/run_templates/dynamic_values_customers/run.json deleted file mode 100644 index 0525839..0000000 --- a/code/run_templates/dynamic_values_customers/run.json +++ /dev/null @@ -1,20 +0,0 @@ -{ - "steps": [ - { "id": "csm_data_version", "command": "csm-data", "arguments": ["--version"] }, - { "id": "csm_orc_version", "command": "csm-orc", "arguments": ["--version"] }, - { - "id": "fetch_parameters", - "command": "csm-data", - "arguments": ["api", "run-load-data"], - "precedents": ["csm_data_version", "csm_orc_version"], - "useSystemEnvironment": true - }, - { - "id": "engine", - "command": "csm-orc", - "arguments": ["run-step"], - "precedents": ["fetch_parameters"], - "useSystemEnvironment": true - } - ] -} diff --git a/code/run_templates/etl_instance_generator/etl.py b/code/run_templates/etl_instance_generator/etl.py index 34d6930..93c7f42 100644 --- a/code/run_templates/etl_instance_generator/etl.py +++ b/code/run_templates/etl_instance_generator/etl.py @@ -1,17 +1,16 @@ -import csv import json import os -import sys -import shutil import tempfile from dataclasses import dataclass +from pathlib import Path -from cosmotech_api.model.dataset import Dataset -from common.common import get_logger, get_api +from cosmotech.coal.utils.logger import get_logger +from cosmotech.coal.utils.configuration import Configuration +from cosmotech.coal.cosmotech_api.apis.runner import RunnerApi +from cosmotech.coal.cosmotech_api.apis.dataset import DatasetApi from generate_brewery_dataset import generate_brewery_dataset - -LOGGER = get_logger() +LOGGER = get_logger('etl') @dataclass @@ -37,22 +36,12 @@ def get_zip_file_name(dir): def main(): LOGGER.info("Starting the ETL Run") - organization_id = os.environ.get("CSM_ORGANIZATION_ID") - api = get_api() - - runner_data = api.runner.get_runner( - organization_id=organization_id, - workspace_id=os.environ.get("CSM_WORKSPACE_ID"), - runner_id=os.environ.get("CSM_RUNNER_ID"), - ) - # When a runner is an ETL created by the webapp, the 1st item in datasetList is is a dataset created by the webapp - # before starting the runner - target_dataset_id = runner_data.dataset_list[0] + coal_config = Configuration() - with open(os.path.join(os.environ.get("CSM_PARAMETERS_ABSOLUTE_PATH"), "parameters.json")) as f: + with open( + Path(coal_config.cosmotech.parameters_absolute_path) / "parameters.json" + ) as f: parameters = {d["parameterId"]: d["value"] for d in json.loads(f.read())} - LOGGER.info("All parameters are loaded") - generator_parameters = GeneratorParameters( int(parameters["etl_param_restock_quantity"]), int(parameters["etl_param_stock"]), @@ -62,38 +51,32 @@ def main(): parameters["etl_param_thirsty"] == "THIRSTY", parameters["etl_param_locale"], int(parameters["etl_param_customers_count"]), - int(parameters["etl_param_tables_count"],) + int( + parameters["etl_param_tables_count"], + ), ) + LOGGER.info("All parameters are loaded") + # When a runner is an ETL created by the webapp, the 1st item in datasetList is is a dataset created by the webapp + # before starting the runner and is used as the target for the ETL + runner_data = RunnerApi().get_runner_metadata( + runner_id=coal_config.cosmotech.runner_id + ) + target_dataset_id = runner_data['datasets']['bases'][0] with tempfile.TemporaryDirectory() as tmp_dir: generate_brewery_dataset(generator_parameters, tmp_dir) - output_file_path = os.path.join(tmp_dir, "generated_brewery_instance") - output_file_path_with_format = os.path.join(tmp_dir, "generated_brewery_instance.zip") - shutil.make_archive(output_file_path, "zip", tmp_dir) - LOGGER.info(f"Instance archive gerenated: {output_file_path_with_format}") - - ws_file_path = f'datasets/{target_dataset_id}/generated_brewery_instance.zip' - LOGGER.info(f"Starting workspace file upload to {ws_file_path}...") - ws_file = api.workspace.upload_workspace_file(organization_id, - os.environ.get("CSM_WORKSPACE_ID"), - output_file_path_with_format, - overwrite=True, - destination=ws_file_path) - LOGGER.info("Workspace file has been uploaded") - - api.dataset.update_dataset( - organization_id, target_dataset_id, {"ingestionStatus": "SUCCESS", "twincacheStatus":"FULL"} - ) + path_tmp_dir = Path(tmp_dir) + path_list_file = [ + path_tmp_dir / "arc_Satisfaction.csv", + path_tmp_dir / "Bar_vertex.csv", + path_tmp_dir / "Customer.csv", + ] + path_list_db = [path_tmp_dir / "Bar.csv"] + DatasetApi().update_dataset_from_files( + target_dataset_id, path_list_file, path_list_db + ) LOGGER.info("ETL Run finished") if __name__ == "__main__": - try: - main() - except Exception as e: - exception_type = type(e).__name__ - file_name = os.path.split(e.__traceback__.tb_frame.f_code.co_filename)[1] - line_number = e.__traceback__.tb_lineno - LOGGER.error(f"An error occured during the dataset generation: {exception_type}") - LOGGER.error('%s' % e) - sys.exit(-1) + main() diff --git a/code/run_templates/etl_instance_generator/generate_brewery_dataset.py b/code/run_templates/etl_instance_generator/generate_brewery_dataset.py index edd8762..f184070 100644 --- a/code/run_templates/etl_instance_generator/generate_brewery_dataset.py +++ b/code/run_templates/etl_instance_generator/generate_brewery_dataset.py @@ -13,37 +13,72 @@ import itertools from faker import Faker -from common.common import get_logger +from cosmotech.coal.utils.logger import get_logger - -LOGGER = get_logger() +LOGGER = get_logger('generate') def parse_arguments(): parser = argparse.ArgumentParser( - description='Generate a brewery instance with Bar and Customer entities, and relationships between them.' + description="Generate a brewery instance with Bar and Customer entities, and relationships between them." + ) + parser.add_argument( + "-t", "--tables", help="Number of tables in the bar", default="10", type=int + ) + parser.add_argument( + "-c", + "--customers", + help="Number of customers in the instance", + default="50", + type=int, + ) + parser.add_argument( + "-n", + "--name", + help="Name of the generated instance", + default="brewery_instance", + ) + parser.add_argument( + "-l", + "--locale", + help="Localization of generated names (leave empty to use English)", ) - parser.add_argument("-t", "--tables", help="Number of tables in the bar", default="10", type=int) - parser.add_argument("-c", "--customers", help="Number of customers in the instance", default="50", type=int) - parser.add_argument("-n", "--name", help="Name of the generated instance", default="brewery_instance") - parser.add_argument("-l", "--locale", help="Localization of generated names (leave empty to use English)") - parser.add_argument("--restock", help="Restock quantity for generated bar", default="30", type=int) - parser.add_argument("--stock", help="Stock quantity for generated bar", default="60", type=int) - parser.add_argument("--waiters", help="Number of waiters for generated bar", default="10", type=int) + parser.add_argument( + "--restock", help="Restock quantity for generated bar", default="30", type=int + ) + parser.add_argument( + "--stock", help="Stock quantity for generated bar", default="60", type=int + ) + parser.add_argument( + "--waiters", help="Number of waiters for generated bar", default="10", type=int + ) - parser.add_argument("--satisfaction", help="Initial statisfaction of generated customers", default="0", type=int) - parser.add_argument("--surrounding_satisfaction", help="Initial surrounding statisfaction of generated customers", - default="0", type=int) - parser.add_argument("--thirsty", help="Value for the Thirsty attribute of generated customers", default=False, - type=bool) + parser.add_argument( + "--satisfaction", + help="Initial statisfaction of generated customers", + default="0", + type=int, + ) + parser.add_argument( + "--surrounding_satisfaction", + help="Initial surrounding statisfaction of generated customers", + default="0", + type=int, + ) + parser.add_argument( + "--thirsty", + help="Value for the Thirsty attribute of generated customers", + default=False, + type=bool, + ) args = parser.parse_args() return args def get_faker_instance(locale=None): - locale = locale.split(',') if locale else None + locale = locale.split(",") if locale else None return Faker(locale) @@ -59,26 +94,26 @@ def generate_people(faker_instance, count): def get_id_from_name(name): - return re.sub(r'[\. \'-\,]', '_', name) + return re.sub(r"[\. \'-\,]", "_", name) def generate_bar(options): return { - 'id': 'MyBar', - 'name': 'MyBar', - 'restock': options['restock'], - 'stock': options['stock'], - 'waiters': options['waiters'], + "id": "MyBar", + "name": "MyBar", + "restock": options["restock"], + "stock": options["stock"], + "waiters": options["waiters"], } def generate_customer(person, options): return { - 'id': get_id_from_name(person), - 'name': person, - 'satisfaction': options['satisfaction'], - 'surrounding_satisfaction': options['surrounding_satisfaction'], - 'thirsty': options['thirsty'], + "id": get_id_from_name(person), + "name": person, + "satisfaction": options["satisfaction"], + "surrounding_satisfaction": options["surrounding_satisfaction"], + "thirsty": options["thirsty"], } @@ -87,7 +122,7 @@ def generate_customers(people, options): def generate_customers_groups(customers, groups_count): - ''' + """ Generate an array of customer groups: [ [customer_id1, customer_id3, customer_id6, customer_id9], @@ -95,26 +130,26 @@ def generate_customers_groups(customers, groups_count): [customer_id2, customer_id5, customer_id7] ] Distribution is not perfect on purpose, to have groups of different sizes - ''' + """ customer_groups = [] for i in range(0, groups_count): customer_groups.append([]) for customer in customers: group_index = random.randint(0, groups_count - 1) - customer_groups[group_index].append(customer['id']) + customer_groups[group_index].append(customer["id"]) return customer_groups def generate_bar_to_customers_mapping(bar, customers): - ''' + """ Generate a dict similar to: { bar_id1: [customer_id1, customer_id3, customer_id6, customer_id9], bar_id2: [customer_id4, customer_id8], bar_id3: [customer_id2, customer_id5, customer_id7] } - ''' - return {bar['id']: [customer['id'] for customer in customers]} + """ + return {bar["id"]: [customer["id"] for customer in customers]} def generate_customers_to_customers_links(customer_groups): @@ -128,67 +163,54 @@ def generate_customers_to_customers_links(customer_groups): def generate_bar_csv_file_content(bar): - file_content = 'id,NbWaiters,RestockQty,Stock\n' - cells = [ - bar['id'], - str(bar['waiters']), - str(bar['restock']), - str(bar['stock']) - ] - file_content += ','.join(cells) + '\n' + file_content = "id,NbWaiters,RestockQty,Stock\n" + cells = [bar["id"], str(bar["waiters"]), str(bar["restock"]), str(bar["stock"])] + file_content += ",".join(cells) + "\n" return file_content def generate_customers_csv_file_content(customers): - file_content = 'id,Satisfaction,SurroundingSatisfaction,Thirsty\n' + file_content = "id,Satisfaction,SurroundingSatisfaction,Thirsty\n" for customer in customers: cells = [ - customer['id'], - str(customer['satisfaction']), - str(customer['surrounding_satisfaction']), - str(customer['thirsty']).lower() + customer["id"], + str(customer["satisfaction"]), + str(customer["surrounding_satisfaction"]), + str(customer["thirsty"]).lower(), ] - file_content += ','.join(cells) + '\n' + file_content += ",".join(cells) + "\n" return file_content def generate_bar_to_customers_csv_file_content(bar_to_customers): - file_content = 'source,target,name\n' + file_content = "source,target,name\n" for bar, customers in bar_to_customers.items(): for customer in customers: - cells = [ - bar, - customer, - bar + '_contains_' + customer - ] - file_content += ','.join(cells) + '\n' + cells = [bar, customer, bar + "_contains_" + customer] + file_content += ",".join(cells) + "\n" return file_content def generate_arc_to_customers_csv_file_content(customers_to_customers_links): - file_content = 'source,target,name\n' + file_content = "source,target,name\n" for pair in customers_to_customers_links: - cells = [ - pair[0], - pair[1], - 'arc_from_' + pair[0] + '_to_' + pair[1] - ] - file_content += ','.join(cells) + '\n' - cells = [ - pair[1], - pair[0], - 'arc_from_' + pair[1] + '_to_' + pair[0] - ] - file_content += ','.join(cells) + '\n' + cells = [pair[0], pair[1], "arc_from_" + pair[0] + "_to_" + pair[1]] + file_content += ",".join(cells) + "\n" + cells = [pair[1], pair[0], "arc_from_" + pair[1] + "_to_" + pair[0]] + file_content += ",".join(cells) + "\n" return file_content -def generate_csv_files_content(bar, customers, bar_to_customers, customers_to_customers_links): +def generate_csv_files_content( + bar, customers, bar_to_customers, customers_to_customers_links +): return { - 'Bar.csv': generate_bar_csv_file_content(bar), - 'Customer.csv': generate_customers_csv_file_content(customers), - 'Bar_vertex.csv': generate_bar_to_customers_csv_file_content(bar_to_customers), - 'arc_Satisfaction.csv': generate_arc_to_customers_csv_file_content(customers_to_customers_links), + "Bar.csv": generate_bar_csv_file_content(bar), + "Customer.csv": generate_customers_csv_file_content(customers), + "Bar_vertex.csv": generate_bar_to_customers_csv_file_content(bar_to_customers), + "arc_Satisfaction.csv": generate_arc_to_customers_csv_file_content( + customers_to_customers_links + ), } @@ -196,16 +218,20 @@ def export_csv_files(csv_files_content, dir_path): if not os.path.exists(dir_path): os.mkdir(dir_path) for file_name, file_content in csv_files_content.items(): - with open(os.path.join(dir_path, file_name), 'w') as writer: + with open(os.path.join(dir_path, file_name), "w") as writer: writer.write(file_content) def generate_brewery_dataset(args, working_dir="."): - bar_options = {'restock': args.restock, 'stock': args.stock, 'waiters': args.waiters} + bar_options = { + "restock": args.restock, + "stock": args.stock, + "waiters": args.waiters, + } customers_options = { - 'satisfaction': args.satisfaction, - 'surrounding_satisfaction': args.surrounding_satisfaction, - 'thirsty': args.thirsty, + "satisfaction": args.satisfaction, + "surrounding_satisfaction": args.surrounding_satisfaction, + "thirsty": args.thirsty, } faker_instance = get_faker_instance(args.locale) @@ -215,9 +241,13 @@ def generate_brewery_dataset(args, working_dir="."): bar_to_customers = generate_bar_to_customers_mapping(bar, customers) customer_groups = generate_customers_groups(customers, args.tables) - customers_to_customers_links = generate_customers_to_customers_links(customer_groups) + customers_to_customers_links = generate_customers_to_customers_links( + customer_groups + ) - csv_files_content = generate_csv_files_content(bar, customers, bar_to_customers, customers_to_customers_links) + csv_files_content = generate_csv_files_content( + bar, customers, bar_to_customers, customers_to_customers_links + ) export_csv_files(csv_files_content, working_dir) LOGGER.info(f"Instance CSV files generated in folder {working_dir}") diff --git a/code/run_templates/etl_with_local_file/common b/code/run_templates/etl_with_local_file/common deleted file mode 120000 index 8332399..0000000 --- a/code/run_templates/etl_with_local_file/common +++ /dev/null @@ -1 +0,0 @@ -../common/ \ No newline at end of file diff --git a/code/run_templates/etl_with_local_file/etl.py b/code/run_templates/etl_with_local_file/etl.py deleted file mode 100644 index 26138bf..0000000 --- a/code/run_templates/etl_with_local_file/etl.py +++ /dev/null @@ -1,130 +0,0 @@ -import csv -import json -import os -from zipfile import ZipFile - -from cosmotech_api.model.dataset import Dataset -from common.common import get_logger, get_api - -LOGGER = get_logger() - - -def get_zip_file_name(dir): - for _, _, files in os.walk(dir): - for file in files: - if file.endswith(".zip"): - return file - - -def main(): - LOGGER.info("Starting the ETL Run") - api = get_api() - runner_data = api.runner.get_runner( - organization_id=os.environ.get("CSM_ORGANIZATION_ID"), - workspace_id=os.environ.get("CSM_WORKSPACE_ID"), - runner_id=os.environ.get("CSM_RUNNER_ID"), - ) - - with open(os.path.join(os.environ.get("CSM_PARAMETERS_ABSOLUTE_PATH"), "parameters.json")) as f: - parameters = {d["parameterId"]: d["value"] for d in json.loads(f.read())} - - LOGGER.info("All parameters are loaded") - - bars = list() - - bar = { - "type": "Bar", - "name": "MyBar", - "params": f"""NbWaiters: {int(parameters["etl_param_num_waiters"])},""" - + f"""RestockQty: {int(parameters["etl_param_restock_quantity"])},""" - + f"""Stock: {int(parameters["etl_param_stock"])}""", - } - bars.append(bar) - - base_path = parameters["etl_param_bar_instance"] - file_name = get_zip_file_name(base_path) - with ZipFile(base_path + "/" + file_name) as zip: - zip.extractall(base_path) - - base_path = base_path + "/reference" - customers = list() - with open(base_path + "/Nodes/Customer.csv") as _f: - LOGGER.info("Found 'Customer' list") - csv_r = csv.DictReader(_f) - for row in csv_r: - customer = { - "type": "Customer", - "name": row["id"], - "params": f"Name: '{row['id']}'," - f"Satisfaction: {row['Satisfaction']}," - f"SurroundingSatisfaction: {row['SurroundingSatisfaction']}," - f"Thirsty: {row['Thirsty']}", - } - customers.append(customer) - - satisfactions = list() - with open(base_path + "/Edges/arc_Satisfaction.csv") as _f: - LOGGER.info("Found 'Customer satisfaction' relation") - csv_r = csv.DictReader(_f) - for row in csv_r: - satisfaction = { - "type": "arc_Satisfaction", - "source": row["source"], - "target": row["target"], - "name": row["name"], - "params": "a: 'a'", - } - satisfactions.append(satisfaction) - - links = list() - with open(base_path + "/Edges/Bar_vertex.csv") as _f: - LOGGER.info("Found 'Bar vertex' relation") - csv_r = csv.DictReader(_f) - for row in csv_r: - link = { - "type": "bar_vertex", - "source": row["source"], - "target": row["target"], - "name": row["name"], - "params": "a: 'a'", - } - links.append(link) - - dataset = Dataset(ingestion_status="SUCCESS") - api.dataset.update_dataset(os.environ.get("CSM_ORGANIZATION_ID"), runner_data.dataset_list[0], dataset) - - try: - LOGGER.info("Erasing data from target Dataset") - api.dataset.twingraph_query( - organization_id=os.environ.get("CSM_ORGANIZATION_ID"), - dataset_id=runner_data.dataset_list[0], - dataset_twin_graph_query={"query": "MATCH (n) DETACH DELETE n"}, - ) - except Exception: - pass - - LOGGER.info("Writing entities into target Dataset") - api.dataset.create_twingraph_entities( - organization_id=os.environ.get("CSM_ORGANIZATION_ID"), - dataset_id=runner_data.dataset_list[0], - type="node", - graph_properties=bars + customers, - ) - - LOGGER.info("Writing relationshipss into target Dataset") - api.dataset.create_twingraph_entities( - organization_id=os.environ.get("CSM_ORGANIZATION_ID"), - dataset_id=runner_data.dataset_list[0], - type="relationship", - graph_properties=satisfactions + links, - ) - - api.dataset.update_dataset( - os.environ.get("CSM_ORGANIZATION_ID"), runner_data['dataset_list'][0], Dataset(twincacheStatus="FULL") - ) - - LOGGER.info("ETL Run finished") - - -if __name__ == "__main__": - main() diff --git a/code/run_templates/etl_zip2db/etl.py b/code/run_templates/etl_zip2db/etl.py new file mode 100644 index 0000000..cd49f9f --- /dev/null +++ b/code/run_templates/etl_zip2db/etl.py @@ -0,0 +1,46 @@ +import os +from zipfile import ZipFile +from pathlib import Path + +from cosmotech.coal.utils.logger import get_logger +from cosmotech.coal.utils.configuration import Configuration +from cosmotech.coal.cosmotech_api.apis.runner import RunnerApi +from cosmotech.coal.cosmotech_api.apis.dataset import DatasetApi + +LOGGER = get_logger('etl') + + +def get_zip_file_path(dir): + for root, _, files in os.walk(dir): + for file in files: + if file.endswith(".zip"): + return Path(root) / file + + +def list_files(dir): + for root, _, files in os.walk(dir): + for file in files: + yield Path(root) / file + + +def main(): + LOGGER.info("Starting the ETL Run") + coal_config = Configuration() + + parameter_dir_path = Path(coal_config.cosmotech.parameters_absolute_path) + extract_path = parameter_dir_path / "extract" + with ZipFile(get_zip_file_path(parameter_dir_path)) as zip: + zip.extractall(extract_path) + + runner_data = RunnerApi().get_runner_metadata() + target_dataset_id = runner_data['datasets']['bases'][0] + path_list_db = list_files(extract_path) + datasetApi = DatasetApi() + datasetApi.upload_dataset_parts( + target_dataset_id, [], path_list_db, replace_existing=True + ) + LOGGER.info("ETL Run finished") + + +if __name__ == "__main__": + main() diff --git a/code/run_templates/etl_with_local_file/run.json b/code/run_templates/etl_zip2db/run.json similarity index 89% rename from code/run_templates/etl_with_local_file/run.json rename to code/run_templates/etl_zip2db/run.json index 38bd578..6a6bd7d 100644 --- a/code/run_templates/etl_with_local_file/run.json +++ b/code/run_templates/etl_zip2db/run.json @@ -12,7 +12,7 @@ { "id": "ETL", "command": "python", - "arguments": ["run_templates/etl_with_local_file/etl.py"], + "arguments": ["code/run_templates/etl_zip2db/etl.py"], "precedents": ["fetch_parameters"], "useSystemEnvironment": true } diff --git a/code/run_templates/full_demo/parameters_handler.py b/code/run_templates/full_demo/parameters_handler.py new file mode 100644 index 0000000..9b946c0 --- /dev/null +++ b/code/run_templates/full_demo/parameters_handler.py @@ -0,0 +1,173 @@ +import csv +import json +import os +import itertools +import shutil +from pathlib import Path + +from cosmotech.coal.utils.configuration import ENVIRONMENT_CONFIGURATION as EC +from cosmotech.orchestrator.utils.logger import get_logger + +LOGGER = get_logger('parameter_handler') + + +def get_parameters(): + with open( + Path(EC.cosmotech.parameters_absolute_path) / "parameters.json" + ) as f: + parameters = {d["parameterId"]: d["value"] for d in json.loads(f.read())} + + LOGGER.info("Parameters loaded from JSON") + return parameters + + +def update_first_row_in_csv(csv_path, updated_values): + """ + Read a CSV file and change the first data row with new values. + + Args: + csv_path: Path to the CSV file + updated_values: Dictionary with column names as keys and new values + """ + # Read the CSV file + with open(csv_path, 'r', newline='') as f: + reader = csv.DictReader(f) + rows = list(reader) + fieldnames = reader.fieldnames + + # Update the first row with new values + if rows: + for key, value in updated_values.items(): + if key in rows[0]: + rows[0][key] = value + + # Write back to the CSV file + with open(csv_path, 'w', newline='') as f: + writer = csv.DictWriter(f, fieldnames=fieldnames) + writer.writeheader() + writer.writerows(rows) + + return rows[0] + + +def fetch_dataset_file_path(dataset_name: str) -> Path: + for r, d, f in os.walk(EC.cosmotech.dataset_absolute_path): + if dataset_name in f: + return Path(r) / dataset_name + raise FileNotFoundError(f"File for {dataset_name} not found in {EC.cosmotech.dataset_absolute_path}.") + + +def fetch_parameter_file_path(param_name: str) -> Path: + for r, d, f in os.walk(EC.cosmotech.parameters_absolute_path): + if param_name in r: + return Path(r) / f[0] + raise FileNotFoundError(f"File for {param_name} not found in {EC.cosmotech.parameters_absolute_path}.") + + +def fetch_customers_list(): + customers = [] + + def extract_names_from_csv(csv_path: Path): + names = [] + with open(csv_path, 'r', newline='') as f: + reader = csv.DictReader(f) + for row in reader: + names.append(row['Name']) + return names + + try: + customers_csv_path = fetch_parameter_file_path("customers_dynamic_table") + except FileNotFoundError: + customers_csv_path = fetch_dataset_file_path("Customer.csv") + customers.extend(extract_names_from_csv(customers_csv_path)) + + try: + additionnal_customers_csv_path = fetch_parameter_file_path("additional_customers") + customers.extend(extract_names_from_csv(additionnal_customers_csv_path)) + except FileNotFoundError: + pass + + LOGGER.info(f"Fetched {len(customers)} customers from {customers_csv_path}") + return customers + + +def generate_customers(customers: list): + customer_csv_path = Path(EC.cosmotech.dataset_absolute_path) / "Customer.csv" + with open(customer_csv_path, 'w', newline='') as f: + fieldnames = ['id', 'Thirsty', 'Satisfaction', 'SurroundingSatisfaction'] + writer = csv.DictWriter(f, fieldnames=fieldnames) + writer.writeheader() + for customer in customers: + writer.writerow({ + 'id': customer, + 'Thirsty': 'false', + 'Satisfaction': 100, + 'SurroundingSatisfaction': 100 + }) + LOGGER.info(f"Generated Customer.csv with {len(customers)} customers at {customer_csv_path}") + + +def generate_bar_to_customer_mapping(bar_name: str, customers: list): + bar_vertex_csv_path = Path(EC.cosmotech.dataset_absolute_path) / "Bar_vertex.csv" + with open(bar_vertex_csv_path, 'w', newline='') as f: + fieldnames = ['source', 'target'] + writer = csv.DictWriter(f, fieldnames=fieldnames) + writer.writeheader() + for customer in customers: + writer.writerow({'source': bar_name, 'target': customer}) + LOGGER.info(f"Generated bar to customer mapping in {bar_vertex_csv_path}") + + +def generate_customers_to_customer_mapping(customers: list): + arc_satisfaction_csv_path = Path(EC.cosmotech.dataset_absolute_path) / "arc_Satisfaction.csv" + with open(arc_satisfaction_csv_path, 'w', newline='') as f: + fieldnames = ['source', 'target'] + writer = csv.DictWriter(f, fieldnames=fieldnames) + writer.writeheader() + for i, j in itertools.combinations(customers, 2): + writer.writerow({'source': i, 'target': j}) + writer.writerow({'source': j, 'target': i}) + LOGGER.info(f"Generated customers to customer mapping in {arc_satisfaction_csv_path}") + + +def main(): + LOGGER.info("Starting parameter handler") + + # get_parameter from json + parameters = get_parameters() + + # update dataset Bar.csv with param + updated_values = {} + if "nb_waiters" in parameters: + updated_values["NbWaiters"] = parameters["nb_waiters"] + if "restock_qty" in parameters: + updated_values["RestockQty"] = parameters["restock_qty"] + if "stock" in parameters: + updated_values["Stock"] = parameters["stock"] + updated_values["id"] = "MyBar" + # copy initial Bar.csv to datasets path + fetched_dataset_file_path = fetch_dataset_file_path("Bar.csv") + bar_csv_path = Path(EC.cosmotech.dataset_absolute_path) / "Bar.csv" + shutil.copy(fetched_dataset_file_path, bar_csv_path) + first_row = update_first_row_in_csv(bar_csv_path, updated_values) + LOGGER.info("Updated Bar.csv with parameters") + + # copy initial Customer.csv to datasets path + fetched_dataset_file_path = fetch_dataset_file_path("Customer.csv") + customer_csv_path = Path(EC.cosmotech.dataset_absolute_path) / "Customer.csv" + shutil.copy(fetched_dataset_file_path, customer_csv_path) + + LOGGER.info("Updated Customer.csv with parameters") + + # fetch customers + customers = fetch_customers_list() + # generate Customer.csv + generate_customers(customers) + # generate bar_vertex + generate_bar_to_customer_mapping(first_row["id"], customers) + # generate arc_satisfaction + generate_customers_to_customer_mapping(customers) + + +if __name__ == "__main__": + main() diff --git a/code/run_templates/full_demo/print_inputs.py b/code/run_templates/full_demo/print_inputs.py new file mode 120000 index 0000000..331fa12 --- /dev/null +++ b/code/run_templates/full_demo/print_inputs.py @@ -0,0 +1 @@ +../common/print_inputs.py \ No newline at end of file diff --git a/code/run_templates/full_demo/run.json b/code/run_templates/full_demo/run.json new file mode 100644 index 0000000..84b257c --- /dev/null +++ b/code/run_templates/full_demo/run.json @@ -0,0 +1,60 @@ +{ + "steps": [ + { + "id": "fetch_parameters", + "command": "csm-data", + "arguments": ["api", "run-load-data"], + "useSystemEnvironment": true + }, + { + "id": "print_out", + "command": "python3", + "arguments": ["code/run_templates/full_demo/print_inputs.py"], + "precedents": ["fetch_parameters"], + "useSystemEnvironment": true + }, + { + "id": "parameters_handler", + "command": "python3", + "arguments": [ + "code/run_templates/full_demo/parameters_handler.py" + ], + "precedents": [ + "print_out" + ], + "useSystemEnvironment": true + }, + { + "id": "engine", + "command": "csm-simulator", + "arguments": [ + "-i", + "BreweryDemoSimulationWithConnector" + ], + "precedents": [ + "parameters_handler" + ], + "useSystemEnvironment": true + }, + { + "id": "load_results_to_store", + "command": "csm-data", + "arguments": [ + "store", + "load-csv-folder" + ], + "precedents": ["engine"], + "useSystemEnvironment": true + }, + { + "id": "send_results_to_psql", + "command": "csm-data", + "arguments": [ + "store", + "output" + ], + "precedents": ["load_results_to_store"], + "useSystemEnvironment": true + } + ] +} diff --git a/code/run_templates/minimal/run.json b/code/run_templates/minimal/run.json deleted file mode 100644 index c119c8b..0000000 --- a/code/run_templates/minimal/run.json +++ /dev/null @@ -1,47 +0,0 @@ -{ - "steps": [ - { - "id": "generate", - "command": "python", - "arguments": [ - "code/run_templates/common/generate_dataset.py" - ], - "useSystemEnvironment": true, - "outputs": { - "dataset_id": { - "description": "Generated Dataset ID in the Cosmo Tech API" - } - } - }, - { - "id": "download", - "command": "python", - "arguments": [ - "code/run_templates/common/download_dataset.py" - ], - "useSystemEnvironment": true, - "inputs": { - "source_dataset": { - "stepId": "generate", - "output": "dataset_id", - "as": "CSM_DATASET_ID" - } - }, - "precedents": [ - "generate" - ] - }, - { - "id": "engine", - "command": "csm-simulator", - "arguments": [ - "-i", - "BreweryDemoSimulationWithConnector" - ], - "precedents": [ - "download" - ], - "useSystemEnvironment": true - } - ] -} diff --git a/code/run_templates/sim_brewery_parameters/common b/code/run_templates/sim_brewery_parameters/common deleted file mode 120000 index 8332399..0000000 --- a/code/run_templates/sim_brewery_parameters/common +++ /dev/null @@ -1 +0,0 @@ -../common/ \ No newline at end of file diff --git a/code/run_templates/sim_brewery_parameters/parameters_handler.py b/code/run_templates/sim_brewery_parameters/parameters_handler.py deleted file mode 100644 index 5f68a0f..0000000 --- a/code/run_templates/sim_brewery_parameters/parameters_handler.py +++ /dev/null @@ -1,134 +0,0 @@ -import os -import sys -from pathlib import Path -import csv -from tempfile import NamedTemporaryFile, TemporaryDirectory -import shutil -import glob -import json -from common.common import get_api, get_logger -from cosmotech.coal.cosmotech_api.connection import get_api_client -from cosmotech.coal.cosmotech_api.workspace import download_workspace_file - -LOGGER = get_logger() -api = get_api() - - -def main(): - organization_id = os.environ.get("CSM_ORGANIZATION_ID") - workspace_id = os.environ.get("CSM_WORKSPACE_ID") - data_folder = Path(os.environ["CSM_DATASET_ABSOLUTE_PATH"]) - - parameters_folder = Path(os.environ["CSM_PARAMETERS_ABSOLUTE_PATH"]) - json_parameters_file = parameters_folder / "parameters.json" - if not json_parameters_file.exists(): - raise Exception(f"No parameters file in {parameters_file}") - - values = { - "stock": 0, - "restock_qty": 0, - "nb_waiters": 0, - "initial_stock_dataset": "", - } - expected_parameters = list(values.keys()) - LOGGER.info("Start parsing JSON parameters file") - with open(json_parameters_file) as json_file: - parameters = json.load(json_file) - for parameter in parameters: - parameter_name = parameter["parameterId"] - parameter_value = parameter["value"] - if parameter_name not in expected_parameters: - LOGGER.warning(f"Unknown parameter {parameter_name}") - else: - LOGGER.info(f'Value for {parameter_name}: "{parameter_value}"') - values[parameter_name] = parameter_value - - if values["initial_stock_dataset"] == "": - LOGGER.info("\nInitial stock file not uploaded, skipping this part...") - else: - dataset_folder = parameters_folder / "initial_stock_dataset" - dataset_files = os.listdir(dataset_folder) - if not dataset_files: - LOGGER.info(f'\nNo files in folder: "{dataset_folder}"') - else: - LOGGER.info(f"\nParsing rows of {dataset_files[0]}") - with open(dataset_folder / dataset_files[0], "r") as initial_stock_file: - dataset_reader = csv.reader(initial_stock_file) - for row in dataset_reader: - values["stock"] = row[1] - break - - runner_data = api.runner.get_runner( - organization_id=organization_id, - workspace_id=workspace_id, - runner_id=os.environ.get("CSM_RUNNER_ID"), - ) - instance_dataset_id = runner_data.dataset_list[0] - dataset = api.dataset.find_dataset_by_id( - organization_id=organization_id, - dataset_id=instance_dataset_id, - ) - - # Default path for ETL-generated datasets - ws_file_path = f'datasets/{instance_dataset_id}/generated_brewery_instance.zip' - if dataset.source_type == "None": - try: - ws_file_path = dataset.source.location - except e: - LOGGER.warning(f"Failed to get source.location from dataset {instance_dataset_id}, using default file path") - - LOGGER.info("Downloading instance dataset...") - _file_content = api.workspace.download_workspace_file(organization_id, workspace_id, ws_file_path) - - with TemporaryDirectory() as tmp_dir: - archive_path = os.path.join(tmp_dir, 'dataset.zip') - with open(archive_path, "wb") as _file: - _file.write(_file_content) - - LOGGER.info("Extracting instance dataset...") - if not os.path.exists(data_folder): - os.mkdir(data_folder) - shutil.unpack_archive(archive_path, data_folder) - - files = "\n".join([f" - {path}" for path in glob.glob(str(data_folder / "**"), recursive=True)]) - LOGGER.info(f"\nData files:\n{files}") - - temp_file = NamedTemporaryFile("w+t", newline="", delete=False) - bar_file_path = data_folder / "Bar.csv" - LOGGER.info("\nPatching dataset file Bar.csv with parameters values...") - with open(bar_file_path, newline="") as bar_file: - bar_reader = csv.reader(bar_file) - bar_writer = csv.writer(temp_file) - - header = next(bar_reader) - bar_writer.writerow(header) - column_indices = {"stock": -1, "restock_qty": -1, "nb_waiters": -1} - csv_column_mapping = { - "Stock": "stock", - "RestockQty": "restock_qty", - "NbWaiters": "nb_waiters", - } - for index, column in enumerate(header): - if column in csv_column_mapping: - parameter_name = csv_column_mapping[column] - column_indices[parameter_name] = index - - for row in bar_reader: - for parameter_name, column_index in column_indices.items(): - if column_index == -1: - LOGGER.warning(f" - {parameter_name}: parameter never found in CSV header:") - LOGGER.warning(f' - CSV header: "{header}"') - LOGGER.warning(f' - column mapping: "{csv_column_mapping}"') - continue - previous_value = row[column_index] - new_value = values[parameter_name] - row[column_index] = new_value - LOGGER.info(f" - {parameter_name}: {previous_value} => {new_value}") - bar_writer.writerow(row) - - temp_file.close() - shutil.move(temp_file.name, bar_file_path) - - -if __name__ == "__main__": - main() diff --git a/code/run_templates/sim_brewery_parameters/run.json b/code/run_templates/sim_brewery_parameters/run.json deleted file mode 100644 index 5ba556f..0000000 --- a/code/run_templates/sim_brewery_parameters/run.json +++ /dev/null @@ -1,69 +0,0 @@ -{ - "steps": [ - { "id": "csm_data_version", "command": "csm-data", "arguments": ["--version"] }, - { "id": "csm_orc_version", "command": "csm-orc", "arguments": ["--version"] }, - { - "id": "reset_data_store", - "command": "csm-data", - "arguments": [ - "store", - "reset" - ], - "precedents": ["csm_data_version", "csm_orc_version"], - "useSystemEnvironment": true - }, - { - "id": "fetch_parameters", - "command": "csm-data", - "arguments": ["api", "run-load-data"], - "precedents": ["reset_data_store"], - "useSystemEnvironment": true - }, - { - "id": "parameters_handler", - "command": "python", - "arguments": ["code/run_templates/sim_brewery_parameters/parameters_handler.py"], - "precedents": ["fetch_parameters"], - "useSystemEnvironment": true - }, - { - "id":"engine", - "command":"main", - "arguments": ["-i","BreweryDemoSimulationWithConnector"], - "precedents": ["parameters_handler"], - "useSystemEnvironment": true - }, - { - "id": "load_results_to_store", - "command": "csm-data", - "arguments": [ - "store", - "load-csv-folder", - "--csv-folder", - "$CSM_OUTPUT_ABSOLUTE_PATH" - ], - "precedents": ["engine"], - "useSystemEnvironment": true - }, - { - "id": "send_results_to_psql", - "command": "csm-data", - "arguments": [ - "store", - "dump-to-postgresql", - "--table-prefix", - "brewery", - "--append" - ], - "precedents": ["load_results_to_store"], - "useSystemEnvironment": true - }, - { - "id":"update_runner_metadata", - "command":"python", - "arguments":["code/run_templates/common/update_runner_metadata.py"], - "useSystemEnvironment":true, - "precedents":["load_results_to_store"] - } - ] -} diff --git a/code/run_templates/sim_mock_parameters/run.json b/code/run_templates/sim_mock_parameters/run.json deleted file mode 100644 index 3b20d42..0000000 --- a/code/run_templates/sim_mock_parameters/run.json +++ /dev/null @@ -1,57 +0,0 @@ -{ - "steps": [ - { "id": "csm_data_version", "command": "csm-data", "arguments": ["--version"] }, - { "id": "csm_orc_version", "command": "csm-orc", "arguments": ["--version"] }, - { - "id":"engine", - "command":"main", - "arguments": ["-i","BreweryDemoSimulation"], - "useSystemEnvironment":false, - "precedents": ["csm_data_version", "csm_orc_version"] - }, - { - "id": "load_results_to_store", - "command": "csm-data", - "arguments": [ - "store", - "load-csv-folder", - "--csv-folder", - "$CSM_OUTPUT_ABSOLUTE_PATH" - ], - "precedents": ["engine"], - "useSystemEnvironment": true - }, - { - "id": "send_results_to_psql", - "command": "csm-data", - "arguments": [ - "store", - "dump-to-postgresql", - "--table-prefix", - "brewery", - "--append" - ], - "precedents": ["load_results_to_store"], - "useSystemEnvironment": true - }, - { - "id":"update_runner_metadata", - "command":"python", - "arguments":[ - "code/run_templates/common/update_runner_metadata.py", - "--postgres-host", "${POSTGRES_HOST_URI}", - "--postgres-port", "${POSTGRES_HOST_PORT}", - "--postgres-db", "${POSTGRES_DB_NAME}", - "--postgres-schema", "${POSTGRES_DB_SCHEMA}", - "--postgres-user", "${POSTGRES_USER_NAME}", - "--postgres-password", "${POSTGRES_USER_PASSWORD}", - "--csm-organization-id", "${CSM_ORGANIZATION_ID}", - "--csm-workspace-id", "${CSM_WORKSPACE_ID}", - "--csm-runner-id", "${CSM_RUNNER_ID}", - "--csm-run-id", "${CSM_RUN_ID}" - ], - "useSystemEnvironment":true, - "precedents":["load_results_to_store"] - } - ] -} diff --git a/code/run_templates/sim_no_parameters/run.json b/code/run_templates/sim_no_parameters/run.json deleted file mode 100644 index 3b20d42..0000000 --- a/code/run_templates/sim_no_parameters/run.json +++ /dev/null @@ -1,57 +0,0 @@ -{ - "steps": [ - { "id": "csm_data_version", "command": "csm-data", "arguments": ["--version"] }, - { "id": "csm_orc_version", "command": "csm-orc", "arguments": ["--version"] }, - { - "id":"engine", - "command":"main", - "arguments": ["-i","BreweryDemoSimulation"], - "useSystemEnvironment":false, - "precedents": ["csm_data_version", "csm_orc_version"] - }, - { - "id": "load_results_to_store", - "command": "csm-data", - "arguments": [ - "store", - "load-csv-folder", - "--csv-folder", - "$CSM_OUTPUT_ABSOLUTE_PATH" - ], - "precedents": ["engine"], - "useSystemEnvironment": true - }, - { - "id": "send_results_to_psql", - "command": "csm-data", - "arguments": [ - "store", - "dump-to-postgresql", - "--table-prefix", - "brewery", - "--append" - ], - "precedents": ["load_results_to_store"], - "useSystemEnvironment": true - }, - { - "id":"update_runner_metadata", - "command":"python", - "arguments":[ - "code/run_templates/common/update_runner_metadata.py", - "--postgres-host", "${POSTGRES_HOST_URI}", - "--postgres-port", "${POSTGRES_HOST_PORT}", - "--postgres-db", "${POSTGRES_DB_NAME}", - "--postgres-schema", "${POSTGRES_DB_SCHEMA}", - "--postgres-user", "${POSTGRES_USER_NAME}", - "--postgres-password", "${POSTGRES_USER_PASSWORD}", - "--csm-organization-id", "${CSM_ORGANIZATION_ID}", - "--csm-workspace-id", "${CSM_WORKSPACE_ID}", - "--csm-runner-id", "${CSM_RUNNER_ID}", - "--csm-run-id", "${CSM_RUN_ID}" - ], - "useSystemEnvironment":true, - "precedents":["load_results_to_store"] - } - ] -} diff --git a/code/run_templates/standalone/generate.py b/code/run_templates/standalone/generate.py new file mode 100644 index 0000000..d477b5a --- /dev/null +++ b/code/run_templates/standalone/generate.py @@ -0,0 +1,4 @@ +import os +import generate_dataset + +generate_dataset.generate_data(os.environ.get("CSM_DATASET_ABSOLUTE_PATH")) diff --git a/code/run_templates/standalone/generate_dataset.py b/code/run_templates/standalone/generate_dataset.py new file mode 120000 index 0000000..ad95c07 --- /dev/null +++ b/code/run_templates/standalone/generate_dataset.py @@ -0,0 +1 @@ +../common/generate_dataset.py \ No newline at end of file diff --git a/code/run_templates/standalone/run.json b/code/run_templates/standalone/run.json new file mode 100644 index 0000000..60e0f26 --- /dev/null +++ b/code/run_templates/standalone/run.json @@ -0,0 +1,23 @@ +{ + "steps": [ + { + "id": "generate_input_data", + "command": "python", + "arguments": [ + "code/run_templates/standalone/generate.py" + ] + }, + { + "id": "engine", + "command": "csm-simulator", + "arguments": [ + "-i", + "BreweryDemoSimulationWithConnector" + ], + "precedents": [ + "generate_input_data" + ], + "useSystemEnvironment": true + } + ] +}