From d13bccf9477c2cc5e427147687144d45ff7135ac Mon Sep 17 00:00:00 2001 From: "MacFarland, Midgie" Date: Mon, 13 Apr 2026 13:22:53 -0400 Subject: [PATCH 1/5] resolve conflict --- docs/guides/collection.md | 1 + docs/guides/ntp_vendor_design.md | 59 +++ mkdocs.yaml | 1 + opensampl/collect/cli.py | 84 +++++ opensampl/collect/ntp_snapshot.py | 27 ++ opensampl/db/orm.py | 27 ++ opensampl/metrics.py | 54 +++ opensampl/ntp_vendor.yaml | 42 +++ .../grafana-dashboards/ntp-dashboard.json | 141 ++++++++ .../datasources/postgresql.yaml | 1 + opensampl/vendors/constants.py | 7 + opensampl/vendors/ntp.py | 250 +++++++++++++ opensampl/vendors/ntp_parsing.py | 342 ++++++++++++++++++ opensampl/vendors/ntp_remote.py | 127 +++++++ pyproject.toml | 18 +- tests/test_ntp_probe.py | 93 +++++ uv.lock | 15 +- 17 files changed, 1287 insertions(+), 2 deletions(-) create mode 100644 docs/guides/ntp_vendor_design.md create mode 100644 opensampl/collect/ntp_snapshot.py create mode 100644 opensampl/ntp_vendor.yaml create mode 100644 opensampl/server/grafana/grafana-dashboards/ntp-dashboard.json create mode 100644 opensampl/vendors/ntp.py create mode 100644 opensampl/vendors/ntp_parsing.py create mode 100644 opensampl/vendors/ntp_remote.py create mode 100644 tests/test_ntp_probe.py diff --git a/docs/guides/collection.md b/docs/guides/collection.md index 222c4d7..1794071 100644 --- a/docs/guides/collection.md +++ b/docs/guides/collection.md @@ -8,6 +8,7 @@ The collect API enables automated collection of measurement data from network-co - **Microchip TWST Modems** (ATS6502 series): Collect offset and EBNO tracking values along with contextual information - **Microchip TimeProvider® 4100** (TP4100): Collect timing performance metrics from various input channels via web interface +- **NTP** (`opensampl-collect ntp`): Write JSON snapshots for the `NTP` vendor—local host client state (chrony/ntpq/timedatectl chain) or remote UDP queries (install with `pip install 'opensampl[collect]'` for `ntplib`). See [NTP vendor design](ntp_vendor_design.md). ## CLI Usage diff --git a/docs/guides/ntp_vendor_design.md b/docs/guides/ntp_vendor_design.md new file mode 100644 index 0000000..5bbcc2b --- /dev/null +++ b/docs/guides/ntp_vendor_design.md @@ -0,0 +1,59 @@ +# NTP vendor design (OpenSAMPL) + +This note defines the NTP clock-probe family: identity, storage, local vs remote collection, and lab-demo caveats. + +## Vendor identity + +| Item | Value | +|------|--------| +| Vendor name | `NTP` | +| Probe class | `NtpProbe` | +| Module | `opensampl.vendors.ntp` | +| Metadata ORM / table | `NtpMetadata` / `ntp_metadata` | + +## Probe identity (`ProbeKey`) + +- **`ip_address`**: For `remote_server`, the target server IP or a placeholder derived from the hostname. For `local_host`, typically `127.0.0.1` or the host’s primary IPv4 used for labeling. +- **`probe_id`**: Stable slug per logical probe (e.g. `local-chrony`, `mock-a`, `remote-pool-1`). + +Snapshot files use a strict filename pattern so the loader can derive `ProbeKey` without opening the file (see `NtpProbe` docstring). + +## Modes + +| Mode | Meaning | +|------|--------| +| `local_host` | Collector runs on the machine whose NTP client state is observed (Raspberry Pi friendly). | +| `remote_server` | Collector issues NTP client requests to `target_host`:`target_port` (default UDP **123**; high ports supported for demos). | + +## Metadata vs time series + +- **`ntp_metadata`**: Latest normalized fields from the most recent observation (sync/leap/stratum/reach/reference/poll/root metrics, mode, targets, `observation_source`, etc.) plus `additional_metadata` JSONB for raw command output snippets and parser notes. +- **`probe_data`**: One OpenSAMPL row per `(time × metric_type × reference)` as elsewhere. NTP uses dedicated metrics (offset, delay, jitter, stratum, etc.) with `REF_TYPES.UNKNOWN` unless a future reference model is introduced. + +Offset is stored in seconds; Grafana panels may scale to nanoseconds for consistency with existing timing dashboards. + +## Local fallback chain + +Tools are tried in order until one yields usable structured data: + +1. `chronyc tracking` +2. `chronyc -m 'sources -v'` or `chronyc sources -v` +3. `ntpq -p` +4. `timedatectl show-timesync --all` / `timedatectl status` +5. `systemctl show systemd-timesyncd` / `systemctl status systemd-timesyncd` + +Missing binaries are skipped without failing the snapshot; `sync_status` and `observation_source` record partial or unavailable state. + +## Remote collection + +Standard NTP client requests over UDP (default port **123**, configurable). Timeouts and non-responses produce degraded samples and metadata rather than crashing the loader. + +## Failure semantics + +- Loaders and collectors catch per-step failures; snapshots are still written when possible. +- Missing numeric fields omit that metric series for that timestamp or use absent rows only—never rely on invalid JSON (`NaN` is avoided in stored values). + +## Demo vs production NTP + +- **Lab mock servers** often listen on **high UDP ports** so containers do not require `CAP_NET_BIND_SERVICE`. Real deployments typically use **UDP/123**. +- **Simulated drift / unhealthy behavior** in containers is implemented by manipulating **NTP response fields** (stratum, delay, dispersion, etc.), not by true physical clock Allan deviation. Comparison panels show **protocol-level** differences between mock instances. diff --git a/mkdocs.yaml b/mkdocs.yaml index 848aee3..4efb1b9 100644 --- a/mkdocs.yaml +++ b/mkdocs.yaml @@ -14,6 +14,7 @@ nav: - Create: guides/create_probe_type.md - Server: guides/opensampl-server.md - Collect: guides/collection.md + - NTP vendor: guides/ntp_vendor_design.md - Random Data: guides/random-data-generation.md - API: - index: api/index.md diff --git a/opensampl/collect/cli.py b/opensampl/collect/cli.py index e65df6e..171359d 100644 --- a/opensampl/collect/cli.py +++ b/opensampl/collect/cli.py @@ -1,6 +1,8 @@ """Consolidated CLI entry point for opensampl.collect tools.""" import sys +import time +from pathlib import Path from typing import Literal, Optional import click @@ -8,6 +10,9 @@ from opensampl.collect.microchip.tp4100.collect_4100 import main as collect_tp4100_files from opensampl.collect.microchip.twst.generate_twst_files import collect_files as collect_twst_files +from opensampl.collect.ntp_snapshot import write_snapshot +from opensampl.vendors.ntp_parsing import collect_local_snapshot +from opensampl.vendors.ntp_remote import query_ntp_server @click.group() @@ -116,5 +121,84 @@ def tp4100( ) +@cli.group() +def ntp(): + """Collect NTP snapshots (JSON) for ``opensampl load ntp``.""" + pass + + +@ntp.command("local") +@click.option("--probe-id", required=True, help="Stable probe_id slug (e.g. local-chrony)") +@click.option("--probe-ip", default="127.0.0.1", show_default=True, help="ip_address for ProbeKey") +@click.option("--probe-name", default="local NTP", show_default=True) +@click.option( + "--output-dir", + default="./ntp-snapshots", + type=click.Path(path_type=Path, file_okay=False), + help="Directory for JSON snapshot files", +) +@click.option( + "--interval", + default=0.0, + type=float, + help="Seconds between samples; 0 = single sample and exit", +) +@click.option("--count", default=1, type=int, help="Samples to collect when interval > 0") +def ntp_local(probe_id: str, probe_ip: str, probe_name: str, output_dir: Path, interval: float, count: int): + """Run local chrony/ntpq/timedatectl chain and write NtpProbe JSON snapshots.""" + import socket + + chost = socket.gethostname() + out = output_dir + + def one() -> None: + doc = collect_local_snapshot( + probe_id=probe_id, + probe_ip=probe_ip, + probe_name=probe_name, + collection_host=chost, + ) + path = write_snapshot(doc, out) + click.echo(str(path)) + + if interval <= 0: + one() + return + + for _ in range(max(count, 1)): + one() + time.sleep(interval) + + +@ntp.command("remote") +@click.option("--host", "-h", required=True, help="NTP server hostname or IP") +@click.option("--port", "-p", default=123, type=int, show_default=True, help="UDP port (use high ports for lab mocks)") +@click.option( + "--output-dir", + default="./ntp-snapshots", + type=click.Path(path_type=Path, file_okay=False), + help="Directory for JSON snapshot files", +) +@click.option("--timeout", default=3.0, type=float, help="UDP request timeout (seconds)") +@click.option("--interval", default=0.0, type=float, help="Seconds between samples; 0 = once") +@click.option("--count", default=1, type=int, help="Samples when interval > 0") +def ntp_remote(host: str, port: int, output_dir: Path, timeout: float, interval: float, count: int): + """Query a remote NTP server with ntplib and write JSON snapshots.""" + out = output_dir + + def one() -> None: + doc = query_ntp_server(host, port=port, timeout=timeout) + path = write_snapshot(doc, out) + click.echo(str(path)) + + if interval <= 0: + one() + return + + for _ in range(max(count, 1)): + one() + time.sleep(interval) + + if __name__ == "__main__": cli() diff --git a/opensampl/collect/ntp_snapshot.py b/opensampl/collect/ntp_snapshot.py new file mode 100644 index 0000000..88f7036 --- /dev/null +++ b/opensampl/collect/ntp_snapshot.py @@ -0,0 +1,27 @@ +"""Write NTP JSON snapshots in the format expected by :class:`opensampl.vendors.ntp.NtpProbe`.""" + +from __future__ import annotations + +import json +import os +from datetime import datetime, timezone +from typing import Any + + +def snapshot_filename(probe_ip: str, probe_id: str, ts: datetime | None = None) -> str: + """Build canonical snapshot filename (UTC timestamp).""" + ts = ts or datetime.now(tz=timezone.utc) + ip_part = probe_ip.replace(".", "-") + return f"ntp_{ip_part}_{probe_id}_{ts.strftime('%Y%m%dT%H%M%SZ')}.json" + + +def write_snapshot(doc: dict[str, Any], output_dir: str | os.PathLike[str]) -> str: + """Serialize *doc* to *output_dir* using :func:`snapshot_filename`.""" + from pathlib import Path + + out = Path(os.fspath(output_dir)) + out.mkdir(parents=True, exist_ok=True) + name = snapshot_filename(doc["probe_ip"], doc["probe_id"]) + path = out / name + path.write_text(json.dumps(doc, indent=2), encoding="utf-8") + return str(path) diff --git a/opensampl/db/orm.py b/opensampl/db/orm.py index 5d645b1..9c7f9c7 100644 --- a/opensampl/db/orm.py +++ b/opensampl/db/orm.py @@ -181,6 +181,7 @@ class ProbeMetadata(Base): adva_metadata = relationship("AdvaMetadata", back_populates="probe", uselist=False) microchip_twst_metadata = relationship("MicrochipTWSTMetadata", back_populates="probe", uselist=False) microchip_tp4100_metadata = relationship("MicrochipTP4100Metadata", back_populates="probe", uselist=False) + ntp_metadata = relationship("NtpMetadata", back_populates="probe", uselist=False) # --- CUSTOM PROBE METADATA RELATIONSHIP --- @@ -434,6 +435,32 @@ class MicrochipTP4100Metadata(Base): # --- CUSTOM TABLES --- !! Do not remove line, used as reference when inserting metadata table +class NtpMetadata(Base): + """NTP-specific probe metadata (local client or remote server target).""" + + __tablename__ = "ntp_metadata" + + probe_uuid = Column(String, ForeignKey("probe_metadata.uuid"), primary_key=True) + mode = Column(Text) + probe_name = Column(Text) + target_host = Column(Text) + target_port = Column(Integer) + sync_status = Column(Text) + leap_status = Column(Text) + stratum = Column(Integer) + reachability = Column(Integer) + offset_last_s = Column(Float) + delay_s = Column(Float) + jitter_s = Column(Float) + dispersion_s = Column(Float) + root_delay_s = Column(Float) + root_dispersion_s = Column(Float) + poll_interval_s = Column(Float) + reference_id = Column(Text) + observation_source = Column(Text) + collection_host = Column(Text) + additional_metadata = Column(JSONB) + probe = relationship("ProbeMetadata", back_populates="ntp_metadata") # --- TABLE FUNCTIONS --- diff --git a/opensampl/metrics.py b/opensampl/metrics.py index 017deb5..55ecb26 100644 --- a/opensampl/metrics.py +++ b/opensampl/metrics.py @@ -62,3 +62,57 @@ class METRICS: ) # --- CUSTOM METRICS --- !! Do not remove line, used as reference when inserting metric + NTP_DELAY = MetricType( + name="NTP Delay", + description="Round-trip delay (RTT) to the NTP server or observed path delay in seconds", + unit="s", + value_type=float, + ) + NTP_JITTER = MetricType( + name="NTP Jitter", + description="Estimated jitter or RMS offset variation for NTP in seconds", + unit="s", + value_type=float, + ) + NTP_STRATUM = MetricType( + name="NTP Stratum", + description="NTP stratum level (distance from reference clock)", + unit="level", + value_type=float, + ) + NTP_REACHABILITY = MetricType( + name="NTP Reachability", + description="NTP reachability register (0-255) as a scalar for plotting", + unit="count", + value_type=float, + ) + NTP_DISPERSION = MetricType( + name="NTP Dispersion", + description="Combined error budget / dispersion in seconds", + unit="s", + value_type=float, + ) + NTP_ROOT_DELAY = MetricType( + name="NTP Root Delay", + description="Root delay from NTP packet or local estimate in seconds", + unit="s", + value_type=float, + ) + NTP_ROOT_DISPERSION = MetricType( + name="NTP Root Dispersion", + description="Root dispersion from NTP packet or local estimate in seconds", + unit="s", + value_type=float, + ) + NTP_POLL_INTERVAL = MetricType( + name="NTP Poll Interval", + description="Poll interval in seconds", + unit="s", + value_type=float, + ) + NTP_SYNC_HEALTH = MetricType( + name="NTP Sync Health", + description="1.0 if synchronized/healthy, 0.0 otherwise (probe-defined)", + unit="ratio", + value_type=float, + ) diff --git a/opensampl/ntp_vendor.yaml b/opensampl/ntp_vendor.yaml new file mode 100644 index 0000000..acf65db --- /dev/null +++ b/opensampl/ntp_vendor.yaml @@ -0,0 +1,42 @@ +name: NTP +parser_class: NtpProbe +parser_module: ntp +metadata_orm: NtpMetadata +metadata_table: ntp_metadata +metadata_fields: + - name: mode + type: Text + - name: probe_name + type: Text + - name: target_host + type: Text + - name: target_port + type: Integer + - name: sync_status + type: Text + - name: leap_status + type: Text + - name: stratum + type: Integer + - name: reachability + type: Integer + - name: offset_last_s + type: Float + - name: delay_s + type: Float + - name: jitter_s + type: Float + - name: dispersion_s + type: Float + - name: root_delay_s + type: Float + - name: root_dispersion_s + type: Float + - name: poll_interval_s + type: Float + - name: reference_id + type: Text + - name: observation_source + type: Text + - name: collection_host + type: Text diff --git a/opensampl/server/grafana/grafana-dashboards/ntp-dashboard.json b/opensampl/server/grafana/grafana-dashboards/ntp-dashboard.json new file mode 100644 index 0000000..d918498 --- /dev/null +++ b/opensampl/server/grafana/grafana-dashboards/ntp-dashboard.json @@ -0,0 +1,141 @@ +{ + "annotations": {"list": []}, + "editable": true, + "fiscalYearStartMonth": 0, + "graphTooltip": 0, + "links": [], + "liveNow": false, + "panels": [ + { + "datasource": {"type": "grafana-postgresql-datasource", "uid": "castdb-datasource"}, + "fieldConfig": { + "defaults": {"color": {"mode": "palette-classic"}, "unit": "ns"}, + "overrides": [] + }, + "gridPos": {"h": 9, "w": 12, "x": 0, "y": 0}, + "id": 1, + "options": { + "legend": {"displayMode": "list", "placement": "bottom", "showLegend": true}, + "tooltip": {"mode": "single", "sort": "none"} + }, + "targets": [ + { + "datasource": {"type": "grafana-postgresql-datasource", "uid": "castdb-datasource"}, + "editorMode": "code", + "format": "time_series", + "rawQuery": true, + "rawSql": "SELECT\n time_bucket('1 minute'::interval, pd.time AT TIME ZONE 'UTC') AS time,\n COALESCE(pm.name, CONCAT(pm.ip_address, ' ', pm.probe_id)) AS metric,\n AVG((pd.value)::float) * 1e9 AS value\nFROM castdb.probe_data pd\nJOIN castdb.probe_metadata pm ON pd.probe_uuid = pm.uuid\nJOIN castdb.metric_type mt ON pd.metric_type_uuid = mt.uuid\nWHERE $__timeFilter(pd.time)\n AND pm.vendor = 'NTP'\n AND mt.name = 'Phase Offset'\n AND pm.uuid IN (${ntp_probe:sqlstring})\nGROUP BY 1, 2\nORDER BY 1\n", + "refId": "A" + } + ], + "title": "NTP phase offset (Phase Offset metric)", + "type": "timeseries" + }, + { + "datasource": {"type": "grafana-postgresql-datasource", "uid": "castdb-datasource"}, + "fieldConfig": { + "defaults": {"color": {"mode": "palette-classic"}, "unit": "ns"}, + "overrides": [] + }, + "gridPos": {"h": 9, "w": 12, "x": 12, "y": 0}, + "id": 2, + "options": { + "legend": {"displayMode": "list", "placement": "bottom", "showLegend": true}, + "tooltip": {"mode": "single", "sort": "none"} + }, + "targets": [ + { + "datasource": {"type": "grafana-postgresql-datasource", "uid": "castdb-datasource"}, + "editorMode": "code", + "format": "time_series", + "rawQuery": true, + "rawSql": "SELECT\n time_bucket('1 minute'::interval, pd.time AT TIME ZONE 'UTC') AS time,\n COALESCE(pm.name, CONCAT(pm.ip_address, ' ', pm.probe_id)) AS metric,\n AVG((pd.value)::float) * 1e9 AS value\nFROM castdb.probe_data pd\nJOIN castdb.probe_metadata pm ON pd.probe_uuid = pm.uuid\nJOIN castdb.metric_type mt ON pd.metric_type_uuid = mt.uuid\nWHERE $__timeFilter(pd.time)\n AND pm.vendor = 'NTP'\n AND mt.name = 'NTP Jitter'\n AND pm.uuid IN (${ntp_probe:sqlstring})\nGROUP BY 1, 2\nORDER BY 1\n", + "refId": "A" + } + ], + "title": "NTP jitter", + "type": "timeseries" + }, + { + "datasource": {"type": "grafana-postgresql-datasource", "uid": "castdb-datasource"}, + "fieldConfig": { + "defaults": {"color": {"mode": "palette-classic"}, "unit": "none"}, + "overrides": [] + }, + "gridPos": {"h": 8, "w": 12, "x": 0, "y": 9}, + "id": 3, + "options": { + "legend": {"displayMode": "list", "placement": "bottom", "showLegend": true}, + "tooltip": {"mode": "single", "sort": "none"} + }, + "targets": [ + { + "datasource": {"type": "grafana-postgresql-datasource", "uid": "castdb-datasource"}, + "editorMode": "code", + "format": "time_series", + "rawQuery": true, + "rawSql": "SELECT\n time_bucket('1 minute'::interval, pd.time AT TIME ZONE 'UTC') AS time,\n COALESCE(pm.name, CONCAT(pm.ip_address, ' ', pm.probe_id)) AS metric,\n AVG((pd.value)::float) AS value\nFROM castdb.probe_data pd\nJOIN castdb.probe_metadata pm ON pd.probe_uuid = pm.uuid\nJOIN castdb.metric_type mt ON pd.metric_type_uuid = mt.uuid\nWHERE $__timeFilter(pd.time)\n AND pm.vendor = 'NTP'\n AND mt.name = 'NTP Stratum'\n AND pm.uuid IN (${ntp_probe:sqlstring})\nGROUP BY 1, 2\nORDER BY 1\n", + "refId": "A" + } + ], + "title": "NTP stratum", + "type": "timeseries" + }, + { + "datasource": {"type": "grafana-postgresql-datasource", "uid": "castdb-datasource"}, + "fieldConfig": { + "defaults": {"color": {"mode": "palette-classic"}, "unit": "percentunit"}, + "overrides": [] + }, + "gridPos": {"h": 8, "w": 12, "x": 12, "y": 9}, + "id": 4, + "options": { + "legend": {"displayMode": "list", "placement": "bottom", "showLegend": true}, + "tooltip": {"mode": "single", "sort": "none"} + }, + "targets": [ + { + "datasource": {"type": "grafana-postgresql-datasource", "uid": "castdb-datasource"}, + "editorMode": "code", + "format": "time_series", + "rawQuery": true, + "rawSql": "SELECT\n time_bucket('1 minute'::interval, pd.time AT TIME ZONE 'UTC') AS time,\n COALESCE(pm.name, CONCAT(pm.ip_address, ' ', pm.probe_id)) AS metric,\n AVG((pd.value)::float) AS value\nFROM castdb.probe_data pd\nJOIN castdb.probe_metadata pm ON pd.probe_uuid = pm.uuid\nJOIN castdb.metric_type mt ON pd.metric_type_uuid = mt.uuid\nWHERE $__timeFilter(pd.time)\n AND pm.vendor = 'NTP'\n AND mt.name = 'NTP Sync Health'\n AND pm.uuid IN (${ntp_probe:sqlstring})\nGROUP BY 1, 2\nORDER BY 1\n", + "refId": "A" + } + ], + "title": "NTP sync health (1=healthy)", + "type": "timeseries" + } + ], + "refresh": "30s", + "schemaVersion": 38, + "style": "dark", + "tags": ["ntp", "opensampl"], + "templating": { + "list": [ + { + "current": {"selected": true, "text": "All", "value": "$__all"}, + "datasource": {"type": "grafana-postgresql-datasource", "uid": "castdb-datasource"}, + "definition": "SELECT uuid AS __value, COALESCE(name, CONCAT(ip_address, ' ', probe_id)) AS __text FROM castdb.probe_metadata WHERE vendor = 'NTP' ORDER BY 2", + "hide": 0, + "includeAll": true, + "multi": true, + "name": "ntp_probe", + "options": [], + "query": "SELECT uuid AS __value, COALESCE(name, CONCAT(ip_address, ' ', probe_id)) AS __text FROM castdb.probe_metadata WHERE vendor = 'NTP' ORDER BY 2", + "refresh": 1, + "regex": "", + "skipUrlSync": false, + "sort": 1, + "type": "query" + } + ] + }, + "time": {"from": "now-6h", "to": "now"}, + "timepicker": {}, + "timezone": "", + "title": "NTP probes", + "uid": "ntp-opensampl", + "version": 1, + "weekStart": "" +} diff --git a/opensampl/server/grafana/grafana-provisioning/datasources/postgresql.yaml b/opensampl/server/grafana/grafana-provisioning/datasources/postgresql.yaml index df8d725..355c72e 100644 --- a/opensampl/server/grafana/grafana-provisioning/datasources/postgresql.yaml +++ b/opensampl/server/grafana/grafana-provisioning/datasources/postgresql.yaml @@ -2,6 +2,7 @@ apiVersion: 1 datasources: - name: ${POSTGRES_DB}-datasource + uid: castdb-datasource type: grafana-postgresql-datasource url: db:5432 database: ${POSTGRES_DB} diff --git a/opensampl/vendors/constants.py b/opensampl/vendors/constants.py index 4725b05..e2bc68a 100644 --- a/opensampl/vendors/constants.py +++ b/opensampl/vendors/constants.py @@ -71,6 +71,13 @@ class VENDORS: ) # --- CUSTOM VENDORS --- !! Do not remove line, used as reference when inserting vendor + NTP = VendorType( + name="NTP", + parser_class="NtpProbe", + parser_module="ntp", + metadata_table="ntp_metadata", + metadata_orm="NtpMetadata", + ) # --- VENDOR FUNCTIONS --- diff --git a/opensampl/vendors/ntp.py b/opensampl/vendors/ntp.py new file mode 100644 index 0000000..2097525 --- /dev/null +++ b/opensampl/vendors/ntp.py @@ -0,0 +1,250 @@ +"""NTP clock probe: JSON snapshot files from local tooling or remote NTP queries.""" + +from __future__ import annotations + +import json +import math +import random +import re +from datetime import datetime, timedelta, timezone +from pathlib import Path +from typing import Any, ClassVar + +import numpy as np +import pandas as pd +from loguru import logger +from pydantic import Field + +from opensampl.metrics import METRICS +from opensampl.references import REF_TYPES +from opensampl.vendors.base_probe import BaseProbe +from opensampl.vendors.constants import VENDORS, ProbeKey + +# Filename: ntp___.json (IPv4 uses dashes, e.g. 127-0-0-1) +_FILE_RE = re.compile( + r"^ntp_(?P[0-9A-Za-z.-]+)_(?P[a-zA-Z0-9-]+)_(?P\d{4})(?P\d{2})(?P\d{2})T" + r"(?P\d{2})(?P\d{2})(?P\d{2})Z\.json$" +) + + +def _dashed_ip_to_address(ip_part: str) -> str: + """Turn 127-0-0-1 into 127.0.0.1; leave hostnames unchanged.""" + if re.match(r"^[\d-]+$", ip_part) and ip_part.count("-") == 3: + return ip_part.replace("-", ".") + return ip_part + + +class NtpProbe(BaseProbe): + """Load NTP snapshots from JSON files produced by ``opensampl-collect ntp`` or tests.""" + + vendor = VENDORS.NTP + file_pattern: ClassVar = _FILE_RE + + class RandomDataConfig(BaseProbe.RandomDataConfig): + """Random NTP-like test data.""" + + base_value: float = Field( + default_factory=lambda: random.uniform(-1e-4, 1e-4), + description="random.uniform(-1e-4, 1e-4)", + ) + noise_amplitude: float = Field( + default_factory=lambda: random.uniform(1e-9, 1e-7), + description="random.uniform(1e-9, 1e-7)", + ) + drift_rate: float = Field( + default_factory=lambda: random.uniform(-1e-12, 1e-12), + description="random.uniform(-1e-12, 1e-12)", + ) + + def __init__(self, input_file: str | Path, **kwargs: Any): + """Load JSON snapshot; optionally override ``ProbeKey`` from ``probe_id`` / ``probe_ip`` in the document.""" + super().__init__(input_file=input_file, **kwargs) + self._doc: dict[str, Any] = {} + raw = Path(input_file).read_text(encoding="utf-8") + self._doc = json.loads(raw) + fk = self._doc.get("probe_id") + fa = self._doc.get("probe_ip") + if isinstance(fk, str) and isinstance(fa, str): + self.probe_key = ProbeKey(probe_id=fk, ip_address=fa) + else: + self.probe_key, _ = self.parse_file_name(Path(input_file)) + + @classmethod + def filter_files(cls, files: list[Path]) -> list[Path]: + """Keep only JSON files matching :attr:`file_pattern`.""" + return [f for f in files if cls.file_pattern.fullmatch(f.name)] + + @classmethod + def parse_file_name(cls, file_name: Path) -> tuple[ProbeKey, datetime]: + """ + Parse ``ntp___.json`` into probe key and file timestamp. + + IPv4 addresses are written with dashes between octets (``127-0-0-1``). + """ + m = cls.file_pattern.fullmatch(file_name.name) + if not m: + raise ValueError(f"NTP snapshot file name not recognized: {file_name.name}") + ip = _dashed_ip_to_address(m.group("ip")) + probe_id = m.group("probe_id") + ts = datetime( + int(m.group("y")), + int(m.group("mo")), + int(m.group("d")), + int(m.group("h")), + int(m.group("mi")), + int(m.group("s")), + tzinfo=timezone.utc, + ) + return ProbeKey(probe_id=probe_id, ip_address=ip), ts + + def process_metadata(self) -> dict[str, Any]: + """Return vendor metadata row for ``ntp_metadata``.""" + meta = dict(self._doc.get("metadata") or {}) + # Drop keys not in ORM (extra safety) + allowed = { + "mode", + "probe_name", + "target_host", + "target_port", + "sync_status", + "leap_status", + "stratum", + "reachability", + "offset_last_s", + "delay_s", + "jitter_s", + "dispersion_s", + "root_delay_s", + "root_dispersion_s", + "poll_interval_s", + "reference_id", + "observation_source", + "collection_host", + "additional_metadata", + } + meta = {k: v for k, v in meta.items() if k in allowed} + if "probe_name" not in meta or not meta.get("probe_name"): + meta["probe_name"] = f"NTP {self.probe_key.probe_id}" + if "additional_metadata" not in meta: + meta["additional_metadata"] = {} + self.metadata_parsed = True + return meta + + def process_time_data(self) -> pd.DataFrame: + """Send time series for each metric present in each series row.""" + if not self.metadata_parsed: + self.process_metadata() + + series = self._doc.get("series") or [] + if not series: + logger.warning("NTP snapshot has empty series: %s", self.input_file) + return pd.DataFrame() + + metric_map = { + "phase_offset_s": METRICS.PHASE_OFFSET, + "delay_s": METRICS.NTP_DELAY, + "jitter_s": METRICS.NTP_JITTER, + "stratum": METRICS.NTP_STRATUM, + "reachability": METRICS.NTP_REACHABILITY, + "dispersion_s": METRICS.NTP_DISPERSION, + "root_delay_s": METRICS.NTP_ROOT_DELAY, + "root_dispersion_s": METRICS.NTP_ROOT_DISPERSION, + "poll_interval_s": METRICS.NTP_POLL_INTERVAL, + "sync_health": METRICS.NTP_SYNC_HEALTH, + } + + for row in series: + t_raw = row.get("time") + if t_raw is None: + continue + t = pd.to_datetime(t_raw, utc=True) + for key, mtype in metric_map.items(): + if key not in row: + continue + val = row[key] + if val is None or (isinstance(val, float) and (math.isnan(val) or math.isinf(val))): + continue + df = pd.DataFrame({"time": [t], "value": [float(val)]}) + self.send_data( + data=df, + metric=mtype, + reference_type=REF_TYPES.UNKNOWN, + ) + + return pd.DataFrame() + + @classmethod + def generate_random_data( + cls, + config: RandomDataConfig, + probe_key: ProbeKey, + ) -> ProbeKey: + """Generate synthetic NTP-like metrics for testing.""" + cls._setup_random_seed(config.seed) + logger.info(f"Generating random NTP data for {probe_key}") + + meta = { + "mode": "local_host", + "probe_name": f"Random NTP {probe_key.probe_id}", + "target_host": "", + "target_port": 0, + "sync_status": "tracking", + "leap_status": "no_warning", + "stratum": 2, + "reachability": 377, + "observation_source": "random", + "collection_host": "", + "additional_metadata": {"test_data": True}, + } + cls._send_metadata_to_db(probe_key, meta) + + total_seconds = config.duration_hours * 3600 + num_samples = int(total_seconds / config.sample_interval) + + for i in range(num_samples): + sample_time = config.start_time + timedelta(seconds=i * config.sample_interval) + time_offset = i * config.sample_interval + drift_component = config.drift_rate * time_offset + noise = float(np.random.normal(0, config.noise_amplitude)) + offset = config.base_value + drift_component + noise + if random.random() < config.outlier_probability: + offset += float(np.random.normal(0, config.noise_amplitude * config.outlier_multiplier)) + + delay_s = 0.02 + abs(0.0001 * random.random()) + jitter_s = abs(float(config.noise_amplitude * 5)) + stratum = 2.0 + (1.0 if random.random() < 0.05 else 0.0) + sync_health = 1.0 + + cls.send_data( + probe_key=probe_key, + data=pd.DataFrame({"time": [sample_time], "value": [offset]}), + metric=METRICS.PHASE_OFFSET, + reference_type=REF_TYPES.UNKNOWN, + ) + cls.send_data( + probe_key=probe_key, + data=pd.DataFrame({"time": [sample_time], "value": [delay_s]}), + metric=METRICS.NTP_DELAY, + reference_type=REF_TYPES.UNKNOWN, + ) + cls.send_data( + probe_key=probe_key, + data=pd.DataFrame({"time": [sample_time], "value": [jitter_s]}), + metric=METRICS.NTP_JITTER, + reference_type=REF_TYPES.UNKNOWN, + ) + cls.send_data( + probe_key=probe_key, + data=pd.DataFrame({"time": [sample_time], "value": [stratum]}), + metric=METRICS.NTP_STRATUM, + reference_type=REF_TYPES.UNKNOWN, + ) + cls.send_data( + probe_key=probe_key, + data=pd.DataFrame({"time": [sample_time], "value": [sync_health]}), + metric=METRICS.NTP_SYNC_HEALTH, + reference_type=REF_TYPES.UNKNOWN, + ) + + logger.info(f"Finished random NTP generation for {probe_key}") + return probe_key diff --git a/opensampl/vendors/ntp_parsing.py b/opensampl/vendors/ntp_parsing.py new file mode 100644 index 0000000..505ca63 --- /dev/null +++ b/opensampl/vendors/ntp_parsing.py @@ -0,0 +1,342 @@ +"""Parse local NTP client tooling output into a normalized snapshot dict.""" + +from __future__ import annotations + +import re +import shutil +import subprocess +from datetime import datetime, timezone +from typing import Any, Optional + +from loguru import logger + +_CMD_TIMEOUT = 8.0 + + +def _run(cmd: list[str]) -> Optional[str]: + """Run command; return stdout or None if missing/failed.""" + bin0 = cmd[0] + if shutil.which(bin0) is None: + return None + try: + proc = subprocess.run( + cmd, + capture_output=True, + text=True, + timeout=_CMD_TIMEOUT, + check=False, + ) + except (OSError, subprocess.SubprocessError) as e: + logger.debug(f"ntp local: command {cmd!r} failed: {e}") + return None + if proc.returncode != 0: + logger.debug(f"ntp local: {cmd!r} exit {proc.returncode}: {proc.stderr!r}") + return None + return proc.stdout or "" + + +def _parse_chronyc_tracking(text: str) -> dict[str, Any]: + """Parse `chronyc tracking` key: value output.""" + out: dict[str, Any] = {} + for line in text.splitlines(): + line = line.strip() + if not line or ":" not in line: + continue + key, _, rest = line.partition(":") + key = key.strip().lower().replace(" ", "_") + val = rest.strip() + out[key] = val + + offset_s: Optional[float] = None + jitter_s: Optional[float] = None + stratum: Optional[int] = None + ref = None + + # Last offset : +0.000000123 seconds + m = re.search(r"last offset\s*:\s*([+-]?[\d.eE+-]+)\s*seconds?", text, re.I) + if m: + try: + offset_s = float(m.group(1)) + except ValueError: + pass + m = re.search(r"rms offset\s*:\s*([+-]?[\d.eE+-]+)\s*seconds?", text, re.I) + if m: + try: + jitter_s = float(m.group(1)) + except ValueError: + pass + m = re.search(r"stratum\s*:\s*(\d+)", text, re.I) + if m: + try: + stratum = int(m.group(1)) + except ValueError: + pass + m = re.search(r"reference id\s*:\s*(\S+)(?:\s*\(([^)]+)\))?", text, re.I) + if m: + ref = m.group(2) or m.group(1) + + sync_status = "unsynchronized" + if "normal" in text.lower() or offset_s is not None: + sync_status = "tracking" + + return { + "raw_tracking": out, + "offset_s": offset_s, + "jitter_s": jitter_s, + "stratum": stratum, + "reference_id": ref, + "sync_status": sync_status, + "observation_source": "chronyc_tracking", + } + + +def _parse_chronyc_sources(text: str) -> dict[str, Any]: + """Parse `chronyc sources` for reach and selected source.""" + reach: Optional[int] = None + selected: Optional[str] = None + for line in text.splitlines(): + line = line.strip() + if not line or line.startswith("MS") or line.startswith("="): + continue + # ^* or ^+ prefix indicates selected/accepted + if line.startswith("*") or line.startswith("+"): + parts = line.split() + if len(parts) >= 7: + try: + reach = int(parts[5], 8) if parts[5].startswith("0") else int(parts[5]) + except ValueError: + try: + reach = int(parts[5]) + except ValueError: + pass + selected = parts[1] + break + # Fallback: last column often reach (octal) + parts = line.split() + if len(parts) >= 7 and parts[0] in ("^*", "^+", "*", "+"): + # already handled + pass + if reach is None: + # Try any line with 377 octal style + m = re.search(r"\b([0-7]{3})\b", text) + if m: + try: + reach = int(m.group(1), 8) + except ValueError: + pass + + return { + "reachability": reach, + "selected_source": selected, + "observation_source": "chronyc_sources", + } + + +def _parse_ntpq(text: str) -> dict[str, Any]: + """Parse `ntpq -p` / `ntpq -pn` output.""" + offset_s: Optional[float] = None + delay_s: Optional[float] = None + jitter_s: Optional[float] = None + stratum: Optional[int] = None + reach: Optional[int] = None + ref = None + for line in text.splitlines(): + line = line.strip() + if not line or line.startswith("remote") or line.startswith("="): + continue + if line.startswith("*") or line.startswith("+") or line.startswith("-"): + parts = line.split() + # remote refid st t when poll reach delay offset jitter + if len(parts) >= 10: + try: + stratum = int(parts[2]) + except ValueError: + pass + try: + delay_s = float(parts[7]) / 1000.0 # ms -> s + offset_s = float(parts[8]) / 1000.0 + jitter_s = float(parts[9]) / 1000.0 + except (ValueError, IndexError): + pass + try: + reach = int(parts[6], 8) if parts[6].startswith("0") else int(parts[6]) + except ValueError: + try: + reach = int(parts[6]) + except ValueError: + pass + ref = parts[1] + break + + return { + "offset_s": offset_s, + "delay_s": delay_s, + "jitter_s": jitter_s, + "stratum": stratum, + "reachability": reach, + "reference_id": ref, + "sync_status": "synced" if offset_s is not None else "unknown", + "observation_source": "ntpq", + } + + +def _parse_timedatectl(text: str) -> dict[str, Any]: + """Parse `timedatectl status` / `show-timesync --all`.""" + sync = None + for line in text.splitlines(): + low = line.lower() + if "system clock synchronized" in low or "ntp synchronized" in low: + if "yes" in low: + sync = True + elif "no" in low: + sync = False + sync_status = "unknown" + if sync is True: + sync_status = "synchronized" + elif sync is False: + sync_status = "unsynchronized" + + return { + "sync_status": sync_status, + "observation_source": "timedatectl", + } + + +def _parse_systemctl_show(text: str) -> dict[str, Any]: + """Parse `systemctl show` / `systemctl status` for systemd-timesyncd.""" + active = None + for line in text.splitlines(): + if line.strip().lower().startswith("activestate="): + active = line.split("=", 1)[1].strip().lower() == "active" + break + if active is None and "active (running)" in text.lower(): + active = True + sync_status = "unknown" + if active is True: + sync_status = "service_active" + elif active is False: + sync_status = "service_inactive" + + return {"sync_status": sync_status, "observation_source": "systemctl_timesyncd"} + + +def _merge_int(a: Optional[int], b: Optional[int]) -> Optional[int]: + return a if a is not None else b + + +def _merge_float(a: Optional[float], b: Optional[float]) -> Optional[float]: + return a if a is not None else b + + +def collect_local_snapshot( + probe_id: str, + probe_ip: str, + probe_name: str, + collection_host: str, +) -> dict[str, Any]: + """ + Run the local fallback chain and return a snapshot document (metadata + one series row). + + probe_ip: IP used in ProbeKey (e.g. 127.0.0.1 for local). + """ + merged: dict[str, Any] = { + "mode": "local_host", + "probe_name": probe_name, + "target_host": "", + "target_port": 0, + "sync_status": "unknown", + "leap_status": "unknown", + "stratum": None, + "reachability": None, + "offset_last_s": None, + "delay_s": None, + "jitter_s": None, + "dispersion_s": None, + "root_delay_s": None, + "root_dispersion_s": None, + "poll_interval_s": None, + "reference_id": None, + "observation_source": "none", + "collection_host": collection_host, + } + extras: dict[str, Any] = {} + + t = _run(["chronyc", "tracking"]) + if t: + p = _parse_chronyc_tracking(t) + extras["chronyc_tracking"] = p.get("raw_tracking", {}) + merged["offset_last_s"] = _merge_float(merged["offset_last_s"], p.get("offset_s")) + merged["jitter_s"] = _merge_float(merged["jitter_s"], p.get("jitter_s")) + merged["stratum"] = _merge_int(merged["stratum"], p.get("stratum")) + merged["reference_id"] = p.get("reference_id") or merged["reference_id"] + merged["sync_status"] = p.get("sync_status", merged["sync_status"]) + merged["observation_source"] = p.get("observation_source", merged["observation_source"]) + + t = _run(["chronyc", "sources", "-v"]) or _run(["chronyc", "sources"]) + if t: + p = _parse_chronyc_sources(t) + merged["reachability"] = _merge_int(merged["reachability"], p.get("reachability")) + if p.get("selected_source"): + merged["reference_id"] = merged["reference_id"] or p["selected_source"] + merged["observation_source"] = p.get("observation_source", merged["observation_source"]) + + if merged["offset_last_s"] is None and merged["stratum"] is None: + t = _run(["ntpq", "-pn"]) or _run(["ntpq", "-p"]) + if t: + p = _parse_ntpq(t) + merged["offset_last_s"] = _merge_float(merged["offset_last_s"], p.get("offset_s")) + merged["delay_s"] = _merge_float(merged["delay_s"], p.get("delay_s")) + merged["jitter_s"] = _merge_float(merged["jitter_s"], p.get("jitter_s")) + merged["stratum"] = _merge_int(merged["stratum"], p.get("stratum")) + merged["reachability"] = _merge_int(merged["reachability"], p.get("reachability")) + merged["reference_id"] = merged["reference_id"] or p.get("reference_id") + merged["sync_status"] = p.get("sync_status", merged["sync_status"]) + merged["observation_source"] = p.get("observation_source", merged["observation_source"]) + + t = _run(["timedatectl", "show-timesync", "--all"]) or _run(["timedatectl", "status"]) + if t: + p = _parse_timedatectl(t) + if merged["sync_status"] == "unknown": + merged["sync_status"] = p.get("sync_status", merged["sync_status"]) + merged["observation_source"] = p.get("observation_source", merged["observation_source"]) + extras["timedatectl"] = t[:2000] + + t = _run(["systemctl", "show", "systemd-timesyncd", "--property=ActiveState"]) + if not t: + t = _run(["systemctl", "status", "systemd-timesyncd", "--no-pager"]) + if t: + p = _parse_systemctl_show(t) + if merged["sync_status"] == "unknown": + merged["sync_status"] = p.get("sync_status", merged["sync_status"]) + merged["observation_source"] = p.get("observation_source", merged["observation_source"]) + extras["systemctl"] = t[:2000] + + now = datetime.now(tz=timezone.utc) + row: dict[str, Any] = { + "time": now.isoformat(), + "phase_offset_s": merged["offset_last_s"], + "delay_s": merged["delay_s"], + "jitter_s": merged["jitter_s"], + "stratum": float(merged["stratum"]) if merged["stratum"] is not None else None, + "reachability": float(merged["reachability"]) if merged["reachability"] is not None else None, + "dispersion_s": merged["dispersion_s"], + "root_delay_s": merged["root_delay_s"], + "root_dispersion_s": merged["root_dispersion_s"], + "poll_interval_s": merged["poll_interval_s"], + "sync_health": 1.0 if merged["sync_status"] in ("tracking", "synchronized", "synced") else 0.0, + } + + for k in list(row.keys()): + if row[k] is None: + del row[k] + + meta = {k: v for k, v in merged.items() if v is not None} + meta["additional_metadata"] = extras + + return { + "format_version": 1, + "probe_id": probe_id, + "probe_ip": probe_ip, + "metadata": meta, + "series": [row], + } diff --git a/opensampl/vendors/ntp_remote.py b/opensampl/vendors/ntp_remote.py new file mode 100644 index 0000000..24db661 --- /dev/null +++ b/opensampl/vendors/ntp_remote.py @@ -0,0 +1,127 @@ +"""Remote NTP client queries (UDP).""" + +from __future__ import annotations + +from datetime import datetime, timezone +from typing import Any + +from loguru import logger + + +def query_ntp_server(host: str, port: int = 123, timeout: float = 3.0) -> dict[str, Any]: + """ + Perform one NTP client request and return a snapshot document (metadata + one series row). + + Requires optional dependency ``ntplib``. + """ + try: + import ntplib # type: ignore[import-untyped] + except ImportError as e: + raise ImportError("Remote NTP collection requires the 'ntplib' package (install opensampl[collect]).") from e + + client = ntplib.NTPClient() + try: + resp = client.request(host, port=port, version=3, timeout=timeout) + except Exception as e: + logger.warning(f"NTP request to {host}:{port} failed: {e}") + now = datetime.now(tz=timezone.utc) + meta = { + "mode": "remote_server", + "probe_name": f"ntp-{host}-{port}", + "target_host": host, + "target_port": port, + "sync_status": "unreachable", + "leap_status": "unknown", + "stratum": None, + "reachability": None, + "offset_last_s": None, + "delay_s": None, + "jitter_s": None, + "dispersion_s": None, + "root_delay_s": None, + "root_dispersion_s": None, + "poll_interval_s": None, + "reference_id": None, + "observation_source": "ntplib_error", + "collection_host": "", + "additional_metadata": {"error": str(e)}, + } + row = { + "time": now.isoformat(), + "sync_health": 0.0, + } + return { + "format_version": 1, + "probe_id": f"remote-{host}-{port}", + "probe_ip": host, + "metadata": meta, + "series": [row], + } + + leap = int(resp.leap) + leap_map = {0: "no_warning", 1: "add_second", 2: "del_second", 3: "alarm"} + stratum = int(resp.stratum) + # poll is log2 seconds in RFC5905 + try: + poll_s = float(2 ** int(resp.poll)) + except (TypeError, ValueError, OverflowError): + poll_s = None + + root_delay_s = float(resp.root_delay) if resp.root_delay is not None else None + root_dispersion_s = float(resp.root_dispersion) if resp.root_dispersion is not None else None + delay_s = float(resp.delay) if resp.delay is not None else None + offset_s = float(resp.offset) if resp.offset is not None else None + + ref_id = getattr(resp, "ref_id", None) + if hasattr(ref_id, "decode"): + try: + ref_id = ref_id.decode("ascii", errors="replace") + except Exception: + ref_id = str(ref_id) + ref_id = str(ref_id) if ref_id is not None else None + + now = datetime.now(tz=timezone.utc) + sync_ok = stratum < 16 and offset_s is not None + meta = { + "mode": "remote_server", + "probe_name": f"ntp-{host}-{port}", + "target_host": host, + "target_port": port, + "sync_status": "synchronized" if sync_ok else "unsynchronized", + "leap_status": leap_map.get(leap, str(leap)), + "stratum": stratum, + "reachability": None, + "offset_last_s": offset_s, + "delay_s": delay_s, + "jitter_s": None, + "dispersion_s": None, + "root_delay_s": root_delay_s, + "root_dispersion_s": root_dispersion_s, + "poll_interval_s": poll_s, + "reference_id": ref_id, + "observation_source": "ntplib", + "collection_host": "", + "additional_metadata": {"version": getattr(resp, "version", None)}, + } + + row: dict[str, Any] = { + "time": now.isoformat(), + "phase_offset_s": offset_s, + "delay_s": delay_s, + "stratum": float(stratum), + "root_delay_s": root_delay_s, + "root_dispersion_s": root_dispersion_s, + "poll_interval_s": poll_s, + "sync_health": 1.0 if sync_ok else 0.0, + } + for k in list(row.keys()): + if row[k] is None: + del row[k] + + return { + "format_version": 1, + "probe_id": f"remote-{host}-{port}", + "probe_ip": host, + "metadata": meta, + "series": [row], + } diff --git a/pyproject.toml b/pyproject.toml index b9500f6..3c2639d 100644 --- a/pyproject.toml +++ b/pyproject.toml @@ -76,7 +76,7 @@ backend = [ "uvicorn", "prometheus-client", ] -collect = ["telnetlib3==2.0.4"] +collect = ["telnetlib3==2.0.4", "ntplib>=0.4.0,<0.5"] [project.scripts] opensampl = "opensampl.cli:cli" @@ -136,7 +136,23 @@ ignore = ["D203", "D212", "D400", "D415", "ANN401", "S101", "PLR2004", "COM812", [tool.ruff.lint.per-file-ignores] "opensampl/vendors/**/*.py" = ['S311'] # we want to ignore the errors about random +<<<<<<< HEAD "opensampl/server/backend/main.py" = ['B008', 'ARG001'] #ignore complaints about calling functions in args +======= +"opensampl/vendors/ntp_parsing.py" = [ + "C901", + "PLR0915", + "S603", + "PLW2901", + "D103", + "UP007", + "SIM105", + "FURB167", + "PIE810", +] +"opensampl/vendors/ntp_remote.py" = ["S603"] + +>>>>>>> f133862 (initial commit of ntp probes) [tool.ruff.lint.pylint] max-args = 10 diff --git a/tests/test_ntp_probe.py b/tests/test_ntp_probe.py new file mode 100644 index 0000000..e27a162 --- /dev/null +++ b/tests/test_ntp_probe.py @@ -0,0 +1,93 @@ +"""Tests for NTP vendor, parsers, and snapshot format.""" + +import json +from datetime import datetime, timezone +from pathlib import Path + +import pytest + +from opensampl.vendors.ntp import NtpProbe +from opensampl.vendors.ntp_parsing import _parse_chronyc_tracking, _parse_ntpq + + +def test_parse_chronyc_tracking_basic(): + text = """ +Reference ID : A1B2C3D4 (pool.ntp.org) +Stratum : 3 +Ref time (UTC) : Thu Apr 11 12:00:00 2025 +System time : 0.000000100 seconds slow of NTP time +Last offset : +0.000000050 seconds +RMS offset : 0.000000200 seconds +Frequency : 0.123 ppm slow +Residual freq : 0.001 ppm +Skew : 0.050 ppm +Root delay : 0.010234 seconds +Root dispersion : 0.001000 seconds +Update interval : 64.0 seconds +""" + p = _parse_chronyc_tracking(text) + assert p["offset_s"] == pytest.approx(5e-8) + assert p["jitter_s"] == pytest.approx(2e-7) + assert p["stratum"] == 3 + + +def test_parse_ntpq_line(): + text = """ + remote refid st t when poll reach delay offset jitter +============================================================================== +*192.168.1.1 .GPS. 1 u 12 64 377 1.234 0.567 0.089 +""" + p = _parse_ntpq(text) + assert p["offset_s"] == pytest.approx(0.000567) + assert p["delay_s"] == pytest.approx(0.001234) + assert p["stratum"] == 1 + + +def test_ntp_probe_json_roundtrip(tmp_path: Path): + doc = { + "format_version": 1, + "probe_id": "test-probe", + "probe_ip": "127.0.0.1", + "metadata": { + "mode": "remote_server", + "probe_name": "unit-test", + "target_host": "pool.ntp.org", + "target_port": 123, + "sync_status": "synchronized", + "leap_status": "no_warning", + "stratum": 2, + "reachability": 255, + "offset_last_s": 0.001, + "delay_s": 0.02, + "jitter_s": 0.0001, + "dispersion_s": None, + "root_delay_s": 0.001, + "root_dispersion_s": 0.0002, + "poll_interval_s": 64.0, + "reference_id": "GPS", + "observation_source": "test", + "collection_host": "", + "additional_metadata": {}, + }, + "series": [ + { + "time": datetime(2025, 4, 11, 12, 0, 0, tzinfo=timezone.utc).isoformat(), + "phase_offset_s": 0.001, + "delay_s": 0.02, + "jitter_s": 0.0001, + "stratum": 2.0, + "reachability": 255.0, + "sync_health": 1.0, + } + ], + } + name = "ntp_127-0-0-1_test-probe_20250411T120000Z.json" + fp = tmp_path / name + fp.write_text(json.dumps(doc), encoding="utf-8") + + probe = NtpProbe(fp) + assert probe.probe_key.probe_id == "test-probe" + assert probe.probe_key.ip_address == "127.0.0.1" + meta = probe.process_metadata() + assert meta["mode"] == "remote_server" + assert meta["stratum"] == 2 diff --git a/uv.lock b/uv.lock index 0ba575b..0896373 100644 --- a/uv.lock +++ b/uv.lock @@ -1117,6 +1117,15 @@ wheels = [ { url = "https://files.pythonhosted.org/packages/cb/c4/ffa32f2c7cdb1728026c7a34aab87796b895767893aaa54611a79b4eef45/mkdocstrings_python-1.16.11-py3-none-any.whl", hash = "sha256:25d96cc9c1f9c272ea1bd8222c900b5f852bf46c984003e9c7c56eaa4696190f", size = 124282, upload-time = "2025-05-24T10:41:30.008Z" }, ] +[[package]] +name = "ntplib" +version = "0.4.0" +source = { registry = "https://pypi.org/simple" } +sdist = { url = "https://files.pythonhosted.org/packages/b4/14/6b018fb602602d9f6cc7485cbad7c1be3a85d25cea18c233854f05284aed/ntplib-0.4.0.tar.gz", hash = "sha256:899d8fb5f8c2555213aea95efca02934c7343df6ace9d7628a5176b176906267", size = 7135, upload-time = "2021-05-28T19:08:54.394Z" } +wheels = [ + { url = "https://files.pythonhosted.org/packages/58/8c/41da70f6feaca807357206a376b6de2001b439c7f78f53473a914a6dbd1e/ntplib-0.4.0-py2.py3-none-any.whl", hash = "sha256:8d27375329ed7ff38755f7b6d4658b28edc147cadf40338a63a0da8133469d60", size = 6849, upload-time = "2021-05-28T19:08:53.323Z" }, +] + [[package]] name = "numpy" version = "1.26.4" @@ -1178,7 +1187,7 @@ wheels = [ [[package]] name = "opensampl" -version = "1.1.0" +version = "1.1.5" source = { editable = "." } dependencies = [ { name = "allantools" }, @@ -1200,11 +1209,13 @@ dependencies = [ { name = "pyyaml" }, { name = "requests" }, { name = "sqlalchemy" }, + { name = "tabulate" }, { name = "tqdm" }, ] [package.optional-dependencies] collect = [ + { name = "ntplib" }, { name = "telnetlib3" }, ] @@ -1231,6 +1242,7 @@ requires-dist = [ { name = "jinja2", specifier = ">=3.1.6" }, { name = "libcst" }, { name = "loguru", specifier = ">=0.7.0,<0.8" }, + { name = "ntplib", marker = "extra == 'collect'", specifier = ">=0.4.0,<0.5" }, { name = "numpy", specifier = ">=1.26.4,<2" }, { name = "pandas", specifier = ">=2.2.1,<3" }, { name = "psycopg2-binary", specifier = ">=2.9.0,<3" }, @@ -1242,6 +1254,7 @@ requires-dist = [ { name = "pyyaml", specifier = ">=6.0.0,<7" }, { name = "requests", specifier = ">=2.31.0,<3" }, { name = "sqlalchemy", specifier = ">=2.0.39,<3" }, + { name = "tabulate" }, { name = "telnetlib3", marker = "extra == 'collect'", specifier = "==2.0.4" }, { name = "tqdm", specifier = ">=4.66.2,<5" }, ] From 1a3faa439fd3909d14e98755951a4fc9e0e43f8d Mon Sep 17 00:00:00 2001 From: sempervent Date: Sat, 11 Apr 2026 15:02:54 -0400 Subject: [PATCH 2/5] add working data paths for testing to the .gitignore --- .gitignore | 3 +++ 1 file changed, 3 insertions(+) diff --git a/.gitignore b/.gitignore index 41e29a2..8d8939a 100644 --- a/.gitignore +++ b/.gitignore @@ -1,3 +1,6 @@ +# OpenSAMPL data paths +archive/ +ntp-snapshots/ # Byte-compiled / optimized / DLL files __pycache__/ *.py[cod] From 634e1f8490210f9a89df29f06e3e3491b6c7fc8f Mon Sep 17 00:00:00 2001 From: sempervent Date: Sat, 11 Apr 2026 15:32:26 -0400 Subject: [PATCH 3/5] feat: add end-to-end NTP demo appliance and dashboard support - add OpenSAMPL NTP vendor, collection, metrics, and metadata support - bootstrap lookup tables and defaults during init - add PostGIS/Timescale demo database image and continuous ingest service - enrich NTP probe metadata with stored geolocation at ingest time - harden Grafana dashboards for text UUID filters and reference-safe wording - add NTP-specific and public geospatial timing dashboard improvements - document 1.2.0 unreleased changes and collapse metadata tables by default --- CHANGELOG.md | 23 +++ README.md | 18 ++- docs/guides/collection.md | 2 +- docs/guides/index.md | 8 +- docs/guides/ntp_extension.md | 125 +++++++++++++++++ mkdocs.yaml | 1 + opensampl/cli.py | 3 +- opensampl/db/bootstrap.py | 131 ++++++++++++++++++ opensampl/load/ntp_geolocation.py | 127 +++++++++++++++++ opensampl/load_data.py | 12 +- opensampl/metrics.py | 5 +- .../grafana-dashboards/ntp-dashboard.json | 51 +++++-- .../public-timing-dashboard.json | 65 ++++++--- .../dashboards/dashboard.yaml | 6 +- opensampl/vendors/ntp_remote.py | 63 ++++++++- tests/test_cli.py | 23 ++- 16 files changed, 615 insertions(+), 48 deletions(-) create mode 100644 docs/guides/ntp_extension.md create mode 100644 opensampl/db/bootstrap.py create mode 100644 opensampl/load/ntp_geolocation.py diff --git a/CHANGELOG.md b/CHANGELOG.md index dfb7252..80a9825 100644 --- a/CHANGELOG.md +++ b/CHANGELOG.md @@ -5,6 +5,29 @@ This project adheres to [Semantic Versioning](https://semver.org/). --- +## [1.2.0] - Unreleased +### Added +- 🔥 NTP vendor probe family (`NtpProbe`) with JSON snapshot format, filename convention, and `ntp_metadata` ORM table +- 🔥 `opensampl-collect ntp` entry point: local chrony/ntpq/timedatectl-style collection and remote UDP queries via `ntplib` +- 🔥 NTP-focused metrics in `METRICS` (phase offset, delay, jitter, stratum, reachability, dispersion, root delay/dispersion, poll interval, sync health) +- 🔥 Idempotent database bootstrap after schema creation: seed `reference_type`, `metric_type`, default `reference` and `defaults` rows from `REF_TYPES` / `METRICS`; `public.get_default_uuid_for()` for `ProbeData` defaults; `castdb.campus_locations` view for geospatial dashboards backed by `locations.geom` +- 🔥 Grafana: NTP probes dashboard (`ntp-opensampl`), public geospatial timing dashboard updates, datasource/dashboard provisioning alignment +- 🔥 Grafana table panels joining stored `probe_metadata`, `ntp_metadata`, `locations`, and `reference` / `reference_type` for probe reference & source context (no runtime geolocation in panels) +- 🔥 Remote NTP snapshot identity overrides (`probe_id`, `probe_ip`, `probe_name`, optional lab `geolocation` hints) for stable ingest keys + +### Changed +- ⚡ Grafana timing panel titles and dashboard copy to **reference-safe** wording (NTP / configured reference vs implying GNSS truth where not applicable); extensible for future GNSS-backed probes +- ⚡ `METRICS.NTP_JITTER` description to distinguish measured jitter (local parsers) from conservative remote estimates +- ⚡ Remote `query_ntp_server`: emit `jitter_s` for time series using a documented delay/root-dispersion bound when RFC peer jitter is unavailable from a single packet +- ⚡ `load_probe_metadata`: NTP path attaches stored `locations` rows for dashboard geospatial joins (one-time at metadata load; not repeated in Grafana queries) + +### Fixed +- 🩹 `opensampl init` / `create_new_tables` leaving lookup tables empty (load path now seeds baseline rows and defaults) +- 🩹 Grafana PostgreSQL variables and panel filters: text-safe UUID handling for `varchar` `probe_metadata.uuid` (avoid `varchar = uuid` / empty `IN ()` failures) +- 🩹 Public geospatial dashboard map layer using the provisioned `castdb-datasource` UID consistently + +--- + -## [Unreleased] - YYYY-MM-DD +## [1.2.0] - Unreleased ### Added - 🔥 Moved alembic migration code into openSAMPL along with Docker image information - 🔥 Moved backend api code into openSAMPL along with Docker image information - 🔥 Docker-compose for developers which installs openSAMPL as editable on backend image +- 🔥 NTP vendor probe family (`NtpProbe`) with JSON snapshot format, filename convention, and `ntp_metadata` ORM table +- 🔥 `opensampl-collect ntp` entry point: local chrony/ntpq/timedatectl-style collection and remote UDP queries via `ntplib` +- 🔥 NTP-focused metrics in `METRICS` (phase offset, delay, jitter, stratum, reachability, dispersion, root delay/dispersion, poll interval, sync health) +- 🔥 Idempotent database bootstrap after schema creation: seed `reference_type`, `metric_type`, default `reference` and `defaults` rows from `REF_TYPES` / `METRICS`; `public.get_default_uuid_for()` for `ProbeData` defaults; `castdb.campus_locations` view for geospatial dashboards backed by `locations.geom` +- 🔥 Grafana: NTP probes dashboard (`ntp-opensampl`), public geospatial timing dashboard updates, datasource/dashboard provisioning alignment +- 🔥 Grafana table panels joining stored `probe_metadata`, `ntp_metadata`, `locations`, and `reference` / `reference_type` for probe reference & source context (no runtime geolocation in panels) +- 🔥 Remote NTP snapshot identity overrides (`probe_id`, `probe_ip`, `probe_name`, optional lab `geolocation` hints) for stable ingest keys + +### Changed +- ⚡ Grafana timing panel titles and dashboard copy to **reference-safe** wording (NTP / configured reference vs implying GNSS truth where not applicable); extensible for future GNSS-backed probes +- ⚡ `METRICS.NTP_JITTER` description to distinguish measured jitter (local parsers) from conservative remote estimates +- ⚡ Remote `query_ntp_server`: emit `jitter_s` for time series using a documented delay/root-dispersion bound when RFC peer jitter is unavailable from a single packet +- ⚡ `load_probe_metadata`: NTP path attaches stored `locations` rows for dashboard geospatial joins (one-time at metadata load; not repeated in Grafana queries) ### Fixed +- 🩹 `opensampl init` / `create_new_tables` leaving lookup tables empty (load path now seeds baseline rows and defaults) +- 🩹 Grafana PostgreSQL variables and panel filters: text-safe UUID handling for `varchar` `probe_metadata.uuid` (avoid `varchar = uuid` / empty `IN ()` failures) +- 🩹 Public geospatial dashboard map layer using the provisioned `castdb-datasource` UID consistently - 🩹 Bug which caused random data duration to always be 1 hour ## [1.1.5] - 2025-09-22 From 9f70c3adec722b81b31f7517082b89936a1d212b Mon Sep 17 00:00:00 2001 From: "MacFarland, Midgie" Date: Mon, 13 Apr 2026 12:49:55 -0400 Subject: [PATCH 5/5] adding override option for server --- opensampl/config/server.py | 12 ++++++++++++ 1 file changed, 12 insertions(+) diff --git a/opensampl/config/server.py b/opensampl/config/server.py index 6478145..0e302e1 100644 --- a/opensampl/config/server.py +++ b/opensampl/config/server.py @@ -35,6 +35,8 @@ class ServerConfig(BaseConfig): COMPOSE_FILE: str = Field(default="", description="Fully resolved path to the Docker Compose file.") + OVERRIDE_FILE: str | None = Field(defualt=None, description="Override for the compose file") + DOCKER_ENV_FILE: str = Field(default="", description="Fully resolved path to the Docker .env file.") docker_env_values: dict[str, Any] = Field(default_factory=dict, init=False) @@ -67,6 +69,14 @@ def resolve_compose_file(cls, v: Any) -> str: return get_resolved_resource_path(opensampl.server, "docker-compose.yaml") return str(Path(v).expanduser().resolve()) + @field_validator("OVERRIDE_FILE", mode="before") + @classmethod + def resolve_override_file(cls, v: Any) -> str: + """Resolve the provided compose file for docker to use, or default to the docker-compose.yaml provided""" + if v: + return str(Path(v).expanduser().resolve()) + return v + @field_validator("DOCKER_ENV_FILE", mode="before") @classmethod def resolve_docker_env_file(cls, v: Any) -> str: @@ -89,6 +99,8 @@ def build_docker_compose_base(self): compose_command = self.get_compose_command() command = shlex.split(compose_command) command.extend(["--env-file", self.DOCKER_ENV_FILE, "-f", self.COMPOSE_FILE]) + if self.OVERRIDE_FILE: + command.extend(["-f", self.OVERRIDE_FILE]) return command def set_by_name(self, name: str, value: Any):