From ef7e72025c3c66ed6ae38eca964a5a3f59f607d8 Mon Sep 17 00:00:00 2001 From: Tom Kralidis Date: Mon, 19 May 2025 09:53:41 -0400 Subject: [PATCH 1/4] update repository abstraction --- docker/entrypoint.py | 2 +- docs/repofilters.rst | 4 +- pycsw/core/admin.py | 41 +- pycsw/core/config.py | 2 +- ...ofilter_evaluate.py => pygeofilter_ext.py} | 18 +- pycsw/core/repository.py | 104 +++- pycsw/core/util.py | 1 + pycsw/oaipmh.py | 2 +- pycsw/ogc/api/records.py | 182 +++--- pycsw/ogc/csw/csw2.py | 120 ++-- pycsw/ogc/csw/csw3.py | 91 ++- pycsw/ogc/gml/gml3.py | 2 +- pycsw/ogc/gml/gml32.py | 2 +- pycsw/opensearch.py | 2 +- pycsw/plugins/profiles/profile.py | 2 +- pycsw/plugins/repository/odc/odc.py | 2 +- pycsw/plugins/repository/solr_metno.py | 577 ++++++++++++++++++ pycsw/server.py | 10 +- pycsw/sru.py | 2 +- pycsw/stac/api.py | 9 +- requirements-standalone.txt | 2 +- setup.py | 2 - tests/functionaltests/conftest.py | 21 +- ...t_92d4844d-57d5-4cf3-8f47-ba50e369dc04.xml | 2 +- ...t_d94c801a-1207-4897-b84a-53f3a192515b.xml | 2 +- tests/unittests/test_util.py | 2 +- 26 files changed, 956 insertions(+), 250 deletions(-) rename pycsw/core/{pygeofilter_evaluate.py => pygeofilter_ext.py} (83%) create mode 100644 pycsw/plugins/repository/solr_metno.py diff --git a/docker/entrypoint.py b/docker/entrypoint.py index 86a75beb7..6edbbeb61 100644 --- a/docker/entrypoint.py +++ b/docker/entrypoint.py @@ -80,7 +80,7 @@ def launch_pycsw(pycsw_config, workers=2, reload=False): except Exception as err: LOGGER.debug(err) - repo = Repository(database, StaticContext(), table=table) + repo = Repository(database, StaticContext()) repo.ping() diff --git a/docs/repofilters.rst b/docs/repofilters.rst index 75991816e..de3274aa8 100644 --- a/docs/repofilters.rst +++ b/docs/repofilters.rst @@ -58,9 +58,9 @@ The same CSW `GetRecords` filter as per above then yields the following results: Another example: -.. code-block:: text +.. code-block:: yaml - repository:0 + repository: database: sqlite:///records.db filter: "pycsw:ParentIdentifier != '33'" diff --git a/pycsw/core/admin.py b/pycsw/core/admin.py index 6d19ab9d4..dbe96e445 100644 --- a/pycsw/core/admin.py +++ b/pycsw/core/admin.py @@ -52,7 +52,11 @@ def load_records(context, database, table, xml_dirpath, recursive=False, force_update=False): """Load metadata records from directory of files to database""" - repo = repository.Repository(database, context, table=table) + repo_config = { + 'database': database, + 'table': table + } + repo = repository.Repository(repo_config, context) file_list = [] @@ -121,7 +125,13 @@ def load_records(context, database, table, xml_dirpath, recursive=False, force_u def export_records(context, database, table, xml_dirpath): """Export metadata records from database to directory of files""" - repo = repository.Repository(database, context, table=table) + + repo_config = { + 'database': database, + 'table': table + } + + repo = repository.Repository(repo_config, context) LOGGER.info('Querying database %s, table %s ....', database, table) records = repo.session.query(repo.dataset) @@ -190,8 +200,13 @@ def refresh_harvested_records(context, database, table, url): """refresh / harvest all non-local records in repository""" from owslib.csw import CatalogueServiceWeb + repo_config = { + 'database': database, + 'table': table + } + # get configuration and init repo connection - repos = repository.Repository(database, context, table=table) + repos = repository.Repository(repo_config, context) # get all harvested records count, records = repos.query(constraint={'where': "mdsource != 'local'", 'values': []}) @@ -228,8 +243,13 @@ def refresh_harvested_records(context, database, table, url): def gen_sitemap(context, database, table, url, output_file): """generate an XML sitemap from all records in repository""" + repo_config = { + 'database': database, + 'table': table + } + # get configuration and init repo connection - repos = repository.Repository(database, context, table=table) + repos = repository.Repository(repo_config, context) # write out sitemap document urlset = etree.Element(util.nspath_eval('sitemap:urlset', @@ -274,7 +294,7 @@ def post_xml(url, xml, timeout=30): from owslib.util import http_post try: with open(xml) as f: - return http_post(url=url, request=f.read(), timeout=timeout) + return http_post(url=url, request=f.read(), timeout=timeout).text except Exception as err: LOGGER.exception('HTTP XML POST error') raise RuntimeError(err) from err @@ -351,7 +371,12 @@ def delete_records(context, database, table): LOGGER.info('Deleting all records') - repo = repository.Repository(database, context, table=table) + repo_config = { + 'database', database, + 'table', table + } + + repo = repository.Repository(repo_config, context) repo.delete(constraint={'where': '', 'values': []}) @@ -493,7 +518,7 @@ def cli_rebuild_db_indexes(ctx, config, verbosity): context = pconfig.StaticContext() - repo = repository.Repository(cfg['repository']['database'], context, table=cfg['repository'].get('table')) + repo = repository.Repository(cfg['repository'], context) repo.rebuild_db_indexes() @@ -509,7 +534,7 @@ def cli_optimize_db(ctx, config, verbosity): context = pconfig.StaticContext() - repo = repository.Repository(cfg['repository']['database'], context, table=cfg['repository'].get('table')) + repo = repository.Repository(cfg['repository'], context) repo.optimize_db() diff --git a/pycsw/core/config.py b/pycsw/core/config.py index d9e562fa8..acd38c6a8 100644 --- a/pycsw/core/config.py +++ b/pycsw/core/config.py @@ -35,7 +35,7 @@ LOGGER = logging.getLogger(__name__) -class StaticContext(object): +class StaticContext: """core configuration""" def __init__(self, prefix='csw30'): """initializer""" diff --git a/pycsw/core/pygeofilter_evaluate.py b/pycsw/core/pygeofilter_ext.py similarity index 83% rename from pycsw/core/pygeofilter_evaluate.py rename to pycsw/core/pygeofilter_ext.py index ef77b61a8..c83db8eeb 100644 --- a/pycsw/core/pygeofilter_evaluate.py +++ b/pycsw/core/pygeofilter_ext.py @@ -2,7 +2,7 @@ # # Authors: Tom Kralidis # -# Copyright (c) 2021 Tom Kralidis +# Copyright (c) 2025 Tom Kralidis # # Permission is hereby granted, free of charge, to any person # obtaining a copy of this software and associated documentation @@ -35,6 +35,9 @@ from pygeofilter.backends.evaluator import handle from pygeofilter.backends.sqlalchemy import filters from pygeofilter.backends.sqlalchemy.evaluate import SQLAlchemyFilterEvaluator +from pygeofilter.parsers.fes.util import Element +from pygeofilter.parsers.fes.util import handle as fhandle +from pygeofilter.parsers.fes.v11 import FES11Parser from pycsw.core.util import bbox2wktpolygon @@ -81,3 +84,16 @@ def ilike(self, node, lhs): def to_filter(ast, dbtype, field_mapping=None): return PycswFilterEvaluator(field_mapping, dbtype).evaluate(ast) + +class PycswCSWFES11Parser(FES11Parser): +# def __init__(self): +# super().__init__() + + @fhandle('BBOX') + def geometry_bbox(self, node: Element, lhs, rhs, crs=None): + minx = rhs.geometry['coordinates'][0][0][1] + miny = rhs.geometry['coordinates'][0][0][0] + maxx = rhs.geometry['coordinates'][0][2][1] + maxy = rhs.geometry['coordinates'][0][2][0] + + return ast.BBox(lhs, minx, miny, maxx, maxy, crs) diff --git a/pycsw/core/repository.py b/pycsw/core/repository.py index 59093fb53..aaaf96ffe 100644 --- a/pycsw/core/repository.py +++ b/pycsw/core/repository.py @@ -5,7 +5,7 @@ # Angelos Tzotsos # Ricardo Garcia Silva # -# Copyright (c) 2024 Tom Kralidis +# Copyright (c) 2025 Tom Kralidis # Copyright (c) 2015 Angelos Tzotsos # Copyright (c) 2017 Ricardo Garcia Silva # @@ -34,6 +34,7 @@ import inspect import logging +from operator import itemgetter import os from time import sleep @@ -49,11 +50,12 @@ from pycsw.core import util from pycsw.core.etree import etree from pycsw.core.etree import PARSER +from pycsw.core.pygeofilter_ext import to_filter LOGGER = logging.getLogger(__name__) -class Repository(object): +class Repository: _engines = {} @classmethod @@ -87,14 +89,15 @@ def connect(dbapi_connection, connection_rec): return clazz._engines[url] ''' Class to interact with underlying repository ''' - def __init__(self, database, context, app_root=None, table='records', repo_filter=None): + def __init__(self, repo_object, context, app_root=None): ''' Initialize repository ''' self.context = context - self.filter = repo_filter + self.filter = repo_object.get('filter') self.fts = False - self.database = database - self.table = table + self.database = repo_object.get('database') + self.table = repo_object.get('table') + self.facets = repo_object.get('facets', []) # Don't use relative paths, this is hack to get around # most wsgi restriction... @@ -110,7 +113,7 @@ def __init__(self, database, context, app_root=None, table='records', repo_filte self.postgis_geometry_column = None - schema_name, table_name = table.rpartition(".")[::2] + schema_name, table_name = self.table.rpartition(".")[::2] default_table_args = { "autoload": True, @@ -145,6 +148,7 @@ def __init__(self, database, context, app_root=None, table='records', repo_filte temp_dbtype = None self.query_mappings = { + # OGC API - Records mappings 'identifier': self.dataset.identifier, 'type': self.dataset.type, 'typename': self.dataset.typename, @@ -167,6 +171,10 @@ def __init__(self, database, context, app_root=None, table='records', repo_filte 'off_nadir': self.dataset.illuminationelevationangle } + LOGGER.debug('adding OGC CSW mappings') + for key, value in self.context.models['csw']['typenames']['csw:Record']['queryables']['SupportedDublinCoreQueryables'].items(): + self.query_mappings[key] = util.getqattr(self.dataset, value['dbcol']) + if self.dbtype == 'postgresql': # check if PostgreSQL is enabled with PostGIS 1.x try: @@ -410,18 +418,34 @@ def query_source(self, source): query = self.session.query(self.dataset).filter(column == source) return self._get_repo_filter(query).all() - def query(self, constraint, sortby=None, typenames=None, + def query(self, constraint=None, sortby=None, typenames=None, maxrecords=10, startposition=0): ''' Query records from underlying repository ''' - # run the raw query and get total - if 'where' in constraint: # GetRecords with constraint - LOGGER.debug('constraint detected') - query = self.session.query(self.dataset).filter( - text(constraint['where'])).params(self._create_values(constraint['values'])) - else: # GetRecords sans constraint - LOGGER.debug('No constraint detected') - query = self.session.query(self.dataset) + if constraint.get('ast') is not None: # GetRecords with pygeofilter AST + LOGGER.debug('pygeofilter AST detected') + LOGGER.debug('Transforming AST into filters') + try: + filters = to_filter(constraint['ast'], self.dbtype, self.query_mappings) + LOGGER.debug(f'Filter: {filters}') + except Exception as err: + msg = f'AST evaluator error: {str(err)}' + LOGGER.exception(msg) + raise RuntimeError(msg) + + query = self.session.query(self.dataset).filter(filters) + + else: # GetRecords sans pygeofilter AST + LOGGER.debug('No pygeofilter AST detected') + + # run the raw query and get total + if 'where' in constraint: # GetRecords with constraint + LOGGER.debug('constraint detected') + query = self.session.query(self.dataset).filter( + text(constraint['where'])).params(self._create_values(constraint['values'])) + else: # GetRecords sans constraint + LOGGER.debug('No constraint detected') + query = self.session.query(self.dataset) total = self._get_repo_filter(query).count() @@ -438,7 +462,10 @@ def query(self, constraint, sortby=None, typenames=None, if sortby is not None: # apply sorting LOGGER.debug('sorting detected') # TODO: Check here for dbtype so to extract wkt from postgis native to wkt - sortby_column = getattr(self.dataset, sortby['propertyname']) + try: + sortby_column = getattr(self.dataset, sortby['propertyname']) + except: + sortby_column = self.query_mappings.get(sortby['propertyname']) if sortby['order'] == 'DESC': # descending sort if 'spatial' in sortby and sortby['spatial']: # spatial sort @@ -452,9 +479,50 @@ def query(self, constraint, sortby=None, typenames=None, query = query.order_by(sortby_column) # always apply limit and offset - return [str(total), self._get_repo_filter(query).limit( + return [total, self._get_repo_filter(query).limit( maxrecords).offset(startposition).all()] + def get_facets(self, ast=None) -> dict: + """ + Gets all facets for a given query + + :returns: `dict` of facets + """ + + facets_results = {} + + for facet in self.facets: + LOGGER.debug(f'Running facet for {facet}') + facetq = self.session.query(self.query_mappings[facet], self.func.count(facet)).group_by(facet) + + if ast is not None: + try: + filters = to_filter(ast, self.dbtype, self.query_mappings) + LOGGER.debug(f'Filter: {filters}') + except Exception as err: + msg = f'AST evaluator error: {str(err)}' + LOGGER.exception(msg) + raise RuntimeError(msg) + + facetq = facetq.filter(filters) + + LOGGER.debug('Writing facet query results') + facets_results[facet] = { + 'type': 'terms', + 'property': facet, + 'buckets': [] + } + + for fq in facetq.all(): + facets_results[facet]['buckets'].append({ + 'value': fq[0], + 'count': fq[1] + }) + + facets_results[facet]['buckets'].sort(key=itemgetter('count'), reverse=True) + + return facets_results + def insert(self, record, source, insert_date): ''' Insert a record into the repository ''' diff --git a/pycsw/core/util.py b/pycsw/core/util.py index 80c2b56fa..4ec35eec7 100644 --- a/pycsw/core/util.py +++ b/pycsw/core/util.py @@ -269,6 +269,7 @@ def transform_mappings(queryables, typename): def getqattr(obj, name): """Get value of an object, safely""" result = None + item = None try: item = getattr(obj, name) value = item() diff --git a/pycsw/oaipmh.py b/pycsw/oaipmh.py index a3a665934..52ea9339c 100644 --- a/pycsw/oaipmh.py +++ b/pycsw/oaipmh.py @@ -35,7 +35,7 @@ LOGGER = logging.getLogger(__name__) -class OAIPMH(object): +class OAIPMH: """OAI-PMH wrapper class""" def __init__(self, context, config): LOGGER.debug('Initializing OAI-PMH constants') diff --git a/pycsw/ogc/api/records.py b/pycsw/ogc/api/records.py index c9eb4d22a..d76f6b359 100644 --- a/pycsw/ogc/api/records.py +++ b/pycsw/ogc/api/records.py @@ -29,10 +29,9 @@ # # ================================================================= -from datetime import datetime +import datetime import json import logging -from operator import itemgetter import os from urllib.parse import urlencode, quote @@ -44,7 +43,6 @@ from pycsw.core import log from pycsw.core.config import StaticContext from pycsw.core.metadata import parse_record -from pycsw.core.pygeofilter_evaluate import to_filter from pycsw.core.util import bind_url, get_today_and_now, jsonify_links, load_custom_repo_mappings, str2bool, wkt2geom from pycsw.ogc.api.oapi import gen_oapi from pycsw.ogc.api.util import match_env_var, render_j2_template, to_json, to_rfc3339 @@ -114,8 +112,6 @@ def __init__(self, config: dict): self.limit = 10 LOGGER.debug(f'limit: {self.limit}') - repo_filter = self.config['repository'].get('filter') - custom_mappings_path = self.config['repository'].get('mappings') if custom_mappings_path is not None: md_core_model = load_custom_repo_mappings(custom_mappings_path) @@ -127,19 +123,43 @@ def __init__(self, config: dict): self.orm = 'sqlalchemy' from pycsw.core import repository - try: - LOGGER.info('Loading default repository') - self.repository = repository.Repository( - self.config['repository']['database'], - self.context, - table=self.config['repository']['table'], - repo_filter=repo_filter - ) - LOGGER.debug(f'Repository loaded {self.repository.dbtype}') - except Exception as err: - msg = f'Could not load repository {err}' - LOGGER.exception(msg) - raise + + if 'source' in self.config['repository']: # load custom repository + rs = self.config['repository']['source'] + rs_modname, rs_clsname = rs.rsplit('.', 1) + + rs_mod = __import__(rs_modname, globals(), locals(), [rs_clsname]) + rs_cls = getattr(rs_mod, rs_clsname) + + try: + connection_done = False + max_attempts = 0 + max_retries = self.config['repository'].get('maxretries', 5) + while not connection_done and max_attempts <= max_retries: + try: + self.repository = rs_cls(self.config['repository'], self.context) + LOGGER.debug('Custom repository %s loaded' % self.config['repository']['source']) + connection_done = True + except Exception as err: + LOGGER.debug(f'Repository not loaded retry connection {max_attempts}: {err}') + max_attempts += 1 + except Exception as err: + msg = 'Could not load custom repository %s: %s' % (rs, err) + LOGGER.exception(msg) + error = 1 + code = 'NoApplicableCode' + locator = 'service' + text = 'Could not initialize repository. Check server logs' + + else: + try: + LOGGER.info('Loading default repository') + self.repository = repository.Repository(self.config['repository'], self.context) + LOGGER.debug(f'Repository loaded {self.repository.dbtype}') + except Exception as err: + msg = f'Could not load repository {err}' + LOGGER.exception(msg) + raise def get_content_type(self, headers, args): """ @@ -540,7 +560,6 @@ def items(self, headers_, json_post_data, args, collection='metadata:main'): response = { 'type': 'FeatureCollection', - 'facets': [], 'features': [], 'links': [] } @@ -691,60 +710,24 @@ def items(self, headers_, json_post_data, args, collection='metadata:main'): LOGGER.debug('Detected CQL JSON; ignoring all other query predicates') query_parser = parse_cql2_json - LOGGER.debug(f'query parser: {query_parser}') - - if query_parser is not None and cql_query != {}: - LOGGER.debug('Parsing CQL into AST') - LOGGER.debug(json_post_data) - LOGGER.debug(cql_query) - try: - ast = query_parser(cql_query) - LOGGER.debug(f'Abstract syntax tree: {ast}') - except Exception as err: - msg = f'CQL parsing error: {str(err)}' - LOGGER.exception(msg) - return self.get_exception(400, headers_, 'InvalidParameterValue', msg) - - LOGGER.debug('Transforming AST into filters') - try: - filters = to_filter(ast, self.repository.dbtype, self.repository.query_mappings) - LOGGER.debug(f'Filter: {filters}') - except Exception as err: - msg = f'CQL evaluator error: {str(err)}' - LOGGER.exception(msg) - return self.get_exception(400, headers_, 'InvalidParameterValue', msg) - - query = self.repository.session.query(self.repository.dataset).filter(filters) - if facets_requested: - LOGGER.debug('Running facet query') - facets_results = self.get_facets(filters) - else: - query = self.repository.session.query(self.repository.dataset) - facets_results = self.get_facets() - - if facets_requested: - response['facets'] = facets_results - else: - response.pop('facets') - - if 'sortby' in args: + if args.get('sortby') is not None: LOGGER.debug('sortby specified') - sortby = args['sortby'] + sortby = { + 'order': 'ASC', + 'propertyname': args['sortby'] + } - if sortby is not None: LOGGER.debug('processing sortby') - if sortby.startswith('-'): - sortby = sortby.lstrip('-') + if args['sortby'].startswith('-'): + sortby['order'] = 'DESC' + sortby['propertyname'] = args['sortby'].lstrip('-') - if sortby not in list(self.repository.query_mappings.keys()): + if sortby['propertyname'] not in list(self.repository.query_mappings.keys()): msg = 'Invalid sortby property' LOGGER.exception(msg) return self.get_exception(400, headers_, 'InvalidParameterValue', msg) - - if args['sortby'].startswith('-'): - query = query.order_by(self.repository.query_mappings[sortby].desc()) - else: - query = query.order_by(self.repository.query_mappings[sortby]) + else: + sortby = None if limit is None and 'limit' in args: limit = int(args['limit']) @@ -762,13 +745,28 @@ def items(self, headers_, json_post_data, args, collection='metadata:main'): offset = int(args.get('offset', 0)) - LOGGER.debug(f'Query: {query}') - LOGGER.debug('Querying repository') - count = query.count() - LOGGER.debug(f'count: {count}') - LOGGER.debug(f'limit: {limit}') - LOGGER.debug(f'offset: {offset}') - records = query.limit(limit).offset(offset).all() + if query_parser is not None and cql_query != {}: + LOGGER.debug('Parsing CQL into AST') + LOGGER.debug(json_post_data) + LOGGER.debug(cql_query) + try: + ast = query_parser(cql_query) + LOGGER.debug(f'Abstract syntax tree: {ast}') + except Exception as err: + msg = f'CQL parsing error: {str(err)}' + LOGGER.exception(msg) + return self.get_exception(400, headers_, 'InvalidParameterValue', msg) + + else: + ast = None + + count, records = self.repository.query( + constraint={'ast': ast}, sortby=sortby, maxrecords=limit, + startposition=offset) + + if facets_requested: + LOGGER.debug('Running facet query') + response['facets'] = self.repository.get_facets(ast) returned = len(records) @@ -888,7 +886,8 @@ def items(self, headers_, json_post_data, args, collection='metadata:main'): 'hreflang': self.config['server']['language'] }) - response['timeStamp'] = datetime.utcnow().strftime('%Y-%m-%dT%H:%M:%S.%fZ') + response['timeStamp'] = datetime.datetime.now( + datetime.UTC).strftime('%Y-%m-%dT%H:%M:%S.%fZ') if headers_['Content-Type'] == 'text/html': response['title'] = self.config['metadata']['identification']['title'] @@ -1133,39 +1132,6 @@ def get_all_collections(self) -> list: return [default_collection] + [vc.identifier for vc in virtual_collections] - def get_facets(self, filters=None) -> dict: - """ - Gets all facets for a given query - - :returns: `dict` of facets - """ - - facets_results = {} - - for facet in self.facets: - LOGGER.debug(f'Running facet for {facet}') - facetq = self.repository.session.query(self.repository.query_mappings[facet], self.repository.func.count(facet)).group_by(facet) - - if filters is not None: - facetq = facetq.filter(filters) - - LOGGER.debug('Writing facet query results') - facets_results[facet] = { - 'type': 'terms', - 'property': facet, - 'buckets': [] - } - - for fq in facetq.all(): - facets_results[facet]['buckets'].append({ - 'value': fq[0], - 'count': fq[1] - }) - - facets_results[facet]['buckets'].sort(key=itemgetter('count'), reverse=True) - - return facets_results - def record2json(record, url, collection, mode='ogcapi-records'): """ @@ -1181,7 +1147,7 @@ def record2json(record, url, collection, mode='ogcapi-records'): if record.metadata_type in ['application/json', 'application/geo+json']: rec = json.loads(record.metadata) - if rec.get('stac_version') is not None and rec['type'] == 'Feature' and mode == 'stac-api': + if rec.get('stac_version') is not None and rec['type'] in ['Collection', 'Feature'] and mode == 'stac-api': LOGGER.debug('Returning native STAC representation') rec['links'].extend([{ 'rel': 'self', diff --git a/pycsw/ogc/csw/csw2.py b/pycsw/ogc/csw/csw2.py index 43e2f5718..7ab3fdd2f 100644 --- a/pycsw/ogc/csw/csw2.py +++ b/pycsw/ogc/csw/csw2.py @@ -4,7 +4,7 @@ # Authors: Tom Kralidis # Angelos Tzotsos # -# Copyright (c) 2024 Tom Kralidis +# Copyright (c) 2025 Tom Kralidis # Copyright (c) 2015 Angelos Tzotsos # # Permission is hereby granted, free of charge, to any person @@ -31,6 +31,11 @@ # ================================================================= import os + +from pygeofilter.parsers.ecql.parser import parse as ecql_parse +from pygeofilter.parsers.fes.v11 import parse as fes1_parse +from pycsw.core.pygeofilter_ext import PycswCSWFES11Parser + from pycsw.core.etree import etree from pycsw import opensearch from pycsw.ogc.csw.cql import cql2fes @@ -42,7 +47,7 @@ LOGGER = logging.getLogger(__name__) -class Csw2(object): +class Csw2: ''' CSW 2.x server ''' def __init__(self, server_csw): ''' Initialize CSW2 ''' @@ -727,37 +732,48 @@ def getrecords(self): if self.parent.kvp['constraintlanguage'] == 'CQL_TEXT': tmp = self.parent.kvp['constraint'] try: - LOGGER.info('Transforming CQL into fes1') - LOGGER.debug('CQL: %s', tmp) - self.parent.kvp['constraint'] = {} - self.parent.kvp['constraint']['type'] = 'filter' - cql = cql2fes(tmp, self.parent.context.namespaces, fes_version='1.0') - self.parent.kvp['constraint']['where'], self.parent.kvp['constraint']['values'] = fes1.parse(cql, - self.parent.repository.queryables['_all'], self.parent.repository.dbtype, - self.parent.context.namespaces, self.parent.orm, self.parent.language['text'], self.parent.repository.fts) - self.parent.kvp['constraint']['_dict'] = xml2dict(etree.tostring(cql), self.parent.context.namespaces) + if self.parent.repository.database is not None: + LOGGER.info('Transforming CQL into fes1') + LOGGER.debug('CQL: %s', tmp) + self.parent.kvp['constraint'] = {} + self.parent.kvp['constraint']['type'] = 'filter' + cql = cql2fes(tmp, self.parent.context.namespaces, fes_version='1.0') + self.parent.kvp['constraint']['where'], self.parent.kvp['constraint']['values'] = fes1.parse(cql, + self.parent.repository.queryables['_all'], self.parent.repository.dbtype, + self.parent.context.namespaces, self.parent.orm, self.parent.language['text'], self.parent.repository.fts) + self.parent.kvp['constraint']['_dict'] = xml2dict(etree.tostring(cql), self.parent.context.namespaces) + else: + self.parent.kvp['constraint'] = { + 'ast': ecql_parse(tmp) + } except Exception as err: LOGGER.exception('Invalid CQL query %s', tmp) return self.exceptionreport('InvalidParameterValue', 'constraint', 'Invalid Filter syntax') elif self.parent.kvp['constraintlanguage'] == 'FILTER': + tmp = self.parent.kvp['constraint'] # validate filter XML try: - schema = os.path.join(self.parent.config['server'].get('home'), - 'core', 'schemas', 'ogc', 'filter', '1.1.0', 'filter.xsd') - LOGGER.info('Validating Filter %s', self.parent.kvp['constraint']) - schema = etree.XMLSchema(file=schema) - parser = etree.XMLParser(schema=schema, resolve_entities=False) - doc = etree.fromstring(self.parent.kvp['constraint'], parser) - LOGGER.debug('Filter is valid XML') - self.parent.kvp['constraint'] = {} - self.parent.kvp['constraint']['type'] = 'filter' - self.parent.kvp['constraint']['where'], self.parent.kvp['constraint']['values'] = \ - fes1.parse(doc, - self.parent.repository.queryables['_all'], - self.parent.repository.dbtype, - self.parent.context.namespaces, self.parent.orm, self.parent.language['text'], self.parent.repository.fts) - self.parent.kvp['constraint']['_dict'] = xml2dict(etree.tostring(doc), self.parent.context.namespaces) + if self.parent.repository.database is not None: + schema = os.path.join(self.parent.config['server'].get('home'), + 'core', 'schemas', 'ogc', 'filter', '1.1.0', 'filter.xsd') + LOGGER.info('Validating Filter %s', self.parent.kvp['constraint']) + schema = etree.XMLSchema(file=schema) + parser = etree.XMLParser(schema=schema, resolve_entities=False) + doc = etree.fromstring(self.parent.kvp['constraint'], parser) + LOGGER.debug('Filter is valid XML') + self.parent.kvp['constraint'] = {} + self.parent.kvp['constraint']['type'] = 'filter' + self.parent.kvp['constraint']['where'], self.parent.kvp['constraint']['values'] = \ + fes1.parse(doc, + self.parent.repository.queryables['_all'], + self.parent.repository.dbtype, + self.parent.context.namespaces, self.parent.orm, self.parent.language['text'], self.parent.repository.fts) + self.parent.kvp['constraint']['_dict'] = xml2dict(etree.tostring(doc), self.parent.context.namespaces) + else: + self.parent.kvp['constraint'] = { + 'ast': fes1_parse(tmp) + } except Exception as err: errortext = \ 'Exception: document not valid.\nError: %s.' % str(err) @@ -765,7 +781,12 @@ def getrecords(self): return self.exceptionreport('InvalidParameterValue', 'constraint', 'Invalid Filter query: %s' % errortext) else: - self.parent.kvp['constraint'] = {} + if self.parent.repository.database is not None: + self.parent.kvp['constraint'] = {} + else: + self.parent.kvp['constraint'] = { + 'ast': None + } if 'sortby' not in self.parent.kvp: self.parent.kvp['sortby'] = None @@ -909,8 +930,8 @@ def getrecords(self): searchresults = etree.SubElement(node, util.nspath_eval('csw:SearchResults', self.parent.context.namespaces), - numberOfRecordsMatched=matched, numberOfRecordsReturned=returned, - nextRecord=nextrecord, recordSchema=self.parent.kvp['outputschema']) + numberOfRecordsMatched=str(matched), numberOfRecordsReturned=str(returned), + nextRecord=str(nextrecord), recordSchema=self.parent.kvp['outputschema']) if self.parent.kvp['elementsetname'] is not None: searchresults.attrib['elementSet'] = self.parent.kvp['elementsetname'] @@ -1386,7 +1407,8 @@ def harvest(self): inserted += 1 try: tmp = self.parent.repository.insert(record, source, insert_date) - if tmp is not None: ir = tmp + if tmp is not None: + ir = tmp except Exception as err: return self.exceptionreport('NoApplicableCode', 'source', 'Harvest (insert) failed: %s.' % str(err)) @@ -1573,11 +1595,15 @@ def _parse_constraint(self, element): if tmp is not None: LOGGER.debug('Filter constraint specified') try: - query['type'] = 'filter' - query['where'], query['values'] = fes1.parse(tmp, - self.parent.repository.queryables['_all'], self.parent.repository.dbtype, - self.parent.context.namespaces, self.parent.orm, self.parent.language['text'], self.parent.repository.fts) - query['_dict'] = xml2dict(etree.tostring(tmp), self.parent.context.namespaces) + if self.parent.repository.database is not None: + query['type'] = 'filter' + query['where'], query['values'] = fes1.parse(tmp, + self.parent.repository.queryables['_all'], self.parent.repository.dbtype, + self.parent.context.namespaces, self.parent.orm, self.parent.language['text'], self.parent.repository.fts) + query['_dict'] = xml2dict(etree.tostring(tmp), self.parent.context.namespaces) + else: + #query['ast'] = PycswCSWFES11Parser().parse(etree.tostring(tmp)) + query['ast'] = fes1_parse(etree.tostring(tmp)) except Exception as err: return 'Invalid Filter request: %s' % err @@ -1585,13 +1611,16 @@ def _parse_constraint(self, element): if tmp is not None: LOGGER.debug('CQL specified: %s.', tmp.text) try: - LOGGER.info('Transforming CQL into OGC Filter') - query['type'] = 'filter' - cql = cql2fes(tmp.text, self.parent.context.namespaces, fes_version='1.0') - query['where'], query['values'] = fes1.parse(cql, - self.parent.repository.queryables['_all'], self.parent.repository.dbtype, - self.parent.context.namespaces, self.parent.orm, self.parent.language['text'], self.parent.repository.fts) - query['_dict'] = xml2dict(etree.tostring(cql), self.parent.context.namespaces) + if self.parent.repository.database is not None: + LOGGER.info('Transforming CQL into OGC Filter') + query['type'] = 'filter' + cql = cql2fes(tmp.text, self.parent.context.namespaces, fes_version='1.0') + query['where'], query['values'] = fes1.parse(cql, + self.parent.repository.queryables['_all'], self.parent.repository.dbtype, + self.parent.context.namespaces, self.parent.orm, self.parent.language['text'], self.parent.repository.fts) + query['_dict'] = xml2dict(etree.tostring(cql), self.parent.context.namespaces) + else: + query['ast'] = ecql_parse(etree.tostring(tmp)) except Exception as err: LOGGER.exception('Invalid CQL request: %s', tmp.text) LOGGER.exception('Error message: %s', err) @@ -1939,12 +1968,12 @@ def _write_acknowledgement(self, root=True): ''' Generate csw:Acknowledgement ''' node = etree.Element(util.nspath_eval('csw:Acknowledgement', self.parent.context.namespaces), - nsmap = self.parent.context.namespaces, timeStamp=util.get_today_and_now()) + nsmap=self.parent.context.namespaces, timeStamp=util.get_today_and_now()) if root: node.attrib[util.nspath_eval('xsi:schemaLocation', self.parent.context.namespaces)] = \ - '%s %s/csw/2.0.2/CSW-discovery.xsd' % (self.parent.context.namespaces['csw'], \ + '%s %s/csw/2.0.2/CSW-discovery.xsd' % (self.parent.context.namespaces['csw'], self.parent.config['server'].get('ogc_schemas_base')) node1 = etree.SubElement(node, util.nspath_eval('csw:EchoedRequest', @@ -2014,11 +2043,12 @@ def exceptionreport(self, code, locator, text): try: exception_text.text = text - except ValueError as err: + except ValueError: exception_text.text = repr(text) return node + def write_boundingbox(bbox, nsmap): ''' Generate ows:BoundingBox ''' diff --git a/pycsw/ogc/csw/csw3.py b/pycsw/ogc/csw/csw3.py index e62524acc..ebee1bb36 100644 --- a/pycsw/ogc/csw/csw3.py +++ b/pycsw/ogc/csw/csw3.py @@ -3,7 +3,7 @@ # # Authors: Tom Kralidis # -# Copyright (c) 2024 Tom Kralidis +# Copyright (c) 2025 Tom Kralidis # # Permission is hereby granted, free of charge, to any person # obtaining a copy of this software and associated documentation @@ -30,6 +30,10 @@ import os from time import time + +from pygeofilter.parsers.ecql.parser import parse as ecql_parse +from pygeofilter.parsers.fes.v11 import parse as fes1_parse + from pycsw.core.etree import etree from pycsw.ogc.csw.cql import cql2fes from pycsw import opensearch @@ -41,7 +45,7 @@ LOGGER = logging.getLogger(__name__) -class Csw3(object): +class Csw3: ''' CSW 3.x server ''' def __init__(self, server_csw): ''' Initialize CSW3 ''' @@ -755,15 +759,20 @@ def getrecords(self): if self.parent.kvp['constraintlanguage'] == 'CQL_TEXT': tmp = self.parent.kvp['constraint'] try: - LOGGER.info('Transforming CQL into fes1') LOGGER.debug('CQL: %s', tmp) - self.parent.kvp['constraint'] = {} - self.parent.kvp['constraint']['type'] = 'filter' - cql = cql2fes(tmp, self.parent.context.namespaces, fes_version='1.0') - self.parent.kvp['constraint']['where'], self.parent.kvp['constraint']['values'] = fes1.parse(cql, - self.parent.repository.queryables['_all'], self.parent.repository.dbtype, - self.parent.context.namespaces, self.parent.orm, self.parent.language['text'], self.parent.repository.fts) - self.parent.kvp['constraint']['_dict'] = xml2dict(etree.tostring(cql), self.parent.context.namespaces) + LOGGER.info('Transforming CQL into fes1') + if self.parent.repository.database is not None: + self.parent.kvp['constraint'] = {} + self.parent.kvp['constraint']['type'] = 'filter' + cql = cql2fes(tmp, self.parent.context.namespaces, fes_version='1.0') + self.parent.kvp['constraint']['where'], self.parent.kvp['constraint']['values'] = fes1.parse(cql, + self.parent.repository.queryables['_all'], self.parent.repository.dbtype, + self.parent.context.namespaces, self.parent.orm, self.parent.language['text'], self.parent.repository.fts) + self.parent.kvp['constraint']['_dict'] = xml2dict(etree.tostring(cql), self.parent.context.namespaces) + else: + self.parent.kvp['constraint'] = { + 'ast': ecql_parse(etree.tostring(tmp)) + } except Exception as err: LOGGER.exception('Invalid CQL query %s', tmp) return self.exceptionreport('InvalidParameterValue', @@ -778,14 +787,19 @@ def getrecords(self): parser = etree.XMLParser(schema=schema, resolve_entities=False) doc = etree.fromstring(self.parent.kvp['constraint'], parser) LOGGER.debug('Filter is valid XML.') - self.parent.kvp['constraint'] = {} - self.parent.kvp['constraint']['type'] = 'filter' - self.parent.kvp['constraint']['where'], self.parent.kvp['constraint']['values'] = \ - fes2.parse(doc, - self.parent.repository.queryables['_all'], - self.parent.repository.dbtype, - self.parent.context.namespaces, self.parent.orm, self.parent.language['text'], self.parent.repository.fts) - self.parent.kvp['constraint']['_dict'] = xml2dict(etree.tostring(doc), self.parent.context.namespaces) + if self.parent.repository.database is not None: + self.parent.kvp['constraint'] = {} + self.parent.kvp['constraint']['type'] = 'filter' + self.parent.kvp['constraint']['where'], self.parent.kvp['constraint']['values'] = \ + fes2.parse(doc, + self.parent.repository.queryables['_all'], + self.parent.repository.dbtype, + self.parent.context.namespaces, self.parent.orm, self.parent.language['text'], self.parent.repository.fts) + self.parent.kvp['constraint']['_dict'] = xml2dict(etree.tostring(doc), self.parent.context.namespaces) + else: + self.parent.kvp['constraint'] = { + 'ast': fes1_parse(etree.tostring(self.parent.kvp['constraint'])) + } except Exception as err: errortext = \ 'Exception: document not valid.\nError: %s' % str(err) @@ -794,7 +808,12 @@ def getrecords(self): return self.exceptionreport('InvalidParameterValue', 'bbox', 'Invalid Filter query: %s' % errortext) else: - self.parent.kvp['constraint'] = {} + if self.parent.repository.database is not None: + self.parent.kvp['constraint'] = {} + else: + self.parent.kvp['constraint'] = { + 'ast': None + } if 'sortby' not in self.parent.kvp: self.parent.kvp['sortby'] = None @@ -897,8 +916,8 @@ def getrecords(self): searchresults = etree.SubElement(node, util.nspath_eval('csw30:SearchResults', self.parent.context.namespaces), - numberOfRecordsMatched=matched, numberOfRecordsReturned=returned, - nextRecord=nextrecord, recordSchema=self.parent.kvp['outputschema'], + numberOfRecordsMatched=str(matched), numberOfRecordsReturned=str(returned), + nextRecord=str(nextrecord), recordSchema=self.parent.kvp['outputschema'], expires=timestamp, status=get_resultset_status(matched, nextrecord)) if self.parent.kvp['elementsetname'] is not None: @@ -1646,11 +1665,14 @@ def _parse_constraint(self, element): if tmp is not None: LOGGER.debug('Filter constraint specified') try: - query['type'] = 'filter' - query['where'], query['values'] = fes2.parse(tmp, - self.parent.repository.queryables['_all'], self.parent.repository.dbtype, - self.parent.context.namespaces, self.parent.orm, self.parent.language['text'], self.parent.repository.fts) - query['_dict'] = xml2dict(etree.tostring(tmp), self.parent.context.namespaces) + if self.parent.repository.database is not None: + query['type'] = 'filter' + query['where'], query['values'] = fes2.parse(tmp, + self.parent.repository.queryables['_all'], self.parent.repository.dbtype, + self.parent.context.namespaces, self.parent.orm, self.parent.language['text'], self.parent.repository.fts) + query['_dict'] = xml2dict(etree.tostring(tmp), self.parent.context.namespaces) + else: + query['ast'] = fes1_parse(etree.tostring(tmp)) except Exception as err: return 'Invalid Filter request: %s' % err @@ -1658,13 +1680,16 @@ def _parse_constraint(self, element): if tmp is not None: LOGGER.debug('CQL specified: %s.', tmp.text) try: - LOGGER.info('Transforming CQL into OGC Filter') - query['type'] = 'filter' - cql = cql2fes(tmp.text, self.parent.context.namespaces, fes_version='2.0') - query['where'], query['values'] = fes2.parse(cql, - self.parent.repository.queryables['_all'], self.parent.repository.dbtype, - self.parent.context.namespaces, self.parent.orm, self.parent.language['text'], self.parent.repository.fts) - query['_dict'] = xml2dict(etree.tostring(cql), self.parent.context.namespaces) + if self.parent.repository.database is not None: + LOGGER.info('Transforming CQL into OGC Filter') + query['type'] = 'filter' + cql = cql2fes(tmp.text, self.parent.context.namespaces, fes_version='2.0') + query['where'], query['values'] = fes2.parse(cql, + self.parent.repository.queryables['_all'], self.parent.repository.dbtype, + self.parent.context.namespaces, self.parent.orm, self.parent.language['text'], self.parent.repository.fts) + query['_dict'] = xml2dict(etree.tostring(cql), self.parent.context.namespaces) + else: + query['ast'] = ecql_parse(etree.tostring(tmp)) except Exception as err: LOGGER.exception('Invalid CQL request: %s', tmp.text) LOGGER.exception('Error message: %s', err) diff --git a/pycsw/ogc/gml/gml3.py b/pycsw/ogc/gml/gml3.py index ff51c2b8c..80038577a 100644 --- a/pycsw/ogc/gml/gml3.py +++ b/pycsw/ogc/gml/gml3.py @@ -70,7 +70,7 @@ def _poslist2wkt(poslist, axisorder, geomtype): return ', '.join(poslist2) -class Geometry(object): +class Geometry: """base geometry class""" def __init__(self, element, nsmap): diff --git a/pycsw/ogc/gml/gml32.py b/pycsw/ogc/gml/gml32.py index 4fbb07011..aee90f8ac 100644 --- a/pycsw/ogc/gml/gml32.py +++ b/pycsw/ogc/gml/gml32.py @@ -70,7 +70,7 @@ def _poslist2wkt(poslist, axisorder, geomtype): return ', '.join(poslist2) -class Geometry(object): +class Geometry: """base geometry class""" def __init__(self, element, nsmap): diff --git a/pycsw/opensearch.py b/pycsw/opensearch.py index 3415bd24a..bd94fa308 100644 --- a/pycsw/opensearch.py +++ b/pycsw/opensearch.py @@ -59,7 +59,7 @@ ] -class OpenSearch(object): +class OpenSearch: """OpenSearch wrapper class""" def __init__(self, context): diff --git a/pycsw/plugins/profiles/profile.py b/pycsw/plugins/profiles/profile.py index f3046b90f..73474535b 100644 --- a/pycsw/plugins/profiles/profile.py +++ b/pycsw/plugins/profiles/profile.py @@ -33,7 +33,7 @@ import os import warnings -class Profile(object): +class Profile: ''' base Profile class ''' def __init__(self, name, version, title, url, namespace, typename, outputschema, prefixes, model, core_namespaces, diff --git a/pycsw/plugins/repository/odc/odc.py b/pycsw/plugins/repository/odc/odc.py index 7a7af2b3f..8a58b2839 100644 --- a/pycsw/plugins/repository/odc/odc.py +++ b/pycsw/plugins/repository/odc/odc.py @@ -35,7 +35,7 @@ from pycsw.core import repository, util from OpenDataCatalog.opendata.models import Resource -class OpenDataCatalogRepository(object): +class OpenDataCatalogRepository: ''' Class to interact with underlying repository ''' def __init__(self, context, repo_filter=None): ''' Initialize repository ''' diff --git a/pycsw/plugins/repository/solr_metno.py b/pycsw/plugins/repository/solr_metno.py new file mode 100644 index 000000000..b134fdd38 --- /dev/null +++ b/pycsw/plugins/repository/solr_metno.py @@ -0,0 +1,577 @@ +# ================================================================= +# +# Authors: Massimo Di Stefano +# Magnar Martinsen +# Tom Kralidis +# +# Copyright (c) 2025 Massimo Di Stefano +# Copyright (c) 2025 Magnar Martinsen +# Copyright (c) 2025 Tom Kralidis +# +# Permission is hereby granted, free of charge, to any person +# obtaining a copy of this software and associated documentation +# files (the "Software"), to deal in the Software without +# restriction, including without limitation the rights to use, +# copy, modify, merge, publish, distribute, sublicense, and/or sell +# copies of the Software, and to permit persons to whom the +# Software is furnished to do so, subject to the following +# conditions: +# +# The above copyright notice and this permission notice shall be +# included in all copies or substantial portions of the Software. +# +# THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, +# EXPRESS OR IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES +# OF MERCHANTABILITY, FITNESS FOR A PARTICULAR PURPOSE AND +# NONINFRINGEMENT. IN NO EVENT SHALL THE AUTHORS OR COPYRIGHT +# HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER LIABILITY, +# WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING +# FROM, OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR +# OTHER DEALINGS IN THE SOFTWARE. +# +# ================================================================= + +import base64 +from datetime import datetime +import json +import logging +from urllib.parse import urlencode + +import dateutil.parser as dparser +from pycsw.core.etree import etree +from pygeofilter.backends.solr.evaluate import to_filter +import requests +from requests.auth import HTTPBasicAuth + +LOGGER = logging.getLogger(__name__) + + +class SolrMETNORepository: + """ + Class to interact with underlying METNO Solr backend repository + """ + + def __init__(self, repo_object: dict, context): + """ + Initialize repository + """ + + self.database = None + self.filter = repo_object.get('filter') + # + self.xslt_iso_transformer = repo_object.get('xslt_iso_transformer') + self.xslt = repo_object.get('xslt') + #self.mmd_to_iso_xslt = self.xslt[self.xslt_iso_transformer] + # + self.context = context + self.fts = False + self.label = 'MetNO/Solr' + self.local_ingest = True + self.solr_select_url = f'{self.filter}/select' + self.dbtype = 'Solr' + self.username = repo_object.get('username') + self.password = repo_object.get('password') + self.authentication = HTTPBasicAuth(self.username, self.password) + self.session = self + self.adc_collection = repo_object.get('adc_collection') + # get the Solr mappings for main queryables + self.query_mappings = repo_object.get('solr_mappings') + + # generate core queryables db and obj bindings + self.queryables = {} + + for tname in self.context.model['typenames']: + for qname in self.context.model['typenames'][tname]['queryables']: + self.queryables[qname] = {} + items = self.context.model['typenames'][tname]['queryables'][ + qname + ].items() + + for qkey, qvalue in items: + self.queryables[qname][qkey] = qvalue + + # flatten all queryables + self.queryables['_all'] = {} + for qbl in self.queryables: + self.queryables['_all'].update(self.queryables[qbl]) + self.queryables['_all'].update(self.context.md_core_model['mappings']) + + def describe(self) -> dict: + """Derive table columns and types""" + + type_mappings = { + 'TEXT': 'string', + 'VARCHAR': 'string', + 'text_en': 'string', + 'text_general': 'string', + 'pdate': 'string', + 'bbox': 'string', + 'string': 'string' + } + + try: + response = requests.get(f'{self.filter}/schema/fields', + auth=self.authentication) + response.raise_for_status() + response = response.json() + except requests.exceptions.HTTPError as err: + msg = f'Solr query error: {err.response.text}' + LOGGER.error(msg) + raise RuntimeError(msg) + + properties = { + 'geometry': { + '$ref': 'https://geojson.org/schema/Polygon.json', + 'x-ogc-role': 'primary-geometry', + } + } + + for field in response.get('fields', []): + if field['name'] in self.query_mappings.values(): + pname = dict((v,k) for k,v in self.query_mappings.items()).get(field['name']) + properties[pname] = { + 'title': pname + } + if field['type'] in type_mappings: + properties[pname]['type'] = type_mappings[field['type']] + if field['type'] == 'pdate': + properties[pname]['fomat'] = 'date-time' + + if pname == 'identifier': + properties[pname]['x-ogc-role'] = 'id' + + return properties + + def dataset(self, record): + """ + Stub to mock a pycsw dataset object for Transactions + """ + + return type('dataset', (object,), record) + + def query_ids(self, ids: list) -> list: + """ + Query by list of identifiers + """ + + results = [] + + all_ids = '" OR "'.join(ids) + params = { + 'fq': [ + f'metadata_identifier:("{all_ids}")' + ], + 'q.op': 'OR', + 'q': '*:*' + } + + if self.adc_collection not in ['', None]: + params['fq'].append(f'collection:({self.adc_collection})') + + try: + response = requests.get(self.solr_select_url, params=params, + auth=self.authentication) + response.raise_for_status() + response = response.json() + except requests.exceptions.HTTPError as err: + msg = f'Solr query error: {err.response.text}' + LOGGER.error(msg) + raise RuntimeError(msg) + + for doc in response['response']['docs']: + results.append(self._doc2record(doc)) + + return results + + def query_collections(self, filters=None, limit=10) -> list: + ''' Query for parent collections ''' + + results = [] + + params = { + 'fq': ['isChild:false'] + } + if self.adc_collection not in ['', None]: + params['fq'].append(f'collection:({self.adc_collection})') + + try: + response = requests.get(self.solr_select_url, params=params, + auth=self.authentication) + response.raise_for_status() + response = response.json() + except requests.exceptions.HTTPError as err: + msg = f'Solr query error: {err.response.text}' + LOGGER.error(msg) + raise RuntimeError(msg) + + for doc in response['response']['docs']: + results.append(self._doc2record(doc)) + + return results + + def query_domain(self, domain, typenames, domainquerytype='list', + count=False) -> list: + """ + Query by property domain values + """ + + results = [] + + params = { + 'q': '*:*', + 'rows': 0, + 'facet': 'true', + 'facet.query': 'distinct', + 'facet.type': 'terms', + 'facet.field': domain, + 'fq': ['metadata_status:Active'] + } + + if self.adc_collection not in ['', None]: + params['fq'].append('collection:({self.adc_collection})') + + try: + response = requests.get(f'{self.filter}/select', params=params, + auth=self.authentication) + response.raise_for_status() + response = response.json() + except requests.exceptions.HTTPError as err: + msg = f'Solr query error: {err.response.text}' + LOGGER.error(msg) + raise RuntimeError(msg) + + counts = response['facet_counts']['facet_fields'][domain] + + for term in zip(*([iter(counts)] * 2)): + LOGGER.debug(f'Term: {term}') + results.append(term) + + return results + + def query_insert(self, direction='max') -> str: + """ + Query to get latest (default) or earliest update to repository + """ + + if direction == 'min': + sort_order = 'asc' + else: + sort_order = 'desc' + + params = { + 'q': '*:*', + 'q.op': 'OR', + 'fl': 'timestamp', + 'sort': f'timestamp {sort_order}', + 'fq': ['metadata_status:Active'], + } + + if self.adc_collection not in ['', None]: + params['fq'].append('collection:({self.adc_collection})') + + try: + response = requests.get(f'{self.filter}/select', params=params, + auth=self.authentication) + response.raise_for_status() + response = response.json() + except requests.exceptions.HTTPError as err: + msg = f'Solr query error: {err.response.text}' + LOGGER.error(msg) + raise RuntimeError(msg) + + try: + timestamp = datetime.strptime( + response['response']['docs'][0]['timestamp'], + '%Y-%m-%dT%H:%M:%S.%fZ' + ) + except IndexError: + timestamp = datetime.now() + + return timestamp.strftime('%Y-%m-%dT%H:%M:%SZ') + + def query_source(self, source): + """ + Query by source + """ + + return NotImplementedError() + + def query(self, constraint=None, sortby=None, typenames=None, maxrecords=10, + startposition=0) -> tuple: + """ + Query records from underlying repository + """ + + solr_query = {} + results = [] + + if constraint.get('ast') is not None: + # ask pygeofilter to convert AST to Solr query + solr_query = to_filter(constraint['ast']) + if 'csw:AnyText' in solr_query['query']: + solr_query['query'] = solr_query['query'].replace('csw:AnyText', 'full_text') + if 'ows:BoundingBox' in solr_query['query']: + solr_query['query'] = solr_query['query'].replace('ows:BoundingBox', 'bbox') + else: + # DO NOT ask pygeofilter to convert AST to Solr query + solr_query = {'query': '*:*'} + + # add handle sortby, maxrecords, startposition + solr_query['offset'] = startposition + solr_query['limit'] = maxrecords + + if sortby is not None: + solr_query['sort'] = f"{sortby['propertyname']} {sortby['order']}" + + LOGGER.info(f'Solr query: {solr_query}') + try: + response = requests.post(f'{self.filter}/select', json=solr_query, + auth=self.authentication) + response.raise_for_status() + response = response.json() + except requests.exceptions.HTTPError as err: + msg = f'Solr query error: {err.response.text}' + LOGGER.error(msg) + raise RuntimeError(msg) + + total = response['response']['numFound'] + LOGGER.debug(f'Found: {total}') + for doc in response['response']['docs']: + results.append(self._doc2record(doc)) + + return total, results + + def _doc2record(self, doc: dict): + """ + Transform a Solr doc into a pycsw dataset object + """ + + record = {} + + record['identifier'] = doc['metadata_identifier'] + record['metadata_type'] = 'application/xml' + record['typename'] = 'gmd:MD_Metadata' + record['schema'] = 'http://www.isotc211.org/2005/gmd' + + LOGGER.debug('Checking for parent-child relationship') +# if doc.get('isParent', False): +# record['type'] = 'series' +# else: +# record['type'] = 'dataset' + +# if doc.get('isChild', False): +# record['parentidentifier'] = doc['related_dataset'][0] + + if 'isParent' in doc and doc["isParent"]: + record['type'] = "series" + else: + record['type'] = "dataset" + # +# print('child', doc["isChild"]) +# print('parent', doc["isParent"]) + if 'isChild' in doc and doc["isChild"]: + record['parentidentifier'] = doc["related_dataset"][0] + print('parent', record['parentidentifier']) + else: + record['parentidentifier'] = None + + record['wkt_geometry'] = doc['bbox'] + record['title'] = doc['title'][0] + record['abstract'] = doc['abstract'][0] + + if 'iso_topic_category' in doc: + record['topicategory'] = ','.join(doc['iso_topic_category']) + +# if 'keywords_keyword' in doc: +# record['keywords'] = ','.join(doc['keywords_keyword']) + +# if 'related_url_landing_page' in doc: +# record['source'] = doc['related_url_landing_page'][0] + + record['source'] = None + + record['language'] = doc.get('dataset_language', 'en') + + # Transform the indexed time as insert_data + insert = dparser.parse(doc['timestamp'][0]) + record['insert_date'] = insert.isoformat() + + # Transform the last metadata update datetime as modified + if 'last_metadata_update_datetime' in doc: + modified = dparser.parse(doc['last_metadata_update_datetime'][0]) + record['date_modified'] = modified.isoformat() + + if 'Created' in doc['last_metadata_update_type']: + record['date_creation'] = modified.isoformat() + else: + record['date_creation'] = None + + # Transform temporal extendt start and end dates + if 'temporal_extent_start_date' in doc: + #time_begin = dparser.parse(doc['temporal_extent_start_date'][0]) + #record['time_begin'] = time_begin.isoformat() + record['time_begin'] = doc['temporal_extent_start_date'][0] + + if 'temporal_extent_end_date' in doc: + #time_end = dparser.parse(doc['temporal_extent_end_date'][0]) + #record['time_end'] = time_end.isoformat() + #record['time_end'] = doc['temporal_extent_end_date'][0] + record['time_end'] = None + else: + record['time_end'] = None + + links = [] + record['relation'] = None + + if 'data_access_url_opendap' in doc: + links.append( + { + 'name': 'OPeNDAP access', + 'description': 'OPeNDAP access', + 'protocol': 'OPeNDAP:OPeNDAP', + 'url': doc['data_access_url_opendap'][0], + } + ) + if 'data_access_url_ogc_wms' in doc: + links.append( + { + 'name': 'OGC-WMS Web Map Service', + 'description': 'OGC-WMS Web Map Service', + 'protocol': 'OGC:WMS', + 'url': doc['data_access_url_ogc_wms'][0], + } + ) + if 'data_access_url_http' in doc: + links.append( + { + 'name': 'File for download', + 'description': 'Direct HTTP download', + 'protocol': 'WWW:DOWNLOAD-1.0-http--download', + 'url': doc['data_access_url_http'][0], + } + ) + if 'data_access_url_ftp' in doc: + links.append( + { + 'name': 'File for download', + 'description': 'Direct FTP download', + 'protocol': 'ftp', + 'url': doc['data_access_url_ftp'][0], + } + ) + record['links'] = json.dumps(links) + + # Transform the first investigator as creator. + if 'personnel_investigator_name' in doc: + record['creator'] = ','.join(doc['personnel_investigator_name']) + + if 'personnel_technical_name' in doc: + record['contributor'] = ','.join(doc['personnel_technical_name']) + + if 'personnel_metadata_author_name' in doc: + if 'contributor' in record: + record['contributor'] += ',' + ','.join( + doc['personnel_metadata_author_name'] + ) + else: + record['contributor'] = ','.join(doc['personnel_metadata_author_name']) # noqa + + contacts = [] + for ct in ['technical', 'investigator', 'metadata_author']: + ct2 = personnel2contact(doc, ct) + if ct2: + contacts.append(personnel2contact(doc, ct)) + + record['contacts'] = json.dumps(contacts) + print('met', record['contacts']) + #record['themes'] = keywords2themes(doc) + record['themes'], record['keywords'] = keywords2themes(doc) + print(record['themes'], record['keywords']) + + # TODO: rights is mapped to accessconstraint, although we provide this + # info in the use constraint. + # we should use dc:license instead, but it is not mapped in csw. + if 'use_constraint_license_text' in doc: + record['otherconstraints'] = doc.get('use_constraint_license_text') + else: + record['otherconstraints'] = doc.get('use_constraint_identifier') + + #this is mapped to rights. We do not have it + record['conditionapplyingtoaccessanduse'] = None + + if 'dataset_citation_publisher' in doc: + record['publisher'] = doc['dataset_citation_publisher'][0] + + if 'storage_information_file_format' in doc: + record['format'] = doc['storage_information_file_format'] + else: + record['format'] = 'Not provided' + +# transform = etree.XSLT(etree.parse(self.mmd_to_iso_xslt)) +# xml_ = base64.b64decode(doc['mmd_xml_file']) +# +# doc_ = etree.fromstring(xml_, self.context.parser) +# pl = '/usr/local/share/parent_list.xml' +# result_tree = transform( +# doc_, path_to_parent_list=etree.XSLT.strparam(pl)).getroot() +# record['xml'] = etree.tostring(result_tree) +# record['mmd_xml_file'] = doc['mmd_xml_file'] +# +# LOGGER.debug(record['xml']) + params = { + 'q.op': 'OR', + 'q': f"metadata_identifier:{doc['metadata_identifier']}" + } + + mdsource_url = self.solr_select_url + urlencode(params) + record['mdsource'] = mdsource_url + + return self.dataset(record) + + +def keywords2themes(doc: dict) -> list: + schemes = {} + themes = [] + kvoctothesaurus = {'GCMDSK' : 'https://gcmd.earthdata.nasa.gov/kms/concepts/concept_scheme/sciencekeywords', + 'CFSTDN' : 'https://vocab.nerc.ac.uk/standard_name/', + 'GEMET' : 'http://inspire.ec.europa.eu/theme', + 'NORTHEMES' : 'https://register.geonorge.no/metadata-kodelister/nasjonal-temainndeling', + 'GCMDPROV': 'https://gcmd.earthdata.nasa.gov/kms/concepts/concept_scheme/providers', + 'GCMDLOC' : 'https://gcmd.earthdata.nasa.gov/kms/concepts/concept_scheme/locations'} + keywords = "" + + for scheme in set(doc.get('keywords_vocabulary', [])): + schemes[scheme] = [] + for index, value in enumerate(doc['keywords_keyword']): + if doc['keywords_vocabulary'][index] == scheme: + schemes[doc['keywords_vocabulary'][index]].append(value) + + for key, value in schemes.items(): + if key != "None": + themes.append({ + 'keywords': [{'name': v} for v in value], + #'scheme': key, + 'thesaurus': {'title': key, 'url' : kvoctothesaurus[key]} + }) + else: + keywords = ",".join(value) + + return json.dumps(themes), keywords + + +def personnel2contact(doc: dict, ct: str) -> dict: + contact = {} + + mmdrole2roles = {'metadata_author': 'Metadata author', + 'technical' : 'Technical contact', + 'investigator': 'Investigator'} + + if f'personnel_{ct}_name' in doc: + contact = { + 'name': doc[f'personnel_{ct}_name'][0], + 'organization': doc[f'personnel_{ct}_organisation'][0], + 'role': mmdrole2roles[f'{ct}'], + 'email': doc[f'personnel_{ct}_email'][0], + } + + return contact diff --git a/pycsw/server.py b/pycsw/server.py index 67b705252..42d4755d2 100644 --- a/pycsw/server.py +++ b/pycsw/server.py @@ -51,7 +51,7 @@ LOGGER = logging.getLogger(__name__) -class Csw(object): +class Csw: """ Base CSW server """ def __init__(self, rtconfig=None, env=None, version='3.0.0'): """ Initialize CSW """ @@ -389,7 +389,7 @@ def dispatch(self, writer=sys.stdout, write_headers=True): max_attempts = 0 while not connection_done and max_attempts <= self.max_retries: try: - self.repository = rs_cls(self.context, repo_filter) + self.repository = rs_cls(self.config['repository'], self.context) LOGGER.debug('Custom repository %s loaded (%s)', rs, self.repository.dbtype) connection_done = True except Exception as err: @@ -413,11 +413,7 @@ def dispatch(self, writer=sys.stdout, write_headers=True): while not connection_done and max_attempts <= self.max_retries: try: self.repository = repository.Repository( - self.config['repository']['database'], - self.context, - self.environ.get('local.app_root', None), - self.config['repository'].get('table'), - repo_filter + self.config['repository'], self.context ) LOGGER.debug( 'Repository loaded (local): %s.' % self.repository.dbtype) diff --git a/pycsw/sru.py b/pycsw/sru.py index e881513e0..569995b1d 100644 --- a/pycsw/sru.py +++ b/pycsw/sru.py @@ -33,7 +33,7 @@ from pycsw.ogc.fes import fes1 -class Sru(object): +class Sru: """SRU wrapper class""" def __init__(self, context): self.sru_version = '1.1' diff --git a/pycsw/stac/api.py b/pycsw/stac/api.py index d4dbd61c9..67163840a 100644 --- a/pycsw/stac/api.py +++ b/pycsw/stac/api.py @@ -35,7 +35,7 @@ from pygeofilter.parsers.ecql import parse as parse_ecql from pycsw import __version__ -from pycsw.core.pygeofilter_evaluate import to_filter +from pycsw.core.pygeofilter_ext import to_filter from pycsw.ogc.api.oapi import gen_oapi from pycsw.ogc.api.records import API, build_anytext from pycsw.core.util import geojson_geometry2bbox, wkt2geom @@ -262,10 +262,9 @@ def collections(self, headers_, args): query_args.append("typename = 'stac:Collection'") ast = parse_ecql(' AND '.join(query_args)) LOGGER.debug(f'Abstract syntax tree: {ast}') - filters = to_filter(ast, self.repository.dbtype, self.repository.query_mappings) - LOGGER.debug(f'Filter: {filters}') - sc_query = self.repository.session.query( - self.repository.dataset).filter(filters).limit(limit).all() + + _, sc_query = self.repository.query(constraint={'ast': ast}, + maxrecords=limit) for sc in sc_query: id_found = False diff --git a/requirements-standalone.txt b/requirements-standalone.txt index c180df963..75ece761f 100644 --- a/requirements-standalone.txt +++ b/requirements-standalone.txt @@ -1,5 +1,5 @@ SQLAlchemy<2.0.0 Flask -pygeofilter +pygeofilter[fes] PyYAML pygeoif diff --git a/setup.py b/setup.py index 71ba6006b..310ba3b67 100644 --- a/setup.py +++ b/setup.py @@ -58,8 +58,6 @@ def get_package_version(): DESCRIPTION = ('pycsw is an OGC API - Records and OGC CSW server ' 'implementation written in Python') -print("JJJ", DESCRIPTION) - # ensure a fresh MANIFEST file is generated if (os.path.exists('MANIFEST')): os.unlink('MANIFEST') diff --git a/tests/functionaltests/conftest.py b/tests/functionaltests/conftest.py index 9dafd2a78..46a28f9fc 100644 --- a/tests/functionaltests/conftest.py +++ b/tests/functionaltests/conftest.py @@ -217,9 +217,13 @@ def configuration(request, tests_directory, log_level): table_name = _get_table_name(suite_name, config, repository_url) + repo_config = { + 'database': repository_url, + 'table': table_name + } + if not _repository_exists(repository_url, table_name): - _initialize_database(repository_url=repository_url, - table_name=table_name, + _initialize_database(repo_config, data_dir=data_dir, test_dir=tests_directory, export_dir=export_dir) @@ -404,7 +408,7 @@ def _get_table_name(suite, config, repository_url): return result -def _initialize_database(repository_url, table_name, data_dir, test_dir, export_dir): +def _initialize_database(repo_config, data_dir, test_dir, export_dir): """Initialize database for tests. This function will create the database and load any test data that @@ -412,10 +416,8 @@ def _initialize_database(repository_url, table_name, data_dir, test_dir, export_ Parameters ---------- - repository_url: str - URL for the repository, as used by SQLAlchemy engines - table_name: str - Name of the table that is to be used to store pycsw records + repo_config: dict + repository configuration data_dir: str Path to a directory that contains sample data records to be loaded into the database @@ -426,6 +428,9 @@ def _initialize_database(repository_url, table_name, data_dir, test_dir, export_ """ + repository_url = repo_config['database'] + table_name = repo_config['table'] + context = StaticContext() print(f"Setting up {repository_url} repository...") @@ -437,7 +442,7 @@ def _initialize_database(repository_url, table_name, data_dir, test_dir, export_ extra_kwargs = {} setup(repository_url, table_name, **extra_kwargs) - repo = Repository(repository_url, context, table=table_name) + repo = Repository(repo_config, context) if len(os.listdir(data_dir)) > 0: print("Loading database data...") diff --git a/tests/functionaltests/suites/csw30/expected/get_92d4844d-57d5-4cf3-8f47-ba50e369dc04.xml b/tests/functionaltests/suites/csw30/expected/get_92d4844d-57d5-4cf3-8f47-ba50e369dc04.xml index ee0f15f3c..af107e8b9 100644 --- a/tests/functionaltests/suites/csw30/expected/get_92d4844d-57d5-4cf3-8f47-ba50e369dc04.xml +++ b/tests/functionaltests/suites/csw30/expected/get_92d4844d-57d5-4cf3-8f47-ba50e369dc04.xml @@ -2,5 +2,5 @@ - + diff --git a/tests/functionaltests/suites/csw30/expected/get_d94c801a-1207-4897-b84a-53f3a192515b.xml b/tests/functionaltests/suites/csw30/expected/get_d94c801a-1207-4897-b84a-53f3a192515b.xml index ee0f15f3c..af107e8b9 100644 --- a/tests/functionaltests/suites/csw30/expected/get_d94c801a-1207-4897-b84a-53f3a192515b.xml +++ b/tests/functionaltests/suites/csw30/expected/get_d94c801a-1207-4897-b84a-53f3a192515b.xml @@ -2,5 +2,5 @@ - + diff --git a/tests/unittests/test_util.py b/tests/unittests/test_util.py index 42b2fb421..3736e0820 100644 --- a/tests/unittests/test_util.py +++ b/tests/unittests/test_util.py @@ -178,7 +178,7 @@ def test_transform_mappings(): ("some_callable", os.getcwd, os.getcwd()), ]) def test_getqattr_no_link(name, value, expected): - class Phony(object): + class Phony: pass instance = Phony() From 2933bc809d103f88428df08b5eaeb4eeb84328cc Mon Sep 17 00:00:00 2001 From: Tom Kralidis Date: Thu, 2 Oct 2025 10:22:16 -0400 Subject: [PATCH 2/4] fix --- docker/entrypoint.py | 37 +++++++++++++++++++++++++- pycsw/plugins/repository/solr_metno.py | 3 +++ 2 files changed, 39 insertions(+), 1 deletion(-) diff --git a/docker/entrypoint.py b/docker/entrypoint.py index 6edbbeb61..3fa7f1d6b 100644 --- a/docker/entrypoint.py +++ b/docker/entrypoint.py @@ -80,7 +80,42 @@ def launch_pycsw(pycsw_config, workers=2, reload=False): except Exception as err: LOGGER.debug(err) - repo = Repository(database, StaticContext()) + if 'source' in pycsw_config['repository']: # load custom repository + rs = pycsw_config['repository']['source'] + rs_modname, rs_clsname = rs.rsplit('.', 1) + + rs_mod = __import__(rs_modname, globals(), locals(), [rs_clsname]) + rs_cls = getattr(rs_mod, rs_clsname) + + try: + connection_done = False + max_attempts = 0 + max_retries = pycsw_config['repository'].get('maxretries', 5) + while not connection_done and max_attempts <= max_retries: + try: + repo = rs_cls(pycsw_config['repository'], StaticContext()) + LOGGER.debug('Custom repository %s loaded' % pycsw_config['repository']['source']) + connection_done = True + except Exception as err: + LOGGER.debug(f'Repository not loaded retry connection {max_attempts}: {err}') + max_attempts += 1 + except Exception as err: + msg = 'Could not load custom repository %s: %s' % (rs, err) + LOGGER.exception(msg) + error = 1 + code = 'NoApplicableCode' + locator = 'service' + text = 'Could not initialize repository. Check server logs' + + else: + try: + LOGGER.info('Loading default repository') + repo = repository.Repository(pycsw_config, StaticContext()) + LOGGER.debug(f'Repository loaded {repo.dbtype}') + except Exception as err: + msg = f'Could not load repository {err}' + LOGGER.exception(msg) + raise repo.ping() diff --git a/pycsw/plugins/repository/solr_metno.py b/pycsw/plugins/repository/solr_metno.py index b134fdd38..cda293e5d 100644 --- a/pycsw/plugins/repository/solr_metno.py +++ b/pycsw/plugins/repository/solr_metno.py @@ -528,6 +528,9 @@ def _doc2record(self, doc: dict): return self.dataset(record) + def ping(self): + pass + def keywords2themes(doc: dict) -> list: schemes = {} From 066d3b89e1d0fef6eea37209e1c97974649594be Mon Sep 17 00:00:00 2001 From: Tom Kralidis Date: Fri, 3 Oct 2025 05:22:00 -0400 Subject: [PATCH 3/4] update --- pycsw/plugins/repository/solr_metno.py | 9 +++------ 1 file changed, 3 insertions(+), 6 deletions(-) diff --git a/pycsw/plugins/repository/solr_metno.py b/pycsw/plugins/repository/solr_metno.py index cda293e5d..b5a0cdf14 100644 --- a/pycsw/plugins/repository/solr_metno.py +++ b/pycsw/plugins/repository/solr_metno.py @@ -366,12 +366,9 @@ def _doc2record(self, doc: dict): record['type'] = "series" else: record['type'] = "dataset" - # -# print('child', doc["isChild"]) -# print('parent', doc["isParent"]) + if 'isChild' in doc and doc["isChild"]: record['parentidentifier'] = doc["related_dataset"][0] - print('parent', record['parentidentifier']) else: record['parentidentifier'] = None @@ -381,6 +378,8 @@ def _doc2record(self, doc: dict): if 'iso_topic_category' in doc: record['topicategory'] = ','.join(doc['iso_topic_category']) + else: + record['topicategory'] = None # if 'keywords_keyword' in doc: # record['keywords'] = ','.join(doc['keywords_keyword']) @@ -483,10 +482,8 @@ def _doc2record(self, doc: dict): contacts.append(personnel2contact(doc, ct)) record['contacts'] = json.dumps(contacts) - print('met', record['contacts']) #record['themes'] = keywords2themes(doc) record['themes'], record['keywords'] = keywords2themes(doc) - print(record['themes'], record['keywords']) # TODO: rights is mapped to accessconstraint, although we provide this # info in the use constraint. From c98b5d7e7229179c836183a9573c008dbf030098 Mon Sep 17 00:00:00 2001 From: Tom Kralidis Date: Fri, 3 Oct 2025 07:36:44 -0400 Subject: [PATCH 4/4] overriding FES parsing --- pycsw/core/pygeofilter_ext.py | 8 +++++--- pycsw/ogc/csw/csw2.py | 8 ++++---- pycsw/ogc/csw/csw3.py | 8 +++++--- 3 files changed, 14 insertions(+), 10 deletions(-) diff --git a/pycsw/core/pygeofilter_ext.py b/pycsw/core/pygeofilter_ext.py index c83db8eeb..0ef5c44f4 100644 --- a/pycsw/core/pygeofilter_ext.py +++ b/pycsw/core/pygeofilter_ext.py @@ -45,7 +45,8 @@ class PycswFilterEvaluator(SQLAlchemyFilterEvaluator): - def __init__(self, field_mapping=None, dbtype='sqlite', undefined_as_null=None): + def __init__(self, field_mapping=None, dbtype='sqlite', + undefined_as_null=None): super().__init__(field_mapping, undefined_as_null=undefined_as_null) self._pycsw_dbtype = dbtype @@ -85,9 +86,10 @@ def ilike(self, node, lhs): def to_filter(ast, dbtype, field_mapping=None): return PycswFilterEvaluator(field_mapping, dbtype).evaluate(ast) + class PycswCSWFES11Parser(FES11Parser): -# def __init__(self): -# super().__init__() + def parse(self, input_): + return FES11Parser().parse(input_) @fhandle('BBOX') def geometry_bbox(self, node: Element, lhs, rhs, crs=None): diff --git a/pycsw/ogc/csw/csw2.py b/pycsw/ogc/csw/csw2.py index 7ab3fdd2f..f4eb67472 100644 --- a/pycsw/ogc/csw/csw2.py +++ b/pycsw/ogc/csw/csw2.py @@ -33,7 +33,6 @@ import os from pygeofilter.parsers.ecql.parser import parse as ecql_parse -from pygeofilter.parsers.fes.v11 import parse as fes1_parse from pycsw.core.pygeofilter_ext import PycswCSWFES11Parser from pycsw.core.etree import etree @@ -771,8 +770,9 @@ def getrecords(self): self.parent.context.namespaces, self.parent.orm, self.parent.language['text'], self.parent.repository.fts) self.parent.kvp['constraint']['_dict'] = xml2dict(etree.tostring(doc), self.parent.context.namespaces) else: + pc = PycswCSWFES11Parser() self.parent.kvp['constraint'] = { - 'ast': fes1_parse(tmp) + 'ast': pc.parse(tmp) } except Exception as err: errortext = \ @@ -1602,8 +1602,8 @@ def _parse_constraint(self, element): self.parent.context.namespaces, self.parent.orm, self.parent.language['text'], self.parent.repository.fts) query['_dict'] = xml2dict(etree.tostring(tmp), self.parent.context.namespaces) else: - #query['ast'] = PycswCSWFES11Parser().parse(etree.tostring(tmp)) - query['ast'] = fes1_parse(etree.tostring(tmp)) + pc = PycswCSWFES11Parser() + query['ast'] = pc.parse(tmp) except Exception as err: return 'Invalid Filter request: %s' % err diff --git a/pycsw/ogc/csw/csw3.py b/pycsw/ogc/csw/csw3.py index ebee1bb36..9d105012e 100644 --- a/pycsw/ogc/csw/csw3.py +++ b/pycsw/ogc/csw/csw3.py @@ -32,13 +32,13 @@ from time import time from pygeofilter.parsers.ecql.parser import parse as ecql_parse -from pygeofilter.parsers.fes.v11 import parse as fes1_parse from pycsw.core.etree import etree from pycsw.ogc.csw.cql import cql2fes from pycsw import opensearch from pycsw.core import metadata, util from pycsw.core.formats.fmt_json import xml2dict +from pycsw.core.pygeofilter_ext import PycswCSWFES11Parser from pycsw.ogc.fes import fes1, fes2 import logging @@ -797,8 +797,9 @@ def getrecords(self): self.parent.context.namespaces, self.parent.orm, self.parent.language['text'], self.parent.repository.fts) self.parent.kvp['constraint']['_dict'] = xml2dict(etree.tostring(doc), self.parent.context.namespaces) else: + pc = PycswCSWFES11Parser() self.parent.kvp['constraint'] = { - 'ast': fes1_parse(etree.tostring(self.parent.kvp['constraint'])) + 'ast': pc.parse(self.parent.kvp['constraint']) } except Exception as err: errortext = \ @@ -1672,7 +1673,8 @@ def _parse_constraint(self, element): self.parent.context.namespaces, self.parent.orm, self.parent.language['text'], self.parent.repository.fts) query['_dict'] = xml2dict(etree.tostring(tmp), self.parent.context.namespaces) else: - query['ast'] = fes1_parse(etree.tostring(tmp)) + pc = PycswCSWFES11Parser() + query['ast'] = pc.parse(tmp) except Exception as err: return 'Invalid Filter request: %s' % err