Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
Show all changes
17 commits
Select commit Hold shift + click to select a range
ed220d8
docs: result-sink spec and implementation plan
aaronbrethorst May 26, 2026
365d082
feat(sink): scaffold Config with Configured()
aaronbrethorst May 26, 2026
c6042df
style: apply gofmt
aaronbrethorst May 26, 2026
3e92d5a
feat(sink): Validate enforces missing-sibling and table allow-list
aaronbrethorst May 26, 2026
1e48324
fix(sink): gofmt sink_test.go and derive allow-list from map
aaronbrethorst May 26, 2026
e054aa8
feat(sink): normalizeDSN strips jdbc prefix, defaults sslmode and con…
aaronbrethorst May 26, 2026
5b126d1
feat(sink): implement Write against pgx/v5
aaronbrethorst May 26, 2026
22ee3b4
fix(sink): redactErr also scrubs URL-encoded DBPass; document Validat…
aaronbrethorst May 26, 2026
418838e
test(sink): env-gated integration test for Write
aaronbrethorst May 26, 2026
d168600
feat(config): parse and validate optional result-sink fields
aaronbrethorst May 26, 2026
636056f
refactor(report): expose RenderJSON / RenderErrorJSON bytes helpers
aaronbrethorst May 26, 2026
1d48abd
feat(cli): write result row to sink on completed and error paths
aaronbrethorst May 26, 2026
4079765
fix(cli): emit sink error row when report rendering fails; fix err sh…
aaronbrethorst May 26, 2026
e7fb5cc
docs: mention optional result sink in CLAUDE.md
aaronbrethorst May 26, 2026
5bf4282
docs(readme): document optional result sink fields
aaronbrethorst May 26, 2026
4aa15b4
fix(sink): error-check admin.Close and reject unsupported status values
aaronbrethorst May 26, 2026
aa6f60d
refactor: address SonarCloud findings on PR #4
aaronbrethorst May 26, 2026
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
3 changes: 2 additions & 1 deletion CLAUDE.md
Original file line number Diff line number Diff line change
Expand Up @@ -4,7 +4,7 @@ This file provides guidance to Claude Code (claude.ai/code) when working with co

## What this is

A Go CLI + library that validates a running OneBusAway (OBA) server by cross-referencing its REST API against the *authoritative* sources of truth: the operator's static GTFS feed and GTFS-realtime feeds (vehicle positions, trip updates, service alerts). It answers "is this OBA server telling the truth about what the feeds say?"
A Go CLI + library that validates a running OneBusAway (OBA) server by cross-referencing its REST API against the *authoritative* sources of truth: the operator's static GTFS feed and GTFS-realtime feeds (vehicle positions, trip updates, service alerts). It answers "is this OBA server telling the truth about what the feeds say?" An optional Postgres result sink (`sink/`) writes one row per run keyed by `correlation_id` when the invocation payload includes `db_url` and its siblings — see `docs/superpowers/specs/2026-05-25-result-sink-design.md`.

## Commands

Expand Down Expand Up @@ -33,6 +33,7 @@ The flow is **config → prepare (fetch) → checks → report**:
2. **`feeds`** — fetching + parsing. `Fetcher` downloads feeds; static GTFS goes through an on-disk **conditional-GET `Cache`** (ETag/Last-Modified, atomic body-then-meta writes), realtime feeds are always fetched fresh. `ParsedStatic` wraps go-gtfs's `Static` with the lookup indexes checks need (agency IDs/names, raw trip→agency, raw route→agency).
3. **`validator`** — the engine. `validator.Run()` calls `prepare()`, then runs every check.
4. **`report`** — renders a `Report` as grouped text (`WriteText`) or, via `WriteJSON`, a UI-oriented JSON `Document` (meta + summary + grouped results; schema at `schema/oba-validator-report.schema.json`). `WriteErrorJSON` emits the error variant. The `Document` view model is built by the pure `BuildDocument(report, config, now)` so output is deterministic in tests.
5. **`sink`** — optional Postgres writer. When the invocation payload includes `db_url`/`db_user`/`db_pass`/`correlation_id`/`result_table`, `main.go` calls `sink.Write` after stdout is written. `status` is `"completed"` for both PASS and FAIL verdicts (the verdict lives inside `result_data` at `summary.verdict`); `"error"` is reserved for the `errorDocument` variant. A sink write failure is logged to stderr and never changes the validator's exit code.

`prepare()` (`validator/validator.go`) builds the shared `ValidationContext`: it constructs the OBA SDK client, fetches `AgenciesWithCoverage` once, and **fans out concurrently** (bounded by `MaxConcurrency`, default 4) to download/parse each data source's feeds into a `SourceContext`. A per-feed fetch/parse failure is recorded in `SourceContext.PrepErrors[feedName]` rather than aborting the run — checks inspect that map and decide severity themselves.

Expand Down
34 changes: 34 additions & 0 deletions README.md
Original file line number Diff line number Diff line change
Expand Up @@ -36,6 +36,11 @@ error. Warnings and skips do not affect the exit code.
variable. `agencyMapping` (optional, per data source) maps each GTFS `agency_id`
to the `agencyId` the OBA server exposes; unmapped agencies default to identity.

Five optional top-level fields (`db_url`, `db_user`, `db_pass`, `correlation_id`,
`result_table`) activate the result sink described under [Reading the result from
a database](#reading-the-result-from-a-database) below; when absent the validator
behaves exactly as today.

## Library

```go
Expand Down Expand Up @@ -120,3 +125,32 @@ job's exit status is the validator's exit code (`0` no failures, `1` ≥1 failur
See `docs/superpowers/specs/2026-05-24-oba-validator-design.md` for the validator
design and `docs/superpowers/specs/2026-05-25-render-deployment-design.md` for the
deployment design.

### Reading the result from a database

When the validator runs as a Render one-off job, the job status exposes whether
it succeeded but not the report itself. To let a caller (e.g. obacloud's
`ServerValidationJob`) read the report back without scraping stdout, supply five
additional fields in the same JSON payload and the validator will write one row
to a Postgres "results" table after stdout, keyed by `correlation_id`:

| Field | Description |
|---|---|
| `db_url` | JDBC-style URL, e.g. `jdbc:postgresql://host:5432/dbname`. Activates the sink when non-blank. |
| `db_user` / `db_pass` | DB credentials. |
| `correlation_id` | UUID the caller chooses; row key. |
| `result_table` | Table name. Must be `oba_validator_results` (allow-listed). |

The validator creates the table on first write (`CREATE TABLE IF NOT EXISTS`)
with columns `correlation_id TEXT PRIMARY KEY, status TEXT NOT NULL, result_data
TEXT, error_message TEXT`, then `INSERT ... ON CONFLICT (correlation_id) DO
NOTHING` — so retries are idempotent.

Behavior is purely additive: when `db_url` is absent the validator behaves
exactly as today. A row is always written after stdout, with `status="completed"`
for both PASS and FAIL verdicts (the verdict lives at `summary.verdict` inside
`result_data`) and `status="error"` reserved for the `errorDocument` variant.
Sink failures are logged to stderr but never change the exit code.

See `docs/superpowers/specs/2026-05-25-result-sink-design.md` for the full
contract.
144 changes: 116 additions & 28 deletions cmd/oba-validator/main.go
Original file line number Diff line number Diff line change
Expand Up @@ -11,25 +11,66 @@ import (

"github.com/onebusaway/oba-validator/config"
"github.com/onebusaway/oba-validator/report"
"github.com/onebusaway/oba-validator/sink"
"github.com/onebusaway/oba-validator/validator"
)

// apiKeyInJSON matches an "apiKey" string field in a (possibly malformed) JSON
// argument so its value can be scrubbed from error output.
var apiKeyInJSON = regexp.MustCompile(`"apiKey"\s*:\s*"((?:\\.|[^"\\])*)"`)

// redactionKey returns the apiKey to scrub from a config-load error. config.Load
// can fail before it parses the key and echo the raw argument (and thus an inline
// apiKey) into its error — either when a raw-JSON argument that does not start
// with '{' is misread as a file path (the os.ReadFile error wraps the input), or
// when a malformed object fails to parse. config.Load returns an empty Config in
// both cases, so prefer a key sniffed straight from the argument, falling back to
// the environment.
func redactionKey(arg string) string {
// dbPassInJSON matches a "db_pass" field in a (possibly malformed) JSON argument
// so its value can be scrubbed when config.Load echoes the raw input back to
// the user (see redactionKey's rationale for apiKey).
var dbPassInJSON = regexp.MustCompile(`"db_pass"\s*:\s*"((?:\\.|[^"\\])*)"`)

// redactionSecrets returns every secret value that must be removed from an
// error string. Inline credentials sniffed straight from the raw argument win
// over environment fallbacks because config.Load can fail before parsing the
// JSON (an os.ReadFile error wraps the input as a file path) and echo the raw
// blob — including any apiKey or db_pass inside it.
func redactionSecrets(arg string) []string {
var out []string
if m := apiKeyInJSON.FindStringSubmatch(arg); m != nil && m[1] != "" {
return m[1]
out = append(out, m[1])
} else if env := os.Getenv("ONEBUSAWAY_API_KEY"); env != "" {
out = append(out, env)
}
if m := dbPassInJSON.FindStringSubmatch(arg); m != nil && m[1] != "" {
out = append(out, m[1])
}
return out
}

// scrub replaces every non-empty secret in s with "***". Empty secrets are
// no-ops so callers don't need to filter before calling.
func scrub(s string, secrets []string) string {
for _, sec := range secrets {
if sec != "" {
s = strings.ReplaceAll(s, sec, "***")
}
}
return os.Getenv("ONEBUSAWAY_API_KEY")
return s
}

// sinkWriteFailedMsg is the stderr prefix used whenever a result-sink write
// returns an error. Centralized so all four call sites (validator-error path,
// JSON-render-failure fallback in both o.jsonOut branches, and the unified
// success-path write) stay in sync.
const sinkWriteFailedMsg = "result sink write failed:"

// renderJSON is the function used to render the report to JSON bytes. It is a
// package-level var so tests can replace it with a stub that returns an error,
// exercising the sink "error" fallback when rendering itself fails. Production
// callers use the default (report.RenderJSON).
var renderJSON = report.RenderJSON

// sinkWrite is the function used to write the run's result row to the optional
// Postgres sink. It is a package-level var so tests can replace it with a
// recorder, avoiding a real DB dependency in unit tests. Production callers
// use the default (sink.Config.Write).
var sinkWrite = func(ctx context.Context, c sink.Config, status, data, errMsg string) error {
return c.Write(ctx, status, data, errMsg)
}

type overrides struct {
Expand Down Expand Up @@ -88,47 +129,94 @@ func run(args []string, stdout, stderr io.Writer) int {

cfg, err := config.Load(fs.Arg(0))
if err != nil {
key := redactionKey(fs.Arg(0))
secrets := redactionSecrets(fs.Arg(0))
msg := scrub(err.Error(), secrets)
if o.jsonOut {
if werr := report.WriteErrorJSON(stdout, err.Error(), key); werr != nil {
// WriteErrorJSON does an extra apiKey scrub of its own; passing
// the already-scrubbed msg through is idempotent.
if werr := report.WriteErrorJSON(stdout, msg, ""); werr != nil {
fmt.Fprintln(stderr, "output error:", werr)
}
} else {
msg := err.Error()
if key != "" {
msg = strings.ReplaceAll(msg, key, "***")
}
fmt.Fprintln(stderr, "config error:", msg)
}
// No sink write here: the sink config could not be parsed, so there's
// no correlation_id to key the row by. The caller's polling timeout
// is the safety net (see spec §Deployment).
return 2
}
applyOverrides(&cfg, o)

rep, err := validator.Run(context.Background(), cfg)
ctx := context.Background()
rep, err := validator.Run(ctx, cfg)
if err != nil {
errMsg := scrub(err.Error(), []string{cfg.APIKey, cfg.DBPass})
if o.jsonOut {
if werr := report.WriteErrorJSON(stdout, err.Error(), cfg.APIKey); werr != nil {
if werr := report.WriteErrorJSON(stdout, errMsg, ""); werr != nil {
fmt.Fprintln(stderr, "output error:", werr)
}
} else {
msg := err.Error()
if cfg.APIKey != "" {
msg = strings.ReplaceAll(msg, cfg.APIKey, "***")
fmt.Fprintln(stderr, "run error:", errMsg)
}
// Validator-error path: we DO have a parsed sink config (config.Load
// succeeded). Write status="error" so the caller learns the run failed
// rather than timing out.
if sc := cfg.SinkConfig(); sc.Configured() {
if werr := sinkWrite(ctx, sc, "error", "", errMsg); werr != nil {
fmt.Fprintln(stderr, sinkWriteFailedMsg, werr)
}
fmt.Fprintln(stderr, "run error:", msg)
}
return 2
}

var werr error
// Success path: render once, write twice (stdout + optional sink).
var reportBytes []byte
if o.jsonOut {
werr = report.WriteJSON(stdout, rep, cfg)
var renderErr error
reportBytes, renderErr = renderJSON(rep, cfg)
if renderErr != nil {
fmt.Fprintln(stderr, "output error:", renderErr)
// Render failed before stdout: fall back to a sink "error" row so the
// caller doesn't poll until its 15-minute timeout. Stdout consumers
// get nothing on this path, but Render logs will carry the stderr line.
if sc := cfg.SinkConfig(); sc.Configured() {
if werr := sinkWrite(ctx, sc, "error", "", "internal: render JSON failed: "+renderErr.Error()); werr != nil {
fmt.Fprintln(stderr, sinkWriteFailedMsg, werr)
}
}
return 2
}
if _, werr := stdout.Write(reportBytes); werr != nil {
fmt.Fprintln(stderr, "output error:", werr)
return 2
}
} else {
werr = report.WriteText(stdout, rep)
if werr := report.WriteText(stdout, rep); werr != nil {
fmt.Fprintln(stderr, "output error:", werr)
return 2
}
// Text path still needs JSON bytes for the sink (the contract is
// fixed: result_data is the JSON report). Render after stdout so a
// rendering failure here can't suppress the text output the user already
// saw — but if it does fail, write a sink "error" row so the caller
// doesn't poll until its 15-minute timeout.
if sc := cfg.SinkConfig(); sc.Configured() {
var renderErr error
reportBytes, renderErr = renderJSON(rep, cfg)
if renderErr != nil {
fmt.Fprintln(stderr, "result sink: render JSON failed:", renderErr)
if werr := sinkWrite(ctx, sc, "error", "", "internal: render JSON failed: "+renderErr.Error()); werr != nil {
fmt.Fprintln(stderr, sinkWriteFailedMsg, werr)
}
return rep.ExitCode()
}
}
}
if werr != nil {
fmt.Fprintln(stderr, "output error:", werr)
return 2

if sc := cfg.SinkConfig(); sc.Configured() && reportBytes != nil {
if werr := sinkWrite(ctx, sc, "completed", string(reportBytes), ""); werr != nil {
fmt.Fprintln(stderr, sinkWriteFailedMsg, werr)
}
}
return rep.ExitCode()
}
Expand Down
Loading