diff --git a/fastapi_app/Dockerfile b/fastapi_app/Dockerfile
index 9019371..900aefe 100644
--- a/fastapi_app/Dockerfile
+++ b/fastapi_app/Dockerfile
@@ -13,6 +13,7 @@ RUN pip install -r /tmp/requirements.txt
# install multi-vector-simulator's version pinned in docker-compose.yml file
RUN pip install multi-vector-simulator==$mvs_version
RUN pip install gunicorn
+RUN pip install jinja2==3.0
COPY . /fastapi_app
diff --git a/fastapi_app/requirements.txt b/fastapi_app/requirements.txt
index 06877c4..f43a494 100644
--- a/fastapi_app/requirements.txt
+++ b/fastapi_app/requirements.txt
@@ -1,6 +1,5 @@
fastapi
aiofiles
-jinja2==3.0
uvicorn
celery
redis
diff --git a/fastapi_app/templates/index.html b/fastapi_app/templates/index.html
index 2341eb3..ed234e3 100644
--- a/fastapi_app/templates/index.html
+++ b/fastapi_app/templates/index.html
@@ -39,6 +39,15 @@
Actual multi-vector-simulator's version for open_plan: {{ mvs_open_plan_vers
+
+
Upload a mvs input json file, formated as epa and click on "Run sensitivity analysis"
+
+
+
diff --git a/fastapi_app/templates/submitted_sensitivity_analysis.html b/fastapi_app/templates/submitted_sensitivity_analysis.html
new file mode 100644
index 0000000..315f1e5
--- /dev/null
+++ b/fastapi_app/templates/submitted_sensitivity_analysis.html
@@ -0,0 +1,16 @@
+{% extends 'index.html' %}
+
+{% block body %}
+
+
+
Sensitivity Analysis
+ {% if task_id %}
+
+ {{ task_id }}: results
+
+ {% else %}
+
The input parameters were not found in the input json file
+ {% endif %}
+
+
+{% endblock body %}
\ No newline at end of file
diff --git a/fastapi_app/webapp.py b/fastapi_app/webapp.py
index 8730ae7..5323283 100644
--- a/fastapi_app/webapp.py
+++ b/fastapi_app/webapp.py
@@ -1,3 +1,4 @@
+import copy
import os
import json
import io
@@ -57,10 +58,10 @@
@app.get("/")
-def index(request: Request) -> Response:
+def index(request: Request, template: str = "index.html") -> Response:
return templates.TemplateResponse(
- "index.html",
+ template,
{
"request": request,
"mvs_dev_version": MVS_DEV_VERSION,
@@ -92,6 +93,18 @@ async def simulate_json_variable_open_plan(request: Request):
return await simulate_json_variable(request, queue="open_plan")
+@app.post("/sendjson/openplan/sensitivity-analysis")
+async def sensitivity_analysis_json_variable_open_plan(request: Request):
+
+ input_dict = await request.json()
+
+ sensitivity_analysis_id = run_sensitivity_analysis(
+ input_json=json.dumps(input_dict)
+ )
+
+ return await check_sensitivity_analysis(sensitivity_analysis_id)
+
+
@app.post("/uploadjson/dev")
def simulate_uploaded_json_files_dev(
request: Request, json_file: UploadFile = File(...)
@@ -116,6 +129,38 @@ def simulate_uploaded_json_files_open_plan(
return run_simulation_open_plan(request, input_json=json_content)
+@app.post("/uploadjson-sensitivity-analysis/open_plan")
+def sensitivity_analysis_uploaded_json_files_open_plan(
+ request: Request, json_file: UploadFile = File(...)
+) -> Response:
+ """Receive mvs sensitivity analysis parameter in json post request and send it to simulator
+ the value of `name` property of the input html tag should be `json_file` as the second
+ argument of this function
+ """
+ json_content = jsonable_encoder(json_file.file.read())
+
+ sensitivity_analysis_id = run_sensitivity_analysis(input_json=json_content)
+
+ return templates.TemplateResponse(
+ "submitted_sensitivity_analysis.html",
+ {"request": request, "task_id": sensitivity_analysis_id},
+ )
+
+
+def run_sensitivity_analysis(input_json=None, queue="open_plan"):
+ """Send a sensitivity analysis task to a celery worker"""
+
+ """Receive mvs simulation parameter in json post request and send it to simulator"""
+ input_dict = json.loads(input_json)
+
+ sensitivity_analysis = celery_app.send_task(
+ f"{queue}.run_sensitivity_analysis", args=[input_dict], queue=queue, kwargs={}
+ )
+ answer = sensitivity_analysis.id
+
+ return answer
+
+
def run_simulation(request: Request, input_json=None, queue="dev") -> Response:
"""Send a simulation task to a celery worker"""
@@ -157,9 +202,7 @@ async def check_task(task_id: str) -> JSONResponse:
"status": res.state,
"results": None,
}
- if res.state == states.PENDING:
- task["status"] = res.state
- else:
+ if res.state != states.PENDING:
task["status"] = "DONE"
results_as_dict = json.loads(res.result)
server_info = results_as_dict.pop("SERVER")
@@ -173,6 +216,65 @@ async def check_task(task_id: str) -> JSONResponse:
return JSONResponse(content=jsonable_encoder(task))
+@app.get("/check-sensitivity-analysis/{task_id}")
+async def check_sensitivity_analysis(task_id: str) -> JSONResponse:
+ sensitivity_analysis = celery_app.AsyncResult(task_id)
+
+ task = {
+ "server_info": None,
+ "mvs_version": None,
+ "id": task_id,
+ "status": sensitivity_analysis.state,
+ "results": dict(reference_simulation_id=None, sensitivity_analysis_steps=None),
+ }
+ if sensitivity_analysis.state != states.PENDING:
+ sa_results = sensitivity_analysis.result
+ if "ERROR" in sa_results:
+ task["status"] = "ERROR"
+ task["results"] = sa_results
+ else:
+ # fetch results of each sensitivity analysis steps
+ sa_step_ids = sa_results["sensitivity_analysis_ids"]
+
+ if "ERROR" in sa_step_ids:
+ task["status"] = "ERROR"
+ task["results"]["sensitivity_analysis_ids"] = sa_step_ids
+ answers = None
+ else:
+ server_info = None
+ answers = []
+ for sa_step_id in sa_step_ids:
+ sa_step = celery_app.AsyncResult(sa_step_id)
+ if sa_step.ready():
+ results_as_dict = json.loads(sa_step.result)
+ server_info = results_as_dict.pop("SERVER")
+ if "ERROR" in results_as_dict:
+
+ temp = copy.deepcopy(results_as_dict)
+ temp.pop("step_idx")
+ results_as_dict["output_values"] = temp
+ answers.append(results_as_dict)
+ else:
+ task["status"] = states.PENDING
+ answers = None
+ break
+
+ if answers is not None:
+
+ task["status"] = "DONE"
+ task["server_info"] = server_info
+ task["mvs_version"] = MVS_SERVER_VERSIONS.get(server_info, "unknown")
+ task["results"]["sensitivity_analysis_steps"] = [
+ d["output_values"]
+ for d in sorted(answers, key=lambda item: item["step_idx"])
+ ]
+ # the "result" of the main simulation here is the mvs token leading to the simulations results
+ # that can be checked with url /check/{task_id}
+ task["results"]["reference_simulation_id"] = sa_results["ref_sim_id"]
+
+ return JSONResponse(content=jsonable_encoder(task))
+
+
@app.get("/get_lp_file/{task_id}")
async def get_lp_file(task_id: str) -> Response:
res = celery_app.AsyncResult(task_id)
@@ -212,4 +314,3 @@ async def get_lp_file(task_id: str) -> Response:
response = "There is no LP file output, did you check the LP file option when you started your simulation?"
return response
-
diff --git a/task_queue/requirements.txt b/task_queue/requirements.txt
index 9fc707f..77bfd0d 100644
--- a/task_queue/requirements.txt
+++ b/task_queue/requirements.txt
@@ -2,3 +2,4 @@ celery
flower
redis
python-multipart
+jsonschema
\ No newline at end of file
diff --git a/task_queue/sensitivity_analysis_utils.py b/task_queue/sensitivity_analysis_utils.py
new file mode 100644
index 0000000..4c88958
--- /dev/null
+++ b/task_queue/sensitivity_analysis_utils.py
@@ -0,0 +1,145 @@
+import jsonschema
+import traceback
+from multi_vector_simulator.utils.data_parser import MAP_EPA_MVS
+
+
+SA_SCHEMA = {
+ "type": "object",
+ "required": [
+ "variable_parameter_name",
+ "variable_parameter_range",
+ "variable_parameter_ref_val",
+ "output_parameter_names",
+ ],
+ "properties": {
+ "variable_parameter_name": {
+ "oneOf": [{"type": "array", "items": {"type": "string"}}]
+ },
+ "variable_parameter_range": {"type": "array", "items": {"type": "number"}},
+ "variable_parameter_ref_val": {"type": "number"},
+ "output_parameter_names": {"type": "array", "items": {"type": "string"}},
+ },
+ "additionalProperties": False,
+}
+
+SENSITIVITY_ANALYSIS_SETTINGS = "sensitivity_analysis_settings"
+
+
+class SensitivityAnalysis:
+ __raw_input = None
+ variable_parameter_name = None
+ variable_parameter_range = None
+ variable_parameter_ref_val = None
+ output_parameter_names = None
+ validation_error = ""
+
+ def __init__(self, dict_settings):
+ sa_settings = dict_settings.get(SENSITIVITY_ANALYSIS_SETTINGS, None)
+ self.__raw_input = sa_settings
+ if sa_settings is not None:
+ try:
+ jsonschema.validate(sa_settings, SA_SCHEMA)
+ schema_is_valid = True
+ except jsonschema.exceptions.ValidationError as e:
+ schema_is_valid = False
+ self.validation_error = "{}".format(traceback.format_exc())
+
+ if schema_is_valid is True:
+ self.variable_parameter_name = sa_settings.get(
+ "variable_parameter_name", None
+ )
+ self.format_parameter_name()
+
+ self.variable_parameter_range = sa_settings.get(
+ "variable_parameter_range", None
+ )
+ self.output_parameter_names = sa_settings.get(
+ "output_parameter_names", None
+ )
+ self.variable_parameter_ref_val = sa_settings.get(
+ "variable_parameter_ref_val", None
+ )
+ else:
+ self.validation_error = (
+ f"The key {SENSITIVITY_ANALYSIS_SETTINGS} is missing in the input json"
+ )
+
+ def format_parameter_name(self):
+ if self.variable_parameter_name is not None:
+ self.variable_parameter_name = tuple(
+ [MAP_EPA_MVS.get(key, key) for key in self.variable_parameter_name]
+ )
+
+ def is_valid(self):
+ if (
+ self.variable_parameter_name is not None
+ and self.variable_parameter_range is not None
+ and self.output_parameter_names is not None
+ and self.variable_parameter_ref_val
+ ):
+ answer = True
+ if self.variable_parameter_ref_val not in self.variable_parameter_range:
+ answer = False
+ self.validation_error = (
+ f"The value ({self.variable_parameter_ref_val}) of the variable parameter"
+ f" {'.'.join(self.variable_parameter_name)} for the reference scenario of"
+ " the sensitivity analysis is not within the variable range provided"
+ f": [{', '.join([str(p) for p in self.variable_parameter_range]) }]"
+ )
+ else:
+ answer = False
+ return answer
+
+ def __str__(self):
+ return str(self.__raw_input)
+
+
+if __name__ == "__main__":
+ import json
+
+ with open("test_sa.json", "r") as jf:
+ input_dict = json.load(jf)
+
+ # with open("AFG_epa_format.json", "w") as jf:
+ # json.dump(input_dict, jf, indent=4)
+
+ # input_dict[SENSITIVITY_ANALYSIS_SETTINGS] = {
+ # "variable_parameter_name": ["energy_busses"],
+ # "variable_parameter_range": [1, 2, 3.2, 3.5],
+ # "variable_parameter_ref_val": 3,
+ # "output_parameter_names": [
+ # "specific_emissions_per_electricity_equivalent",
+ # "total_feedinElectricity",
+ # "total_internal_generation",
+ # "peak_flow",
+ # ],
+ # }
+ sa = SensitivityAnalysis(input_dict)
+ print(sa.is_valid(), sa.validation_error)
+ import ipdb
+
+ ipdb.set_trace()
+ # input_dict[SENSITIVITY_ANALYSIS_SETTINGS] = {
+ # "variable_parameter_name": [
+ # "energy_providers",
+ # "Grid_DSO",
+ # "energy_price",
+ # "value",
+ # ],
+ # "variable_parameter_range": [1, 2, 3, 3.5],
+ # "variable_parameter_ref_val": 3,
+ # "output_parameter_names": [
+ # "specific_emissions_per_electricity_equivalent",
+ # "total_feedinElectricity",
+ # "total_internal_generation",
+ # "peak_flow",
+ # ],
+ # # }
+ # sa = SensitivityAnalysis(input_dict)
+ # print(sa.is_valid(), sa.validation_error)
+ # sa = SensitivityAnalysis({SENSITIVITY_ANALYSIS_SETTINGS: {
+ # "variable_parameter_name": "",
+ # "variable_parameter_range": "",
+ # "variable_parameter_ref_val": "",
+ # "output_parameter_names": "",
+ # }})
diff --git a/task_queue/tasks.py b/task_queue/tasks.py
index fc2af7a..b479184 100644
--- a/task_queue/tasks.py
+++ b/task_queue/tasks.py
@@ -1,13 +1,20 @@
import os
import time
+import logging
import traceback
import json
from copy import deepcopy
from celery import Celery
-from multi_vector_simulator.server import run_simulation as mvs_simulation
+from multi_vector_simulator.server import (
+ run_simulation as mvs_simulation,
+ run_sensitivity_analysis_step as mvs_sensitivity_analysis_step,
+)
+from multi_vector_simulator.utils import set_nested_value, nested_dict_crawler
from multi_vector_simulator.utils.data_parser import convert_epa_params_to_mvs
+from sensitivity_analysis_utils import SensitivityAnalysis
+
CELERY_BROKER_URL = (os.environ.get("CELERY_BROKER_URL", "redis://localhost:6379"),)
CELERY_RESULT_BACKEND = os.environ.get(
"CELERY_RESULT_BACKEND", "redis://localhost:6379"
@@ -34,3 +41,77 @@ def run_simulation(simulation_input: dict,) -> dict:
INPUT_JSON_MVS=dict_values,
)
return json.dumps(simulation_output)
+
+
+@app.task(name=f"{CELERY_TASK_NAME}.run_sensitivity_analysis")
+def run_sensitivity_analysis(simulation_input: dict,):
+ epa_json = deepcopy(simulation_input)
+
+ # parse the sensitivity analysis settings from input json
+ sa_settings = SensitivityAnalysis(epa_json)
+
+ if sa_settings.is_valid() is False:
+ answer = dict(
+ SERVER=CELERY_TASK_NAME,
+ ERROR="{}".format(sa_settings.validation_error),
+ INPUT_JSON_EPA=epa_json,
+ SENSITIVITY_ANALYSIS_SETTINGS=str(sa_settings),
+ )
+ else:
+ mvs_dict = None
+ param_val = None
+
+ # start a mvs simulation for the reference scenario
+ reference_simulation = run_simulation.apply_async(
+ args=[epa_json], queue=CELERY_TASK_NAME
+ )
+ answer = dict(ref_sim_id=reference_simulation.id, sensitivity_analysis_ids=[])
+
+ try:
+ mvs_dict = convert_epa_params_to_mvs(epa_json)
+ task_ids = []
+ # perform one mvs sensitivity analysis step per value of the variable parameter
+ # this is similar to running a simulation, only part of the output is returned
+ for i, param_val in enumerate(sa_settings.variable_parameter_range):
+
+ modified_mvs_dict = set_nested_value(
+ mvs_dict, param_val, sa_settings.variable_parameter_name
+ )
+ result = run_sensitivity_analysis_step.apply_async(
+ args=[modified_mvs_dict, i, sa_settings.output_parameter_names],
+ queue=CELERY_TASK_NAME,
+ )
+ task_ids.append(result.id)
+ answer["sensitivity_analysis_ids"] = task_ids
+
+ except Exception:
+ answer["sensitivity_analysis_ids"] = dict(
+ SERVER=CELERY_TASK_NAME,
+ ERROR="{}".format(traceback.format_exc()),
+ INPUT_PARAM_PATH=sa_settings.variable_parameter_name,
+ INPUT_PARAM_VAL=param_val,
+ PARAM_PATHES=nested_dict_crawler(mvs_dict),
+ )
+
+ return answer
+
+
+@app.task(name=f"{CELERY_TASK_NAME}.run_sensitivity_analysis_step")
+def run_sensitivity_analysis_step(
+ mvs_dict: dict, step_idx: int, output_variables: list
+) -> dict:
+ mvs_dict
+ try:
+ simulation_output = mvs_sensitivity_analysis_step(
+ mvs_dict, step_idx, output_variables
+ )
+ simulation_output["SERVER"] = CELERY_TASK_NAME
+ except Exception as e:
+ simulation_output = dict(
+ SERVER=CELERY_TASK_NAME,
+ ERROR="{}".format(traceback.format_exc()),
+ step_idx=step_idx,
+ INPUT_JSON_MVS=mvs_dict,
+ OUTPUT_VARIABLES=output_variables,
+ )
+ return json.dumps(simulation_output)