diff --git a/fastapi_app/Dockerfile b/fastapi_app/Dockerfile index 9019371..900aefe 100644 --- a/fastapi_app/Dockerfile +++ b/fastapi_app/Dockerfile @@ -13,6 +13,7 @@ RUN pip install -r /tmp/requirements.txt # install multi-vector-simulator's version pinned in docker-compose.yml file RUN pip install multi-vector-simulator==$mvs_version RUN pip install gunicorn +RUN pip install jinja2==3.0 COPY . /fastapi_app diff --git a/fastapi_app/requirements.txt b/fastapi_app/requirements.txt index 06877c4..f43a494 100644 --- a/fastapi_app/requirements.txt +++ b/fastapi_app/requirements.txt @@ -1,6 +1,5 @@ fastapi aiofiles -jinja2==3.0 uvicorn celery redis diff --git a/fastapi_app/templates/index.html b/fastapi_app/templates/index.html index 2341eb3..ed234e3 100644 --- a/fastapi_app/templates/index.html +++ b/fastapi_app/templates/index.html @@ -39,6 +39,15 @@

Actual multi-vector-simulator's version for open_plan: {{ mvs_open_plan_vers + +

Upload a mvs input json file, formated as epa and click on "Run sensitivity analysis"

+
+ + + +
+ + diff --git a/fastapi_app/templates/submitted_sensitivity_analysis.html b/fastapi_app/templates/submitted_sensitivity_analysis.html new file mode 100644 index 0000000..315f1e5 --- /dev/null +++ b/fastapi_app/templates/submitted_sensitivity_analysis.html @@ -0,0 +1,16 @@ +{% extends 'index.html' %} + +{% block body %} + +
+

Sensitivity Analysis

+ {% if task_id %} +

+ {{ task_id }}: results +

+ {% else %} +

The input parameters were not found in the input json file

+ {% endif %} +
+ +{% endblock body %} \ No newline at end of file diff --git a/fastapi_app/webapp.py b/fastapi_app/webapp.py index 8730ae7..5323283 100644 --- a/fastapi_app/webapp.py +++ b/fastapi_app/webapp.py @@ -1,3 +1,4 @@ +import copy import os import json import io @@ -57,10 +58,10 @@ @app.get("/") -def index(request: Request) -> Response: +def index(request: Request, template: str = "index.html") -> Response: return templates.TemplateResponse( - "index.html", + template, { "request": request, "mvs_dev_version": MVS_DEV_VERSION, @@ -92,6 +93,18 @@ async def simulate_json_variable_open_plan(request: Request): return await simulate_json_variable(request, queue="open_plan") +@app.post("/sendjson/openplan/sensitivity-analysis") +async def sensitivity_analysis_json_variable_open_plan(request: Request): + + input_dict = await request.json() + + sensitivity_analysis_id = run_sensitivity_analysis( + input_json=json.dumps(input_dict) + ) + + return await check_sensitivity_analysis(sensitivity_analysis_id) + + @app.post("/uploadjson/dev") def simulate_uploaded_json_files_dev( request: Request, json_file: UploadFile = File(...) @@ -116,6 +129,38 @@ def simulate_uploaded_json_files_open_plan( return run_simulation_open_plan(request, input_json=json_content) +@app.post("/uploadjson-sensitivity-analysis/open_plan") +def sensitivity_analysis_uploaded_json_files_open_plan( + request: Request, json_file: UploadFile = File(...) +) -> Response: + """Receive mvs sensitivity analysis parameter in json post request and send it to simulator + the value of `name` property of the input html tag should be `json_file` as the second + argument of this function + """ + json_content = jsonable_encoder(json_file.file.read()) + + sensitivity_analysis_id = run_sensitivity_analysis(input_json=json_content) + + return templates.TemplateResponse( + "submitted_sensitivity_analysis.html", + {"request": request, "task_id": sensitivity_analysis_id}, + ) + + +def run_sensitivity_analysis(input_json=None, queue="open_plan"): + """Send a sensitivity analysis task to a celery worker""" + + """Receive mvs simulation parameter in json post request and send it to simulator""" + input_dict = json.loads(input_json) + + sensitivity_analysis = celery_app.send_task( + f"{queue}.run_sensitivity_analysis", args=[input_dict], queue=queue, kwargs={} + ) + answer = sensitivity_analysis.id + + return answer + + def run_simulation(request: Request, input_json=None, queue="dev") -> Response: """Send a simulation task to a celery worker""" @@ -157,9 +202,7 @@ async def check_task(task_id: str) -> JSONResponse: "status": res.state, "results": None, } - if res.state == states.PENDING: - task["status"] = res.state - else: + if res.state != states.PENDING: task["status"] = "DONE" results_as_dict = json.loads(res.result) server_info = results_as_dict.pop("SERVER") @@ -173,6 +216,65 @@ async def check_task(task_id: str) -> JSONResponse: return JSONResponse(content=jsonable_encoder(task)) +@app.get("/check-sensitivity-analysis/{task_id}") +async def check_sensitivity_analysis(task_id: str) -> JSONResponse: + sensitivity_analysis = celery_app.AsyncResult(task_id) + + task = { + "server_info": None, + "mvs_version": None, + "id": task_id, + "status": sensitivity_analysis.state, + "results": dict(reference_simulation_id=None, sensitivity_analysis_steps=None), + } + if sensitivity_analysis.state != states.PENDING: + sa_results = sensitivity_analysis.result + if "ERROR" in sa_results: + task["status"] = "ERROR" + task["results"] = sa_results + else: + # fetch results of each sensitivity analysis steps + sa_step_ids = sa_results["sensitivity_analysis_ids"] + + if "ERROR" in sa_step_ids: + task["status"] = "ERROR" + task["results"]["sensitivity_analysis_ids"] = sa_step_ids + answers = None + else: + server_info = None + answers = [] + for sa_step_id in sa_step_ids: + sa_step = celery_app.AsyncResult(sa_step_id) + if sa_step.ready(): + results_as_dict = json.loads(sa_step.result) + server_info = results_as_dict.pop("SERVER") + if "ERROR" in results_as_dict: + + temp = copy.deepcopy(results_as_dict) + temp.pop("step_idx") + results_as_dict["output_values"] = temp + answers.append(results_as_dict) + else: + task["status"] = states.PENDING + answers = None + break + + if answers is not None: + + task["status"] = "DONE" + task["server_info"] = server_info + task["mvs_version"] = MVS_SERVER_VERSIONS.get(server_info, "unknown") + task["results"]["sensitivity_analysis_steps"] = [ + d["output_values"] + for d in sorted(answers, key=lambda item: item["step_idx"]) + ] + # the "result" of the main simulation here is the mvs token leading to the simulations results + # that can be checked with url /check/{task_id} + task["results"]["reference_simulation_id"] = sa_results["ref_sim_id"] + + return JSONResponse(content=jsonable_encoder(task)) + + @app.get("/get_lp_file/{task_id}") async def get_lp_file(task_id: str) -> Response: res = celery_app.AsyncResult(task_id) @@ -212,4 +314,3 @@ async def get_lp_file(task_id: str) -> Response: response = "There is no LP file output, did you check the LP file option when you started your simulation?" return response - diff --git a/task_queue/requirements.txt b/task_queue/requirements.txt index 9fc707f..77bfd0d 100644 --- a/task_queue/requirements.txt +++ b/task_queue/requirements.txt @@ -2,3 +2,4 @@ celery flower redis python-multipart +jsonschema \ No newline at end of file diff --git a/task_queue/sensitivity_analysis_utils.py b/task_queue/sensitivity_analysis_utils.py new file mode 100644 index 0000000..4c88958 --- /dev/null +++ b/task_queue/sensitivity_analysis_utils.py @@ -0,0 +1,145 @@ +import jsonschema +import traceback +from multi_vector_simulator.utils.data_parser import MAP_EPA_MVS + + +SA_SCHEMA = { + "type": "object", + "required": [ + "variable_parameter_name", + "variable_parameter_range", + "variable_parameter_ref_val", + "output_parameter_names", + ], + "properties": { + "variable_parameter_name": { + "oneOf": [{"type": "array", "items": {"type": "string"}}] + }, + "variable_parameter_range": {"type": "array", "items": {"type": "number"}}, + "variable_parameter_ref_val": {"type": "number"}, + "output_parameter_names": {"type": "array", "items": {"type": "string"}}, + }, + "additionalProperties": False, +} + +SENSITIVITY_ANALYSIS_SETTINGS = "sensitivity_analysis_settings" + + +class SensitivityAnalysis: + __raw_input = None + variable_parameter_name = None + variable_parameter_range = None + variable_parameter_ref_val = None + output_parameter_names = None + validation_error = "" + + def __init__(self, dict_settings): + sa_settings = dict_settings.get(SENSITIVITY_ANALYSIS_SETTINGS, None) + self.__raw_input = sa_settings + if sa_settings is not None: + try: + jsonschema.validate(sa_settings, SA_SCHEMA) + schema_is_valid = True + except jsonschema.exceptions.ValidationError as e: + schema_is_valid = False + self.validation_error = "{}".format(traceback.format_exc()) + + if schema_is_valid is True: + self.variable_parameter_name = sa_settings.get( + "variable_parameter_name", None + ) + self.format_parameter_name() + + self.variable_parameter_range = sa_settings.get( + "variable_parameter_range", None + ) + self.output_parameter_names = sa_settings.get( + "output_parameter_names", None + ) + self.variable_parameter_ref_val = sa_settings.get( + "variable_parameter_ref_val", None + ) + else: + self.validation_error = ( + f"The key {SENSITIVITY_ANALYSIS_SETTINGS} is missing in the input json" + ) + + def format_parameter_name(self): + if self.variable_parameter_name is not None: + self.variable_parameter_name = tuple( + [MAP_EPA_MVS.get(key, key) for key in self.variable_parameter_name] + ) + + def is_valid(self): + if ( + self.variable_parameter_name is not None + and self.variable_parameter_range is not None + and self.output_parameter_names is not None + and self.variable_parameter_ref_val + ): + answer = True + if self.variable_parameter_ref_val not in self.variable_parameter_range: + answer = False + self.validation_error = ( + f"The value ({self.variable_parameter_ref_val}) of the variable parameter" + f" {'.'.join(self.variable_parameter_name)} for the reference scenario of" + " the sensitivity analysis is not within the variable range provided" + f": [{', '.join([str(p) for p in self.variable_parameter_range]) }]" + ) + else: + answer = False + return answer + + def __str__(self): + return str(self.__raw_input) + + +if __name__ == "__main__": + import json + + with open("test_sa.json", "r") as jf: + input_dict = json.load(jf) + + # with open("AFG_epa_format.json", "w") as jf: + # json.dump(input_dict, jf, indent=4) + + # input_dict[SENSITIVITY_ANALYSIS_SETTINGS] = { + # "variable_parameter_name": ["energy_busses"], + # "variable_parameter_range": [1, 2, 3.2, 3.5], + # "variable_parameter_ref_val": 3, + # "output_parameter_names": [ + # "specific_emissions_per_electricity_equivalent", + # "total_feedinElectricity", + # "total_internal_generation", + # "peak_flow", + # ], + # } + sa = SensitivityAnalysis(input_dict) + print(sa.is_valid(), sa.validation_error) + import ipdb + + ipdb.set_trace() + # input_dict[SENSITIVITY_ANALYSIS_SETTINGS] = { + # "variable_parameter_name": [ + # "energy_providers", + # "Grid_DSO", + # "energy_price", + # "value", + # ], + # "variable_parameter_range": [1, 2, 3, 3.5], + # "variable_parameter_ref_val": 3, + # "output_parameter_names": [ + # "specific_emissions_per_electricity_equivalent", + # "total_feedinElectricity", + # "total_internal_generation", + # "peak_flow", + # ], + # # } + # sa = SensitivityAnalysis(input_dict) + # print(sa.is_valid(), sa.validation_error) + # sa = SensitivityAnalysis({SENSITIVITY_ANALYSIS_SETTINGS: { + # "variable_parameter_name": "", + # "variable_parameter_range": "", + # "variable_parameter_ref_val": "", + # "output_parameter_names": "", + # }}) diff --git a/task_queue/tasks.py b/task_queue/tasks.py index fc2af7a..b479184 100644 --- a/task_queue/tasks.py +++ b/task_queue/tasks.py @@ -1,13 +1,20 @@ import os import time +import logging import traceback import json from copy import deepcopy from celery import Celery -from multi_vector_simulator.server import run_simulation as mvs_simulation +from multi_vector_simulator.server import ( + run_simulation as mvs_simulation, + run_sensitivity_analysis_step as mvs_sensitivity_analysis_step, +) +from multi_vector_simulator.utils import set_nested_value, nested_dict_crawler from multi_vector_simulator.utils.data_parser import convert_epa_params_to_mvs +from sensitivity_analysis_utils import SensitivityAnalysis + CELERY_BROKER_URL = (os.environ.get("CELERY_BROKER_URL", "redis://localhost:6379"),) CELERY_RESULT_BACKEND = os.environ.get( "CELERY_RESULT_BACKEND", "redis://localhost:6379" @@ -34,3 +41,77 @@ def run_simulation(simulation_input: dict,) -> dict: INPUT_JSON_MVS=dict_values, ) return json.dumps(simulation_output) + + +@app.task(name=f"{CELERY_TASK_NAME}.run_sensitivity_analysis") +def run_sensitivity_analysis(simulation_input: dict,): + epa_json = deepcopy(simulation_input) + + # parse the sensitivity analysis settings from input json + sa_settings = SensitivityAnalysis(epa_json) + + if sa_settings.is_valid() is False: + answer = dict( + SERVER=CELERY_TASK_NAME, + ERROR="{}".format(sa_settings.validation_error), + INPUT_JSON_EPA=epa_json, + SENSITIVITY_ANALYSIS_SETTINGS=str(sa_settings), + ) + else: + mvs_dict = None + param_val = None + + # start a mvs simulation for the reference scenario + reference_simulation = run_simulation.apply_async( + args=[epa_json], queue=CELERY_TASK_NAME + ) + answer = dict(ref_sim_id=reference_simulation.id, sensitivity_analysis_ids=[]) + + try: + mvs_dict = convert_epa_params_to_mvs(epa_json) + task_ids = [] + # perform one mvs sensitivity analysis step per value of the variable parameter + # this is similar to running a simulation, only part of the output is returned + for i, param_val in enumerate(sa_settings.variable_parameter_range): + + modified_mvs_dict = set_nested_value( + mvs_dict, param_val, sa_settings.variable_parameter_name + ) + result = run_sensitivity_analysis_step.apply_async( + args=[modified_mvs_dict, i, sa_settings.output_parameter_names], + queue=CELERY_TASK_NAME, + ) + task_ids.append(result.id) + answer["sensitivity_analysis_ids"] = task_ids + + except Exception: + answer["sensitivity_analysis_ids"] = dict( + SERVER=CELERY_TASK_NAME, + ERROR="{}".format(traceback.format_exc()), + INPUT_PARAM_PATH=sa_settings.variable_parameter_name, + INPUT_PARAM_VAL=param_val, + PARAM_PATHES=nested_dict_crawler(mvs_dict), + ) + + return answer + + +@app.task(name=f"{CELERY_TASK_NAME}.run_sensitivity_analysis_step") +def run_sensitivity_analysis_step( + mvs_dict: dict, step_idx: int, output_variables: list +) -> dict: + mvs_dict + try: + simulation_output = mvs_sensitivity_analysis_step( + mvs_dict, step_idx, output_variables + ) + simulation_output["SERVER"] = CELERY_TASK_NAME + except Exception as e: + simulation_output = dict( + SERVER=CELERY_TASK_NAME, + ERROR="{}".format(traceback.format_exc()), + step_idx=step_idx, + INPUT_JSON_MVS=mvs_dict, + OUTPUT_VARIABLES=output_variables, + ) + return json.dumps(simulation_output)