1+ #!/usr/bin/env python3
2+ """Bootstrap curated Finnish FMI air-quality CSAPI resources."""
3+
4+ import argparse
5+ import json
6+ import os
7+ import re
8+ import sys
9+
10+ sys .path .insert (0 , os .path .join (os .path .dirname (__file__ ), ".." , ".." ))
11+ from publishers .bootstrap_helpers import (
12+ get_config , _auth_header , api_put , find_by_uid ,
13+ ensure_procedure , ensure_system , ensure_datastream , ensure_deployment ,
14+ clean_resource , add_bootstrap_args , print_summary ,
15+ )
16+
17+
18+ VALID_TIME_START = "2026-01-01T00:00:00Z"
19+ PUBLISH_INTERVAL_SECONDS = 3600
20+ PROC_UID = "urn:os4csapi:procedure:fmi-air-quality:v1"
21+ DEPLOY_ROOT_UID = "urn:os4csapi:deployment:fmi-air-quality-demo:v1"
22+ DEPLOY_GROUP_UID = "urn:os4csapi:deployment:fmi-air-quality-stations:v1"
23+ DS_OUTPUT_NAME = "fmiAirQualityObs"
24+ FMI_HOME = "https://en.ilmatieteenlaitos.fi/open-data"
25+ FMI_WFS = "https://opendata.fmi.fi/wfs"
26+
27+
28+ def _load_stations () -> list [dict ]:
29+ here = os .path .dirname (os .path .abspath (__file__ ))
30+ with open (os .path .join (here , "stations.json" ), encoding = "utf-8" ) as file :
31+ return json .load (file )["stations" ]
32+
33+
34+ def _uid_token (value : str ) -> str :
35+ return re .sub (r"[^A-Za-z0-9-]+" , "-" , value ).strip ("-" ).lower ()
36+
37+
38+ def _system_uid (station_id : str ) -> str :
39+ return f"urn:os4csapi:system:fmi-air-quality:{ _uid_token (station_id )} :v1"
40+
41+
42+ def _deploy_uid (station_id : str ) -> str :
43+ return f"urn:os4csapi:deployment:fmi-air-quality-{ _uid_token (station_id )} :v1"
44+
45+
46+ def _datastream_uid (station : dict ) -> str :
47+ return f"urn:os4csapi:datastream:fmi-air-quality:{ _uid_token (station ['stationId' ])} :{ DS_OUTPUT_NAME } :v1"
48+
49+
50+ PROCEDURE_STUB = {"type" : "Feature" , "geometry" : None , "properties" : {"uid" : PROC_UID , "featureType" : "sosa:ObservingProcedure" , "name" : "FMI Air Quality Observation v1" , "description" : "Publishes curated Finnish Meteorological Institute air-quality observations from FMI Open Data WFS." , "validTime" : [VALID_TIME_START , ".." ]}}
51+ PROCEDURE_SML = {"type" : "SimpleProcess" , "id" : PROC_UID , "uniqueId" : PROC_UID , "definition" : "sosa:ObservingProcedure" , "label" : "FMI Air Quality Observation v1" , "description" : "Fetches recent FMI Open Data hourly simple air-quality observations and publishes one combined observation per curated Finnish monitoring location." , "keywords" : ["FMI" , "Finnish Meteorological Institute" , "Finland" , "air quality" , "NO2" , "O3" , "PM10" , "PM2.5" , "WFS" ], "documents" : [{"role" : "http://dbpedia.org/resource/Web_page" , "name" : "FMI Open Data" , "link" : {"href" : FMI_HOME , "type" : "text/html" }}, {"role" : "http://dbpedia.org/resource/Web_page" , "name" : "FMI WFS" , "link" : {"href" : FMI_WFS , "type" : "text/xml" }}], "contacts" : [{"role" : "operator" , "organisationName" : "Finnish Meteorological Institute" , "contactInfo" : {"onlineResource" : {"linkage" : FMI_HOME }}}, {"role" : "publisher" , "organisationName" : "OS4CSAPI" , "contactInfo" : {"onlineResource" : {"linkage" : "https://github.com/OS4CSAPI/OSHConnect-Python" }}}]}
52+
53+
54+ def _system_stub (station : dict ) -> dict :
55+ return {"type" : "Feature" , "geometry" : {"type" : "Point" , "coordinates" : [station ["lon" ], station ["lat" ]]}, "properties" : {"uid" : _system_uid (station ["stationId" ]), "featureType" : "sosa:Sensor" , "name" : f"FMI Air Quality { station ['name' ]} " , "description" : f"Curated FMI air-quality monitoring location for { station ['name' ]} ." , "validTime" : [VALID_TIME_START , ".." ]}}
56+
57+
58+ def _system_sml (station : dict ) -> dict :
59+ return {"type" : "PhysicalSystem" , "id" : _system_uid (station ["stationId" ]), "uniqueId" : _system_uid (station ["stationId" ]), "definition" : "sosa:System" , "label" : f"FMI Air Quality { station ['name' ]} " , "description" : f"Finnish Meteorological Institute Open Data air-quality monitoring location for { station ['name' ]} ({ station .get ('region' , 'Finland' )} )." , "keywords" : ["FMI" , "Finland" , "air quality" , "air pollution" , station ["name" ], station ["stationId" ]], "identifiers" : [{"definition" : "http://sensorml.com/ont/swe/property/ShortName" , "label" : "Short Name" , "value" : f"FMI AQ { station ['name' ]} " }, {"definition" : "http://sensorml.com/ont/swe/property/StationID" , "label" : "Curated Station ID" , "value" : station ["stationId" ]}, {"definition" : "http://sensorml.com/ont/swe/property/UniqueID" , "label" : "OS4CSAPI UID" , "value" : _system_uid (station ["stationId" ])}], "classifiers" : [{"definition" : "http://sensorml.com/ont/swe/property/SensorType" , "label" : "Source Type" , "value" : "FMI air-quality monitoring station" }, {"definition" : "http://sensorml.com/ont/swe/property/IntendedApplication" , "label" : "Intended Application" , "value" : "Air-quality monitoring and environmental situational awareness" }], "contacts" : [{"role" : "operator" , "organisationName" : "Finnish Meteorological Institute" , "contactInfo" : {"onlineResource" : {"linkage" : FMI_HOME }}}], "documents" : [{"role" : "http://dbpedia.org/resource/Web_page" , "name" : "FMI Open Data" , "link" : {"href" : FMI_HOME , "type" : "text/html" }}, {"role" : "http://dbpedia.org/resource/Web_page" , "name" : "FMI WFS" , "link" : {"href" : FMI_WFS , "type" : "text/xml" }}], "characteristics" : [{"label" : "Station Properties" , "characteristics" : [{"type" : "Text" , "name" : "bbox" , "label" : "FMI BBox Query" , "value" : station .get ("bbox" , "" )}, {"type" : "Text" , "name" : "region" , "label" : "Region" , "value" : station .get ("region" , "" )}, {"type" : "Text" , "name" : "selection_reason" , "label" : "Selection Reason" , "value" : station .get ("selectionReason" , "Curated FMI air-quality station" )}, {"type" : "Text" , "name" : "license" , "label" : "License" , "value" : "FMI Open Data terms and attribution" }]}], "capabilities" : [{"definition" : "http://www.w3.org/ns/ssn/systems/SystemCapability" , "label" : "Publisher Capabilities" , "capabilities" : [{"type" : "Quantity" , "name" : "publish_interval" , "definition" : "http://qudt.org/vocab/quantitykind/Period" , "label" : "Publish Interval" , "uom" : {"code" : "s" }, "value" : PUBLISH_INTERVAL_SECONDS }]}], "position" : {"type" : "Point" , "coordinates" : [station ["lon" ], station ["lat" ]], "srsName" : "http://www.opengis.net/def/crs/EPSG/0/4326" }}
60+
61+
62+ def _datastream_schema (station : dict ) -> dict :
63+ return {"uid" : _datastream_uid (station ), "outputName" : DS_OUTPUT_NAME , "name" : "FMI Air Quality Observation" , "description" : "Recent FMI Open Data air-quality parameters for one curated Finnish monitoring location." , "documentation" : [{"title" : "FMI Open Data" , "href" : FMI_HOME , "rel" : "describedby" }], "schema" : {"obsFormat" : "application/om+json" , "resultSchema" : {"type" : "DataRecord" , "label" : "FMI Air Quality Reading" , "fields" : [{"type" : "Text" , "name" : "stationId" , "label" : "Station ID" , "definition" : "http://sensorml.com/ont/swe/property/StationID" }, {"type" : "Text" , "name" : "stationName" , "label" : "Station Name" , "definition" : "http://sensorml.com/ont/swe/property/Name" }, {"type" : "Quantity" , "name" : "no2_ugm3" , "label" : "Nitrogen Dioxide" , "definition" : "http://sensorml.com/ont/swe/property/NO2" , "uom" : {"code" : "ug/m3" }}, {"type" : "Quantity" , "name" : "o3_ugm3" , "label" : "Ozone" , "definition" : "http://sensorml.com/ont/swe/property/O3" , "uom" : {"code" : "ug/m3" }}, {"type" : "Quantity" , "name" : "pm10_ugm3" , "label" : "PM10" , "definition" : "http://sensorml.com/ont/swe/property/PM10" , "uom" : {"code" : "ug/m3" }}, {"type" : "Quantity" , "name" : "pm25_ugm3" , "label" : "PM2.5" , "definition" : "http://sensorml.com/ont/swe/property/PM2.5" , "uom" : {"code" : "ug/m3" }}, {"type" : "Quantity" , "name" : "airQualityIndex" , "label" : "Air Quality Index" , "definition" : "http://sensorml.com/ont/swe/property/AirQualityIndex" }, {"type" : "Text" , "name" : "sourceParametersJson" , "label" : "Source Parameters JSON" , "definition" : "http://sensorml.com/ont/swe/property/RawData" }, {"type" : "Text" , "name" : "sourceUrl" , "label" : "Source URL" , "definition" : "http://sensorml.com/ont/swe/property/ReferenceURL" }]}}}
64+
65+
66+ def _deploy_root () -> dict :
67+ return {"type" : "Feature" , "geometry" : {"type" : "Point" , "coordinates" : [25.0 , 63.5 ]}, "properties" : {"uid" : DEPLOY_ROOT_UID , "featureType" : "sosa:Deployment" , "name" : "FMI Air Quality Demo" , "description" : "Top-level grouping for curated FMI air-quality resources." , "validTime" : [VALID_TIME_START , ".." ]}}
68+
69+
70+ def _deploy_group () -> dict :
71+ return {"type" : "Feature" , "geometry" : {"type" : "Point" , "coordinates" : [25.0 , 63.5 ]}, "properties" : {"uid" : DEPLOY_GROUP_UID , "featureType" : "sosa:Deployment" , "name" : "FMI Air Quality Stations" , "description" : "Grouping deployment for curated FMI air-quality monitoring locations." , "validTime" : [VALID_TIME_START , ".." ]}}
72+
73+
74+ def _deploy_station (station : dict , system_server_id : str , base_url : str ) -> dict :
75+ return {"type" : "Feature" , "geometry" : {"type" : "Point" , "coordinates" : [station ["lon" ], station ["lat" ]]}, "properties" : {"uid" : _deploy_uid (station ["stationId" ]), "featureType" : "sosa:Deployment" , "name" : f"FMI Air Quality { station ['name' ]} " , "description" : f"Deployment linking FMI air-quality location { station ['name' ]} to its CSAPI system." , "validTime" : [VALID_TIME_START , ".." ], "platform@link" : {"href" : f"{ base_url .rstrip ('/' )} /systems/{ system_server_id } " , "uid" : _system_uid (station ["stationId" ]), "title" : f"FMI Air Quality { station ['name' ]} " }}}
76+
77+
78+ def _ensure_system_resilient (base_url : str , auth : str , station : dict , * , dry_run : bool , stats : dict , force_sml : bool ) -> str | None :
79+ uid = _system_uid (station ["stationId" ])
80+ try :
81+ return ensure_system (base_url , auth , uid , _system_stub (station ), _system_sml (station ), dry_run = dry_run , stats = stats , force_sml = force_sml )
82+ except RuntimeError as exc :
83+ if "HTTP 500 POST" not in str (exc ) or "/systems" not in str (exc ): raise
84+ recovered = find_by_uid (base_url , auth , "systems" , uid , no_cache = True )
85+ if not recovered : raise
86+ print (f" [WARN] Server returned HTTP 500 after creating system { uid } ; recovered id={ recovered } " )
87+ if not dry_run :
88+ try : api_put (base_url , f"systems/{ recovered } " , _system_sml (station ), auth , content_type = "application/sml+json" )
89+ except Exception as sml_exc : print (f" [WARN] SML PUT skipped for recovered system { uid } : { sml_exc } " )
90+ stats ["recovered" ] = stats .get ("recovered" , 0 ) + 1
91+ return recovered
92+
93+
94+ def clean_all (base_url : str , auth : str , * , dry_run : bool , stats : dict ):
95+ for station in _load_stations (): clean_resource (base_url , auth , "deployments" , _deploy_uid (station ["stationId" ]), dry_run = dry_run , stats = stats , cascade = True )
96+ clean_resource (base_url , auth , "deployments" , DEPLOY_GROUP_UID , dry_run = dry_run , stats = stats , cascade = True ); clean_resource (base_url , auth , "deployments" , DEPLOY_ROOT_UID , dry_run = dry_run , stats = stats , cascade = True )
97+ for station in _load_stations (): clean_resource (base_url , auth , "systems" , _system_uid (station ["stationId" ]), dry_run = dry_run , stats = stats , cascade = True )
98+ clean_resource (base_url , auth , "procedures" , PROC_UID , dry_run = dry_run , stats = stats )
99+
100+
101+ def bootstrap (* , clean : bool = False , clean_only : bool = False , dry_run : bool = False , force_sml : bool = False ):
102+ config = get_config (); base_url = config ["base_url" ]; auth = _auth_header (config ["user" ], config ["password" ]); stations = _load_stations (); stats : dict [str , int ] = {}
103+ print ("\n " + "=" * 70 ); print (" FMI Air Quality -- Bootstrap" ); print ("=" * 70 ); print (f" Server: { base_url } " ); print (f" Stations: { len (stations )} " ); print (f" Clean: { clean } Clean-only: { clean_only } Dry-run: { dry_run } Force-SML: { force_sml } \n " )
104+ if clean or clean_only :
105+ clean_all (base_url , auth , dry_run = dry_run , stats = stats )
106+ if clean_only : print_summary (stats , dry_run ); return
107+ print (" -- Procedure --" ); ensure_procedure (base_url , auth , PROC_UID , PROCEDURE_STUB , PROCEDURE_SML , dry_run = dry_run , stats = stats , force_sml = force_sml )
108+ print (" -- Systems and Datastreams --" ); system_ids = {}
109+ for station in stations :
110+ sys_id = _ensure_system_resilient (base_url , auth , station , dry_run = dry_run , stats = stats , force_sml = force_sml ); system_ids [station ["stationId" ]] = sys_id or "pending" ; ensure_datastream (base_url , auth , sys_id or "pending" , DS_OUTPUT_NAME , _datastream_schema (station ), dry_run = dry_run , stats = stats )
111+ print (" -- Deployments --" ); root_id = ensure_deployment (base_url , auth , DEPLOY_ROOT_UID , _deploy_root (), dry_run = dry_run , stats = stats ); group_id = ensure_deployment (base_url , auth , DEPLOY_GROUP_UID , _deploy_group (), parent_id = root_id , dry_run = dry_run , stats = stats )
112+ for station in stations : ensure_deployment (base_url , auth , _deploy_uid (station ["stationId" ]), _deploy_station (station , system_ids .get (station ["stationId" ], "pending" ), base_url ), parent_id = group_id , dry_run = dry_run , stats = stats )
113+ print_summary (stats , dry_run )
114+
115+
116+ def main ():
117+ parser = argparse .ArgumentParser (description = "Bootstrap FMI air-quality resources on the CSAPI server." ); add_bootstrap_args (parser ); args = parser .parse_args (); bootstrap (clean = args .clean , clean_only = args .clean_only , dry_run = args .dry_run , force_sml = args .force_sml )
118+
119+
120+ if __name__ == "__main__" :
121+ main ()
0 commit comments