diff --git a/.gitignore b/.gitignore index 41e29a2..8d8939a 100644 --- a/.gitignore +++ b/.gitignore @@ -1,3 +1,6 @@ +# OpenSAMPL data paths +archive/ +ntp-snapshots/ # Byte-compiled / optimized / DLL files __pycache__/ *.py[cod] diff --git a/CHANGELOG.md b/CHANGELOG.md index dfb7252..1fafdba 100644 --- a/CHANGELOG.md +++ b/CHANGELOG.md @@ -4,7 +4,6 @@ All notable changes to this project will be documented in this file in [Keep a C This project adheres to [Semantic Versioning](https://semver.org/). --- - -## [Unreleased] - YYYY-MM-DD +## [1.2.0] - Unreleased ### Added - 🔥 Moved alembic migration code into openSAMPL along with Docker image information - 🔥 Moved backend api code into openSAMPL along with Docker image information - 🔥 Docker-compose for developers which installs openSAMPL as editable on backend image +- 🔥 NTP vendor probe family (`NtpProbe`) with JSON snapshot format, filename convention, and `ntp_metadata` ORM table +- 🔥 `opensampl-collect ntp` entry point: local chrony/ntpq/timedatectl-style collection and remote UDP queries via `ntplib` +- 🔥 NTP-focused metrics in `METRICS` (phase offset, delay, jitter, stratum, reachability, dispersion, root delay/dispersion, poll interval, sync health) +- 🔥 Idempotent database bootstrap after schema creation: seed `reference_type`, `metric_type`, default `reference` and `defaults` rows from `REF_TYPES` / `METRICS`; `public.get_default_uuid_for()` for `ProbeData` defaults; `castdb.campus_locations` view for geospatial dashboards backed by `locations.geom` +- 🔥 Grafana: NTP probes dashboard (`ntp-opensampl`), public geospatial timing dashboard updates, datasource/dashboard provisioning alignment +- 🔥 Grafana table panels joining stored `probe_metadata`, `ntp_metadata`, `locations`, and `reference` / `reference_type` for probe reference & source context (no runtime geolocation in panels) +- 🔥 Remote NTP snapshot identity overrides (`probe_id`, `probe_ip`, `probe_name`, optional lab `geolocation` hints) for stable ingest keys + +### Changed +- ⚡ Grafana timing panel titles and dashboard copy to **reference-safe** wording (NTP / configured reference vs implying GNSS truth where not applicable); extensible for future GNSS-backed probes +- ⚡ `METRICS.NTP_JITTER` description to distinguish measured jitter (local parsers) from conservative remote estimates +- ⚡ Remote `query_ntp_server`: emit `jitter_s` for time series using a documented delay/root-dispersion bound when RFC peer jitter is unavailable from a single packet +- ⚡ `load_probe_metadata`: NTP path attaches stored `locations` rows for dashboard geospatial joins (one-time at metadata load; not repeated in Grafana queries) ### Fixed +- 🩹 `opensampl init` / `create_new_tables` leaving lookup tables empty (load path now seeds baseline rows and defaults) +- 🩹 Grafana PostgreSQL variables and panel filters: text-safe UUID handling for `varchar` `probe_metadata.uuid` (avoid `varchar = uuid` / empty `IN ()` failures) +- 🩹 Public geospatial dashboard map layer using the provisioned `castdb-datasource` UID consistently - 🩹 Bug which caused random data duration to always be 1 hour ## [1.1.5] - 2025-09-22 diff --git a/README.md b/README.md index f67f9ae..837d37e 100644 --- a/README.md +++ b/README.md @@ -26,10 +26,15 @@ The name OpenSAMPL stands for **O**pen **S**ynchronization **A**nalytics and **M with the goal of this project being to provide a comprehensive and open-source solution for clock data management and analysis. Visualizations are provided via [grafana](https://grafana.com/), and the data is stored in a [TimescaleDB](https://www.timescale.com/) database, which is a time-series database built on PostgreSQL. +**NTP clock probes** are supported end-to-end: YAML-driven vendor scaffolding (`opensampl create`), JSON snapshots, `opensampl load ntp`, and `opensampl-collect ntp` (local chrony/ntpq-style chain or remote UDP via optional `ntplib`). NTP observations use the same metric/reference tables as other vendors; **“Reference”** in Grafana means the OpenSAMPL reference dimension for SQL joins, **not** GNSS ground truth for NTP-only demos. + +**Database bootstrap**: first-time setup should run **`opensampl init`**, which creates schema and seeds lookup tables (`reference_type`, `metric_type`, `reference`, `defaults`) plus PostgreSQL helpers expected by the load path (see `opensampl/db/bootstrap.py`). Skipping init leads to obscure failures on first load. + +**Documentation**: [published docs](https://ornl.github.io/OpenSAMPL/) — start with *Guides*. For NTP specifically, see *NTP vendor design* and *NTP extension (walkthrough)* (generator, geolocation at ingest, bootstrap, Grafana query notes). ### (**O**pen **S**ynchronization **A**nalytics and **M**onitoring **PL**atform) -python tools for adding clock data to a timescale db. +Python tools for adding clock data to a TimescaleDB-backed database. ## CLI TOOL @@ -39,6 +44,7 @@ python tools for adding clock data to a timescale db. 2. Pip install the latest version of opensampl: ```bash pip install opensampl +# Remote NTP collection also needs extras, e.g. pip install 'opensampl[collect]' ``` ### Development Setup @@ -130,21 +136,23 @@ Display current environment configuration: ```bash # Show all variables -poetry run opensampl config show +uv run opensampl config show # Show with descriptions -poetry run opensampl config show --explain +uv run opensampl config show --explain # Show specific variable -poetry run opensampl config show --var DATABASE_URL +uv run opensampl config show --var DATABASE_URL ``` +Configuration resolution: explicit `--env-file`, then `OPENSAMPL_ENV_FILE`, then `python-dotenv`’s search for `.env`. **Process environment variables override values from the env file** (pydantic-settings precedence). + ### Set Configuration Update environment variables: ```bash -poetry run opensampl config set VARIABLE_NAME value +uv run opensampl config set VARIABLE_NAME value ``` ## File Format Support diff --git a/docs/guides/collection.md b/docs/guides/collection.md index 222c4d7..de7fe9f 100644 --- a/docs/guides/collection.md +++ b/docs/guides/collection.md @@ -8,6 +8,7 @@ The collect API enables automated collection of measurement data from network-co - **Microchip TWST Modems** (ATS6502 series): Collect offset and EBNO tracking values along with contextual information - **Microchip TimeProvider® 4100** (TP4100): Collect timing performance metrics from various input channels via web interface +- **NTP** (`opensampl-collect ntp`): Write JSON snapshots for the `NTP` vendor—local host client state (chrony/ntpq/timedatectl chain) or remote UDP queries (install with `pip install 'opensampl[collect]'` for `ntplib`). See [NTP vendor design](ntp_vendor_design.md) and [NTP extension walkthrough](ntp_extension.md). ## CLI Usage diff --git a/docs/guides/index.md b/docs/guides/index.md index d46d42d..aac1274 100644 --- a/docs/guides/index.md +++ b/docs/guides/index.md @@ -1,6 +1,8 @@ # Guides -* [Configuration](configuration.md) +* [Configuration](configuration.md) * [Using the `opensampl` CLI](opensampl-cli.md) -* [Using the `opensampl-server` CLI](opensampl-server.md) -* [Using the `opensampl-collect` CLI](opensampl-cli.md) \ No newline at end of file +* [Using the `opensampl-server` CLI](opensampl-server.md) +* [Using the `opensampl-collect` CLI](opensampl-cli.md) +* [NTP vendor design](ntp_vendor_design.md) +* [NTP extension walkthrough](ntp_extension.md) — generator, bootstrap, geolocation, Grafana vs demo repo \ No newline at end of file diff --git a/docs/guides/ntp_extension.md b/docs/guides/ntp_extension.md new file mode 100644 index 0000000..ac3ab93 --- /dev/null +++ b/docs/guides/ntp_extension.md @@ -0,0 +1,125 @@ +# NTP vendor extension (implementation walkthrough) + +This document describes how the **NTP** clock-probe family was added to OpenSAMPL, how it fits the **vendor generator** model, and which pieces remain **manual integration** work. It also separates **upstream OpenSAMPL** behavior from the **syncscope-at-home** demo appliance. + +For field-level design (modes, metadata vs series, local tool chain), see [NTP vendor design](ntp_vendor_design.md). + +--- + +## 1. Why NTP is modeled as a vendor / probe family + +OpenSAMPL organizes ingest around **vendors** (`VendorType`), **probe identity** (`ProbeKey`), **vendor-specific metadata tables**, and **normalized time series** in `probe_data`. NTP sources are not GNSS truth references; they are **network clock observations** (local client state or remote server responses). Modeling NTP as its own vendor keeps: + +- A dedicated **`ntp_metadata`** table for sync/leap/stratum/targets and parser provenance. +- Stable **`ProbeKey`** derivation from snapshot filenames and JSON payloads. +- Metrics and references aligned with the rest of the platform (`metric_type`, `reference`) without pretending NTP stratum implies a calibrated physical reference. + +--- + +## 2. Role of the vendor generator (`opensampl create`) + +The CLI command `opensampl create ` uses `opensampl.create.create_vendor.VendorConfig` to: + +- Generate or refresh **probe parser scaffolding** and **SQLAlchemy metadata ORM** from YAML. +- Update **`opensampl.vendors.constants`** so the new vendor is registered for routing and CLI. + +The generator is **scaffolding**: it does not implement protocol logic, collectors, or Grafana panels by itself. + +--- + +## 3. `ntp_vendor.yaml` and generated artifacts + +The canonical input is `opensampl/ntp_vendor.yaml` in the package source tree. It declares: + +- `name`, `parser_class` / `parser_module`, `metadata_orm`, `metadata_table` +- `metadata_fields`: typed columns on `ntp_metadata` (mode, targets, sync fields, dispersion, etc.) + +Running `opensampl create` against this file produces/updates generated modules under `opensampl/vendors/` and wires the vendor into constants. Treat **`ntp_vendor.yaml` as the contract** between schema and hand-written Python (`ntp.py`, collectors). + +--- + +## 4. Manual steps after generation + +Typical follow-through (as done for NTP): + +1. **Implement the probe class** (`NtpProbe`): parse snapshot JSON, normalize metadata, emit series rows with correct metric keys. +2. **Implement collectors** (`opensampl-collect ntp`): + - **Local**: shell out to chrony/ntpq/timedatectl with a defined fallback order. + - **Remote**: UDP client via `ntplib` (`opensampl/vendors/ntp_remote.py`), optional probe/geo overrides. +3. **Load path hooks**: `write_to_table` / probe load pipeline must attach NTP-specific behavior (e.g. **geolocation** before `probe_metadata` insert—see below). +4. **Metrics**: register NTP metrics in `opensampl/metrics` so bootstrap seeds `metric_type`. +5. **References**: NTP demos use **`REF_TYPES.UNKNOWN`**; dashboards label **“Reference”** for SQL joins, **not** as GNSS ground truth. +6. **ORM / migrations**: ensure `opensampl init` (or Alembic, if used) creates `ntp_metadata` and related objects consistently with generated code. + +--- + +## 5. Metrics and reference choices + +- **Offset, delay, stratum, poll, root delay/dispersion**, etc. are stored as first-class metrics where applicable. +- **Jitter**: a **single** remote NTP client response does not expose RFC5905 peer jitter; `ntp_remote` may emit a **positive bound** derived from delay and root dispersion so dashboards have a value—this is an **estimate**, not a sampled Allan deviation. Local chrony/ntpq paths may still expose **measured** jitter when available. +- **Reference**: use **`UNKNOWN`** unless a future model maps NTP reference IDs to calibrated references. Do **not** describe NTP-only demos as validating against **GNSS truth**. + +--- + +## 6. Local vs remote collection + +| Path | Mechanism | Notes | +|------|-----------|--------| +| **Local** | Subprocess chain (chronyc, ntpq, timedatectl, …) | Best-effort; records `observation_source` and partial state when tools are missing. | +| **Remote** | `ntplib` UDP request | High ports supported for lab mocks; production often uses UDP **123**. Timeouts produce degraded metadata, not process crashes. | + +--- + +## 7. Metadata and geolocation + +**Geolocation is applied at metadata ingest** (when building rows for `locations` / `probe_metadata`), not inside Grafana: + +- **`attach_ntp_location`** (`opensampl/load/ntp_geolocation.py`) resolves coordinates from YAML `geo_override`, lab defaults, or **public IP → HTTP** lookup (e.g. ip-api.com) when enabled. +- Grafana maps read **`castdb.locations`** / **`castdb.campus_locations`**; panels do **not** call external geo APIs at query time. + +Disable enrichment with env **`NTP_GEO_ENABLED=false`** if you want probes without new location rows. + +--- + +## 8. Bootstrap and seed requirements + +`opensampl init` and/or load bootstrap (`opensampl/db/bootstrap.py` → `seed_lookup_tables`) must ensure: + +- **`reference_type`** / **`metric_type`** rows exist (including **UNKNOWN**). +- A **`reference`** row and **`defaults`** entries so ORM defaults and `ProbeData` triggers resolve UUIDs. +- **`public.get_default_uuid_for(text)`** exists on PostgreSQL (used by probe data insertion). +- **`castdb.campus_locations`** view (PostGIS lat/lon from `locations.geom`) for **reference-safe** geospatial dashboards when PostGIS is present. + +Skipping bootstrap causes obscure failures during first load; always run **`opensampl init`** against a fresh database before loading probes. + +--- + +## 9. Grafana and SQL hardening + +- Dashboards use **text** template variables aligned with `probe_metadata.uuid` (varchar UUID strings)—avoid numeric formatting that strips leading zeroes. +- Prefer queries that tolerate **empty** or **single-probe** deployments (e.g. NTP-only stacks without legacy GNSS rows). +- **“Reference”** in titles means **OpenSAMPL reference dimension** for joins/filters, not a claim of absolute timing truth. +- **Metadata panels** may **collapse** JSON into compact rows for readability; that is presentation-only. + +--- + +## 10. OpenSAMPL vs syncscope-at-home + +| Concern | OpenSAMPL (library) | syncscope-at-home (demo) | +|--------|---------------------|---------------------------| +| Vendor YAML, parsers, collectors, load hooks, bootstrap | Yes | Consumes as submodule | +| Docker Compose, custom **PostGIS + Timescale** DB image, **ntp-ingest** loop | No | Yes (`docker-compose.yaml`, `demo/db`, `demo/ntp-ingest`) | +| Default **NTP targets**, **interval**, spool paths | No | `config/ntp-ingest.yaml`, env `NTP_INGEST_CONFIG` | +| Lab **mock NTP** UDP services | No | Compose services `mock-ntp-*` | +| Opinionated Grafana **dashboards** shipped in repo | Optional / examples | `demo/` Grafana image and provisioning | + +Treat **syncscope-at-home** as an **appliance-style** illustration: it shows how to run continuous collect+load with sane defaults, not a mandatory deployment topology for upstream OpenSAMPL. + +--- + +## See also + +- [NTP vendor design](ntp_vendor_design.md) — probe identity, modes, failure semantics +- [Collection](collection.md) — `opensampl-collect` overview +- [Configuration](configuration.md) — env files and CLI config +- API: [`create_vendor`](../api/helpers/create_vendor.md) diff --git a/docs/guides/ntp_vendor_design.md b/docs/guides/ntp_vendor_design.md new file mode 100644 index 0000000..5bbcc2b --- /dev/null +++ b/docs/guides/ntp_vendor_design.md @@ -0,0 +1,59 @@ +# NTP vendor design (OpenSAMPL) + +This note defines the NTP clock-probe family: identity, storage, local vs remote collection, and lab-demo caveats. + +## Vendor identity + +| Item | Value | +|------|--------| +| Vendor name | `NTP` | +| Probe class | `NtpProbe` | +| Module | `opensampl.vendors.ntp` | +| Metadata ORM / table | `NtpMetadata` / `ntp_metadata` | + +## Probe identity (`ProbeKey`) + +- **`ip_address`**: For `remote_server`, the target server IP or a placeholder derived from the hostname. For `local_host`, typically `127.0.0.1` or the host’s primary IPv4 used for labeling. +- **`probe_id`**: Stable slug per logical probe (e.g. `local-chrony`, `mock-a`, `remote-pool-1`). + +Snapshot files use a strict filename pattern so the loader can derive `ProbeKey` without opening the file (see `NtpProbe` docstring). + +## Modes + +| Mode | Meaning | +|------|--------| +| `local_host` | Collector runs on the machine whose NTP client state is observed (Raspberry Pi friendly). | +| `remote_server` | Collector issues NTP client requests to `target_host`:`target_port` (default UDP **123**; high ports supported for demos). | + +## Metadata vs time series + +- **`ntp_metadata`**: Latest normalized fields from the most recent observation (sync/leap/stratum/reach/reference/poll/root metrics, mode, targets, `observation_source`, etc.) plus `additional_metadata` JSONB for raw command output snippets and parser notes. +- **`probe_data`**: One OpenSAMPL row per `(time × metric_type × reference)` as elsewhere. NTP uses dedicated metrics (offset, delay, jitter, stratum, etc.) with `REF_TYPES.UNKNOWN` unless a future reference model is introduced. + +Offset is stored in seconds; Grafana panels may scale to nanoseconds for consistency with existing timing dashboards. + +## Local fallback chain + +Tools are tried in order until one yields usable structured data: + +1. `chronyc tracking` +2. `chronyc -m 'sources -v'` or `chronyc sources -v` +3. `ntpq -p` +4. `timedatectl show-timesync --all` / `timedatectl status` +5. `systemctl show systemd-timesyncd` / `systemctl status systemd-timesyncd` + +Missing binaries are skipped without failing the snapshot; `sync_status` and `observation_source` record partial or unavailable state. + +## Remote collection + +Standard NTP client requests over UDP (default port **123**, configurable). Timeouts and non-responses produce degraded samples and metadata rather than crashing the loader. + +## Failure semantics + +- Loaders and collectors catch per-step failures; snapshots are still written when possible. +- Missing numeric fields omit that metric series for that timestamp or use absent rows only—never rely on invalid JSON (`NaN` is avoided in stored values). + +## Demo vs production NTP + +- **Lab mock servers** often listen on **high UDP ports** so containers do not require `CAP_NET_BIND_SERVICE`. Real deployments typically use **UDP/123**. +- **Simulated drift / unhealthy behavior** in containers is implemented by manipulating **NTP response fields** (stratum, delay, dispersion, etc.), not by true physical clock Allan deviation. Comparison panels show **protocol-level** differences between mock instances. diff --git a/mkdocs.yaml b/mkdocs.yaml index 848aee3..a634b46 100644 --- a/mkdocs.yaml +++ b/mkdocs.yaml @@ -14,6 +14,8 @@ nav: - Create: guides/create_probe_type.md - Server: guides/opensampl-server.md - Collect: guides/collection.md + - NTP vendor: guides/ntp_vendor_design.md + - NTP extension (walkthrough): guides/ntp_extension.md - Random Data: guides/random-data-generation.md - API: - index: api/index.md diff --git a/opensampl/cli.py b/opensampl/cli.py index da144d6..5ae7124 100644 --- a/opensampl/cli.py +++ b/opensampl/cli.py @@ -89,7 +89,8 @@ def init(): """ Initialize the database. - Creates all tables as defined in the opensampl.db.orm file. + Creates all tables as defined in the opensampl.db.orm file, then idempotently seeds lookup tables + (reference_type, reference, metric_type, defaults) from REF_TYPES and METRICS. This is not required if you are using `opensampl-server`, as that is done as part of that initialization of the db. """ logger.debug("Initializing database") diff --git a/opensampl/collect/cli.py b/opensampl/collect/cli.py index e65df6e..171359d 100644 --- a/opensampl/collect/cli.py +++ b/opensampl/collect/cli.py @@ -1,6 +1,8 @@ """Consolidated CLI entry point for opensampl.collect tools.""" import sys +import time +from pathlib import Path from typing import Literal, Optional import click @@ -8,6 +10,9 @@ from opensampl.collect.microchip.tp4100.collect_4100 import main as collect_tp4100_files from opensampl.collect.microchip.twst.generate_twst_files import collect_files as collect_twst_files +from opensampl.collect.ntp_snapshot import write_snapshot +from opensampl.vendors.ntp_parsing import collect_local_snapshot +from opensampl.vendors.ntp_remote import query_ntp_server @click.group() @@ -116,5 +121,84 @@ def tp4100( ) +@cli.group() +def ntp(): + """Collect NTP snapshots (JSON) for ``opensampl load ntp``.""" + pass + + +@ntp.command("local") +@click.option("--probe-id", required=True, help="Stable probe_id slug (e.g. local-chrony)") +@click.option("--probe-ip", default="127.0.0.1", show_default=True, help="ip_address for ProbeKey") +@click.option("--probe-name", default="local NTP", show_default=True) +@click.option( + "--output-dir", + default="./ntp-snapshots", + type=click.Path(path_type=Path, file_okay=False), + help="Directory for JSON snapshot files", +) +@click.option( + "--interval", + default=0.0, + type=float, + help="Seconds between samples; 0 = single sample and exit", +) +@click.option("--count", default=1, type=int, help="Samples to collect when interval > 0") +def ntp_local(probe_id: str, probe_ip: str, probe_name: str, output_dir: Path, interval: float, count: int): + """Run local chrony/ntpq/timedatectl chain and write NtpProbe JSON snapshots.""" + import socket + + chost = socket.gethostname() + out = output_dir + + def one() -> None: + doc = collect_local_snapshot( + probe_id=probe_id, + probe_ip=probe_ip, + probe_name=probe_name, + collection_host=chost, + ) + path = write_snapshot(doc, out) + click.echo(str(path)) + + if interval <= 0: + one() + return + + for _ in range(max(count, 1)): + one() + time.sleep(interval) + + +@ntp.command("remote") +@click.option("--host", "-h", required=True, help="NTP server hostname or IP") +@click.option("--port", "-p", default=123, type=int, show_default=True, help="UDP port (use high ports for lab mocks)") +@click.option( + "--output-dir", + default="./ntp-snapshots", + type=click.Path(path_type=Path, file_okay=False), + help="Directory for JSON snapshot files", +) +@click.option("--timeout", default=3.0, type=float, help="UDP request timeout (seconds)") +@click.option("--interval", default=0.0, type=float, help="Seconds between samples; 0 = once") +@click.option("--count", default=1, type=int, help="Samples when interval > 0") +def ntp_remote(host: str, port: int, output_dir: Path, timeout: float, interval: float, count: int): + """Query a remote NTP server with ntplib and write JSON snapshots.""" + out = output_dir + + def one() -> None: + doc = query_ntp_server(host, port=port, timeout=timeout) + path = write_snapshot(doc, out) + click.echo(str(path)) + + if interval <= 0: + one() + return + + for _ in range(max(count, 1)): + one() + time.sleep(interval) + + if __name__ == "__main__": cli() diff --git a/opensampl/collect/ntp_snapshot.py b/opensampl/collect/ntp_snapshot.py new file mode 100644 index 0000000..88f7036 --- /dev/null +++ b/opensampl/collect/ntp_snapshot.py @@ -0,0 +1,27 @@ +"""Write NTP JSON snapshots in the format expected by :class:`opensampl.vendors.ntp.NtpProbe`.""" + +from __future__ import annotations + +import json +import os +from datetime import datetime, timezone +from typing import Any + + +def snapshot_filename(probe_ip: str, probe_id: str, ts: datetime | None = None) -> str: + """Build canonical snapshot filename (UTC timestamp).""" + ts = ts or datetime.now(tz=timezone.utc) + ip_part = probe_ip.replace(".", "-") + return f"ntp_{ip_part}_{probe_id}_{ts.strftime('%Y%m%dT%H%M%SZ')}.json" + + +def write_snapshot(doc: dict[str, Any], output_dir: str | os.PathLike[str]) -> str: + """Serialize *doc* to *output_dir* using :func:`snapshot_filename`.""" + from pathlib import Path + + out = Path(os.fspath(output_dir)) + out.mkdir(parents=True, exist_ok=True) + name = snapshot_filename(doc["probe_ip"], doc["probe_id"]) + path = out / name + path.write_text(json.dumps(doc, indent=2), encoding="utf-8") + return str(path) diff --git a/opensampl/config/server.py b/opensampl/config/server.py index 6478145..0e302e1 100644 --- a/opensampl/config/server.py +++ b/opensampl/config/server.py @@ -35,6 +35,8 @@ class ServerConfig(BaseConfig): COMPOSE_FILE: str = Field(default="", description="Fully resolved path to the Docker Compose file.") + OVERRIDE_FILE: str | None = Field(defualt=None, description="Override for the compose file") + DOCKER_ENV_FILE: str = Field(default="", description="Fully resolved path to the Docker .env file.") docker_env_values: dict[str, Any] = Field(default_factory=dict, init=False) @@ -67,6 +69,14 @@ def resolve_compose_file(cls, v: Any) -> str: return get_resolved_resource_path(opensampl.server, "docker-compose.yaml") return str(Path(v).expanduser().resolve()) + @field_validator("OVERRIDE_FILE", mode="before") + @classmethod + def resolve_override_file(cls, v: Any) -> str: + """Resolve the provided compose file for docker to use, or default to the docker-compose.yaml provided""" + if v: + return str(Path(v).expanduser().resolve()) + return v + @field_validator("DOCKER_ENV_FILE", mode="before") @classmethod def resolve_docker_env_file(cls, v: Any) -> str: @@ -89,6 +99,8 @@ def build_docker_compose_base(self): compose_command = self.get_compose_command() command = shlex.split(compose_command) command.extend(["--env-file", self.DOCKER_ENV_FILE, "-f", self.COMPOSE_FILE]) + if self.OVERRIDE_FILE: + command.extend(["-f", self.OVERRIDE_FILE]) return command def set_by_name(self, name: str, value: Any): diff --git a/opensampl/db/bootstrap.py b/opensampl/db/bootstrap.py new file mode 100644 index 0000000..03cb351 --- /dev/null +++ b/opensampl/db/bootstrap.py @@ -0,0 +1,131 @@ +"""Idempotent bootstrap of lookup tables required for load paths (reference_type, metric_type, reference, defaults).""" + +from __future__ import annotations + +from typing import TYPE_CHECKING + +from loguru import logger +from sqlalchemy import text + +from opensampl.load.table_factory import TableFactory +from opensampl.metrics import METRICS +from opensampl.metrics import MetricType as PydanticMetricType +from opensampl.references import REF_TYPES +from opensampl.references import ReferenceType as PydanticReferenceType + +if TYPE_CHECKING: + from sqlalchemy.orm import Session + + +def _iter_reference_types() -> list[PydanticReferenceType]: + return [v for v in REF_TYPES.__dict__.values() if isinstance(v, PydanticReferenceType)] + + +def _iter_metrics() -> list[PydanticMetricType]: + return [v for v in METRICS.__dict__.values() if isinstance(v, PydanticMetricType)] + + +def ensure_get_default_uuid_function(session: Session) -> None: + """ + Ensure public.get_default_uuid_for(text) exists. + + ProbeData.before_insert calls this name without a schema; it must live in a schema on the connection search_path + (typically public). + """ + if session.bind.dialect.name != "postgresql": + return + + session.execute( + text( + """ +CREATE OR REPLACE FUNCTION public.get_default_uuid_for(table_category text) +RETURNS uuid +LANGUAGE sql +STABLE +AS $$ + SELECT d.uuid::uuid + FROM castdb.defaults AS d + WHERE d.table_name = table_category + LIMIT 1; +$$; +""" + ) + ) + session.flush() + + +def seed_lookup_tables(session: Session) -> None: + """ + Populate lookup tables from REF_TYPES, METRICS, and baseline reference/defaults rows. + + Safe to run repeatedly (uses TableFactory with if_exists='ignore'). + """ + ensure_get_default_uuid_function(session) + + rt_factory = TableFactory("reference_type", session=session) + for ref in _iter_reference_types(): + rt_factory.write(data=ref.model_dump(), if_exists="ignore") + + unknown_rt = rt_factory.find_existing(data=REF_TYPES.UNKNOWN.model_dump()) + if unknown_rt is None: + raise RuntimeError("Bootstrap failed: UNKNOWN reference_type missing after seed") + + ref_factory = TableFactory("reference", session=session) + ref_factory.write( + data={"reference_type_uuid": unknown_rt.uuid, "compound_reference_uuid": None}, + if_exists="ignore", + ) + + mt_factory = TableFactory("metric_type", session=session) + for m in _iter_metrics(): + mt_factory.write(data=m.model_dump(), if_exists="ignore") + + unknown_mt = mt_factory.find_existing(data=METRICS.UNKNOWN.model_dump()) + if unknown_mt is None: + raise RuntimeError("Bootstrap failed: UNKNOWN metric_type missing after seed") + + default_ref = ref_factory.find_existing( + data={"reference_type_uuid": unknown_rt.uuid, "compound_reference_uuid": None} + ) + if default_ref is None: + raise RuntimeError("Bootstrap failed: default reference row missing") + + def_factory = TableFactory("defaults", session=session) + def_factory.write(data={"table_name": "reference", "uuid": default_ref.uuid}, if_exists="ignore") + def_factory.write(data={"table_name": "metric_type", "uuid": unknown_mt.uuid}, if_exists="ignore") + + ensure_campus_locations_view(session) + + session.flush() + logger.info( + "Lookup tables bootstrapped (defaults: reference={}, metric_type={})", + default_ref.uuid, + unknown_mt.uuid, + ) + + +def ensure_campus_locations_view(session: Session) -> None: + """ + Create castdb.campus_locations view expected by the public geospatial Grafana dashboard. + + Maps ORM ``locations`` (PostGIS geom) to latitude/longitude/campus columns. + """ + if session.bind.dialect.name != "postgresql": + return + + session.execute( + text( + """ +CREATE OR REPLACE VIEW castdb.campus_locations AS +SELECT + l.uuid, + l.name, + ST_Y(l.geom::geometry) AS latitude, + ST_X(l.geom::geometry) AS longitude, + l.name AS campus, + l.public +FROM castdb.locations l; +""" + ) + ) + session.flush() diff --git a/opensampl/db/orm.py b/opensampl/db/orm.py index 5d645b1..9c7f9c7 100644 --- a/opensampl/db/orm.py +++ b/opensampl/db/orm.py @@ -181,6 +181,7 @@ class ProbeMetadata(Base): adva_metadata = relationship("AdvaMetadata", back_populates="probe", uselist=False) microchip_twst_metadata = relationship("MicrochipTWSTMetadata", back_populates="probe", uselist=False) microchip_tp4100_metadata = relationship("MicrochipTP4100Metadata", back_populates="probe", uselist=False) + ntp_metadata = relationship("NtpMetadata", back_populates="probe", uselist=False) # --- CUSTOM PROBE METADATA RELATIONSHIP --- @@ -434,6 +435,32 @@ class MicrochipTP4100Metadata(Base): # --- CUSTOM TABLES --- !! Do not remove line, used as reference when inserting metadata table +class NtpMetadata(Base): + """NTP-specific probe metadata (local client or remote server target).""" + + __tablename__ = "ntp_metadata" + + probe_uuid = Column(String, ForeignKey("probe_metadata.uuid"), primary_key=True) + mode = Column(Text) + probe_name = Column(Text) + target_host = Column(Text) + target_port = Column(Integer) + sync_status = Column(Text) + leap_status = Column(Text) + stratum = Column(Integer) + reachability = Column(Integer) + offset_last_s = Column(Float) + delay_s = Column(Float) + jitter_s = Column(Float) + dispersion_s = Column(Float) + root_delay_s = Column(Float) + root_dispersion_s = Column(Float) + poll_interval_s = Column(Float) + reference_id = Column(Text) + observation_source = Column(Text) + collection_host = Column(Text) + additional_metadata = Column(JSONB) + probe = relationship("ProbeMetadata", back_populates="ntp_metadata") # --- TABLE FUNCTIONS --- diff --git a/opensampl/load/ntp_geolocation.py b/opensampl/load/ntp_geolocation.py new file mode 100644 index 0000000..dab707d --- /dev/null +++ b/opensampl/load/ntp_geolocation.py @@ -0,0 +1,127 @@ +"""Associate NTP probes with ``castdb.locations`` for the geospatial Grafana dashboard.""" + +from __future__ import annotations + +import ipaddress +import json +import os +import socket +import urllib.request +from typing import TYPE_CHECKING, Any + +from loguru import logger + +from opensampl.load.table_factory import TableFactory + +if TYPE_CHECKING: + from sqlalchemy.orm import Session + + from opensampl.vendors.constants import ProbeKey + +_GEO_CACHE: dict[str, tuple[float, float, str]] = {} + + +def _env_bool(name: str, default: bool) -> bool: + v = os.getenv(name) + if v is None: + return default + return v.strip().lower() in ("1", "true", "yes", "on") + + +def _default_lab_coords() -> tuple[float, float]: + lat = float(os.getenv("NTP_GEO_DEFAULT_LAT", "37.4419")) + lon = float(os.getenv("NTP_GEO_DEFAULT_LON", "-122.1430")) + return lat, lon + + +def _is_private_or_loopback(ip: str) -> bool: + try: + addr = ipaddress.ip_address(ip) + except ValueError: + return True + return bool(addr.is_private or addr.is_loopback or addr.is_link_local or addr.is_reserved) + + +def _lookup_geo_ipapi(ip: str) -> tuple[float, float, str] | None: + if ip in _GEO_CACHE: + return _GEO_CACHE[ip] + url = f"http://ip-api.com/json/{ip}?fields=status,lat,lon,city,country" + try: + with urllib.request.urlopen(url, timeout=4.0) as resp: # noqa: S310 + body = json.loads(resp.read().decode("utf-8")) + except Exception as e: + logger.warning("ip-api geolocation failed for {}: {}", ip, e) + return None + + if body.get("status") != "success" or body.get("lat") is None or body.get("lon") is None: + logger.warning("ip-api returned no coordinates for {}", ip) + return None + + city = body.get("city") or "" + country = body.get("country") or "" + label = ", ".join(x for x in (city, country) if x) + out = (float(body["lat"]), float(body["lon"]), label or ip) + _GEO_CACHE[ip] = out + return out + + +def attach_ntp_location(session: Session, probe_key: ProbeKey, data: dict[str, Any]) -> None: + """ + Set probe ``name``, ``public``, and ``location_uuid`` on NTP metadata before ``probe_metadata`` insert. + + Uses ``additional_metadata.geo_override`` when present (lat/lon/label). Otherwise resolves the remote + host, uses RFC1918/loopback defaults from env, or ip-api.com for public IPs (HTTP, no API key). + """ + if not _env_bool("NTP_GEO_ENABLED", True): + data.setdefault("name", data.get("probe_name") or f"NTP {probe_key.probe_id}") + return + + extras = data.get("additional_metadata") or {} + if not isinstance(extras, dict): + extras = {} + geo_override = extras.get("geo_override") + + mode = data.get("mode") + target_host = (data.get("target_host") or "").strip() + target_port = data.get("target_port") + probe_name = data.get("probe_name") or f"NTP {probe_key.probe_id}" + + lat: float | None = None + lon: float | None = None + + if isinstance(geo_override, dict) and geo_override.get("lat") is not None and geo_override.get("lon") is not None: + lat = float(geo_override["lat"]) + lon = float(geo_override["lon"]) + elif mode == "remote_server" and target_host: + ip_for_geo = target_host + try: + ip_for_geo = socket.gethostbyname(target_host) + except OSError as e: + logger.debug("Could not resolve {}: {}", target_host, e) + + if _is_private_or_loopback(ip_for_geo): + lat, lon = _default_lab_coords() + else: + geo = _lookup_geo_ipapi(ip_for_geo) + if geo: + lat, lon, _ = geo + else: + lat, lon = _default_lab_coords() + else: + lat, lon = _default_lab_coords() + + loc_name = f"NTP: {target_host}:{target_port}" if target_host and target_port is not None else f"NTP: {probe_key}" + + loc_factory = TableFactory("locations", session=session) + existing = loc_factory.find_existing({"name": loc_name}) + if existing is not None: + loc = existing + else: + loc = loc_factory.write( + {"name": loc_name, "lat": lat, "lon": lon, "public": True}, + if_exists="ignore", + ) + + data["location_uuid"] = loc.uuid + data["name"] = probe_name + data["public"] = True diff --git a/opensampl/load_data.py b/opensampl/load_data.py index f427167..b8b854a 100644 --- a/opensampl/load_data.py +++ b/opensampl/load_data.py @@ -9,7 +9,9 @@ from sqlalchemy.orm import Session from opensampl.config.base import BaseConfig +from opensampl.db.bootstrap import seed_lookup_tables from opensampl.db.orm import Base, ProbeData +from opensampl.load.ntp_geolocation import attach_ntp_location from opensampl.load.routing import route from opensampl.load.table_factory import TableFactory from opensampl.metrics import MetricType @@ -125,9 +127,10 @@ def load_time_data( strict=strict, session=session, ) + probe = data_definition.probe # ty: ignore[possibly-unbound-attribute] probe_readable = ( - data_definition.probe.name # ty: ignore[possibly-unbound-attribute] - or f"{data_definition.probe.ip_address} ({data_definition.probe.probe_id})" # ty: ignore[possibly-unbound-attribute] + probe.name + or f"{probe.ip_address} ({probe.probe_id})" # ty: ignore[possibly-unbound-attribute] ) if any(x is None for x in [data_definition.probe, data_definition.metric, data_definition.reference]): @@ -197,6 +200,9 @@ def load_probe_metadata( try: pm_factory = TableFactory(name="probe_metadata", session=session) + if vendor.name == "NTP": + attach_ntp_location(session, probe_key, data) + pm_cols = {col.name for col in pm_factory.inspector.columns} probe_info = {k: data.pop(k) for k in list(data.keys()) if k in pm_cols} probe_info.update({"probe_id": probe_key.probe_id, "ip_address": probe_key.ip_address, "vendor": vendor.name}) @@ -227,6 +233,8 @@ def create_new_tables(*, _config: BaseConfig, create_schema: bool = True, sessio session.execute(text(f"CREATE SCHEMA IF NOT EXISTS {Base.metadata.schema}")) session.commit() Base.metadata.create_all(session.bind) + seed_lookup_tables(session) + session.commit() except Exception as e: session.rollback() logger.error(f"Error writing to table: {e}") diff --git a/opensampl/metrics.py b/opensampl/metrics.py index 017deb5..bc7d016 100644 --- a/opensampl/metrics.py +++ b/opensampl/metrics.py @@ -62,3 +62,60 @@ class METRICS: ) # --- CUSTOM METRICS --- !! Do not remove line, used as reference when inserting metric + NTP_DELAY = MetricType( + name="NTP Delay", + description="Round-trip delay (RTT) to the NTP server or observed path delay in seconds", + unit="s", + value_type=float, + ) + NTP_JITTER = MetricType( + name="NTP Jitter", + description=( + "Jitter or offset variation for NTP in seconds (true value from chrony/ntpq when available; " + "remote single-packet collection may use a delay/dispersion bound estimate)" + ), + unit="s", + value_type=float, + ) + NTP_STRATUM = MetricType( + name="NTP Stratum", + description="NTP stratum level (distance from reference clock)", + unit="level", + value_type=float, + ) + NTP_REACHABILITY = MetricType( + name="NTP Reachability", + description="NTP reachability register (0-255) as a scalar for plotting", + unit="count", + value_type=float, + ) + NTP_DISPERSION = MetricType( + name="NTP Dispersion", + description="Combined error budget / dispersion in seconds", + unit="s", + value_type=float, + ) + NTP_ROOT_DELAY = MetricType( + name="NTP Root Delay", + description="Root delay from NTP packet or local estimate in seconds", + unit="s", + value_type=float, + ) + NTP_ROOT_DISPERSION = MetricType( + name="NTP Root Dispersion", + description="Root dispersion from NTP packet or local estimate in seconds", + unit="s", + value_type=float, + ) + NTP_POLL_INTERVAL = MetricType( + name="NTP Poll Interval", + description="Poll interval in seconds", + unit="s", + value_type=float, + ) + NTP_SYNC_HEALTH = MetricType( + name="NTP Sync Health", + description="1.0 if synchronized/healthy, 0.0 otherwise (probe-defined)", + unit="ratio", + value_type=float, + ) diff --git a/opensampl/ntp_vendor.yaml b/opensampl/ntp_vendor.yaml new file mode 100644 index 0000000..acf65db --- /dev/null +++ b/opensampl/ntp_vendor.yaml @@ -0,0 +1,42 @@ +name: NTP +parser_class: NtpProbe +parser_module: ntp +metadata_orm: NtpMetadata +metadata_table: ntp_metadata +metadata_fields: + - name: mode + type: Text + - name: probe_name + type: Text + - name: target_host + type: Text + - name: target_port + type: Integer + - name: sync_status + type: Text + - name: leap_status + type: Text + - name: stratum + type: Integer + - name: reachability + type: Integer + - name: offset_last_s + type: Float + - name: delay_s + type: Float + - name: jitter_s + type: Float + - name: dispersion_s + type: Float + - name: root_delay_s + type: Float + - name: root_dispersion_s + type: Float + - name: poll_interval_s + type: Float + - name: reference_id + type: Text + - name: observation_source + type: Text + - name: collection_host + type: Text diff --git a/opensampl/server/grafana/grafana-dashboards/ntp-dashboard.json b/opensampl/server/grafana/grafana-dashboards/ntp-dashboard.json new file mode 100644 index 0000000..79f7104 --- /dev/null +++ b/opensampl/server/grafana/grafana-dashboards/ntp-dashboard.json @@ -0,0 +1,174 @@ +{ + "annotations": {"list": []}, + "editable": true, + "fiscalYearStartMonth": 0, + "graphTooltip": 0, + "links": [], + "liveNow": false, + "panels": [ + { + "datasource": {"type": "grafana-postgresql-datasource", "uid": "castdb-datasource"}, + "fieldConfig": { + "defaults": {"color": {"mode": "palette-classic"}, "unit": "ns"}, + "overrides": [] + }, + "gridPos": {"h": 9, "w": 12, "x": 0, "y": 0}, + "id": 1, + "options": { + "legend": {"displayMode": "list", "placement": "bottom", "showLegend": true}, + "tooltip": {"mode": "single", "sort": "none"} + }, + "targets": [ + { + "datasource": {"type": "grafana-postgresql-datasource", "uid": "castdb-datasource"}, + "editorMode": "code", + "format": "time_series", + "rawQuery": true, + "rawSql": "SELECT\n time_bucket('1 minute'::interval, pd.time AT TIME ZONE 'UTC') AS time,\n COALESCE(pm.name, CONCAT(pm.ip_address, ' ', pm.probe_id)) AS metric,\n AVG((pd.value)::float) * 1e9 AS value\nFROM castdb.probe_data pd\nJOIN castdb.probe_metadata pm ON pd.probe_uuid = pm.uuid\nJOIN castdb.metric_type mt ON pd.metric_type_uuid = mt.uuid\nWHERE $__timeFilter(pd.time)\n AND pm.vendor = 'NTP'\n AND mt.name = 'Phase Offset'\n AND (trim('${ntp_probe:csv}') = '' OR pm.uuid = ANY(string_to_array(trim('${ntp_probe:csv}'), ',')))\nGROUP BY 1, 2\nORDER BY 1\n", + "refId": "A" + } + ], + "title": "NTP phase offset (Phase Offset metric)", + "type": "timeseries" + }, + { + "datasource": {"type": "grafana-postgresql-datasource", "uid": "castdb-datasource"}, + "fieldConfig": { + "defaults": {"color": {"mode": "palette-classic"}, "unit": "ns"}, + "overrides": [] + }, + "gridPos": {"h": 9, "w": 12, "x": 12, "y": 0}, + "id": 2, + "options": { + "legend": {"displayMode": "list", "placement": "bottom", "showLegend": true}, + "tooltip": {"mode": "single", "sort": "none"} + }, + "targets": [ + { + "datasource": {"type": "grafana-postgresql-datasource", "uid": "castdb-datasource"}, + "editorMode": "code", + "format": "time_series", + "rawQuery": true, + "rawSql": "SELECT\n time_bucket('1 minute'::interval, pd.time AT TIME ZONE 'UTC') AS time,\n COALESCE(pm.name, CONCAT(pm.ip_address, ' ', pm.probe_id)) AS metric,\n AVG((pd.value)::float) * 1e9 AS value\nFROM castdb.probe_data pd\nJOIN castdb.probe_metadata pm ON pd.probe_uuid = pm.uuid\nJOIN castdb.metric_type mt ON pd.metric_type_uuid = mt.uuid\nWHERE $__timeFilter(pd.time)\n AND pm.vendor = 'NTP'\n AND mt.name = 'NTP Jitter'\n AND (trim('${ntp_probe:csv}') = '' OR pm.uuid = ANY(string_to_array(trim('${ntp_probe:csv}'), ',')))\nGROUP BY 1, 2\nORDER BY 1\n", + "refId": "A" + } + ], + "description": "Remote single-packet paths use a conservative jitter estimate from delay and root dispersion when peer RMS jitter is unavailable; local chrony/ntpq snapshots may supply measured jitter.", + "title": "NTP jitter (delay/dispersion estimate or measured)", + "type": "timeseries" + }, + { + "datasource": {"type": "grafana-postgresql-datasource", "uid": "castdb-datasource"}, + "fieldConfig": { + "defaults": {"color": {"mode": "palette-classic"}, "unit": "none"}, + "overrides": [] + }, + "gridPos": {"h": 8, "w": 12, "x": 0, "y": 9}, + "id": 3, + "options": { + "legend": {"displayMode": "list", "placement": "bottom", "showLegend": true}, + "tooltip": {"mode": "single", "sort": "none"} + }, + "targets": [ + { + "datasource": {"type": "grafana-postgresql-datasource", "uid": "castdb-datasource"}, + "editorMode": "code", + "format": "time_series", + "rawQuery": true, + "rawSql": "SELECT\n time_bucket('1 minute'::interval, pd.time AT TIME ZONE 'UTC') AS time,\n COALESCE(pm.name, CONCAT(pm.ip_address, ' ', pm.probe_id)) AS metric,\n AVG((pd.value)::float) AS value\nFROM castdb.probe_data pd\nJOIN castdb.probe_metadata pm ON pd.probe_uuid = pm.uuid\nJOIN castdb.metric_type mt ON pd.metric_type_uuid = mt.uuid\nWHERE $__timeFilter(pd.time)\n AND pm.vendor = 'NTP'\n AND mt.name = 'NTP Stratum'\n AND (trim('${ntp_probe:csv}') = '' OR pm.uuid = ANY(string_to_array(trim('${ntp_probe:csv}'), ',')))\nGROUP BY 1, 2\nORDER BY 1\n", + "refId": "A" + } + ], + "title": "NTP stratum", + "type": "timeseries" + }, + { + "datasource": {"type": "grafana-postgresql-datasource", "uid": "castdb-datasource"}, + "fieldConfig": { + "defaults": {"color": {"mode": "palette-classic"}, "unit": "percentunit"}, + "overrides": [] + }, + "gridPos": {"h": 8, "w": 12, "x": 12, "y": 9}, + "id": 4, + "options": { + "legend": {"displayMode": "list", "placement": "bottom", "showLegend": true}, + "tooltip": {"mode": "single", "sort": "none"} + }, + "targets": [ + { + "datasource": {"type": "grafana-postgresql-datasource", "uid": "castdb-datasource"}, + "editorMode": "code", + "format": "time_series", + "rawQuery": true, + "rawSql": "SELECT\n time_bucket('1 minute'::interval, pd.time AT TIME ZONE 'UTC') AS time,\n COALESCE(pm.name, CONCAT(pm.ip_address, ' ', pm.probe_id)) AS metric,\n AVG((pd.value)::float) AS value\nFROM castdb.probe_data pd\nJOIN castdb.probe_metadata pm ON pd.probe_uuid = pm.uuid\nJOIN castdb.metric_type mt ON pd.metric_type_uuid = mt.uuid\nWHERE $__timeFilter(pd.time)\n AND pm.vendor = 'NTP'\n AND mt.name = 'NTP Sync Health'\n AND (trim('${ntp_probe:csv}') = '' OR pm.uuid = ANY(string_to_array(trim('${ntp_probe:csv}'), ',')))\nGROUP BY 1, 2\nORDER BY 1\n", + "refId": "A" + } + ], + "title": "NTP sync health (1=healthy)", + "type": "timeseries" + }, + { + "collapsed": true, + "gridPos": {"h": 1, "w": 24, "x": 0, "y": 17}, + "id": 50, + "panels": [ + { + "datasource": {"type": "grafana-postgresql-datasource", "uid": "castdb-datasource"}, + "description": "Phase metrics use OpenSAMPL\u2019s default reference row (UNKNOWN reference type). NTP **observation** context is the configured server in `ntp_metadata` (not GNSS unless a GNSS-backed probe is present).", + "fieldConfig": {"defaults": {}, "overrides": []}, + "gridPos": {"h": 9, "w": 24, "x": 0, "y": 0}, + "id": 5, + "options": {"cellHeight": "sm", "showHeader": true, "sortBy": [{"desc": false, "displayName": "probe"}]}, + "pluginVersion": "12.0.0", + "targets": [ + { + "datasource": {"type": "grafana-postgresql-datasource", "uid": "castdb-datasource"}, + "editorMode": "code", + "format": "table", + "rawQuery": true, + "rawSql": "SELECT\n COALESCE(pm.name, CONCAT(pm.ip_address, ' ', pm.probe_id)) AS probe,\n pm.vendor,\n COALESCE(rt.name, '') AS reference_type,\n COALESCE(nm.target_host::text, '') AS ntp_server,\n COALESCE(nm.mode::text, '') AS ntp_mode,\n COALESCE(nm.reference_id::text, '') AS ntp_ref_id,\n COALESCE(l.name, '') AS location,\n COALESCE(pm.public::text, '') AS public\nFROM castdb.probe_metadata pm\nLEFT JOIN castdb.ntp_metadata nm ON nm.probe_uuid = pm.uuid\nLEFT JOIN castdb.locations l ON l.uuid = pm.location_uuid\nLEFT JOIN LATERAL (\n SELECT pd.reference_uuid FROM castdb.probe_data pd WHERE pd.probe_uuid = pm.uuid LIMIT 1\n) rp ON true\nLEFT JOIN castdb.reference r ON r.uuid = rp.reference_uuid\nLEFT JOIN castdb.reference_type rt ON rt.uuid = r.reference_type_uuid\nWHERE pm.vendor = 'NTP'\n AND (trim('${ntp_probe:csv}') = '' OR pm.uuid = ANY(string_to_array(trim('${ntp_probe:csv}'), ',')))\nORDER BY 1;", + "refId": "A" + } + ], + "title": "Probe reference & source (stored metadata)", + "transformations": [], + "type": "table" + } + ], + "title": "Reference & source metadata", + "type": "row" + } + ], + "refresh": "30s", + "schemaVersion": 38, + "style": "dark", + "tags": ["ntp", "opensampl", "reference"], + "templating": { + "list": [ + { + "current": {"selected": true, "text": "All", "value": "$__all"}, + "datasource": {"type": "grafana-postgresql-datasource", "uid": "castdb-datasource"}, + "definition": "SELECT pm.uuid::text AS __value, COALESCE(pm.name, CONCAT(pm.ip_address, ' ', pm.probe_id)) AS __text FROM castdb.probe_metadata pm WHERE pm.vendor = 'NTP' ORDER BY 2", + "hide": 0, + "includeAll": true, + "multi": true, + "name": "ntp_probe", + "options": [], + "query": "SELECT pm.uuid::text AS __value, COALESCE(pm.name, CONCAT(pm.ip_address, ' ', pm.probe_id)) AS __text FROM castdb.probe_metadata pm WHERE pm.vendor = 'NTP' ORDER BY 2", + "refresh": 1, + "regex": "", + "skipUrlSync": false, + "sort": 1, + "type": "query" + } + ] + }, + "time": {"from": "now-6h", "to": "now"}, + "timepicker": {}, + "timezone": "", + "description": "NTP reference path: measurements are relative to OpenSAMPL\u2019s configured default reference (UNKNOWN type) unless you add GNSS-backed probes; timing vs GNSS is not implied for these series.", + "title": "NTP probes (NTP server reference path)", + "uid": "ntp-opensampl", + "version": 1, + "weekStart": "" +} diff --git a/opensampl/server/grafana/grafana-dashboards/public-timing-dashboard.json b/opensampl/server/grafana/grafana-dashboards/public-timing-dashboard.json index 687ceae..24b7a6f 100644 --- a/opensampl/server/grafana/grafana-dashboards/public-timing-dashboard.json +++ b/opensampl/server/grafana/grafana-dashboards/public-timing-dashboard.json @@ -338,7 +338,7 @@ "group": [], "metricColumn": "none", "rawQuery": true, - "rawSql": "select\n pm.name as \"Clock\",\n l.name as \"Location Name\",\n l.latitude,\n l.longitude,\n l.campus\nfrom castdb.campus_locations l left join castdb.probe_metadata pm on l.uuid = pm.location_uuid\nwhere pm.uuid in (${clock_name:sqlstring});", + "rawSql": "select\n pm.name as \"Clock\",\n l.name as \"Location Name\",\n l.latitude,\n l.longitude,\n l.campus\nfrom castdb.campus_locations l left join castdb.probe_metadata pm on l.uuid = pm.location_uuid\nwhere (trim('${clock_name:csv}') = '' OR pm.uuid = ANY(string_to_array(trim('${clock_name:csv}'), ',')));", "refId": "ClockProbes", "select": [ [ @@ -379,13 +379,13 @@ { "datasource": { "type": "grafana-postgresql-datasource", - "uid": "P55EB97F79F5EB88E" + "uid": "castdb-datasource" }, "editorMode": "code", "format": "table", "hide": false, "rawQuery": true, - "rawSql": "SELECT\n l.latitude,\n l.longitude,\n l.campus,\n sum(\n CASE\n when pm.public = True and pm.vendor in ('ADVA', 'MicrochipTP4100') then 1 else 0\n end \n ) as visible_clocks,\n sum(\n CASE\n when pm.uuid in (${clock_name:sqlstring}) then 1 else 0\n end \n ) as selected_clocks\n from castdb.campus_locations l left join castdb.probe_metadata pm on l.uuid = pm.location_uuid\n where l.public = True\n group by\n l.latitude, l.longitude, l.campus;", + "rawSql": "SELECT\n l.latitude,\n l.longitude,\n l.campus,\n sum(\n CASE\n when pm.public = True and pm.vendor in ('ADVA', 'MicrochipTP4100', 'NTP') then 1 else 0\n end \n ) as visible_clocks,\n sum(\n CASE\n when (trim('${clock_name:csv}') = '' OR pm.uuid = ANY(string_to_array(trim('${clock_name:csv}'), ','))) then 1 else 0\n end \n ) as selected_clocks\n from castdb.campus_locations l left join castdb.probe_metadata pm on l.uuid = pm.location_uuid\n where l.public = True\n group by\n l.latitude, l.longitude, l.campus;", "refId": "A", "sql": { "columns": [ @@ -465,7 +465,7 @@ }, "format": "table", "rawQuery": true, - "rawSql": "SELECT COUNT(*) as \"Total Clock Probes\" FROM castdb.probe_metadata where uuid in ($clock_name)", + "rawSql": "SELECT COUNT(*)::bigint AS \"Total Clock Probes\" FROM castdb.probe_metadata pm WHERE pm.vendor IN ('ADVA', 'MicrochipTP4100', 'NTP') AND coalesce(pm.public, true) AND (trim('${clock_name:csv}') = '' OR pm.uuid = ANY(string_to_array(trim('${clock_name:csv}'), ',')))", "refId": "A" } ], @@ -539,7 +539,7 @@ "editorMode": "code", "format": "table", "rawQuery": true, - "rawSql": "SELECT \n coalesce(pm.name, concat(pm.ip_address, 'Inteface', pm.probe_id)) as \"Clock Probe\", \n COUNT(*) as \"Total Records\" \nFROM castdb.probe_data pd\njoin castdb.probe_metadata pm on pd.probe_uuid = pm.uuid \nwhere pm.uuid in (${clock_name:sqlstring})\nAND pd.\"time\" >= $__timeFrom()\nAND pd.\"time\" <= $__timeTo()\ngroup by pm.uuid, pm.name, pm.ip_address, pm.probe_id;", + "rawSql": "SELECT \n coalesce(pm.name, concat(pm.ip_address, ' Interface ', pm.probe_id)) AS \"Clock Probe\", \n COUNT(*)::bigint AS \"Total Records\" \nFROM castdb.probe_data pd\nJOIN castdb.probe_metadata pm ON pd.probe_uuid = pm.uuid \nWHERE pm.vendor IN ('ADVA', 'MicrochipTP4100', 'NTP')\n AND coalesce(pm.public, true)\n AND (trim('${clock_name:csv}') = '' OR pm.uuid = ANY(string_to_array(trim('${clock_name:csv}'), ',')))\n AND pd.\"time\" >= $__timeFrom()\n AND pd.\"time\" <= $__timeTo()\nGROUP BY pm.uuid, pm.name, pm.ip_address, pm.probe_id;", "refId": "A", "sql": { "columns": [ @@ -568,7 +568,7 @@ "type": "grafana-postgresql-datasource", "uid": "castdb-datasource" }, - "description": "Average time error \n(averaged on selected resolution)", + "description": "Average time error vs stored reference (resolution as selected). GNSS-specific labeling applies only when the probe/reference model is GNSS-backed.", "fieldConfig": { "defaults": { "color": { @@ -650,7 +650,7 @@ "editorMode": "code", "format": "time_series", "rawQuery": true, - "rawSql": "SELECT \n time_bucket(${resolution:sqlstring}, pd.time AT TIME ZONE 'UTC') AS time,\n coalesce(pm.name, concat(pm.ip_address, ' Interface ', pm.probe_id)),\n AVG(pd.value::FLOAT) * 1e9 AS value\nFROM castdb.probe_data pd join castdb.probe_metadata pm on pd.probe_uuid = pm.uuid\nWHERE\n $__timeFilter(pd.time)\n AND pd.probe_uuid IN (${clock_name:sqlstring})\nGROUP BY \n time_bucket(${resolution:sqlstring}, pd.time AT TIME ZONE 'UTC'),\n pd.probe_uuid,\n pm.name,\n pm.ip_address,\n pm.probe_id\nORDER BY \n time_bucket(${resolution:sqlstring}, pd.time AT TIME ZONE 'UTC')\n", + "rawSql": "SELECT \n time_bucket(${resolution:sqlstring}, pd.time AT TIME ZONE 'UTC') AS time,\n coalesce(pm.name, concat(pm.ip_address, ' Interface ', pm.probe_id)),\n AVG(pd.value::FLOAT) * 1e9 AS value\nFROM castdb.probe_data pd JOIN castdb.probe_metadata pm ON pd.probe_uuid = pm.uuid\nWHERE\n $__timeFilter(pd.time)\n AND pm.vendor IN ('ADVA', 'MicrochipTP4100', 'NTP')\n AND coalesce(pm.public, true)\n AND (trim('${clock_name:csv}') = '' OR pd.probe_uuid = ANY(string_to_array(trim('${clock_name:csv}'), ',')))\nGROUP BY \n time_bucket(${resolution:sqlstring}, pd.time AT TIME ZONE 'UTC'),\n pd.probe_uuid,\n pm.name,\n pm.ip_address,\n pm.probe_id\nORDER BY \n time_bucket(${resolution:sqlstring}, pd.time AT TIME ZONE 'UTC')\n", "refId": "A", "sql": { "columns": [ @@ -671,7 +671,7 @@ } } ], - "title": "Time Error - Clock Time vs GNSS", + "title": "Time Error - Clock Time vs Reference", "transformations": [ { "id": "prepareTimeSeries", @@ -776,7 +776,7 @@ "editorMode": "code", "format": "time_series", "rawQuery": true, - "rawSql": "SELECT \n time_bucket(${resolution:sqlstring}, pd.time AT TIME ZONE 'UTC') AS time,\n COALESCE(name, CONCAT(ip_address, ' Interface ', probe_id)),\n (MAX(pd.value::FLOAT) - MIN(pd.value::FLOAT)) * 1e9 AS value\nFROM castdb.probe_data pd join castdb.probe_metadata pm on pd.probe_uuid = pm.uuid\nWHERE\n $__timeFilter(pd.time)\n AND pd.probe_uuid in (${clock_name:sqlstring})\nGROUP BY \n time_bucket(${resolution:sqlstring}, pd.time AT TIME ZONE 'UTC'),\n pd.probe_uuid,\n pm.name,\n pm.ip_address,\n pm.probe_id\nORDER BY \n time_bucket(${resolution:sqlstring}, pd.time AT TIME ZONE 'UTC')\n", + "rawSql": "SELECT \n time_bucket(${resolution:sqlstring}, pd.time AT TIME ZONE 'UTC') AS time,\n COALESCE(pm.name, CONCAT(pm.ip_address, ' Interface ', pm.probe_id)),\n (MAX(pd.value::FLOAT) - MIN(pd.value::FLOAT)) * 1e9 AS value\nFROM castdb.probe_data pd JOIN castdb.probe_metadata pm ON pd.probe_uuid = pm.uuid\nWHERE\n $__timeFilter(pd.time)\n AND pm.vendor IN ('ADVA', 'MicrochipTP4100', 'NTP')\n AND coalesce(pm.public, true)\n AND (trim('${clock_name:csv}') = '' OR pd.probe_uuid = ANY(string_to_array(trim('${clock_name:csv}'), ',')))\nGROUP BY \n time_bucket(${resolution:sqlstring}, pd.time AT TIME ZONE 'UTC'),\n pd.probe_uuid,\n pm.name,\n pm.ip_address,\n pm.probe_id\nORDER BY \n time_bucket(${resolution:sqlstring}, pd.time AT TIME ZONE 'UTC')\n", "refId": "A", "sql": { "columns": [ @@ -797,7 +797,7 @@ } } ], - "title": "Maximum Time Interval Error VS GNSS", + "title": "Maximum Time Interval Error vs Reference", "transformations": [ { "id": "prepareTimeSeries", @@ -834,7 +834,7 @@ "type": "grafana-postgresql-datasource", "uid": "castdb-datasource" }, - "description": "Average time error \n(averaged on selected resolution)", + "description": "Average time error vs stored reference (resolution as selected). GNSS-specific labeling applies only when the probe/reference model is GNSS-backed.", "fieldConfig": { "defaults": { "color": { @@ -937,7 +937,7 @@ } } ], - "title": "Time Error - Clock Time vs GNSS", + "title": "Time Error - Clock Time vs Reference", "type": "timeseries" }, { @@ -1048,14 +1048,44 @@ } } ], - "title": "Maximum Time Interval Error", + "title": "Maximum Time Interval Error vs Reference", "type": "timeseries" + }, + { + "collapsed": true, + "gridPos": {"h": 1, "w": 24, "x": 0, "y": 29}, + "id": 101, + "panels": [ + { + "datasource": {"type": "grafana-postgresql-datasource", "uid": "castdb-datasource"}, + "description": "Rows reflect stored `probe_metadata`, `ntp_metadata` (when vendor is NTP), `locations`, and one sample `reference`/`reference_type` from `probe_data` per probe.", + "fieldConfig": {"defaults": {}, "overrides": []}, + "gridPos": {"h": 10, "w": 24, "x": 0, "y": 0}, + "id": 100, + "options": {"cellHeight": "sm", "showHeader": true}, + "pluginVersion": "12.0.0", + "targets": [ + { + "datasource": {"type": "grafana-postgresql-datasource", "uid": "castdb-datasource"}, + "editorMode": "code", + "format": "table", + "rawQuery": true, + "rawSql": "SELECT\n COALESCE(pm.name, CONCAT(pm.ip_address, ' ', pm.probe_id)) AS probe,\n pm.vendor,\n COALESCE(rt.name, '') AS reference_type,\n COALESCE(nm.target_host::text, '') AS ntp_server,\n COALESCE(nm.mode::text, '') AS ntp_mode,\n COALESCE(nm.reference_id::text, '') AS ntp_ref_id,\n COALESCE(l.name, '') AS location,\n COALESCE(pm.public::text, '') AS public\nFROM castdb.probe_metadata pm\nLEFT JOIN castdb.ntp_metadata nm ON nm.probe_uuid = pm.uuid\nLEFT JOIN castdb.locations l ON l.uuid = pm.location_uuid\nLEFT JOIN LATERAL (\n SELECT pd.reference_uuid FROM castdb.probe_data pd WHERE pd.probe_uuid = pm.uuid LIMIT 1\n) rp ON true\nLEFT JOIN castdb.reference r ON r.uuid = rp.reference_uuid\nLEFT JOIN castdb.reference_type rt ON rt.uuid = r.reference_type_uuid\nWHERE pm.vendor IN ('ADVA', 'MicrochipTP4100', 'NTP')\n AND coalesce(pm.public, true)\n AND (trim('${clock_name:csv}') = '' OR pm.uuid = ANY(string_to_array(trim('${clock_name:csv}'), ',')))\nORDER BY 1;", + "refId": "A" + } + ], + "title": "Probe reference & source (stored metadata)", + "type": "table" + } + ], + "title": "Reference & source metadata", + "type": "row" } ], "preload": false, "refresh": "", "schemaVersion": 41, - "tags": [], + "tags": ["opensampl", "reference", "geospatial"], "templating": { "list": [ { @@ -1067,12 +1097,12 @@ "type": "grafana-postgresql-datasource", "uid": "castdb-datasource" }, - "definition": "SELECT uuid AS __value, COALESCE(name, CONCAT(ip_address, ' Interface ', probe_id)) AS __text FROM castdb.probe_metadata WHERE vendor in ('ADVA', 'MicrochipTP4100') and public;", + "definition": "SELECT pm.uuid::text AS __value, COALESCE(pm.name, CONCAT(pm.ip_address, ' Interface ', pm.probe_id)) AS __text FROM castdb.probe_metadata pm WHERE pm.vendor in ('ADVA', 'MicrochipTP4100', 'NTP') AND coalesce(pm.public, true) ORDER BY 2;", "includeAll": true, "multi": true, "name": "clock_name", "options": [], - "query": "SELECT uuid AS __value, COALESCE(name, CONCAT(ip_address, ' Interface ', probe_id)) AS __text FROM castdb.probe_metadata WHERE vendor in ('ADVA', 'MicrochipTP4100') and public;", + "query": "SELECT pm.uuid::text AS __value, COALESCE(pm.name, CONCAT(pm.ip_address, ' Interface ', pm.probe_id)) AS __text FROM castdb.probe_metadata pm WHERE pm.vendor in ('ADVA', 'MicrochipTP4100', 'NTP') AND coalesce(pm.public, true) ORDER BY 2;", "refresh": 1, "regex": "", "type": "query" @@ -1130,7 +1160,8 @@ }, "timepicker": {}, "timezone": "utc", - "title": "Public Geospatial and Timing Combined Dashboard", + "description": "Geospatial views use stored `locations` geometry. Timing series are relative to each probe\u2019s stored reference (OpenSAMPL `reference` / `reference_type`) and are **not** GNSS-truth unless a GNSS-backed probe supplies that semantics.", + "title": "Public Geospatial and Timing (Reference)", "uid": "public-geospatial-dashboard", "version": 10 } \ No newline at end of file diff --git a/opensampl/server/grafana/grafana-provisioning/dashboards/dashboard.yaml b/opensampl/server/grafana/grafana-provisioning/dashboards/dashboard.yaml index 0109b1e..837835b 100644 --- a/opensampl/server/grafana/grafana-provisioning/dashboards/dashboard.yaml +++ b/opensampl/server/grafana/grafana-provisioning/dashboards/dashboard.yaml @@ -1,13 +1,13 @@ apiVersion: 1 providers: - - name: 'Clock Dashboards' + - name: "Clock Dashboards" orgId: 1 - folder: 'Timing' + folder: "Timing" type: file disableDeletion: false updateIntervalSeconds: 10 allowUiUpdates: true options: path: /var/lib/grafana/dashboards - foldersFromFilesStructure: true \ No newline at end of file + foldersFromFilesStructure: true diff --git a/opensampl/server/grafana/grafana-provisioning/datasources/postgresql.yaml b/opensampl/server/grafana/grafana-provisioning/datasources/postgresql.yaml index df8d725..355c72e 100644 --- a/opensampl/server/grafana/grafana-provisioning/datasources/postgresql.yaml +++ b/opensampl/server/grafana/grafana-provisioning/datasources/postgresql.yaml @@ -2,6 +2,7 @@ apiVersion: 1 datasources: - name: ${POSTGRES_DB}-datasource + uid: castdb-datasource type: grafana-postgresql-datasource url: db:5432 database: ${POSTGRES_DB} diff --git a/opensampl/vendors/constants.py b/opensampl/vendors/constants.py index 4725b05..e2bc68a 100644 --- a/opensampl/vendors/constants.py +++ b/opensampl/vendors/constants.py @@ -71,6 +71,13 @@ class VENDORS: ) # --- CUSTOM VENDORS --- !! Do not remove line, used as reference when inserting vendor + NTP = VendorType( + name="NTP", + parser_class="NtpProbe", + parser_module="ntp", + metadata_table="ntp_metadata", + metadata_orm="NtpMetadata", + ) # --- VENDOR FUNCTIONS --- diff --git a/opensampl/vendors/ntp.py b/opensampl/vendors/ntp.py new file mode 100644 index 0000000..2097525 --- /dev/null +++ b/opensampl/vendors/ntp.py @@ -0,0 +1,250 @@ +"""NTP clock probe: JSON snapshot files from local tooling or remote NTP queries.""" + +from __future__ import annotations + +import json +import math +import random +import re +from datetime import datetime, timedelta, timezone +from pathlib import Path +from typing import Any, ClassVar + +import numpy as np +import pandas as pd +from loguru import logger +from pydantic import Field + +from opensampl.metrics import METRICS +from opensampl.references import REF_TYPES +from opensampl.vendors.base_probe import BaseProbe +from opensampl.vendors.constants import VENDORS, ProbeKey + +# Filename: ntp___.json (IPv4 uses dashes, e.g. 127-0-0-1) +_FILE_RE = re.compile( + r"^ntp_(?P[0-9A-Za-z.-]+)_(?P[a-zA-Z0-9-]+)_(?P\d{4})(?P\d{2})(?P\d{2})T" + r"(?P\d{2})(?P\d{2})(?P\d{2})Z\.json$" +) + + +def _dashed_ip_to_address(ip_part: str) -> str: + """Turn 127-0-0-1 into 127.0.0.1; leave hostnames unchanged.""" + if re.match(r"^[\d-]+$", ip_part) and ip_part.count("-") == 3: + return ip_part.replace("-", ".") + return ip_part + + +class NtpProbe(BaseProbe): + """Load NTP snapshots from JSON files produced by ``opensampl-collect ntp`` or tests.""" + + vendor = VENDORS.NTP + file_pattern: ClassVar = _FILE_RE + + class RandomDataConfig(BaseProbe.RandomDataConfig): + """Random NTP-like test data.""" + + base_value: float = Field( + default_factory=lambda: random.uniform(-1e-4, 1e-4), + description="random.uniform(-1e-4, 1e-4)", + ) + noise_amplitude: float = Field( + default_factory=lambda: random.uniform(1e-9, 1e-7), + description="random.uniform(1e-9, 1e-7)", + ) + drift_rate: float = Field( + default_factory=lambda: random.uniform(-1e-12, 1e-12), + description="random.uniform(-1e-12, 1e-12)", + ) + + def __init__(self, input_file: str | Path, **kwargs: Any): + """Load JSON snapshot; optionally override ``ProbeKey`` from ``probe_id`` / ``probe_ip`` in the document.""" + super().__init__(input_file=input_file, **kwargs) + self._doc: dict[str, Any] = {} + raw = Path(input_file).read_text(encoding="utf-8") + self._doc = json.loads(raw) + fk = self._doc.get("probe_id") + fa = self._doc.get("probe_ip") + if isinstance(fk, str) and isinstance(fa, str): + self.probe_key = ProbeKey(probe_id=fk, ip_address=fa) + else: + self.probe_key, _ = self.parse_file_name(Path(input_file)) + + @classmethod + def filter_files(cls, files: list[Path]) -> list[Path]: + """Keep only JSON files matching :attr:`file_pattern`.""" + return [f for f in files if cls.file_pattern.fullmatch(f.name)] + + @classmethod + def parse_file_name(cls, file_name: Path) -> tuple[ProbeKey, datetime]: + """ + Parse ``ntp___.json`` into probe key and file timestamp. + + IPv4 addresses are written with dashes between octets (``127-0-0-1``). + """ + m = cls.file_pattern.fullmatch(file_name.name) + if not m: + raise ValueError(f"NTP snapshot file name not recognized: {file_name.name}") + ip = _dashed_ip_to_address(m.group("ip")) + probe_id = m.group("probe_id") + ts = datetime( + int(m.group("y")), + int(m.group("mo")), + int(m.group("d")), + int(m.group("h")), + int(m.group("mi")), + int(m.group("s")), + tzinfo=timezone.utc, + ) + return ProbeKey(probe_id=probe_id, ip_address=ip), ts + + def process_metadata(self) -> dict[str, Any]: + """Return vendor metadata row for ``ntp_metadata``.""" + meta = dict(self._doc.get("metadata") or {}) + # Drop keys not in ORM (extra safety) + allowed = { + "mode", + "probe_name", + "target_host", + "target_port", + "sync_status", + "leap_status", + "stratum", + "reachability", + "offset_last_s", + "delay_s", + "jitter_s", + "dispersion_s", + "root_delay_s", + "root_dispersion_s", + "poll_interval_s", + "reference_id", + "observation_source", + "collection_host", + "additional_metadata", + } + meta = {k: v for k, v in meta.items() if k in allowed} + if "probe_name" not in meta or not meta.get("probe_name"): + meta["probe_name"] = f"NTP {self.probe_key.probe_id}" + if "additional_metadata" not in meta: + meta["additional_metadata"] = {} + self.metadata_parsed = True + return meta + + def process_time_data(self) -> pd.DataFrame: + """Send time series for each metric present in each series row.""" + if not self.metadata_parsed: + self.process_metadata() + + series = self._doc.get("series") or [] + if not series: + logger.warning("NTP snapshot has empty series: %s", self.input_file) + return pd.DataFrame() + + metric_map = { + "phase_offset_s": METRICS.PHASE_OFFSET, + "delay_s": METRICS.NTP_DELAY, + "jitter_s": METRICS.NTP_JITTER, + "stratum": METRICS.NTP_STRATUM, + "reachability": METRICS.NTP_REACHABILITY, + "dispersion_s": METRICS.NTP_DISPERSION, + "root_delay_s": METRICS.NTP_ROOT_DELAY, + "root_dispersion_s": METRICS.NTP_ROOT_DISPERSION, + "poll_interval_s": METRICS.NTP_POLL_INTERVAL, + "sync_health": METRICS.NTP_SYNC_HEALTH, + } + + for row in series: + t_raw = row.get("time") + if t_raw is None: + continue + t = pd.to_datetime(t_raw, utc=True) + for key, mtype in metric_map.items(): + if key not in row: + continue + val = row[key] + if val is None or (isinstance(val, float) and (math.isnan(val) or math.isinf(val))): + continue + df = pd.DataFrame({"time": [t], "value": [float(val)]}) + self.send_data( + data=df, + metric=mtype, + reference_type=REF_TYPES.UNKNOWN, + ) + + return pd.DataFrame() + + @classmethod + def generate_random_data( + cls, + config: RandomDataConfig, + probe_key: ProbeKey, + ) -> ProbeKey: + """Generate synthetic NTP-like metrics for testing.""" + cls._setup_random_seed(config.seed) + logger.info(f"Generating random NTP data for {probe_key}") + + meta = { + "mode": "local_host", + "probe_name": f"Random NTP {probe_key.probe_id}", + "target_host": "", + "target_port": 0, + "sync_status": "tracking", + "leap_status": "no_warning", + "stratum": 2, + "reachability": 377, + "observation_source": "random", + "collection_host": "", + "additional_metadata": {"test_data": True}, + } + cls._send_metadata_to_db(probe_key, meta) + + total_seconds = config.duration_hours * 3600 + num_samples = int(total_seconds / config.sample_interval) + + for i in range(num_samples): + sample_time = config.start_time + timedelta(seconds=i * config.sample_interval) + time_offset = i * config.sample_interval + drift_component = config.drift_rate * time_offset + noise = float(np.random.normal(0, config.noise_amplitude)) + offset = config.base_value + drift_component + noise + if random.random() < config.outlier_probability: + offset += float(np.random.normal(0, config.noise_amplitude * config.outlier_multiplier)) + + delay_s = 0.02 + abs(0.0001 * random.random()) + jitter_s = abs(float(config.noise_amplitude * 5)) + stratum = 2.0 + (1.0 if random.random() < 0.05 else 0.0) + sync_health = 1.0 + + cls.send_data( + probe_key=probe_key, + data=pd.DataFrame({"time": [sample_time], "value": [offset]}), + metric=METRICS.PHASE_OFFSET, + reference_type=REF_TYPES.UNKNOWN, + ) + cls.send_data( + probe_key=probe_key, + data=pd.DataFrame({"time": [sample_time], "value": [delay_s]}), + metric=METRICS.NTP_DELAY, + reference_type=REF_TYPES.UNKNOWN, + ) + cls.send_data( + probe_key=probe_key, + data=pd.DataFrame({"time": [sample_time], "value": [jitter_s]}), + metric=METRICS.NTP_JITTER, + reference_type=REF_TYPES.UNKNOWN, + ) + cls.send_data( + probe_key=probe_key, + data=pd.DataFrame({"time": [sample_time], "value": [stratum]}), + metric=METRICS.NTP_STRATUM, + reference_type=REF_TYPES.UNKNOWN, + ) + cls.send_data( + probe_key=probe_key, + data=pd.DataFrame({"time": [sample_time], "value": [sync_health]}), + metric=METRICS.NTP_SYNC_HEALTH, + reference_type=REF_TYPES.UNKNOWN, + ) + + logger.info(f"Finished random NTP generation for {probe_key}") + return probe_key diff --git a/opensampl/vendors/ntp_parsing.py b/opensampl/vendors/ntp_parsing.py new file mode 100644 index 0000000..505ca63 --- /dev/null +++ b/opensampl/vendors/ntp_parsing.py @@ -0,0 +1,342 @@ +"""Parse local NTP client tooling output into a normalized snapshot dict.""" + +from __future__ import annotations + +import re +import shutil +import subprocess +from datetime import datetime, timezone +from typing import Any, Optional + +from loguru import logger + +_CMD_TIMEOUT = 8.0 + + +def _run(cmd: list[str]) -> Optional[str]: + """Run command; return stdout or None if missing/failed.""" + bin0 = cmd[0] + if shutil.which(bin0) is None: + return None + try: + proc = subprocess.run( + cmd, + capture_output=True, + text=True, + timeout=_CMD_TIMEOUT, + check=False, + ) + except (OSError, subprocess.SubprocessError) as e: + logger.debug(f"ntp local: command {cmd!r} failed: {e}") + return None + if proc.returncode != 0: + logger.debug(f"ntp local: {cmd!r} exit {proc.returncode}: {proc.stderr!r}") + return None + return proc.stdout or "" + + +def _parse_chronyc_tracking(text: str) -> dict[str, Any]: + """Parse `chronyc tracking` key: value output.""" + out: dict[str, Any] = {} + for line in text.splitlines(): + line = line.strip() + if not line or ":" not in line: + continue + key, _, rest = line.partition(":") + key = key.strip().lower().replace(" ", "_") + val = rest.strip() + out[key] = val + + offset_s: Optional[float] = None + jitter_s: Optional[float] = None + stratum: Optional[int] = None + ref = None + + # Last offset : +0.000000123 seconds + m = re.search(r"last offset\s*:\s*([+-]?[\d.eE+-]+)\s*seconds?", text, re.I) + if m: + try: + offset_s = float(m.group(1)) + except ValueError: + pass + m = re.search(r"rms offset\s*:\s*([+-]?[\d.eE+-]+)\s*seconds?", text, re.I) + if m: + try: + jitter_s = float(m.group(1)) + except ValueError: + pass + m = re.search(r"stratum\s*:\s*(\d+)", text, re.I) + if m: + try: + stratum = int(m.group(1)) + except ValueError: + pass + m = re.search(r"reference id\s*:\s*(\S+)(?:\s*\(([^)]+)\))?", text, re.I) + if m: + ref = m.group(2) or m.group(1) + + sync_status = "unsynchronized" + if "normal" in text.lower() or offset_s is not None: + sync_status = "tracking" + + return { + "raw_tracking": out, + "offset_s": offset_s, + "jitter_s": jitter_s, + "stratum": stratum, + "reference_id": ref, + "sync_status": sync_status, + "observation_source": "chronyc_tracking", + } + + +def _parse_chronyc_sources(text: str) -> dict[str, Any]: + """Parse `chronyc sources` for reach and selected source.""" + reach: Optional[int] = None + selected: Optional[str] = None + for line in text.splitlines(): + line = line.strip() + if not line or line.startswith("MS") or line.startswith("="): + continue + # ^* or ^+ prefix indicates selected/accepted + if line.startswith("*") or line.startswith("+"): + parts = line.split() + if len(parts) >= 7: + try: + reach = int(parts[5], 8) if parts[5].startswith("0") else int(parts[5]) + except ValueError: + try: + reach = int(parts[5]) + except ValueError: + pass + selected = parts[1] + break + # Fallback: last column often reach (octal) + parts = line.split() + if len(parts) >= 7 and parts[0] in ("^*", "^+", "*", "+"): + # already handled + pass + if reach is None: + # Try any line with 377 octal style + m = re.search(r"\b([0-7]{3})\b", text) + if m: + try: + reach = int(m.group(1), 8) + except ValueError: + pass + + return { + "reachability": reach, + "selected_source": selected, + "observation_source": "chronyc_sources", + } + + +def _parse_ntpq(text: str) -> dict[str, Any]: + """Parse `ntpq -p` / `ntpq -pn` output.""" + offset_s: Optional[float] = None + delay_s: Optional[float] = None + jitter_s: Optional[float] = None + stratum: Optional[int] = None + reach: Optional[int] = None + ref = None + for line in text.splitlines(): + line = line.strip() + if not line or line.startswith("remote") or line.startswith("="): + continue + if line.startswith("*") or line.startswith("+") or line.startswith("-"): + parts = line.split() + # remote refid st t when poll reach delay offset jitter + if len(parts) >= 10: + try: + stratum = int(parts[2]) + except ValueError: + pass + try: + delay_s = float(parts[7]) / 1000.0 # ms -> s + offset_s = float(parts[8]) / 1000.0 + jitter_s = float(parts[9]) / 1000.0 + except (ValueError, IndexError): + pass + try: + reach = int(parts[6], 8) if parts[6].startswith("0") else int(parts[6]) + except ValueError: + try: + reach = int(parts[6]) + except ValueError: + pass + ref = parts[1] + break + + return { + "offset_s": offset_s, + "delay_s": delay_s, + "jitter_s": jitter_s, + "stratum": stratum, + "reachability": reach, + "reference_id": ref, + "sync_status": "synced" if offset_s is not None else "unknown", + "observation_source": "ntpq", + } + + +def _parse_timedatectl(text: str) -> dict[str, Any]: + """Parse `timedatectl status` / `show-timesync --all`.""" + sync = None + for line in text.splitlines(): + low = line.lower() + if "system clock synchronized" in low or "ntp synchronized" in low: + if "yes" in low: + sync = True + elif "no" in low: + sync = False + sync_status = "unknown" + if sync is True: + sync_status = "synchronized" + elif sync is False: + sync_status = "unsynchronized" + + return { + "sync_status": sync_status, + "observation_source": "timedatectl", + } + + +def _parse_systemctl_show(text: str) -> dict[str, Any]: + """Parse `systemctl show` / `systemctl status` for systemd-timesyncd.""" + active = None + for line in text.splitlines(): + if line.strip().lower().startswith("activestate="): + active = line.split("=", 1)[1].strip().lower() == "active" + break + if active is None and "active (running)" in text.lower(): + active = True + sync_status = "unknown" + if active is True: + sync_status = "service_active" + elif active is False: + sync_status = "service_inactive" + + return {"sync_status": sync_status, "observation_source": "systemctl_timesyncd"} + + +def _merge_int(a: Optional[int], b: Optional[int]) -> Optional[int]: + return a if a is not None else b + + +def _merge_float(a: Optional[float], b: Optional[float]) -> Optional[float]: + return a if a is not None else b + + +def collect_local_snapshot( + probe_id: str, + probe_ip: str, + probe_name: str, + collection_host: str, +) -> dict[str, Any]: + """ + Run the local fallback chain and return a snapshot document (metadata + one series row). + + probe_ip: IP used in ProbeKey (e.g. 127.0.0.1 for local). + """ + merged: dict[str, Any] = { + "mode": "local_host", + "probe_name": probe_name, + "target_host": "", + "target_port": 0, + "sync_status": "unknown", + "leap_status": "unknown", + "stratum": None, + "reachability": None, + "offset_last_s": None, + "delay_s": None, + "jitter_s": None, + "dispersion_s": None, + "root_delay_s": None, + "root_dispersion_s": None, + "poll_interval_s": None, + "reference_id": None, + "observation_source": "none", + "collection_host": collection_host, + } + extras: dict[str, Any] = {} + + t = _run(["chronyc", "tracking"]) + if t: + p = _parse_chronyc_tracking(t) + extras["chronyc_tracking"] = p.get("raw_tracking", {}) + merged["offset_last_s"] = _merge_float(merged["offset_last_s"], p.get("offset_s")) + merged["jitter_s"] = _merge_float(merged["jitter_s"], p.get("jitter_s")) + merged["stratum"] = _merge_int(merged["stratum"], p.get("stratum")) + merged["reference_id"] = p.get("reference_id") or merged["reference_id"] + merged["sync_status"] = p.get("sync_status", merged["sync_status"]) + merged["observation_source"] = p.get("observation_source", merged["observation_source"]) + + t = _run(["chronyc", "sources", "-v"]) or _run(["chronyc", "sources"]) + if t: + p = _parse_chronyc_sources(t) + merged["reachability"] = _merge_int(merged["reachability"], p.get("reachability")) + if p.get("selected_source"): + merged["reference_id"] = merged["reference_id"] or p["selected_source"] + merged["observation_source"] = p.get("observation_source", merged["observation_source"]) + + if merged["offset_last_s"] is None and merged["stratum"] is None: + t = _run(["ntpq", "-pn"]) or _run(["ntpq", "-p"]) + if t: + p = _parse_ntpq(t) + merged["offset_last_s"] = _merge_float(merged["offset_last_s"], p.get("offset_s")) + merged["delay_s"] = _merge_float(merged["delay_s"], p.get("delay_s")) + merged["jitter_s"] = _merge_float(merged["jitter_s"], p.get("jitter_s")) + merged["stratum"] = _merge_int(merged["stratum"], p.get("stratum")) + merged["reachability"] = _merge_int(merged["reachability"], p.get("reachability")) + merged["reference_id"] = merged["reference_id"] or p.get("reference_id") + merged["sync_status"] = p.get("sync_status", merged["sync_status"]) + merged["observation_source"] = p.get("observation_source", merged["observation_source"]) + + t = _run(["timedatectl", "show-timesync", "--all"]) or _run(["timedatectl", "status"]) + if t: + p = _parse_timedatectl(t) + if merged["sync_status"] == "unknown": + merged["sync_status"] = p.get("sync_status", merged["sync_status"]) + merged["observation_source"] = p.get("observation_source", merged["observation_source"]) + extras["timedatectl"] = t[:2000] + + t = _run(["systemctl", "show", "systemd-timesyncd", "--property=ActiveState"]) + if not t: + t = _run(["systemctl", "status", "systemd-timesyncd", "--no-pager"]) + if t: + p = _parse_systemctl_show(t) + if merged["sync_status"] == "unknown": + merged["sync_status"] = p.get("sync_status", merged["sync_status"]) + merged["observation_source"] = p.get("observation_source", merged["observation_source"]) + extras["systemctl"] = t[:2000] + + now = datetime.now(tz=timezone.utc) + row: dict[str, Any] = { + "time": now.isoformat(), + "phase_offset_s": merged["offset_last_s"], + "delay_s": merged["delay_s"], + "jitter_s": merged["jitter_s"], + "stratum": float(merged["stratum"]) if merged["stratum"] is not None else None, + "reachability": float(merged["reachability"]) if merged["reachability"] is not None else None, + "dispersion_s": merged["dispersion_s"], + "root_delay_s": merged["root_delay_s"], + "root_dispersion_s": merged["root_dispersion_s"], + "poll_interval_s": merged["poll_interval_s"], + "sync_health": 1.0 if merged["sync_status"] in ("tracking", "synchronized", "synced") else 0.0, + } + + for k in list(row.keys()): + if row[k] is None: + del row[k] + + meta = {k: v for k, v in merged.items() if v is not None} + meta["additional_metadata"] = extras + + return { + "format_version": 1, + "probe_id": probe_id, + "probe_ip": probe_ip, + "metadata": meta, + "series": [row], + } diff --git a/opensampl/vendors/ntp_remote.py b/opensampl/vendors/ntp_remote.py new file mode 100644 index 0000000..b31785f --- /dev/null +++ b/opensampl/vendors/ntp_remote.py @@ -0,0 +1,182 @@ +"""Remote NTP client queries (UDP).""" + +from __future__ import annotations + +from datetime import datetime, timezone +from typing import Any + +from loguru import logger + + +def _estimate_jitter_s(delay_s: float | None, root_dispersion_s: float | None) -> float | None: + """ + Single NTP client response does not include RFC5905 peer jitter (that needs multiple samples). + + Emit a conservative positive bound from round-trip delay and root dispersion so downstream + ``NTP Jitter`` metrics and dashboards have a value; chrony/ntpq local paths still supply true jitter when available. + """ + if delay_s is None and root_dispersion_s is None: + return None + d = float(delay_s) if delay_s is not None else 0.0 + r = float(root_dispersion_s) if root_dispersion_s is not None else 0.0 + est = 0.05 * d + 0.25 * r + return est if est > 0 else None + + +def _apply_probe_overrides( + doc: dict[str, Any], + probe_id: str | None, + probe_ip: str | None, + probe_name: str | None, + geo_override: dict[str, Any] | None, +) -> None: + """Apply stable probe identity and optional lab geolocation overrides for ingest/config workflows.""" + meta = doc.setdefault("metadata", {}) + add = meta.setdefault("additional_metadata", {}) + if not isinstance(add, dict): + add = {} + meta["additional_metadata"] = add + if probe_id: + doc["probe_id"] = probe_id + if probe_ip: + doc["probe_ip"] = probe_ip + if probe_name: + meta["probe_name"] = probe_name + if geo_override: + add["geo_override"] = geo_override + + +def query_ntp_server( + host: str, + port: int = 123, + timeout: float = 3.0, + *, + probe_id: str | None = None, + probe_ip: str | None = None, + probe_name: str | None = None, + geo_override: dict[str, Any] | None = None, +) -> dict[str, Any]: + """ + Perform one NTP client request and return a snapshot document (metadata + one series row). + + Requires optional dependency ``ntplib``. + """ + try: + import ntplib # type: ignore[import-untyped] + except ImportError as e: + raise ImportError("Remote NTP collection requires the 'ntplib' package (install opensampl[collect]).") from e + + client = ntplib.NTPClient() + try: + resp = client.request(host, port=port, version=3, timeout=timeout) + except Exception as e: + logger.warning(f"NTP request to {host}:{port} failed: {e}") + now = datetime.now(tz=timezone.utc) + meta = { + "mode": "remote_server", + "probe_name": f"ntp-{host}-{port}", + "target_host": host, + "target_port": port, + "sync_status": "unreachable", + "leap_status": "unknown", + "stratum": None, + "reachability": None, + "offset_last_s": None, + "delay_s": None, + "jitter_s": None, + "dispersion_s": None, + "root_delay_s": None, + "root_dispersion_s": None, + "poll_interval_s": None, + "reference_id": None, + "observation_source": "ntplib_error", + "collection_host": "", + "additional_metadata": {"error": str(e)}, + } + row = { + "time": now.isoformat(), + "sync_health": 0.0, + } + doc = { + "format_version": 1, + "probe_id": f"remote-{host}-{port}", + "probe_ip": host, + "metadata": meta, + "series": [row], + } + _apply_probe_overrides(doc, probe_id, probe_ip, probe_name, geo_override) + return doc + + leap = int(resp.leap) + leap_map = {0: "no_warning", 1: "add_second", 2: "del_second", 3: "alarm"} + stratum = int(resp.stratum) + # poll is log2 seconds in RFC5905 + try: + poll_s = float(2 ** int(resp.poll)) + except (TypeError, ValueError, OverflowError): + poll_s = None + + root_delay_s = float(resp.root_delay) if resp.root_delay is not None else None + root_dispersion_s = float(resp.root_dispersion) if resp.root_dispersion is not None else None + delay_s = float(resp.delay) if resp.delay is not None else None + offset_s = float(resp.offset) if resp.offset is not None else None + + ref_id = getattr(resp, "ref_id", None) + if hasattr(ref_id, "decode"): + try: + ref_id = ref_id.decode("ascii", errors="replace") + except Exception: + ref_id = str(ref_id) + ref_id = str(ref_id) if ref_id is not None else None + + now = datetime.now(tz=timezone.utc) + sync_ok = stratum < 16 and offset_s is not None + meta = { + "mode": "remote_server", + "probe_name": f"ntp-{host}-{port}", + "target_host": host, + "target_port": port, + "sync_status": "synchronized" if sync_ok else "unsynchronized", + "leap_status": leap_map.get(leap, str(leap)), + "stratum": stratum, + "reachability": None, + "offset_last_s": offset_s, + "delay_s": delay_s, + "jitter_s": _estimate_jitter_s(delay_s, root_dispersion_s), + "dispersion_s": None, + "root_delay_s": root_delay_s, + "root_dispersion_s": root_dispersion_s, + "poll_interval_s": poll_s, + "reference_id": ref_id, + "observation_source": "ntplib", + "collection_host": "", + "additional_metadata": {"version": getattr(resp, "version", None)}, + } + + jitter_est = _estimate_jitter_s(delay_s, root_dispersion_s) + + row: dict[str, Any] = { + "time": now.isoformat(), + "phase_offset_s": offset_s, + "delay_s": delay_s, + "stratum": float(stratum), + "root_delay_s": root_delay_s, + "root_dispersion_s": root_dispersion_s, + "poll_interval_s": poll_s, + "sync_health": 1.0 if sync_ok else 0.0, + } + if jitter_est is not None: + row["jitter_s"] = jitter_est + for k in list(row.keys()): + if row[k] is None: + del row[k] + + doc = { + "format_version": 1, + "probe_id": f"remote-{host}-{port}", + "probe_ip": host, + "metadata": meta, + "series": [row], + } + _apply_probe_overrides(doc, probe_id, probe_ip, probe_name, geo_override) + return doc diff --git a/pyproject.toml b/pyproject.toml index b9500f6..3c2639d 100644 --- a/pyproject.toml +++ b/pyproject.toml @@ -76,7 +76,7 @@ backend = [ "uvicorn", "prometheus-client", ] -collect = ["telnetlib3==2.0.4"] +collect = ["telnetlib3==2.0.4", "ntplib>=0.4.0,<0.5"] [project.scripts] opensampl = "opensampl.cli:cli" @@ -136,7 +136,23 @@ ignore = ["D203", "D212", "D400", "D415", "ANN401", "S101", "PLR2004", "COM812", [tool.ruff.lint.per-file-ignores] "opensampl/vendors/**/*.py" = ['S311'] # we want to ignore the errors about random +<<<<<<< HEAD "opensampl/server/backend/main.py" = ['B008', 'ARG001'] #ignore complaints about calling functions in args +======= +"opensampl/vendors/ntp_parsing.py" = [ + "C901", + "PLR0915", + "S603", + "PLW2901", + "D103", + "UP007", + "SIM105", + "FURB167", + "PIE810", +] +"opensampl/vendors/ntp_remote.py" = ["S603"] + +>>>>>>> f133862 (initial commit of ntp probes) [tool.ruff.lint.pylint] max-args = 10 diff --git a/tests/test_cli.py b/tests/test_cli.py index f5d5e46..d305d56 100644 --- a/tests/test_cli.py +++ b/tests/test_cli.py @@ -47,8 +47,15 @@ def test_cli_config_with_env_vars(self): assert config.ROUTE_TO_BACKEND is True @patch('opensampl.cli.find_dotenv') - def test_cli_config_auto_find_env_file(self, mock_find_dotenv, tmp_path): - """Test CLIConfig auto-finding .env file.""" + def test_cli_config_auto_find_env_file(self, mock_find_dotenv, tmp_path, monkeypatch): + """Test CLIConfig auto-finding .env file when env vars do not override. + + pydantic-settings gives precedence to process environment variables over + values loaded from the env file. Clear DATABASE_URL so the test asserts + .env resolution in isolation from the developer machine. + """ + monkeypatch.delenv("DATABASE_URL", raising=False) + monkeypatch.delenv("OPENSAMPL_ENV_FILE", raising=False) env_file = tmp_path / ".env" env_file.write_text("DATABASE_URL=postgresql://auto:5432/autodb") mock_find_dotenv.return_value = str(env_file) @@ -56,6 +63,18 @@ def test_cli_config_auto_find_env_file(self, mock_find_dotenv, tmp_path): config = load_config() assert config.DATABASE_URL == "postgresql://auto:5432/autodb" + @patch('opensampl.cli.find_dotenv') + def test_cli_config_database_url_env_overrides_auto_found_dotenv(self, mock_find_dotenv, tmp_path, monkeypatch): + """Process DATABASE_URL overrides the same key in an auto-found .env file.""" + env_file = tmp_path / ".env" + env_file.write_text("DATABASE_URL=postgresql://from-dotenv:5432/db") + mock_find_dotenv.return_value = str(env_file) + monkeypatch.delenv("OPENSAMPL_ENV_FILE", raising=False) + monkeypatch.setenv("DATABASE_URL", "postgresql://from-env:5432/db") + + config = load_config() + assert config.DATABASE_URL == "postgresql://from-env:5432/db" + def test_cli_config_validation(self): """Test CLIConfig validation.""" # Should work with valid data diff --git a/tests/test_ntp_probe.py b/tests/test_ntp_probe.py new file mode 100644 index 0000000..e27a162 --- /dev/null +++ b/tests/test_ntp_probe.py @@ -0,0 +1,93 @@ +"""Tests for NTP vendor, parsers, and snapshot format.""" + +import json +from datetime import datetime, timezone +from pathlib import Path + +import pytest + +from opensampl.vendors.ntp import NtpProbe +from opensampl.vendors.ntp_parsing import _parse_chronyc_tracking, _parse_ntpq + + +def test_parse_chronyc_tracking_basic(): + text = """ +Reference ID : A1B2C3D4 (pool.ntp.org) +Stratum : 3 +Ref time (UTC) : Thu Apr 11 12:00:00 2025 +System time : 0.000000100 seconds slow of NTP time +Last offset : +0.000000050 seconds +RMS offset : 0.000000200 seconds +Frequency : 0.123 ppm slow +Residual freq : 0.001 ppm +Skew : 0.050 ppm +Root delay : 0.010234 seconds +Root dispersion : 0.001000 seconds +Update interval : 64.0 seconds +""" + p = _parse_chronyc_tracking(text) + assert p["offset_s"] == pytest.approx(5e-8) + assert p["jitter_s"] == pytest.approx(2e-7) + assert p["stratum"] == 3 + + +def test_parse_ntpq_line(): + text = """ + remote refid st t when poll reach delay offset jitter +============================================================================== +*192.168.1.1 .GPS. 1 u 12 64 377 1.234 0.567 0.089 +""" + p = _parse_ntpq(text) + assert p["offset_s"] == pytest.approx(0.000567) + assert p["delay_s"] == pytest.approx(0.001234) + assert p["stratum"] == 1 + + +def test_ntp_probe_json_roundtrip(tmp_path: Path): + doc = { + "format_version": 1, + "probe_id": "test-probe", + "probe_ip": "127.0.0.1", + "metadata": { + "mode": "remote_server", + "probe_name": "unit-test", + "target_host": "pool.ntp.org", + "target_port": 123, + "sync_status": "synchronized", + "leap_status": "no_warning", + "stratum": 2, + "reachability": 255, + "offset_last_s": 0.001, + "delay_s": 0.02, + "jitter_s": 0.0001, + "dispersion_s": None, + "root_delay_s": 0.001, + "root_dispersion_s": 0.0002, + "poll_interval_s": 64.0, + "reference_id": "GPS", + "observation_source": "test", + "collection_host": "", + "additional_metadata": {}, + }, + "series": [ + { + "time": datetime(2025, 4, 11, 12, 0, 0, tzinfo=timezone.utc).isoformat(), + "phase_offset_s": 0.001, + "delay_s": 0.02, + "jitter_s": 0.0001, + "stratum": 2.0, + "reachability": 255.0, + "sync_health": 1.0, + } + ], + } + name = "ntp_127-0-0-1_test-probe_20250411T120000Z.json" + fp = tmp_path / name + fp.write_text(json.dumps(doc), encoding="utf-8") + + probe = NtpProbe(fp) + assert probe.probe_key.probe_id == "test-probe" + assert probe.probe_key.ip_address == "127.0.0.1" + meta = probe.process_metadata() + assert meta["mode"] == "remote_server" + assert meta["stratum"] == 2 diff --git a/uv.lock b/uv.lock index 0ba575b..0896373 100644 --- a/uv.lock +++ b/uv.lock @@ -1117,6 +1117,15 @@ wheels = [ { url = "https://files.pythonhosted.org/packages/cb/c4/ffa32f2c7cdb1728026c7a34aab87796b895767893aaa54611a79b4eef45/mkdocstrings_python-1.16.11-py3-none-any.whl", hash = "sha256:25d96cc9c1f9c272ea1bd8222c900b5f852bf46c984003e9c7c56eaa4696190f", size = 124282, upload-time = "2025-05-24T10:41:30.008Z" }, ] +[[package]] +name = "ntplib" +version = "0.4.0" +source = { registry = "https://pypi.org/simple" } +sdist = { url = "https://files.pythonhosted.org/packages/b4/14/6b018fb602602d9f6cc7485cbad7c1be3a85d25cea18c233854f05284aed/ntplib-0.4.0.tar.gz", hash = "sha256:899d8fb5f8c2555213aea95efca02934c7343df6ace9d7628a5176b176906267", size = 7135, upload-time = "2021-05-28T19:08:54.394Z" } +wheels = [ + { url = "https://files.pythonhosted.org/packages/58/8c/41da70f6feaca807357206a376b6de2001b439c7f78f53473a914a6dbd1e/ntplib-0.4.0-py2.py3-none-any.whl", hash = "sha256:8d27375329ed7ff38755f7b6d4658b28edc147cadf40338a63a0da8133469d60", size = 6849, upload-time = "2021-05-28T19:08:53.323Z" }, +] + [[package]] name = "numpy" version = "1.26.4" @@ -1178,7 +1187,7 @@ wheels = [ [[package]] name = "opensampl" -version = "1.1.0" +version = "1.1.5" source = { editable = "." } dependencies = [ { name = "allantools" }, @@ -1200,11 +1209,13 @@ dependencies = [ { name = "pyyaml" }, { name = "requests" }, { name = "sqlalchemy" }, + { name = "tabulate" }, { name = "tqdm" }, ] [package.optional-dependencies] collect = [ + { name = "ntplib" }, { name = "telnetlib3" }, ] @@ -1231,6 +1242,7 @@ requires-dist = [ { name = "jinja2", specifier = ">=3.1.6" }, { name = "libcst" }, { name = "loguru", specifier = ">=0.7.0,<0.8" }, + { name = "ntplib", marker = "extra == 'collect'", specifier = ">=0.4.0,<0.5" }, { name = "numpy", specifier = ">=1.26.4,<2" }, { name = "pandas", specifier = ">=2.2.1,<3" }, { name = "psycopg2-binary", specifier = ">=2.9.0,<3" }, @@ -1242,6 +1254,7 @@ requires-dist = [ { name = "pyyaml", specifier = ">=6.0.0,<7" }, { name = "requests", specifier = ">=2.31.0,<3" }, { name = "sqlalchemy", specifier = ">=2.0.39,<3" }, + { name = "tabulate" }, { name = "telnetlib3", marker = "extra == 'collect'", specifier = "==2.0.4" }, { name = "tqdm", specifier = ">=4.66.2,<5" }, ]