|
| 1 | +from configparser import ConfigParser |
| 2 | +from configparser import ExtendedInterpolation |
| 3 | +from pathlib import Path |
| 4 | +from urllib import parse |
| 5 | +from urllib import request |
| 6 | + |
| 7 | +import os |
| 8 | +import tempfile |
| 9 | +import typing |
| 10 | + |
| 11 | + |
| 12 | +def resolve_dependencies( |
| 13 | + file_or_url: typing.Union[str, Path], |
| 14 | + tmpdir: str, |
| 15 | + http_parent=None, |
| 16 | +) -> typing.List[Path]: |
| 17 | + """Resolve dependencies of a file or url |
| 18 | +
|
| 19 | + The result is a list of Path objects, starting with the |
| 20 | + given file_or_url and followed by all file_or_urls referenced from it. |
| 21 | +
|
| 22 | + The file_or_url is assumed to be a ini file or url to such, with an option key "include" |
| 23 | + under the "[settings]" section. |
| 24 | + """ |
| 25 | + if isinstance(file_or_url, str): |
| 26 | + if http_parent: |
| 27 | + file_or_url = parse.urljoin(http_parent, file_or_url) |
| 28 | + parsed = parse.urlparse(str(file_or_url)) |
| 29 | + if parsed.scheme: |
| 30 | + with request.urlopen(str(file_or_url)) as fio: |
| 31 | + tf = tempfile.NamedTemporaryFile( |
| 32 | + suffix=".ini", |
| 33 | + dir=str(tmpdir), |
| 34 | + delete=False, |
| 35 | + ) |
| 36 | + tf.write(fio.read()) |
| 37 | + tf.flush() |
| 38 | + file = Path(tf.name) |
| 39 | + parts = list(parsed) |
| 40 | + parts[2] = str(Path(parts[2]).parent) |
| 41 | + http_parent = parse.urlunparse(parts) |
| 42 | + else: |
| 43 | + file = Path(file_or_url) |
| 44 | + else: |
| 45 | + file = file_or_url |
| 46 | + if not file.exists(): |
| 47 | + raise FileNotFoundError(file) |
| 48 | + cfg = ConfigParser() |
| 49 | + cfg.read(file) |
| 50 | + if not ("settings" in cfg and "include" in cfg["settings"]): |
| 51 | + return [file] |
| 52 | + file_list = [] |
| 53 | + for include in cfg["settings"]["include"].split("\n"): |
| 54 | + include = include.strip() |
| 55 | + if not include: |
| 56 | + continue |
| 57 | + if http_parent or parse.urlparse(include).scheme: |
| 58 | + file_list += resolve_dependencies(include, tmpdir, http_parent) |
| 59 | + else: |
| 60 | + file_list += resolve_dependencies(file.parent / include, tmpdir) |
| 61 | + |
| 62 | + file_list.append(file) |
| 63 | + return file_list |
| 64 | + |
| 65 | + |
| 66 | +def read_with_included(file_or_url: typing.Union[str, Path]) -> ConfigParser: |
| 67 | + """Read a file or url and include all referenced files, |
| 68 | +
|
| 69 | + Parse the result as a ConfigParser and return it. |
| 70 | + """ |
| 71 | + cfg = ConfigParser( |
| 72 | + default_section="settings", |
| 73 | + interpolation=ExtendedInterpolation(), |
| 74 | + ) |
| 75 | + cfg.optionxform = str # type: ignore |
| 76 | + cfg["settings"]["directory"] = os.getcwd() |
| 77 | + with tempfile.TemporaryDirectory() as tmpdir: |
| 78 | + resolved = resolve_dependencies(file_or_url, tmpdir) |
| 79 | + cfg.read(resolved) |
| 80 | + return cfg |
0 commit comments