From 295e4c52208a54899f316363e3a66aa9cdd5a516 Mon Sep 17 00:00:00 2001 From: zazmuz Date: Fri, 4 Apr 2025 15:54:16 +0200 Subject: [PATCH 1/6] Created more checks for legacy and 2023-07 formats to validate problem.yaml. Co-authored-by: square-cylinder Cleaned up and added more checks for problem.yaml Co-authored-by: square-cylinder detect unknown fields f-strings compatible with older python versions fix bug --- problemtools/verifyproblem.py | 716 ++++++++++++++++++++++++++-------- 1 file changed, 554 insertions(+), 162 deletions(-) diff --git a/problemtools/verifyproblem.py b/problemtools/verifyproblem.py index 3aa566d5..205fd53f 100644 --- a/problemtools/verifyproblem.py +++ b/problemtools/verifyproblem.py @@ -4,6 +4,7 @@ import concurrent.futures from concurrent.futures import ThreadPoolExecutor +import datetime import threading import queue import glob @@ -141,6 +142,12 @@ def __append_additional_info(msg: str, additional_info: str|None) -> str: def __init__(self, name: str) -> None: self.log = log.getChild(name) + def fatal(self, msg: str, additional_info: str|None=None, *args) -> None: + self._check_res = False + ProblemAspect.errors += 1 + self.log.error(ProblemAspect.__append_additional_info(msg, additional_info), *args) + raise VerifyError(msg) + def error(self, msg: str, additional_info: str|None=None, *args) -> None: self._check_res = False ProblemAspect.errors += 1 @@ -262,7 +269,7 @@ def check(self, context: Context) -> bool: self.check_size_limits(self.ansfile) self._problem.getProblemPart(InputValidators).validate(self) anssize = os.path.getsize(self.ansfile) / 1024.0 / 1024.0 - outputlim = self._problem.get(ProblemConfig)['limits']['output'] + outputlim = self._problem.get(Config_Legacy)['limits']['output'] if anssize > outputlim: self.error(f'Answer file ({anssize:.1f} Mb) is larger than output limit ({outputlim} Mb), you need to increase output limit') elif 2 * anssize > outputlim: @@ -331,7 +338,7 @@ def run_submission_real(self, sub, context: Context, timelim: int, timelim_low: errfile = os.path.join(self._problem.tmpdir, f'error-{self.counter}') status, runtime = sub.run(infile=self.infile, outfile=outfile, errfile=errfile, timelim=timelim_high+1, - memlim=self._problem.get(ProblemConfig)['limits']['memory'], work_dir=sub.path) + memlim=self._problem.get(Config_Legacy)['limits']['memory'], work_dir=sub.path) if is_TLE(status) or runtime > timelim_high: res_high = SubmissionResult('TLE') elif is_RTE(status): @@ -420,11 +427,10 @@ def __init__(self, problem: Problem, aspect_name: str, datadir: str|None=None, p if not field in self.config: self.config[field] = parent_value - # TODO: Decide if these should stay # Some deprecated properties are inherited from problem config during a transition period - problem_grading = problem.get(ProblemConfig)['grading'] + problem_grading = problem.get(Config_Legacy)['grading'] for key in ['accept_score', 'reject_score', 'range']: - if key in problem.get(ProblemConfig)['grading']: + if key in problem.get(Config_Legacy)['grading']: self.config[key] = problem_grading[key] problem_on_reject = problem_grading.get('on_reject') @@ -433,7 +439,7 @@ def __init__(self, problem: Problem, aspect_name: str, datadir: str|None=None, p if problem_on_reject == 'grade': self.config['on_reject'] = 'continue' - if self._problem.get(ProblemConfig)['type'] == 'pass-fail': + if self._problem.get(Config_Legacy)['type'] == 'pass-fail': for key in TestCaseGroup._SCORING_ONLY_KEYS: if key not in self.config: self.config[key] = None @@ -709,10 +715,8 @@ def all_datasets(self) -> list: res += child.all_datasets() return res - class ProblemStatement(ProblemPart): PART_NAME = 'statement' - def setup(self): self.format_data = formatversion.get_format_data(self.problem.probdir) if not self.format_data: @@ -725,7 +729,6 @@ def setup(self): else: self.error(f"No directory named {self.format_data.statement_directory} found") self.statements = [] - return self.get_config() def check(self, context: Context) -> bool: @@ -741,7 +744,6 @@ def check(self, context: Context) -> bool: for lang, count in collections.Counter(langs).items(): if count > 1: self.error(f"Can't supply multiple statements of the same language ({lang}).") - for _, lang in self.statements: try: options = problem2pdf.get_parser().parse_args([""]) @@ -770,7 +772,7 @@ def __str__(self) -> str: return 'problem statement' def get_config(self) -> dict[str, dict[str, str]]: - ret: dict[str, dict[str, str]] = {'name':{}} + ret: dict[str, dict[str, str]] = {} for filename, lang in self.statements: dir = os.path.join(self.problem.probdir, self.format_data.statement_directory) with open(os.path.join(dir, filename)) as f: @@ -778,19 +780,17 @@ def get_config(self) -> dict[str, dict[str, str]]: hit = re.search(r'\\problemname{(.*)}', stmt, re.MULTILINE) if hit: problem_name = hit.group(1).strip() - ret['name'][lang] = problem_name - return ret if ret['name'] else {} + ret[lang] = problem_name + return ret -class ProblemConfig(ProblemPart): - PART_NAME = 'config' +class ProblemConfigBase(ProblemPart): @staticmethod def setup_dependencies(): return {ProblemStatement} - _MANDATORY_CONFIG = ['name'] - _OPTIONAL_CONFIG = config.load_config('problem.yaml') + PART_NAME = 'config' _VALID_LICENSES = ['unknown', 'public domain', 'cc0', 'cc by', 'cc by-sa', 'educational', 'permission'] def setup(self): @@ -801,41 +801,156 @@ def setup(self): if os.path.isfile(self.configfile): try: with open(self.configfile) as f: - self._data = yaml.safe_load(f) - # Loading empty yaml yields None, for no apparent reason... - if self._data is None: - self._data = {} + self._data = yaml.safe_load(f) or {} except Exception as e: - self.error(str(e)) + self.fatal(str(e)) + + self.common_fix_config_structure() + self.fix_config_structure() + + return self._data + + def common_fix_config_structure(self): + for field, value in self._data.items(): + if value is None: + self.error(f"Field '{field}' provided in problem.yaml but is empty") + + self._data.setdefault('license', 'unknown') + if type(self._data['license']) is str: + self._data['license'] = self._data['license'].lower() + if self._data['license'] not in self._VALID_LICENSES: + self.error(f'License needs to be one of: {self._VALID_LICENSES}') + if self._data['license'] == 'unknown': + self.warning("License is 'unknown'") + def fix_config_structure(self): + pass + + def check_uuid(self, uuid: str) -> None: + if not re.match(r'^[0-9a-f]{8}-[0-9a-f]{4}-[0-9a-f]{4}-[0-9a-f]{4}-[0-9a-f]{12}+$', uuid): + self.error(f'UUID "{uuid}" is not a valid UUID') - # Add config items from problem statement e.g. name - self._data.update(self.problem.get(ProblemStatement)) - # Populate rights_owner unless license is public domain - if 'rights_owner' not in self._data and self._data.get('license') != 'public domain': - if 'author' in self._data: + def check(self, context: Context) -> bool: + if self._check_res is not None: + return self._check_res + self._check_res = True + to_check = [prop for prop in dir(self) if prop.startswith('_check_') and callable(getattr(self, prop))] + for prop in to_check: + self.info(f'Checking "{prop}"') + fun = getattr(self, prop) + fun() + # TODO: if fatal has happened: abort + return self._check_res + +class Config_Legacy(ProblemConfigBase): + DEFAULT_LIMITS = { + "time_multiplier": 5, + "time_safety_margin": 2, + "memory": 1024, + "output": 8, + "code": 128, + "compilation_time": 60, + "validation_time": 60, + "validation_memory": 1024, + "validation_output": 8, + } + + def fix_config_structure(self): + self._data.setdefault('problem_format_version', formatversion.VERSION_LEGACY) + if self._data.get('problem_format_version') != formatversion.VERSION_LEGACY: + val = self._data['problem_format_version'] + self.error(f'problem_format_version needs to be the string {formatversion.VERSION_LEGACY} for the legacy format (got: "{val}")') + + self._data.setdefault('type', 'pass-fail') + if self._data['type'] not in ('pass-fail', 'scoring'): + val = self._data['type'] + self.fatal(f'Type should be one of ["pass-fail", "scoring"] (got: "{val}")') + + if 'name' in self._data: + val = self._data['name'] + if type(val) is not str: + self.warning(f'name should be of type string (got: "{val}")') + val = str(val) + self._data['name'] = {'': val} + self._data.setdefault('name', {}) + self._data['name'].update(self.problem.get(ProblemStatement)) + + for prop in ('uuid', 'author', 'source', 'source_url'): + if prop in self._data: + if type(self._data[prop]) is not str: + val = self._data[prop] + self.warning(f'"{prop}" should be of type string (got "{val}")') + self._data[prop] = str(val) + if prop == 'uuid': + self.check_uuid(self._data[prop]) + + if self._data['license'] != 'public domain': + if 'rights_owner' in self._data: + self._data['rights_owner'] = self._data['rights_owner'].strip() + elif 'author' in self._data: self._data['rights_owner'] = self._data['author'] elif 'source' in self._data: self._data['rights_owner'] = self._data['source'] + else: + self.error('No author, source or rights_owner provided') + else: + if 'rights_owner' in self._data: + self.error('Can not have a rights_owner for a problem in public domain') + + if not self._data['license'] in ('unknown', 'public domain') and 'rights_owner' not in self._data: + self.error('rights_owner needs to be defined when license is not "public domain" or "unknown"') + + self._data.setdefault('limits', {}) + for key, default in self.DEFAULT_LIMITS.items(): + self._data['limits'].setdefault(key, default) + val = self._data['limits'][key] + if key not in ('time_multiplier', 'time_safety_margin') and type(val) is float: + self._data['limits'][key] = int(val) + self.warning(f'Property limits.{key} of type int was given as a float. Using {self._data["limits"][val]}') + elif type(val) not in (float, int): + self.fatal(f'Property limts.{key} was of incorrect type. Acceptable types are int and float. (got: "{val}")') + + self._data.setdefault('grading', {}) + grading = self._data['grading'] + if 'objective' in grading: + if grading['objective'] not in ('min', 'max'): + self.fatal(f'grading.objective shmust be either "min" or "max"] (got: "{grading["objective"]}")') + else: + self._data['grading']['objective'] = 'max' - if 'license' in self._data: - self._data['license'] = self._data['license'].lower() + if 'show_test_data_groups' in grading: + if type(grading['show_test_data_groups']) is not bool: + self.fatal(f'grading.show_test_data_groups should be of type bool (got: "{grading["show_test_data_groups"]}")') + elif self._data['grading']['show_test_data_groups'] and self._data['type'] == 'pass-fail': + self.error("Showing test data groups is only supported for scoring problems, this is a pass-fail problem") + else: + self._data['grading']['show_test_data_groups'] = False - # Ugly backwards compatibility hack - if 'name' in self._data and not isinstance(self._data['name'], dict): - self._data['name'] = {'': self._data['name']} + if 'on_reject' in self._data['grading']: + if self._data['type'] == 'pass-fail' and self._data['grading']['on_reject'] == 'grade': + self.error(f"Invalid on_reject policy '{self._data['grading']['on_reject']}' for problem type '{self._data['type']}'") + if not self._data['grading']['on_reject'] in ['first_error', 'worst_error', 'grade']: + self.error(f"Invalid value '{self._data['grading']['on_reject']}' for on_reject policy") - self._origdata = copy.deepcopy(self._data) + for deprecated_grading_key in ['accept_score', 'reject_score', 'range', 'on_reject']: + if deprecated_grading_key in self._data['grading']: + self.warning(f"Grading key '{deprecated_grading_key}' is deprecated in problem.yaml, use '{deprecated_grading_key}' in testdata.yaml instead") - for field, default in copy.deepcopy(ProblemConfig._OPTIONAL_CONFIG).items(): - if not field in self._data: - self._data[field] = default - elif isinstance(default, dict) and isinstance(self._data[field], dict): - self._data[field] = dict(list(default.items()) + list(self._data[field].items())) + if 'validation' in self._data: + val = self._data['validation'] + if type(val) is not str: + self.fatal(f'validation expected to be of type string (got: "{val}")') + else: + args = val.split() + if len(args) > 0: + self._data['validation-type'] = args[0] + self._data['validation-params'] = args[1:] + else: + self.fatal(f'validation expected to contain at least one argument (got: {val})') + else: + self._data['validation-type'] = 'default' + self._data['validation-params'] = [] - val = self._data['validation'].split() - self._data['validation-type'] = val[0] - self._data['validation-params'] = val[1:] self._data['grading']['custom_scoring'] = False for param in self._data['validation-params']: @@ -844,112 +959,391 @@ def setup(self): elif param == 'interactive': pass - return self._data + self._data.setdefault('validator_flags', '') + if type(self._data['validator_flags']) is not str: + self.error(f'validator_flags should be of type string (got: "{self._data["validator_flags"]}")') + self._data.setdefault('keywords', '') + if type(self._data['keywords']) is not str: + self.error(f'keywords should be a string with space-separated words (got: "{self._data["keywords"]}")') + else: + self._data['keywords'] = self._data["keywords"].split() + + def _check_validation(self): + if self._data['validation-type'] == 'default': + if len(self._data['validation-params']) > 0: + self.error('If validation is set to "default", no other arguments are permitted') + elif self._data['validation-type'] == 'custom': + params = self._data['validation-params'] + for p, cnt in collections.Counter(params).items(): + if p not in ('score', 'interactive'): + self.error(f'validation params should be one of ["score", "interactive"] (got: {p})') + elif cnt > 1: + self.error(f'validation params should only be given once ({p} was given {cnt} times)') + + def _check_source_url(self): + if 'source' not in self._data: + if 'source_url' in self._data: + self.error('source needs to be defined when source_url is defined') + + def _check_custom_groups(self): + # TODO: implement the check + pass + #if self._data['type'] != 'pass-fail' and self.problem.get(ProblemTestCases)['root_group'].has_custom_groups() and 'show_test_data_groups' not in self._origdata.get('grading', {}): + # self.warning("Problem has custom testcase groups, but does not specify a value for grading.show_test_data_groups; defaulting to false") + + def _check_unknown_fields(self): + known = {'problem_format_version', 'type', 'name', 'uuid', 'author', 'source', 'source_url', 'license', 'rights_owner', 'limits', 'validation', 'validation-type', 'validation-params', 'validator_flags', 'grading', 'keywords'} + for field in self._data.keys(): + if field not in known: + self.warning(f'unknown field "{field}"') + +class Config_2023_07(ProblemConfigBase): + + DEFAULT_LIMITS = { + "time_multipliers": { + "ac_to_timmelimit": 2.0, + "time_limit_to_tle": 1.5 + }, + # "time_limit": left empty if it isn't specified because it needs to be calculated based on slowest submission + "time_resolution": 1.0, + "memory": 2048, + "output": 8, + "code": 128, + "compilation_time": 60, + "compilation_memory": 2048, + "validation_time": 60, + "validation_memory": 2048, + "validation_output": 8, + "validation_passes": 2, + } - def __str__(self) -> str: - return 'problem configuration' + _REQUIRED_FIELDS = ['problem_format_version', 'name', 'uuid'] + _PROBLEM_TYPES = {'pass-fail', 'scoring', 'interactive', 'multi-pass', 'submit-answer'} + _statement_languages = set() + def fix_config_structure(self): + self._statement_languages = set(self.problem.get(ProblemStatement).keys()) + for REQUIRED_FIELD in self._REQUIRED_FIELDS: + if REQUIRED_FIELD not in self._data: + self.error(f'Missing required field "{REQUIRED_FIELD}"') + continue - def check(self, context: Context) -> bool: - if self._check_res is not None: - return self._check_res - self._check_res = True + to_check = self._data[REQUIRED_FIELD] + if REQUIRED_FIELD == 'problem_format_version': + if to_check != formatversion.VERSION_2023_07: + self.error(f'problem_format_version needs to be the string {formatversion.VERSION_2023_07} for the 2023_07 format (got: "{to_check}")') - if not os.path.isfile(self.configfile): - self.error(f"No config file {self.configfile} found") + elif REQUIRED_FIELD == 'name': + if type(to_check) is str: + self._data[REQUIRED_FIELD] = {'en': to_check} - for field in ProblemConfig._MANDATORY_CONFIG: - if not field in self._data: - self.error(f"Mandatory field '{field}' not provided") + if type(self._data['name']) is dict: + for statement_lang in self._statement_languages: + if statement_lang not in self._data['name']: + self.error(f'No name set for language: "{statement_lang}"') - for field, value in self._origdata.items(): - if field not in ProblemConfig._OPTIONAL_CONFIG.keys() and field not in ProblemConfig._MANDATORY_CONFIG: - self.warning(f"Unknown field '{field}' provided in problem.yaml") + elif type(self._data['name']) is not dict: + self.error(f'Name expected to be of type string or dict (got: {to_check})') - for field, value in self._data.items(): - if value is None: - self.error(f"Field '{field}' provided in problem.yaml but is empty") - self._data[field] = ProblemConfig._OPTIONAL_CONFIG.get(field, '') + elif REQUIRED_FIELD == 'uuid': + if type(to_check) is not str: + self.error(f'UUID expected to be of type string (got: {to_check}) of type {type(to_check)}') + else: + self.check_uuid(self._data['uuid']) - # Check type - if not self._data['type'] in ['pass-fail', 'scoring']: - self.error(f"Invalid value '{self._data['type']}' for type") + self._data.setdefault('type', ['pass-fail']) + if type(self._data['type']) is str: + if self._data['type'] not in self._PROBLEM_TYPES: + val = self._data['type'] + self.error(f'Type expected to be one of {self._PROBLEM_TYPES} (got: {val})') + else: + self._data['type'] = [self._data['type']] - # Check rights_owner - if self._data['license'] == 'public domain': - if self._data['rights_owner'].strip() != '': - self.error('Can not have a rights_owner for a problem in public domain') - elif self._data['license'] != 'unknown': - if self._data['rights_owner'].strip() == '': - self.error('No author, source or rights_owner provided') + elif type(self._data['type']) is not list: + self.error(f'Type expected to be of type string or list (got: {self._data["type"]})') - # Check source_url - if (self._data['source_url'].strip() != '' and - self._data['source'].strip() == ''): - self.error('Can not provide source_url without also providing source') + types_chosen = set(self._data['type']) + if len(types_chosen) == 0: + self.error('Type must contain at least one type') - # Check license - if not self._data['license'] in ProblemConfig._VALID_LICENSES: - self.error(f"Invalid value for license: {self._data['license']}.\n Valid licenses are {ProblemConfig._VALID_LICENSES}") - elif self._data['license'] == 'unknown': - self.warning("License is 'unknown'") + elif len(types_chosen) != len(self._data['type']): + self.error('Duplicates for field "type" is not allowed') - if self._data['grading']['show_test_data_groups'] not in [True, False]: - self.error(f"Invalid value for grading.show_test_data_groups: {self._data['grading']['show_test_data_groups']}") - elif self._data['grading']['show_test_data_groups'] and self._data['type'] == 'pass-fail': - self.error("Showing test data groups is only supported for scoring problems, this is a pass-fail problem") - if self._data['type'] != 'pass-fail' and self.problem.get(ProblemTestCases)['root_group'].has_custom_groups() and 'show_test_data_groups' not in self._origdata.get('grading', {}): - self.warning("Problem has custom testcase groups, but does not specify a value for grading.show_test_data_groups; defaulting to false") + else: + if 'pass-fail' in types_chosen and 'scoring' in types_chosen: + self.error('Type "pass-fail" and "scoring" are not allowed together') + + if 'submit-answer' in types_chosen: + for banned in ['multi-pass', 'interactive']: + if banned in types_chosen: + self.error(f'Type "{banned}" and "submit-answer" are not allowed together') + + def person_check(persons, persontype) -> list[dict[str, str]]: + if type(persons) not in (str, list): + self.error(f'Field "{persontype}" must be of type str, list (got: {type(persons)})') + return [] + + if type(persons) is str: + persons = [persons] + + def extract_email(person): + name_email = re.match(r'(.+)<(.+@.+)>', person) + if name_email: + return { + "name": name_email.group(1).strip(), + "email": name_email.group(2) + } + return {"name": person.strip()} + + new_persons = [] + for person in persons: + if type(person) is str: + if person.strip() == '': + self.error(f'"{persontype}" may not be an empty string, remove or set a {persontype[:-1]} / list of {persontype}') + new_persons.append(extract_email(person)) + elif type(person) is dict: + if 'name' not in person: + self.error(f'{person} dict must have a field "name"') + continue - if 'on_reject' in self._data['grading']: - if self._data['type'] == 'pass-fail' and self._data['grading']['on_reject'] == 'grade': - self.error(f"Invalid on_reject policy '{self._data['grading']['on_reject']}' for problem type '{self._data['type']}'") - if not self._data['grading']['on_reject'] in ['first_error', 'worst_error', 'grade']: - self.error(f"Invalid value '{self._data['grading']['on_reject']}' for on_reject policy") + if person['name'].strip() == '': + self.error(f'name field for "{person}" may not be an empty string') - if self._data['grading']['objective'] not in ['min', 'max']: - self.error(f"Invalid value '{self._data['grading']['objective']}' for objective") + name_email = extract_email(person['name']) + if 'email' in name_email and 'email' in person: + self.error(f'Both email in name and email field in "{person}" cannot be set, choose one') - for deprecated_grading_key in ['accept_score', 'reject_score', 'range', 'on_reject']: - if deprecated_grading_key in self._data['grading']: - self.warning(f"Grading key '{deprecated_grading_key}' is deprecated in problem.yaml, use '{deprecated_grading_key}' in testdata.yaml instead") + elif 'email' in name_email: + person.update(name_email) + + elif 'email' in person: + email = person['email'] + if type(email) is str: + if email.strip() == '': + self.error('Email may not be empty string') + if not re.fullmatch(r'.+@.+', email): + self.error(f'Email "{email}" is not a valid email address') + person['email'] = email.strip() + else: + self.error(f'Email must be of type str (got: "{email}" of type: {type(email)})') + + if 'orcid' in person: + if type(person['orcid']) is str: + if person['orcid'].strip() == '': + self.error('ORCID may not be empty string') + if not re.search(r'^[0-9]{4}-[0-9]{4}-[0-9]{4}-[0-9]{4}$', person['orcid']): + self.error(f'ORCID "{person["orcid"]}" is not a valid ORCID address') + person['orcid'] = person['orcid'].strip() + else: + self.error(f'ORCID must be of type str (got: {type(person["orcid"])})') + + if 'kattis' in person: + if type(person['kattis']) is str: + if person['kattis'].strip() == '': + self.error('Kattis username may not be empty string') + if ' ' in person['kattis']: + self.error(f'Kattis username "{person["kattis"]}" may not contain spaces') + person['kattis'] = person['kattis'].strip() + else: + self.error(f'Kattis username must be of type str (got: {type(person["kattis"])})') - if not self._data['validation-type'] in ['default', 'custom']: - self.error(f"Invalid value '{self._data['validation']}' for validation, first word must be 'default' or 'custom'") + new_persons.append(person) + return new_persons - if self._data['validation-type'] == 'default' and len(self._data['validation-params']) > 0: - self.error(f"Invalid value '{self._data['validation']}' for validation") + if 'rights_owner' in self._data: + if type(self._data['rights_owner']) is str: + self._data['rights_owner'] = person_check([self._data['rights_owner'].strip()], 'rights_owners') + else: + self.error(f'rights_owner must be of type str (got: "{self._data["rights_owner"]}" of type: {type(self._data["rights_owner"])})') + else: + self._data['rights_owner'] = [] - if self._data['validation-type'] == 'custom': - for param in self._data['validation-params']: - if param not in['score', 'interactive']: - self.error(f"Invalid parameter '{param}' for custom validation") + if 'credits' in self._data: + if "traslators" in self._data['credits']: + translators = self._data['credits']['traslators'] + if type(translators) is str: + if translators.strip() == '': + self.error('"traslators" may not be an empty string, remove or set a translator / list of translators') + self._data['credits']['traslators'] = {'en': person_check(translators, 'EN_traslators')} - # Check limits - if not isinstance(self._data['limits'], dict): - self.error('Limits key in problem.yaml must specify a dict') - self._data['limits'] = ProblemConfig._OPTIONAL_CONFIG['limits'] + elif type(translators) is dict: + for lang, translator in translators.items(): + if type(lang) is not str: + self.error(f'Traslator language "{lang}" must be of type str (got: {type(lang)})') - # Some things not yet implemented - if self._data['libraries'] != '': - self.error("Libraries not yet supported") + if lang not in self._statement_languages: + self.error(f'Traslator language "{lang}" does not have a corresponding statement') - return self._check_res + self._data['credits']['traslators'][lang] = person_check(translator, f'traslators.{lang}') + else: + self.error(f'Traslator must be of type str or dict (got: {type(translators)})') + + similar_credit_fields = ['authors', 'contributors', 'testers', 'packagers', 'acknowledgements'] + + for field in similar_credit_fields: + if field in self._data['credits']: + persons = person_check(self._data['credits'][field], field) + self._data['credits'][field] = persons + if field == 'authors' and len(self._data['rights_owner']) == 0: + self._data['rights_owner'] = persons + + if 'source' in self._data: + ALLOWED_SOURCE_KEYS = {'name', 'url'} + if type(self._data['source']) in (str, dict): + self._data['source'] = [self._data['source']] + + if type(self._data['source']) is list: + for i, val in enumerate(self._data['source']): + if type(val) is str: + self._data['source'][i] = {'name': val} + elif type(val) is dict: + if 'name' not in val: + self.error(f'source needs to have key name (got: {val})') + + for k in val.keys(): + if k not in ALLOWED_SOURCE_KEYS: + self.warning(f'source has unknown key "{k}" (allowed keys: {ALLOWED_SOURCE_KEYS})') + elif type(val[k]) is not str: + self.error(f'source.{k} needs to be of type string (got: "{val[k]}")') + else: + self.error(f'each source needs to be a string or a map (got: "{val}")') + + if len(self._data['rights_owner']) == 0: + self._data['rights_owner'] = self._data['source'] + else: + self.error(f'source needs to be of a string, list or a map (got: "{self._data["source"]}")') + + if self._data['license'] != 'public domain': + if len(self._data['rights_owner']) == 0: + self.error('No author, source or rights_owner provided') + else: + if len(self._data['rights_owner']) != 0: + self.error('Can not have a rights_owner for a problem in public domain') + + if not self._data['license'] in ('unknown', 'public domain') and len(self._data['rights_owner']) == 0: + self.error('rights_owner needs to be defined when license is not "public domain" or "unknown"') + + if 'embargo_until' in self._data: + val = self._data['embargo_until'] + try: + if type(val) != str: + self.error(f'embargo_until needs to be of type string (got: "{val}")') + + if re.fullmatch(r'\d{4}-\d\d-\d\d', val): + self._data['embargo_until'] = datetime.datetime.strptime(val, '%Y-%m-%d').replace(tzinfo=datetime.timezone.utc) + elif re.fullmatch(r'\d{4}-\d\d-\d\dT\d\d:\d\d:\d\dZ?', val): + val = val.replace('Z', '') + self._data['embargo_until'] = datetime.datetime.strptime(val, '%Y-%m-%dT%H:%M:%S').replace(tzinfo=datetime.timezone.utc) + else: + self.error(f'embargo_until needs to be in the form "YYYY-MM-DD" or "YYYY-MM-DDThh:mm:ss" and will be treated as UTC0, (got: "{val}")') + except ValueError as e: + self.error(f'embargo_until contained invalid date: {e}') + + limits = self._data.get('limits', {}) + self._data['limits'] = copy.deepcopy(self.DEFAULT_LIMITS) + ints = {'memory', 'output', 'code', 'compilation_time', 'compilation_memory', 'validation_time', 'validation_memory', 'validation_output', 'validation_passes'} + allowed_keys_limits = ints | {'time_limit', 'time_resolution', 'time_multipliers'} + allowed_in_time_multipliers = {'ac_to_time_limit', 'time_limit_to_tle'} + for k in limits: + if k not in allowed_keys_limits: + self.warning(f"Unknown property limits.{k} was given") + if k == "time_multipliers": + for k2 in limits[k]: + if k2 not in allowed_in_time_multipliers: + self.warning(f"Unknown property limits.time_multipliers.{k2} was given") + + for i in ints: + if i not in limits: + continue + val = limits[i] + if type(val) is float: + val = int(val) + self.warning(f'limits.{i} was given as a float ({limits[i]}), interpreting as {val}') + elif type(val) is not int: + self.fatal(f'limits.{i} was of incorrect type, expected int (got: "{val}")') + + if i == 'validation_passes' and val < 2: + self.fatal(f'limits.validation_passes should be >= 2 (got: "{val}")') + elif val <= 0: + self.fatal(f'limits.{i} needs to be > 0 (got: {val})') + self._data['limits'][i] = limits[i] + + for f in ('time_resolution', 'time_limit'): + if f in limits: + if type(limits[f] not in (int, float)) or limits[f] <= 0: + self.fatal(f'limits.{f} needs to be a float greater than 0 (got: "{limits[f]}")') + self._data[f] = float(limits[f]) + + if 'time_multipliers' in limits: + if type(limits['time_multipliers']) is dict: + for k in ('ac_to_time_limit', 'time_limit_to_tle'): + if k in limits['time_multipliers']: + val = limits['time_multipliers'][k] + if type(val) not in (int, float) or val < 1: + self.fatal(f'limits.time_multipliers.{k} needs to be a float >= 1 (got: "{val}")') + self._data['limits']['ac_to_time_limit'][k] = float(val) + + self._data.setdefault('keywords', []) + for kw in self._data['keywords']: + if type(kw) is not str: + self.error(f'keywords contains non-string item ("{kw}")') + + + self._data.setdefault('languages', "all") + langs = self._data['languages'] + + # There are way more languages that should be here that are not in the languages.yaml ... + valid_languages = set(self.problem.language_config.languages.keys()) if hasattr(self.problem, 'language_config') else set() + + if type(langs) is str: + if langs.lower() != 'all': + self.error(f'Field "languages" should be "all" or a list of languages (got: "{langs}")') + self._data['languages'] = valid_languages + elif type(langs) is list: + for lang in langs: + if type(lang) is not str: + self.error(f'Each language in "languages" list must be of type "str" (got: "{langs}")') + elif lang not in valid_languages: + self.fatal(f'Field "languages" contains unknown language "{lang}"') + self._data['languages'] = langs + else: + self.fatal(f'Field "languages" should be "all" or a list of languages (got: "{langs}")') + + self._data.setdefault('allow_file_writing', False) + if type(self._data['allow_file_writing']) is not bool: + self.fatal(f'allow_file_writing should be of type bool (got: "{self._data["allow_file_writing"]}")') + + self._data.setdefault('constants', {}) + if type(self._data['constants']) is not dict: + self.fatal(f'constants must be maps (got: "{self._data["constants"]}")') + else: + for k, v in self._data['constants'].items(): + constant_regex = r'[a-zA-Z_][a-zA-Z0-9_]*' + if not re.fullmatch(constant_regex, k): + self.error(f'Invalid name for constant "{k}". Name must match "{constant_regex}"') + else: + if type(v) not in (int, float, string): + self.error(f'Invalid type for constant "{k}". Needs to be of type int, float or string. (got: "{v}")') + + def _check_unknown_fields(self): + known = {'problem_format_version', 'type', 'name', 'uuid', 'version', 'credits', 'source', 'license', 'rights_owner', 'embargo_until', 'limits', 'keywords', 'languages', 'allow_file_writing', 'constants'} + for field in self._data.keys(): + if field not in known: + self.warning(f'unknown field "{field}"') class ProblemTestCases(ProblemPart): - PART_NAME = 'testdata' - + @staticmethod def setup_dependencies(): - return {ProblemConfig} + return {ProblemConfigBase} def setup(self): self.testcase_by_infile = {} return { 'root_group': TestCaseGroup(self.problem, self.PART_NAME), - 'is_interactive': 'interactive' in self.problem.get(ProblemConfig)['validation-params'], - 'is_scoring': self.problem.get(ProblemConfig)['type'] == 'scoring' + 'is_interactive': 'interactive' in self.problem.get(Config_Legacy)['validation-params'], + 'is_scoring': self.problem.get(Config_Legacy)['type'] == 'scoring' } def check(self, context: Context) -> bool: @@ -1162,7 +1556,7 @@ def check(self, context: Context) -> bool: return self._check_res self._check_res = True - if self.problem.get(ProblemConfig)['type'] == 'pass-fail' and len(self._graders) > 0: + if self.problem.get(Config_Legacy)['type'] == 'pass-fail' and len(self._graders) > 0: self.error('There are grader programs but the problem is pass-fail') for grader in self._graders: @@ -1269,12 +1663,12 @@ def check(self, context: Context) -> bool: if isinstance(v, run.SourceCode) and v.language.lang_id not in recommended_output_validator_languages: self.warning('output validator language %s is not recommended' % v.language.name) - if self.problem.get(ProblemConfig)['validation'] == 'default' and self._validators: + if self.problem.get(Config_Legacy)['validation-type'] == 'default' and self._validators: self.error('There are validator programs but problem.yaml has validation = "default"') - elif self.problem.get(ProblemConfig)['validation'] != 'default' and not self._validators: + elif self.problem.get(Config_Legacy)['validation-type'] != 'default' and not self._validators: self.error('problem.yaml specifies custom validator but no validator programs found') - if self.problem.get(ProblemConfig)['validation'] == 'default' and self._default_validator is None: + if self.problem.get(Config_Legacy)['validation-type'] == 'default' and self._default_validator is None: self.error('Unable to locate default validator') for val in self._validators[:]: @@ -1287,7 +1681,7 @@ def check(self, context: Context) -> bool: # Only sanity check output validators if they all actually compiled if self._check_res: - flags = self.problem.get(ProblemConfig)['validator_flags'] + flags = self.problem.get(Config_Legacy)['validator_flags'] fd, file_name = tempfile.mkstemp() os.close(fd) @@ -1329,7 +1723,7 @@ def _get_feedback(feedback_dir: str) -> str|None: def _parse_validator_results(self, val, status: int, feedbackdir, testcase: TestCase) -> SubmissionResult: - custom_score = self.problem.get(ProblemConfig)['grading']['custom_scoring'] + custom_score = self.problem.get(Config_Legacy)['grading']['custom_scoring'] score = None # TODO: would be good to have some way of displaying the feedback for debugging uses score_file = os.path.join(feedbackdir, 'score.txt') @@ -1364,7 +1758,7 @@ def _parse_validator_results(self, val, status: int, feedbackdir, testcase: Test def _actual_validators(self) -> list: vals = self._validators - if self.problem.get(ProblemConfig)['validation'] == 'default': + if self.problem.get(Config_Legacy)['validation-type'] == 'default': vals = [self._default_validator] return [val for val in vals if val is not None] @@ -1380,10 +1774,10 @@ def validate_interactive(self, testcase: TestCase, submission, timelim: int, err # file descriptor, wall time lim initargs = ['1', str(2 * timelim)] validator_args = [testcase.infile, testcase.ansfile, ''] - submission_args = submission.get_runcmd(memlim=self.problem.get(ProblemConfig)['limits']['memory']) + submission_args = submission.get_runcmd(memlim=self.problem.get(Config_Legacy)['limits']['memory']) - val_timelim = self.problem.get(ProblemConfig)['limits']['validation_time'] - val_memlim = self.problem.get(ProblemConfig)['limits']['validation_memory'] + val_timelim = self.problem.get(Config_Legacy)['limits']['validation_time'] + val_memlim = self.problem.get(Config_Legacy)['limits']['validation_memory'] for val in self._actual_validators(): if val.compile()[0]: feedbackdir = tempfile.mkdtemp(prefix='feedback', dir=self.problem.tmpdir) @@ -1435,9 +1829,9 @@ def validate_interactive(self, testcase: TestCase, submission, timelim: int, err def validate(self, testcase: TestCase, submission_output: str) -> SubmissionResult: res = SubmissionResult('JE') - val_timelim = self.problem.get(ProblemConfig)['limits']['validation_time'] - val_memlim = self.problem.get(ProblemConfig)['limits']['validation_memory'] - flags = self.problem.get(ProblemConfig)['validator_flags'].split() + testcase.testcasegroup.config['output_validator_flags'].split() + val_timelim = self.problem.get(Config_Legacy)['limits']['validation_time'] + val_memlim = self.problem.get(Config_Legacy)['limits']['validation_memory'] + flags = self.problem.get(Config_Legacy)['validator_flags'].split() + testcase.testcasegroup.config['output_validator_flags'].split() for val in self._actual_validators(): if val.compile()[0]: feedbackdir = tempfile.mkdtemp(prefix='feedback', dir=self.problem.tmpdir) @@ -1646,14 +2040,14 @@ def check_submission(self, sub, context: Context, expected_verdict: Verdict, tim def full_score_finite(self) -> bool: min_score, max_score = self.problem.get(ProblemTestCases)['root_group'].get_score_range() - if self.problem.get(ProblemConfig)['grading']['objective'] == 'min': + if self.problem.get(Config_Legacy)['grading']['objective'] == 'min': return min_score != float('-inf') else: return max_score != float('inf') def fully_accepted(self, result: SubmissionResult) -> bool: min_score, max_score = self.problem.get(ProblemTestCases)['root_group'].get_score_range() - best_score = min_score if self.problem.get(ProblemConfig)['grading']['objective'] == 'min' else max_score + best_score = min_score if self.problem.get(Config_Legacy)['grading']['objective'] == 'min' else max_score return result.verdict == 'AC' and (not self.problem.get(ProblemTestCases)['is_scoring'] or result.score == best_score) def start_background_work(self, context: Context) -> None: @@ -1669,7 +2063,7 @@ def check(self, context: Context) -> bool: return self._check_res self._check_res = True - limits = self.problem.get(ProblemConfig)['limits'] + limits = self.problem.get(Config_Legacy)['limits'] time_multiplier = limits['time_multiplier'] safety_margin = limits['time_safety_margin'] @@ -1729,15 +2123,17 @@ def check(self, context: Context) -> bool: return self._check_res PROBLEM_FORMATS: dict[str, dict[str, list[Type[ProblemPart]]]] = { - 'legacy': { - 'config': [ProblemConfig], + formatversion.VERSION_LEGACY: { + #'config': [ProblemConfig], + 'config': [Config_Legacy], 'statement': [ProblemStatement, Attachments], 'validators': [InputValidators, OutputValidators], 'graders': [Graders], 'data': [ProblemTestCases], 'submissions': [Submissions], + #'fornow_remove': [Config_Legacy], }, - '2023-07': { # TODO: Add all the parts + formatversion.VERSION_2023_07: { # TODO: Add all the parts 'statement': [ProblemStatement, Attachments], } } @@ -1754,7 +2150,7 @@ class Problem(ProblemAspect): problem are listed. These should all be a subclass of ProblemPart. The dictionary is in the form of category -> part-types. You could for example have 'validators' -> [InputValidators, OutputValidators]. """ - def __init__(self, probdir: str, parts: dict[str, list[type]] = PROBLEM_FORMATS['legacy']): + def __init__(self, probdir: str, args: argparse.Namespace, parts: dict[str, list[type]] = PROBLEM_FORMATS[formatversion.VERSION_LEGACY]) -> None: self.part_mapping: dict[str, list[Type[ProblemPart]]] = parts self.aspects: set[type] = {v for s in parts.values() for v in s} self.probdir = os.path.realpath(probdir) @@ -1763,6 +2159,12 @@ def __init__(self, probdir: str, parts: dict[str, list[type]] = PROBLEM_FORMATS[ self.language_config = languages.load_language_config() self._data: dict[str, dict] = {} self.debug(f'Problem-format: {parts}') + self.problem_can_be_checked = True + self.args = args + ProblemAspect.errors = 0 + ProblemAspect.warnings = 0 + ProblemAspect.bail_on_error = self.args.bail_on_error + ProblemAspect.consider_warnings_errors = self.args.werror def get(self, part) -> dict: if isinstance(part, type) and issubclass(part, ProblemPart): @@ -1803,8 +2205,12 @@ def init(_class): self._data[_class.PART_NAME] = self._classes[_class.PART_NAME].setup() initialized.add(_class.PART_NAME) - for c in self.aspects: - init(c) + try: + for c in self.aspects: + init(c) + except VerifyError: + self.problem_can_be_checked = False + return self return self @@ -1814,17 +2220,12 @@ def __exit__(self, exc_type, exc_value, exc_traceback) -> None: def __str__(self) -> str: return str(self.shortname) - def check(self, args: argparse.Namespace) -> tuple[int, int]: - if self.shortname is None: - return 1, 0 + def check(self) -> tuple[int, int]: + if self.shortname is None or not self.problem_can_be_checked: + return ProblemAspect.errors, ProblemAspect.warnings - ProblemAspect.errors = 0 - ProblemAspect.warnings = 0 - ProblemAspect.bail_on_error = args.bail_on_error - ProblemAspect.consider_warnings_errors = args.werror - - executor = ThreadPoolExecutor(args.threads) if args.threads > 1 else None - context = Context(args, executor) + executor = ThreadPoolExecutor(self.args.threads) if self.args.threads > 1 else None + context = Context(self.args, executor) try: if not re.match('^[a-z0-9]+$', self.shortname): @@ -1835,7 +2236,7 @@ def check(self, args: argparse.Namespace) -> tuple[int, int]: run.limit.check_limit_capabilities(self) # Skip any parts that do not belong to the format - parts = [part for part in args.parts if part in self.part_mapping] + parts = [part for part in self.args.parts if part in self.part_mapping] if executor: for part in parts: @@ -1942,15 +2343,6 @@ def initialize_logging(args: argparse.Namespace) -> None: format=fmt, level=getattr(logging, args.log_level.upper())) -def detect_problem_version(path) -> str: - config_path = os.path.join(path, 'problem.yaml') - try: - with open(config_path) as f: - config: dict = yaml.safe_load(f) or {} - except Exception as e: - raise VerifyError(str(e)) - return config.get('problem_format_version', 'legacy') - def main() -> None: args = argparser().parse_args() @@ -1962,16 +2354,16 @@ def main() -> None: problem_version = args.problem_format if problem_version == 'automatic': try: - problem_version = detect_problem_version(problemdir) - except VerifyError as e: + problem_version = formatversion.detect_problem_version(problemdir) + except formatversion.VersionError as e: total_errors += 1 print(f'ERROR: problem version could not be decided for {os.path.basename(os.path.realpath(problemdir))}: {e}') continue print(f'Loading problem {os.path.basename(os.path.realpath(problemdir))} with format version {problem_version}') format = PROBLEM_FORMATS[problem_version] - with Problem(problemdir, format) as prob: - errors, warnings = prob.check(args) + with Problem(problemdir, args, format) as prob: + errors, warnings = prob.check() p = lambda x: '' if x == 1 else 's' print(f'{prob.shortname} tested: {errors} error{p(errors)}, {warnings} warning{p(warnings)}') total_errors += errors From ae32854b400e7b35f7487a2f18d8399d225e2504 Mon Sep 17 00:00:00 2001 From: zazmuz Date: Sun, 6 Apr 2025 11:14:25 +0200 Subject: [PATCH 2/6] make statement dir not found error more obvious made it more obvious if old folder name is used, or folder is misspelled made check error messages clearer for people newer to problemtools closest word functionality bugfix index error --- problemtools/verifyproblem.py | 56 ++++++++++++++++++++++++++++------- 1 file changed, 46 insertions(+), 10 deletions(-) diff --git a/problemtools/verifyproblem.py b/problemtools/verifyproblem.py index 205fd53f..93051e99 100644 --- a/problemtools/verifyproblem.py +++ b/problemtools/verifyproblem.py @@ -727,7 +727,9 @@ def setup(self): if os.path.isdir(dir): self.statements = [(m.group(0), m.group(2) or '') for file in os.listdir(dir) if (m := re.search(self.statement_regex, file))] else: - self.error(f"No directory named {self.format_data.statement_directory} found") + folders_found = [os.path.basename(f) for f in glob.glob(os.path.join(self.problem.probdir)+'/*') if os.path.isdir(f)] + folders_found = "'" + "', '".join(folders_found) + "'" + self.error(f'No directory named "{self.format_data.statement_directory}" found. Found folders: {folders_found}') self.statements = [] return self.get_config() @@ -842,6 +844,34 @@ def check(self, context: Context) -> bool: # TODO: if fatal has happened: abort return self._check_res + @staticmethod + def smallest_edit_dist(a: str, b: list[str] | set[str]) -> str: + def edit_dist(a: str, b: str) -> int: + n = len(a) + m = len(b) + dp = [[0] * (m + 1) for _ in range(n + 1)] + for i in range(n + 1): + dp[i][0] = i + for j in range(m + 1): + dp[0][j] = j + for i in range(1, n + 1): + for j in range(1, m + 1): + if a[i - 1] == b[j - 1]: + dp[i][j] = dp[i - 1][j - 1] + else: + dp[i][j] = 1 + min(dp[i - 1][j], dp[i][j - 1], dp[i - 1][j - 1]) + return dp[n][m] + if type(b) is set: + b = list(b) + best = b[0] + best_dist = edit_dist(a, best) + for s in b[1:]: + dist = edit_dist(a, s) + if dist < best_dist: + best = s + best_dist = dist + return best + class Config_Legacy(ProblemConfigBase): DEFAULT_LIMITS = { "time_multiplier": 5, @@ -1055,16 +1085,20 @@ def fix_config_structure(self): if type(self._data['type']) is str: if self._data['type'] not in self._PROBLEM_TYPES: val = self._data['type'] - self.error(f'Type expected to be one of {self._PROBLEM_TYPES} (got: {val})') + err = f'Type expected to be one of: {self._PROBLEM_TYPES}, got: "{val}".' + if ' ' in val: + err += f'\nIf you meant having multiple types: {val.split()}, use a YAML list instead of a string.' + self.fatal(err) + self._data['type'] = [] else: self._data['type'] = [self._data['type']] elif type(self._data['type']) is not list: - self.error(f'Type expected to be of type string or list (got: {self._data["type"]})') + self.fatal(f'Type expected to be of type string or list (got: {self._data["type"]})') types_chosen = set(self._data['type']) if len(types_chosen) == 0: - self.error('Type must contain at least one type') + self.fatal('Type must contain at least one valid type') elif len(types_chosen) != len(self._data['type']): self.error('Duplicates for field "type" is not allowed') @@ -1215,13 +1249,13 @@ def extract_email(person): if self._data['license'] != 'public domain': if len(self._data['rights_owner']) == 0: - self.error('No author, source or rights_owner provided') + self.error('No authors in "credits" and no "source" or "rights_owner" provided') else: if len(self._data['rights_owner']) != 0: - self.error('Can not have a rights_owner for a problem in public domain') + self.error('"rights_owner" is not allowed for a problem in with license "public domain"') if not self._data['license'] in ('unknown', 'public domain') and len(self._data['rights_owner']) == 0: - self.error('rights_owner needs to be defined when license is not "public domain" or "unknown"') + self.error('"rights_owner" needs to be defined when license is not "public domain" or "unknown"') if 'embargo_until' in self._data: val = self._data['embargo_until'] @@ -1246,11 +1280,12 @@ def extract_email(person): allowed_in_time_multipliers = {'ac_to_time_limit', 'time_limit_to_tle'} for k in limits: if k not in allowed_keys_limits: - self.warning(f"Unknown property limits.{k} was given") + closest = self.smallest_edit_dist(k, allowed_keys_limits) + self.warning(f'Unknown property "limits.{k}" was given. Did you mean "limits.{closest}"?') if k == "time_multipliers": for k2 in limits[k]: if k2 not in allowed_in_time_multipliers: - self.warning(f"Unknown property limits.time_multipliers.{k2} was given") + self.warning(f'Unknown property "limits.time_multipliers.{k2}" was given. Allowed keys: {allowed_in_time_multipliers}') for i in ints: if i not in limits: @@ -1281,7 +1316,7 @@ def extract_email(person): val = limits['time_multipliers'][k] if type(val) not in (int, float) or val < 1: self.fatal(f'limits.time_multipliers.{k} needs to be a float >= 1 (got: "{val}")') - self._data['limits']['ac_to_time_limit'][k] = float(val) + self._data['limits']['time_multipliers'][k] = float(val) self._data.setdefault('keywords', []) for kw in self._data['keywords']: @@ -2135,6 +2170,7 @@ def check(self, context: Context) -> bool: }, formatversion.VERSION_2023_07: { # TODO: Add all the parts 'statement': [ProblemStatement, Attachments], + 'config': [Config_2023_07], } } From 75707adcf995c590bde7be596a5b200f4e1aeb20 Mon Sep 17 00:00:00 2001 From: zazmuz Date: Sun, 6 Apr 2025 13:41:51 +0200 Subject: [PATCH 3/6] Field hints, updated usage of config and readded comment readded comment stating weird it is Name change and pytest fix --- problemtools/tests/test_verify_hello.py | 4 +- problemtools/verifyproblem.py | 82 +++++++++++++++---------- 2 files changed, 52 insertions(+), 34 deletions(-) diff --git a/problemtools/tests/test_verify_hello.py b/problemtools/tests/test_verify_hello.py index 49ff53aa..090e2d24 100644 --- a/problemtools/tests/test_verify_hello.py +++ b/problemtools/tests/test_verify_hello.py @@ -9,8 +9,8 @@ def test_load_hello(): args = verify.argparser().parse_args([string]) verify.initialize_logging(args) - with verify.Problem(string) as p: + with verify.Problem(string, args) as p: assert p.shortname == "hello" # pytest and fork don't go along very well, so just run aspects that work without run - assert p.getProblemPart(verify.ProblemConfig).check(args) + assert p.getProblemPart(verify.ProblemConfigLegacy).check(args) assert p.getProblemPart(verify.Attachments).check(args) diff --git a/problemtools/verifyproblem.py b/problemtools/verifyproblem.py index 93051e99..8585a6d3 100644 --- a/problemtools/verifyproblem.py +++ b/problemtools/verifyproblem.py @@ -269,7 +269,7 @@ def check(self, context: Context) -> bool: self.check_size_limits(self.ansfile) self._problem.getProblemPart(InputValidators).validate(self) anssize = os.path.getsize(self.ansfile) / 1024.0 / 1024.0 - outputlim = self._problem.get(Config_Legacy)['limits']['output'] + outputlim = self._problem.get(ProblemConfigLegacy)['limits']['output'] if anssize > outputlim: self.error(f'Answer file ({anssize:.1f} Mb) is larger than output limit ({outputlim} Mb), you need to increase output limit') elif 2 * anssize > outputlim: @@ -338,7 +338,7 @@ def run_submission_real(self, sub, context: Context, timelim: int, timelim_low: errfile = os.path.join(self._problem.tmpdir, f'error-{self.counter}') status, runtime = sub.run(infile=self.infile, outfile=outfile, errfile=errfile, timelim=timelim_high+1, - memlim=self._problem.get(Config_Legacy)['limits']['memory'], work_dir=sub.path) + memlim=self._problem.get(ProblemConfigLegacy)['limits']['memory'], work_dir=sub.path) if is_TLE(status) or runtime > timelim_high: res_high = SubmissionResult('TLE') elif is_RTE(status): @@ -428,9 +428,9 @@ def __init__(self, problem: Problem, aspect_name: str, datadir: str|None=None, p self.config[field] = parent_value # Some deprecated properties are inherited from problem config during a transition period - problem_grading = problem.get(Config_Legacy)['grading'] + problem_grading = problem.get(ProblemConfigLegacy)['grading'] for key in ['accept_score', 'reject_score', 'range']: - if key in problem.get(Config_Legacy)['grading']: + if key in problem.get(ProblemConfigLegacy)['grading']: self.config[key] = problem_grading[key] problem_on_reject = problem_grading.get('on_reject') @@ -439,7 +439,7 @@ def __init__(self, problem: Problem, aspect_name: str, datadir: str|None=None, p if problem_on_reject == 'grade': self.config['on_reject'] = 'continue' - if self._problem.get(Config_Legacy)['type'] == 'pass-fail': + if self._problem.get(ProblemConfigLegacy)['type'] == 'pass-fail': for key in TestCaseGroup._SCORING_ONLY_KEYS: if key not in self.config: self.config[key] = None @@ -833,15 +833,15 @@ def check_uuid(self, uuid: str) -> None: def check(self, context: Context) -> bool: - if self._check_res is not None: + if self._check_res is True: return self._check_res - self._check_res = True + elif self._check_res is not False: + self._check_res = True to_check = [prop for prop in dir(self) if prop.startswith('_check_') and callable(getattr(self, prop))] for prop in to_check: self.info(f'Checking "{prop}"') fun = getattr(self, prop) fun() - # TODO: if fatal has happened: abort return self._check_res @staticmethod @@ -872,7 +872,7 @@ def edit_dist(a: str, b: str) -> int: best_dist = dist return best -class Config_Legacy(ProblemConfigBase): +class ProblemConfigLegacy(ProblemConfigBase): DEFAULT_LIMITS = { "time_multiplier": 5, "time_safety_margin": 2, @@ -901,6 +901,7 @@ def fix_config_structure(self): if type(val) is not str: self.warning(f'name should be of type string (got: "{val}")') val = str(val) + # Ugly backwards compatibility hack? self._data['name'] = {'': val} self._data.setdefault('name', {}) self._data['name'].update(self.problem.get(ProblemStatement)) @@ -1026,9 +1027,13 @@ def _check_unknown_fields(self): known = {'problem_format_version', 'type', 'name', 'uuid', 'author', 'source', 'source_url', 'license', 'rights_owner', 'limits', 'validation', 'validation-type', 'validation-params', 'validator_flags', 'grading', 'keywords'} for field in self._data.keys(): if field not in known: - self.warning(f'unknown field "{field}"') + if field == 'scoring': + self.warning('Field "scoring" is deprecated, use "grading" instead') + continue + closest = self.smallest_edit_dist(field, known) + self.warning(f'Unknown field "{field}", did you mean "{closest}"?') -class Config_2023_07(ProblemConfigBase): +class ProblemConfig2023_07(ProblemConfigBase): DEFAULT_LIMITS = { "time_multipliers": { @@ -1362,9 +1367,22 @@ def extract_email(person): def _check_unknown_fields(self): known = {'problem_format_version', 'type', 'name', 'uuid', 'version', 'credits', 'source', 'license', 'rights_owner', 'embargo_until', 'limits', 'keywords', 'languages', 'allow_file_writing', 'constants'} + old_warning_help = { + 'grading': ' and testdata.yaml is updated instead.', + 'author': ', use "credits.authors" instead.', + 'source_url': ', use "source.url" instead.', + 'validation': ', look at Result Aggregation in the documentation.', + 'validator_flags': '. Instead use either "output_validator_args" or "input_validator_args" in testdata.yaml', + } for field in self._data.keys(): + field = field.lower().strip() if field not in known: - self.warning(f'unknown field "{field}"') + if field in old_warning_help: + self.warning(f'Field "{field}" is removed in 2023-07'+old_warning_help[field]) + continue + else: + closest = self.smallest_edit_dist(field, known) + self.warning(f'Unknown field "{field}", did you mean "{closest}"?') class ProblemTestCases(ProblemPart): PART_NAME = 'testdata' @@ -1377,8 +1395,8 @@ def setup(self): self.testcase_by_infile = {} return { 'root_group': TestCaseGroup(self.problem, self.PART_NAME), - 'is_interactive': 'interactive' in self.problem.get(Config_Legacy)['validation-params'], - 'is_scoring': self.problem.get(Config_Legacy)['type'] == 'scoring' + 'is_interactive': 'interactive' in self.problem.get(ProblemConfigLegacy)['validation-params'], + 'is_scoring': self.problem.get(ProblemConfigLegacy)['type'] == 'scoring' } def check(self, context: Context) -> bool: @@ -1591,7 +1609,7 @@ def check(self, context: Context) -> bool: return self._check_res self._check_res = True - if self.problem.get(Config_Legacy)['type'] == 'pass-fail' and len(self._graders) > 0: + if self.problem.get(ProblemConfigLegacy)['type'] == 'pass-fail' and len(self._graders) > 0: self.error('There are grader programs but the problem is pass-fail') for grader in self._graders: @@ -1698,12 +1716,12 @@ def check(self, context: Context) -> bool: if isinstance(v, run.SourceCode) and v.language.lang_id not in recommended_output_validator_languages: self.warning('output validator language %s is not recommended' % v.language.name) - if self.problem.get(Config_Legacy)['validation-type'] == 'default' and self._validators: + if self.problem.get(ProblemConfigLegacy)['validation-type'] == 'default' and self._validators: self.error('There are validator programs but problem.yaml has validation = "default"') - elif self.problem.get(Config_Legacy)['validation-type'] != 'default' and not self._validators: + elif self.problem.get(ProblemConfigLegacy)['validation-type'] != 'default' and not self._validators: self.error('problem.yaml specifies custom validator but no validator programs found') - if self.problem.get(Config_Legacy)['validation-type'] == 'default' and self._default_validator is None: + if self.problem.get(ProblemConfigLegacy)['validation-type'] == 'default' and self._default_validator is None: self.error('Unable to locate default validator') for val in self._validators[:]: @@ -1716,7 +1734,7 @@ def check(self, context: Context) -> bool: # Only sanity check output validators if they all actually compiled if self._check_res: - flags = self.problem.get(Config_Legacy)['validator_flags'] + flags = self.problem.get(ProblemConfigLegacy)['validator_flags'] fd, file_name = tempfile.mkstemp() os.close(fd) @@ -1758,7 +1776,7 @@ def _get_feedback(feedback_dir: str) -> str|None: def _parse_validator_results(self, val, status: int, feedbackdir, testcase: TestCase) -> SubmissionResult: - custom_score = self.problem.get(Config_Legacy)['grading']['custom_scoring'] + custom_score = self.problem.get(ProblemConfigLegacy)['grading']['custom_scoring'] score = None # TODO: would be good to have some way of displaying the feedback for debugging uses score_file = os.path.join(feedbackdir, 'score.txt') @@ -1793,7 +1811,7 @@ def _parse_validator_results(self, val, status: int, feedbackdir, testcase: Test def _actual_validators(self) -> list: vals = self._validators - if self.problem.get(Config_Legacy)['validation-type'] == 'default': + if self.problem.get(ProblemConfigLegacy)['validation-type'] == 'default': vals = [self._default_validator] return [val for val in vals if val is not None] @@ -1809,10 +1827,10 @@ def validate_interactive(self, testcase: TestCase, submission, timelim: int, err # file descriptor, wall time lim initargs = ['1', str(2 * timelim)] validator_args = [testcase.infile, testcase.ansfile, ''] - submission_args = submission.get_runcmd(memlim=self.problem.get(Config_Legacy)['limits']['memory']) + submission_args = submission.get_runcmd(memlim=self.problem.get(ProblemConfigLegacy)['limits']['memory']) - val_timelim = self.problem.get(Config_Legacy)['limits']['validation_time'] - val_memlim = self.problem.get(Config_Legacy)['limits']['validation_memory'] + val_timelim = self.problem.get(ProblemConfigLegacy)['limits']['validation_time'] + val_memlim = self.problem.get(ProblemConfigLegacy)['limits']['validation_memory'] for val in self._actual_validators(): if val.compile()[0]: feedbackdir = tempfile.mkdtemp(prefix='feedback', dir=self.problem.tmpdir) @@ -1864,9 +1882,9 @@ def validate_interactive(self, testcase: TestCase, submission, timelim: int, err def validate(self, testcase: TestCase, submission_output: str) -> SubmissionResult: res = SubmissionResult('JE') - val_timelim = self.problem.get(Config_Legacy)['limits']['validation_time'] - val_memlim = self.problem.get(Config_Legacy)['limits']['validation_memory'] - flags = self.problem.get(Config_Legacy)['validator_flags'].split() + testcase.testcasegroup.config['output_validator_flags'].split() + val_timelim = self.problem.get(ProblemConfigLegacy)['limits']['validation_time'] + val_memlim = self.problem.get(ProblemConfigLegacy)['limits']['validation_memory'] + flags = self.problem.get(ProblemConfigLegacy)['validator_flags'].split() + testcase.testcasegroup.config['output_validator_flags'].split() for val in self._actual_validators(): if val.compile()[0]: feedbackdir = tempfile.mkdtemp(prefix='feedback', dir=self.problem.tmpdir) @@ -2075,14 +2093,14 @@ def check_submission(self, sub, context: Context, expected_verdict: Verdict, tim def full_score_finite(self) -> bool: min_score, max_score = self.problem.get(ProblemTestCases)['root_group'].get_score_range() - if self.problem.get(Config_Legacy)['grading']['objective'] == 'min': + if self.problem.get(ProblemConfigLegacy)['grading']['objective'] == 'min': return min_score != float('-inf') else: return max_score != float('inf') def fully_accepted(self, result: SubmissionResult) -> bool: min_score, max_score = self.problem.get(ProblemTestCases)['root_group'].get_score_range() - best_score = min_score if self.problem.get(Config_Legacy)['grading']['objective'] == 'min' else max_score + best_score = min_score if self.problem.get(ProblemConfigLegacy)['grading']['objective'] == 'min' else max_score return result.verdict == 'AC' and (not self.problem.get(ProblemTestCases)['is_scoring'] or result.score == best_score) def start_background_work(self, context: Context) -> None: @@ -2098,7 +2116,7 @@ def check(self, context: Context) -> bool: return self._check_res self._check_res = True - limits = self.problem.get(Config_Legacy)['limits'] + limits = self.problem.get(ProblemConfigLegacy)['limits'] time_multiplier = limits['time_multiplier'] safety_margin = limits['time_safety_margin'] @@ -2160,7 +2178,7 @@ def check(self, context: Context) -> bool: PROBLEM_FORMATS: dict[str, dict[str, list[Type[ProblemPart]]]] = { formatversion.VERSION_LEGACY: { #'config': [ProblemConfig], - 'config': [Config_Legacy], + 'config': [ProblemConfigLegacy], 'statement': [ProblemStatement, Attachments], 'validators': [InputValidators, OutputValidators], 'graders': [Graders], @@ -2170,7 +2188,7 @@ def check(self, context: Context) -> bool: }, formatversion.VERSION_2023_07: { # TODO: Add all the parts 'statement': [ProblemStatement, Attachments], - 'config': [Config_2023_07], + 'config': [ProblemConfig2023_07], } } From cf4eb8d2c89e78c24a324aa4584524cb292cefa3 Mon Sep 17 00:00:00 2001 From: zazmuz Date: Mon, 7 Apr 2025 12:45:03 +0200 Subject: [PATCH 4/6] removal of possessive quantifiers in regex statement since it was introduced in python 3.12 --- problemtools/verifyproblem.py | 4 ++-- 1 file changed, 2 insertions(+), 2 deletions(-) diff --git a/problemtools/verifyproblem.py b/problemtools/verifyproblem.py index 8585a6d3..9c146ec9 100644 --- a/problemtools/verifyproblem.py +++ b/problemtools/verifyproblem.py @@ -828,8 +828,8 @@ def fix_config_structure(self): pass def check_uuid(self, uuid: str) -> None: - if not re.match(r'^[0-9a-f]{8}-[0-9a-f]{4}-[0-9a-f]{4}-[0-9a-f]{4}-[0-9a-f]{12}+$', uuid): - self.error(f'UUID "{uuid}" is not a valid UUID') + if not re.match(r'^[0-9a-f]{8}-[0-9a-f]{4}-[0-9a-f]{4}-[0-9a-f]{4}-[0-9a-f]{12}$', uuid): + self.error(f'"{uuid}" is not a valid UUID') def check(self, context: Context) -> bool: From 928cfc1a2c3573d3cf32308e9e8db30c45b5f654 Mon Sep 17 00:00:00 2001 From: zazmuz Date: Thu, 10 Apr 2025 09:21:03 +0200 Subject: [PATCH 5/6] mypy error fixes and merge Merged conflict in hello test --- .gitignore | 5 ++ mypy.ini | 15 ------ problemtools/languages.py | 15 ++---- problemtools/run/checktestdata.py | 8 +-- problemtools/run/program.py | 2 +- problemtools/run/viva.py | 8 +-- problemtools/template.py | 10 ++-- problemtools/tests/test_languages.py | 6 +-- problemtools/tests/test_output_validator.py | 2 +- problemtools/tests/test_verify_hello.py | 5 +- problemtools/verifyproblem.py | 59 ++++++++++++--------- 11 files changed, 66 insertions(+), 69 deletions(-) diff --git a/.gitignore b/.gitignore index 2f372208..0a24bfa5 100644 --- a/.gitignore +++ b/.gitignore @@ -1,7 +1,12 @@ *.pyc *~ +*.swp /.cache/ /problemtools.egg-info/ /support/default_validator/default_validator /support/interactive/interactive build/ + +venv/ +.pytest_cache/ +.mypy_cache/ diff --git a/mypy.ini b/mypy.ini index ff182a3e..7575ab90 100644 --- a/mypy.ini +++ b/mypy.ini @@ -4,20 +4,5 @@ install_types = True check_untyped_defs = True ignore_errors = False -[mypy-problemtools.tests.*] -ignore_errors = True - [mypy-problemtools.generatedata] ignore_errors = True - -[mypy-problemtools.languages] -ignore_errors = True - -[mypy-problemtools.template] -ignore_errors = True - -[mypy-problemtools.run.checktestdata] -ignore_errors = True - -[mypy-problemtools.run.viva] -ignore_errors = True diff --git a/problemtools/languages.py b/problemtools/languages.py index 0da66722..a0395405 100644 --- a/problemtools/languages.py +++ b/problemtools/languages.py @@ -52,7 +52,7 @@ def get_source_files(self, file_list): """ return [file_name for file_name in file_list if (any(fnmatch.fnmatch(file_name, glob) - for glob in self.files) + for glob in self.files) # type: ignore[union-attr] and self.__matches_shebang(file_name))] @@ -105,18 +105,13 @@ def __check(self): """ # Check that all mandatory fields are provided if self.name is None: - raise LanguageConfigError( - 'Language %s has no name' % self.lang_id) + raise LanguageConfigError(f'Language {self.lang_id} has no name') if self.priority is None: - raise LanguageConfigError( - 'Language %s has no priority' % self.lang_id) + raise LanguageConfigError(f'Language {self.lang_id} has no priority') if self.files is None: - raise LanguageConfigError( - - 'Language %s has no files glob' % self.lang_id) + raise LanguageConfigError(f'Language {self.lang_id} has no files glob') if self.run is None: - raise LanguageConfigError( - 'Language %s has no run command' % self.lang_id) + raise LanguageConfigError(f'Language {self.lang_id} has no run command') # Check that all variables appearing are valid variables = Language.__variables_in_command(self.run) diff --git a/problemtools/run/checktestdata.py b/problemtools/run/checktestdata.py index 28abe779..1bad387b 100644 --- a/problemtools/run/checktestdata.py +++ b/problemtools/run/checktestdata.py @@ -41,8 +41,8 @@ def do_compile(self) -> tuple[bool, str|None]: return ((os.WIFEXITED(status) and os.WEXITSTATUS(status) in [0, 1]), None) - def run(self, infile='/dev/null', outfile='/dev/null', - errfile='/dev/null', args=None, timelim=1000): + def run(self, infile='/dev/null', outfile='/dev/null', errfile='/dev/null', + args=None, timelim=1000, memlim=1024, work_dir=None): """Run the Checktestdata script to validate an input file. Args: @@ -66,7 +66,9 @@ def run(self, infile='/dev/null', outfile='/dev/null', outfile=outfile, errfile=errfile, args=args, - timelim=timelim) + timelim=timelim, + memlim=memlim, + work_dir=work_dir) # This is ugly, switches the accept exit status and our accept # exit status 42. if os.WIFEXITED(status) and os.WEXITSTATUS(status) == 0: diff --git a/problemtools/run/program.py b/problemtools/run/program.py index 49cca332..4bc62719 100644 --- a/problemtools/run/program.py +++ b/problemtools/run/program.py @@ -34,7 +34,7 @@ def run(self, infile='/dev/null', outfile='/dev/null', errfile='/dev/null', Args: infile (str): name of file to pass on stdin outfile (str): name of file to send stdout to - errfile (str): name of file to send stderr ro + errfile (str): name of file to send stderr to args (list of str): additional command-line arguments to pass to the program timelim (int): CPU time limit in seconds diff --git a/problemtools/run/viva.py b/problemtools/run/viva.py index cddc0f3b..06beeba6 100644 --- a/problemtools/run/viva.py +++ b/problemtools/run/viva.py @@ -40,8 +40,8 @@ def do_compile(self) -> tuple[bool, str|None]: return ((os.WIFEXITED(status) and os.WEXITSTATUS(status) == 0), None) - def run(self, infile='/dev/null', outfile='/dev/null', - errfile='/dev/null', args=None, timelim=1000): + def run(self, infile='/dev/null', outfile='/dev/null', errfile='/dev/null', + args=None, timelim=1000, memlim=1024, work_dir=None): """Run the VIVA script to validate an input file. Args: @@ -68,7 +68,9 @@ def run(self, infile='/dev/null', outfile='/dev/null', (status, runtime) = super(Viva, self).run(outfile=outfile, errfile=errfile, args=args, - timelim=timelim) + timelim=timelim, + memlim=memlim, + work_dir=work_dir) # This is ugly, switches the accept exit status and our accept # exit status 42. if os.WIFEXITED(status) and os.WEXITSTATUS(status) == 0: diff --git a/problemtools/template.py b/problemtools/template.py index f0c7bc4b..d1796001 100644 --- a/problemtools/template.py +++ b/problemtools/template.py @@ -34,7 +34,7 @@ def __init__(self, problemdir, language=None, force_copy_cls=False, version="aut if glob.glob(os.path.join(stmtdir, 'problem.tex')): langs.append('') for f in glob.glob(os.path.join(stmtdir, 'problem.[a-z][a-z].tex')): - langs.append(re.search("problem.([a-z][a-z]).tex$", f).group(1)) + langs.append(re.search("problem.([a-z][a-z]).tex$", f).group(1)) # type: ignore[union-attr] if len(langs) == 0: raise Exception('No problem statements available') @@ -69,10 +69,10 @@ def __init__(self, problemdir, language=None, force_copy_cls=False, version="aut templatepaths = [os.path.join(os.path.dirname(__file__), 'templates/latex'), os.path.join(os.path.dirname(__file__), '../templates/latex'), '/usr/lib/problemtools/templates/latex'] - self.templatepath = next((p for p in templatepaths - if os.path.isdir(p) and os.path.isfile(os.path.join(p, self.templatefile))), - None) - if self.templatepath is None: + try: + self.templatepath = next((p for p in templatepaths + if os.path.isdir(p) and os.path.isfile(os.path.join(p, self.templatefile)))) + except StopIteration: raise Exception('Could not find directory with latex template "%s"' % self.templatefile) self.basedir = os.path.dirname(problemdir) diff --git a/problemtools/tests/test_languages.py b/problemtools/tests/test_languages.py index 01057a24..e6d70829 100644 --- a/problemtools/tests/test_languages.py +++ b/problemtools/tests/test_languages.py @@ -34,7 +34,7 @@ def test_update(self): assert lang.files == ['*'] lang.update({'shebang': 'new.*end'}) - assert lang.shebang.match('newfilend') + assert lang.shebang is not None and lang.shebang.match('newfilend') with pytest.raises(languages.LanguageConfigError): # ambiguous entry point @@ -52,9 +52,9 @@ def test_update(self): def test_invalid_id(self): vals = self.__language_dict() with pytest.raises(TypeError): - languages.Language(None, vals) + languages.Language(None, vals) # type: ignore with pytest.raises(TypeError): - languages.Language(42, vals) + languages.Language(42, vals) # type: ignore with pytest.raises(languages.LanguageConfigError): languages.Language('åäö', vals) with pytest.raises(languages.LanguageConfigError): diff --git a/problemtools/tests/test_output_validator.py b/problemtools/tests/test_output_validator.py index afd6f2d4..e92f3317 100644 --- a/problemtools/tests/test_output_validator.py +++ b/problemtools/tests/test_output_validator.py @@ -13,7 +13,7 @@ def test_output_validator_feedback(): text = "".join(r.choices(string.printable)) feedback.write_text(text) data = OutputValidators._get_feedback(directory) - assert text in data + assert data is not None and text in data def test_output_validator_feedback_non_unicode(): diff --git a/problemtools/tests/test_verify_hello.py b/problemtools/tests/test_verify_hello.py index 090e2d24..ee58cce7 100644 --- a/problemtools/tests/test_verify_hello.py +++ b/problemtools/tests/test_verify_hello.py @@ -8,9 +8,10 @@ def test_load_hello(): args = verify.argparser().parse_args([string]) verify.initialize_logging(args) + context = verify.Context(args, None) with verify.Problem(string, args) as p: assert p.shortname == "hello" # pytest and fork don't go along very well, so just run aspects that work without run - assert p.getProblemPart(verify.ProblemConfigLegacy).check(args) - assert p.getProblemPart(verify.Attachments).check(args) + assert p.getProblemPart(verify.ProblemConfigLegacy).check(context) + assert p.getProblemPart(verify.Attachments).check(context) diff --git a/problemtools/verifyproblem.py b/problemtools/verifyproblem.py index 9c146ec9..b55bf4ad 100644 --- a/problemtools/verifyproblem.py +++ b/problemtools/verifyproblem.py @@ -717,7 +717,7 @@ def all_datasets(self) -> list: class ProblemStatement(ProblemPart): PART_NAME = 'statement' - def setup(self): + def setup(self) -> dict[str, str]: self.format_data = formatversion.get_format_data(self.problem.probdir) if not self.format_data: raise NotImplementedError('No version selected.') @@ -728,8 +728,8 @@ def setup(self): self.statements = [(m.group(0), m.group(2) or '') for file in os.listdir(dir) if (m := re.search(self.statement_regex, file))] else: folders_found = [os.path.basename(f) for f in glob.glob(os.path.join(self.problem.probdir)+'/*') if os.path.isdir(f)] - folders_found = "'" + "', '".join(folders_found) + "'" - self.error(f'No directory named "{self.format_data.statement_directory}" found. Found folders: {folders_found}') + folders_found_str = "'" + "', '".join(folders_found) + "'" + self.error(f'No directory named "{self.format_data.statement_directory}" found. Found folders: {folders_found_str}') self.statements = [] return self.get_config() @@ -773,8 +773,8 @@ def check(self, context: Context) -> bool: def __str__(self) -> str: return 'problem statement' - def get_config(self) -> dict[str, dict[str, str]]: - ret: dict[str, dict[str, str]] = {} + def get_config(self) -> dict[str, str]: + ret: dict[str, str] = {} for filename, lang in self.statements: dir = os.path.join(self.problem.probdir, self.format_data.statement_directory) with open(os.path.join(dir, filename)) as f: @@ -782,7 +782,10 @@ def get_config(self) -> dict[str, dict[str, str]]: hit = re.search(r'\\problemname{(.*)}', stmt, re.MULTILINE) if hit: problem_name = hit.group(1).strip() + assert type(problem_name) is str ret[lang] = problem_name + else: + self.error(f'Problem name not found in {filename}') return ret @@ -798,7 +801,7 @@ def setup_dependencies(): def setup(self): self.debug(' Loading problem config') self.configfile = os.path.join(self.problem.probdir, 'problem.yaml') - self._data = {} + self._data : dict[str, Any] = {} if os.path.isfile(self.configfile): try: @@ -816,6 +819,8 @@ def common_fix_config_structure(self): for field, value in self._data.items(): if value is None: self.error(f"Field '{field}' provided in problem.yaml but is empty") + elif type(value) is str: + self._data[field] = value.strip() self._data.setdefault('license', 'unknown') if type(self._data['license']) is str: @@ -863,6 +868,7 @@ def edit_dist(a: str, b: str) -> int: return dp[n][m] if type(b) is set: b = list(b) + assert type(b) is list best = b[0] best_dist = edit_dist(a, best) for s in b[1:]: @@ -1055,7 +1061,7 @@ class ProblemConfig2023_07(ProblemConfigBase): _REQUIRED_FIELDS = ['problem_format_version', 'name', 'uuid'] _PROBLEM_TYPES = {'pass-fail', 'scoring', 'interactive', 'multi-pass', 'submit-answer'} - _statement_languages = set() + _statement_languages : set[str] = set() def fix_config_structure(self): self._statement_languages = set(self.problem.get(ProblemStatement).keys()) for REQUIRED_FIELD in self._REQUIRED_FIELDS: @@ -1235,7 +1241,7 @@ def extract_email(person): for i, val in enumerate(self._data['source']): if type(val) is str: self._data['source'][i] = {'name': val} - elif type(val) is dict: + elif isinstance(val, dict): if 'name' not in val: self.error(f'source needs to have key name (got: {val})') @@ -1292,21 +1298,22 @@ def extract_email(person): if k2 not in allowed_in_time_multipliers: self.warning(f'Unknown property "limits.time_multipliers.{k2}" was given. Allowed keys: {allowed_in_time_multipliers}') - for i in ints: - if i not in limits: + for int_field in ints: + if int_field not in limits: continue - val = limits[i] - if type(val) is float: - val = int(val) - self.warning(f'limits.{i} was given as a float ({limits[i]}), interpreting as {val}') - elif type(val) is not int: - self.fatal(f'limits.{i} was of incorrect type, expected int (got: "{val}")') - - if i == 'validation_passes' and val < 2: - self.fatal(f'limits.validation_passes should be >= 2 (got: "{val}")') - elif val <= 0: - self.fatal(f'limits.{i} needs to be > 0 (got: {val})') - self._data['limits'][i] = limits[i] + intfield_val = limits[int_field] + if type(intfield_val) is float: + intfield_val = int(intfield_val) + + self.warning(f'limits.{int_field} was given as a float ({limits[int_field]}), interpreting as {intfield_val}') + elif type(intfield_val) is not int: + self.fatal(f'limits.{int_field} was of incorrect type, expected int (got: "{intfield_val}")') + + if int_field == 'validation_passes' and intfield_val < 2: + self.fatal(f'limits.validation_passes should be >= 2 (got: "{intfield_val}")') + elif intfield_val <= 0: + self.fatal(f'limits.{int_field} needs to be > 0 (got: {intfield_val})') + self._data['limits'][int_field] = intfield_val for f in ('time_resolution', 'time_limit'): if f in limits: @@ -1318,10 +1325,10 @@ def extract_email(person): if type(limits['time_multipliers']) is dict: for k in ('ac_to_time_limit', 'time_limit_to_tle'): if k in limits['time_multipliers']: - val = limits['time_multipliers'][k] - if type(val) not in (int, float) or val < 1: - self.fatal(f'limits.time_multipliers.{k} needs to be a float >= 1 (got: "{val}")') - self._data['limits']['time_multipliers'][k] = float(val) + mult_val = limits['time_multipliers'][k] + if type(mult_val) not in (int, float) or mult_val < 1: + self.fatal(f'limits.time_multipliers.{k} needs to be a float >= 1 (got: "{mult_val}")') + self._data['limits']['time_multipliers'][k] = float(mult_val) self._data.setdefault('keywords', []) for kw in self._data['keywords']: From 9aee251fe8e99b3c6afe12693275e799f2e50862 Mon Sep 17 00:00:00 2001 From: zazmuz Date: Thu, 10 Apr 2025 13:20:34 +0200 Subject: [PATCH 6/6] fixed some CodeFactor issues --- problemtools/verifyproblem.py | 11 ++++------- 1 file changed, 4 insertions(+), 7 deletions(-) diff --git a/problemtools/verifyproblem.py b/problemtools/verifyproblem.py index b55bf4ad..a8500a58 100644 --- a/problemtools/verifyproblem.py +++ b/problemtools/verifyproblem.py @@ -577,7 +577,7 @@ def check(self, context: Context) -> bool: for filename in files: filepath = os.path.join(root, filename) if filepath.endswith('.in') and not os.path.islink(filepath): - md5 = hashlib.md5() + md5 = hashlib.md5(usedforsecurity=False) with open(filepath, 'rb') as f: for buf in iter(lambda: f.read(1024), b''): md5.update(buf) @@ -961,6 +961,8 @@ def fix_config_structure(self): elif self._data['grading']['show_test_data_groups'] and self._data['type'] == 'pass-fail': self.error("Showing test data groups is only supported for scoring problems, this is a pass-fail problem") else: + if self._data['type'] != 'pass-fail' and self.problem.get(ProblemTestCases)['root_group'].has_custom_groups() and 'show_test_data_groups' not in self._data.get('grading', {}): + self.warning("Problem has custom testcase groups, but does not specify a value for grading.show_test_data_groups; defaulting to false") self._data['grading']['show_test_data_groups'] = False if 'on_reject' in self._data['grading']: @@ -1023,11 +1025,6 @@ def _check_source_url(self): if 'source_url' in self._data: self.error('source needs to be defined when source_url is defined') - def _check_custom_groups(self): - # TODO: implement the check - pass - #if self._data['type'] != 'pass-fail' and self.problem.get(ProblemTestCases)['root_group'].has_custom_groups() and 'show_test_data_groups' not in self._origdata.get('grading', {}): - # self.warning("Problem has custom testcase groups, but does not specify a value for grading.show_test_data_groups; defaulting to false") def _check_unknown_fields(self): known = {'problem_format_version', 'type', 'name', 'uuid', 'author', 'source', 'source_url', 'license', 'rights_owner', 'limits', 'validation', 'validation-type', 'validation-params', 'validator_flags', 'grading', 'keywords'} @@ -2211,7 +2208,7 @@ class Problem(ProblemAspect): problem are listed. These should all be a subclass of ProblemPart. The dictionary is in the form of category -> part-types. You could for example have 'validators' -> [InputValidators, OutputValidators]. """ - def __init__(self, probdir: str, args: argparse.Namespace, parts: dict[str, list[type]] = PROBLEM_FORMATS[formatversion.VERSION_LEGACY]) -> None: + def __init__(self, probdir: str, args: argparse.Namespace, parts: dict[str, list[type]] = PROBLEM_FORMATS[formatversion.VERSION_LEGACY]) -> None: self.part_mapping: dict[str, list[Type[ProblemPart]]] = parts self.aspects: set[type] = {v for s in parts.values() for v in s} self.probdir = os.path.realpath(probdir)