diff --git a/CHANGELOG.md b/CHANGELOG.md index 27a38eea..33d760ad 100644 --- a/CHANGELOG.md +++ b/CHANGELOG.md @@ -2,6 +2,8 @@ ## Unreleased +- Go server: select the transport with the `transport` option on the `go_server` outputter to support gradual logging->pubsub migrations. ([DENG-9533](https://mozilla-hub.atlassian.net/browse/DENG-9533)) + ## 19.2.0 - Go server template: add `go_server_pubsub` outputter to support direct diff --git a/glean_parser/go_server.py b/glean_parser/go_server.py index 238ebf94..aecc97ba 100644 --- a/glean_parser/go_server.py +++ b/glean_parser/go_server.py @@ -13,9 +13,11 @@ window and connectivity don't hold. Generated code takes care of assembling pings with metrics, and serializing to messages -conforming to Glean schema. Two transport modes are supported: -- Cloud Logging (go_server): Logs to stdout in MozLog format for ingestion via GCP log routing -- Pub/Sub (go_server_pubsub): Publishes directly to GCP Pub/Sub topics +conforming to Glean schema. The transport is selected with the `transport` option +on the `go_server` outputter (`-s transport=...`): +- `logging` (default): Logs to stdout in MozLog format for ingestion via GCP log routing +- `pubsub`: Builds messages for direct publishing to GCP Pub/Sub topics +- `combined`: Emits both in a single file (sharing common types) for gradual migration Warning: this outputter supports limited set of metrics, see `SUPPORTED_METRIC_TYPES` below. @@ -128,8 +130,9 @@ def output_go( :param objects: A tree of objects (metrics and pings) as returned from `parser.parse_objects`. :param output_dir: Path to an output directory to write to. - :param transport: Transport mode - either "logging" (Cloud Logging) or - "pubsub" (Pub/Sub direct publishing). Default is "logging". + :param transport: Transport mode - one of "logging" (Cloud Logging, the + default), "pubsub" (Pub/Sub direct publishing), or "combined" (both in + a single file, sharing common types). """ template = util.get_jinja2_template( @@ -211,29 +214,36 @@ def output_go( ) -def output_go_logger( - objs: metrics.ObjectTree, output_dir: Path, options: Optional[Dict[str, Any]] = None -) -> None: - """ - Given a tree of objects, output Go code using Cloud Logging transport. - - :param objects: A tree of objects (metrics and pings) as returned from - `parser.parse_objects`. - :param output_dir: Path to an output directory to write to. - :param options: options dictionary (currently unused for Go). - """ - output_go(objs, output_dir, options, transport="logging") +def _resolve_transport(default: str, options: Optional[Dict[str, Any]]) -> str: + if not options: + return default + transport = options.get("transport") + if transport is None: + return default + if transport not in ("logging", "pubsub", "combined"): + raise ValueError( + f"Invalid transport '{transport}'." + " Must be one of: logging, pubsub, combined." + ) + return transport -def output_go_pubsub( +def output_go_logger( objs: metrics.ObjectTree, output_dir: Path, options: Optional[Dict[str, Any]] = None ) -> None: """ - Given a tree of objects, output Go code using Pub/Sub transport. + Given a tree of objects, output server Go code. + + Defaults to the Cloud Logging transport; pass `-s transport=pubsub` for + direct Pub/Sub publishing, or `-s transport=combined` to emit both in a + single file (sharing common types) for a gradual logging->pubsub migration. :param objects: A tree of objects (metrics and pings) as returned from `parser.parse_objects`. :param output_dir: Path to an output directory to write to. - :param options: options dictionary (currently unused for Go). + :param options: options dictionary. Supports `transport` key with values + `logging` (default), `pubsub`, or `combined`. """ - output_go(objs, output_dir, options, transport="pubsub") + output_go( + objs, output_dir, options, transport=_resolve_transport("logging", options) + ) diff --git a/glean_parser/templates/go_server.jinja2 b/glean_parser/templates/go_server.jinja2 index 88f082e7..465499c7 100644 --- a/glean_parser/templates/go_server.jinja2 +++ b/glean_parser/templates/go_server.jinja2 @@ -1,4 +1,6 @@ {# The final Go code is autogenerated, but this template is not. Please file bugs! #} +{% set include_pubsub = transport in ["pubsub", "combined"] %} +{% set include_logging = transport in ["logging", "combined"] %} package glean // This Source Code Form is subject to the terms of the Mozilla Public @@ -9,30 +11,31 @@ package glean // required imports import ( -{% if transport == "pubsub" %} +{% if include_pubsub %} "bytes" "compress/gzip" +{% endif %} "encoding/json" - "fmt" - "io" - "sync" - "time" - - pubsub "cloud.google.com/go/pubsub/v2" - "github.com/google/uuid" -{% else %} - "encoding/json" +{% if include_logging %} "errors" +{% endif %} "fmt" "io" +{% if include_logging %} "strconv" +{% endif %} +{% if include_pubsub %} + "sync" +{% endif %} "time" - "github.com/google/uuid" +{% if include_pubsub %} + pubsub "cloud.google.com/go/pubsub/v2" {% endif %} + "github.com/google/uuid" ) -{% if transport == "pubsub" %} +{% if include_pubsub %} // GleanEventsBuilder constructs Pub/Sub messages carrying Glean pings. // Builder is stateless beyond its app-identity fields; callers own the // pubsub.Client, the pubsub.Publisher, batching settings, retries, shutdown sequencing, and any @@ -74,7 +77,8 @@ func compressPayload(data []byte) ([]byte, error) { gzipPool.Put(gz) return buf.Bytes(), nil } -{% else %} +{% endif %} +{% if include_logging %} // log type string used to identify logs to process in the Moz Data Pipeline var gleanEventMozlogType string = "glean-server-event" @@ -120,7 +124,7 @@ type pingInfo struct { EndTime string `json:"end_time"` } -{% if transport == "logging" %} +{% if include_logging %} type ping struct { DocumentNamespace string `json:"document_namespace"` DocumentType string `json:"document_type"` @@ -148,7 +152,7 @@ type gleanEvent struct { Extra map[string]string `json:"extra"` } -{% if transport == "logging" %} +{% if include_logging %} type logEnvelope struct { Timestamp string Logger string @@ -157,11 +161,23 @@ type logEnvelope struct { } {% endif %} -{% if transport == "pubsub" %} +{% if include_pubsub %} func (g GleanEventsBuilder) createClientInfo() clientInfo { -{% else %} -func (g GleanEventsLogger) createClientInfo() clientInfo { + // Fields with default values are required in the Glean schema, but not used in server context + return clientInfo{ + TelemetrySDKBuild: "glean_parser v{{ parser_version }}", + FirstRunDate: "Unknown", + OS: "Unknown", + OSVersion: "Unknown", + Architecture: "Unknown", + AppBuild: "Unknown", + AppDisplayVersion: g.AppDisplayVersion, + AppChannel: g.AppChannel, + } +} {% endif %} +{% if include_logging %} +func (g GleanEventsLogger) createClientInfo() clientInfo { // Fields with default values are required in the Glean schema, but not used in server context return clientInfo{ TelemetrySDKBuild: "glean_parser v{{ parser_version }}", @@ -174,6 +190,7 @@ func (g GleanEventsLogger) createClientInfo() clientInfo { AppChannel: g.AppChannel, } } +{% endif %} func createPingInfo() pingInfo { {# times are ISO-8601 strings, e.g. "2023-12-19T22:09:17.440Z" #} @@ -185,7 +202,7 @@ func createPingInfo() pingInfo { } } -{% if transport == "logging" %} +{% if include_logging %} func (g GleanEventsLogger) createPing(documentType string, config RequestInfo, payload pingPayload) (ping, error) { payloadJson, err := json.Marshal(payload) if err != nil { @@ -332,7 +349,7 @@ type {{ ping|ping_type_name }} struct { {% endif %} } -{% if transport == "pubsub" %} +{% if include_pubsub %} // Build{{ ping|ping_type_name }}Message constructs a Pub/Sub message carrying // the given `{{ ping }}` ping. The caller publishes the returned message // (e.g., topic.Publish(ctx, msg)) and owns batching, retries, shutdown @@ -426,7 +443,8 @@ func (g GleanEventsBuilder) Build{{ ping|ping_type_name }}MessageWithoutUserInfo ) (*pubsub.Message, error) { return g.Build{{ ping|ping_type_name }}Message(defaultRequestInfo, params) } -{% else %} +{% endif %} +{% if include_logging %} // Record and submit `{{ ping }}` ping func (g GleanEventsLogger) Record{{ ping|ping_type_name }}( requestInfo RequestInfo, diff --git a/glean_parser/translate.py b/glean_parser/translate.py index 59612c25..e9c35ae4 100644 --- a/glean_parser/translate.py +++ b/glean_parser/translate.py @@ -58,7 +58,6 @@ def __init__( OUTPUTTERS = { "go_server": Outputter(go_server.output_go_logger, []), - "go_server_pubsub": Outputter(go_server.output_go_pubsub, []), "javascript": Outputter(javascript.output_javascript, []), "typescript": Outputter(javascript.output_typescript, []), "javascript_server": Outputter(javascript_server.output_javascript, []), diff --git a/glean_parser/translation_options.py b/glean_parser/translation_options.py index 48774fee..db989332 100755 --- a/glean_parser/translation_options.py +++ b/glean_parser/translation_options.py @@ -45,6 +45,11 @@ def translate_options(ctx, param, value): Markdown: - `project_title`: The project's title. +Go server (`go_server`): +- `transport`: Transport mode. One of `logging` (Cloud Logging, the default), + `pubsub` (Pub/Sub direct publishing), or `combined` (emit both in a + single file, sharing common types, for a gradual migration). + (press q to exit)""" if value: diff --git a/tests/test_go_server_pubsub.py b/tests/test_go_server_pubsub.py index 65e83c0e..f4078f2a 100644 --- a/tests/test_go_server_pubsub.py +++ b/tests/test_go_server_pubsub.py @@ -4,7 +4,8 @@ # http://creativecommons.org/publicdomain/zero/1.0/ """ -Tests for the `go_server_pubsub` outputter and its runtime behavior. +Tests for the `go_server` outputter's Pub/Sub and combined transports +(`-s transport=pubsub` / `-s transport=combined`) and their runtime behavior. """ from pathlib import Path @@ -38,10 +39,12 @@ # Helpers -def _translate(glean_module_path, yaml_filenames): - """Translate the given YAML fixtures with the pubsub outputter.""" +def _translate(glean_module_path, yaml_filenames, transport="pubsub"): + """Translate the given YAML fixtures with the given go_server transport.""" yaml_files = [ROOT / "data" / name for name in yaml_filenames] - translate.translate(yaml_files, "go_server_pubsub", glean_module_path) + translate.translate( + yaml_files, "go_server", glean_module_path, {"transport": transport} + ) def _gofmt_parse_ok(path): @@ -92,8 +95,9 @@ def test_parser_pubsub_ping_no_metrics(tmp_path, capsys): without any metrics.""" translate.translate( ROOT / "data" / "server_pings.yaml", - "go_server_pubsub", + "go_server", tmp_path, + {"transport": "pubsub"}, ) assert all(False for _ in tmp_path.iterdir()) @@ -103,8 +107,9 @@ def test_parser_pubsub_metrics_unsupported_type(tmp_path, capsys): emitted for each unsupported type.""" translate.translate( [ROOT / "data" / "go_server_metrics_unsupported.yaml"], - "go_server_pubsub", + "go_server", tmp_path, + {"transport": "pubsub"}, ) captured = capsys.readouterr() assert "Ignoring unsupported metric type" in captured.out @@ -117,8 +122,9 @@ def test_parser_pubsub_labeled_boolean_without_labels(tmp_path, capsys): """labeled_boolean without static labels is rejected.""" translate.translate( [ROOT / "data" / "go_server_metrics_unsupported.yaml"], - "go_server_pubsub", + "go_server", tmp_path, + {"transport": "pubsub"}, ) captured = capsys.readouterr() assert "Ignoring labeled_boolean metric without static labels" in captured.out @@ -129,8 +135,9 @@ def test_parser_pubsub_labeled_boolean(tmp_path): """labeled_boolean metrics generate proper struct types.""" translate.translate( ROOT / "data" / "go_server_labeled_boolean_metrics.yaml", - "go_server_pubsub", + "go_server", tmp_path, + {"transport": "pubsub"}, ) assert set(x.name for x in tmp_path.iterdir()) == set(["server_events.go"]) @@ -165,8 +172,9 @@ def test_parser_pubsub_generation(tmp_path): we don't re-check it here.""" translate.translate( ROOT / "data" / "go_server_events_only_metrics.yaml", - "go_server_pubsub", + "go_server", tmp_path, + {"transport": "pubsub"}, ) assert set(x.name for x in tmp_path.iterdir()) == set(["server_events.go"]) @@ -342,3 +350,32 @@ def test_run_pubsub_nil_string_list(tmp_path): ) _validate_payload_against_schema(payload_bytes) + + +@pytest.mark.go_dependency +def test_run_combined_transport(tmp_path): + """`transport=combined` emits both the logging Logger and the pubsub + Builder into one package, sharing common types. This is what lets a server + dual-write during a logging->pubsub migration. Assert both APIs coexist + with shared types emitted once, then compile + publish via the builder to + prove the combined package builds and the pubsub path still works.""" + glean_module_path = tmp_path / "glean" + _translate(glean_module_path, YAML_EVENTS_PING, transport="combined") + + with (glean_module_path / "server_events.go").open("r", encoding="utf-8") as fd: + content = fd.read() + + # Both transports' entry types and APIs are present in the single file. + assert "type GleanEventsBuilder struct" in content + assert "type GleanEventsLogger struct" in content + assert "func (g GleanEventsBuilder) BuildEventsPingMessage(" in content + assert "func (g GleanEventsLogger) RecordEventsPing(" in content + # Shared types are emitted once, not duplicated across the two paths. + # gofmt validates syntax but not duplicate declarations, so guard here. + assert content.count("type clientInfo struct") == 1 + assert content.count("type pingInfo struct") == 1 + + # The combined package compiles and the builder publishes a message. + msgs = _run_publisher(tmp_path, PUBSUB_SCENARIOS["events_ping"]["code"]) + assert len(msgs) == 1, f"expected one published message, got {len(msgs)}" + assert msgs[0]["attributes"]["document_type"] == "events"