Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 2 additions & 0 deletions CHANGELOG.md
Original file line number Diff line number Diff line change
Expand Up @@ -2,6 +2,8 @@

## Unreleased

- Go server: select the transport with the `transport` option on the `go_server` outputter to support gradual logging->pubsub migrations. ([DENG-9533](https://mozilla-hub.atlassian.net/browse/DENG-9533))
Copy link
Copy Markdown
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This should probably call out that it's ... uh ... basically breaking of what shipped in 19.2.0 (though I guess because there's only one consumer and that's to help them we're not actively breaking anyone)


## 19.2.0

- Go server template: add `go_server_pubsub` outputter to support direct
Expand Down
52 changes: 31 additions & 21 deletions glean_parser/go_server.py
Original file line number Diff line number Diff line change
Expand Up @@ -13,9 +13,11 @@
window and connectivity don't hold.

Generated code takes care of assembling pings with metrics, and serializing to messages
conforming to Glean schema. Two transport modes are supported:
- Cloud Logging (go_server): Logs to stdout in MozLog format for ingestion via GCP log routing
- Pub/Sub (go_server_pubsub): Publishes directly to GCP Pub/Sub topics
conforming to Glean schema. The transport is selected with the `transport` option
on the `go_server` outputter (`-s transport=...`):
- `logging` (default): Logs to stdout in MozLog format for ingestion via GCP log routing
- `pubsub`: Builds messages for direct publishing to GCP Pub/Sub topics
- `combined`: Emits both in a single file (sharing common types) for gradual migration

Warning: this outputter supports limited set of metrics,
see `SUPPORTED_METRIC_TYPES` below.
Expand Down Expand Up @@ -128,8 +130,9 @@ def output_go(
:param objects: A tree of objects (metrics and pings) as returned from
`parser.parse_objects`.
:param output_dir: Path to an output directory to write to.
:param transport: Transport mode - either "logging" (Cloud Logging) or
"pubsub" (Pub/Sub direct publishing). Default is "logging".
:param transport: Transport mode - one of "logging" (Cloud Logging, the
default), "pubsub" (Pub/Sub direct publishing), or "combined" (both in
a single file, sharing common types).
"""

template = util.get_jinja2_template(
Expand Down Expand Up @@ -211,29 +214,36 @@ def output_go(
)


def output_go_logger(
objs: metrics.ObjectTree, output_dir: Path, options: Optional[Dict[str, Any]] = None
) -> None:
"""
Given a tree of objects, output Go code using Cloud Logging transport.

:param objects: A tree of objects (metrics and pings) as returned from
`parser.parse_objects`.
:param output_dir: Path to an output directory to write to.
:param options: options dictionary (currently unused for Go).
"""
output_go(objs, output_dir, options, transport="logging")
def _resolve_transport(default: str, options: Optional[Dict[str, Any]]) -> str:
if not options:
return default
transport = options.get("transport")
if transport is None:
return default
if transport not in ("logging", "pubsub", "combined"):
raise ValueError(
f"Invalid transport '{transport}'."
" Must be one of: logging, pubsub, combined."
)
return transport


def output_go_pubsub(
def output_go_logger(
objs: metrics.ObjectTree, output_dir: Path, options: Optional[Dict[str, Any]] = None
) -> None:
"""
Given a tree of objects, output Go code using Pub/Sub transport.
Given a tree of objects, output server Go code.

Defaults to the Cloud Logging transport; pass `-s transport=pubsub` for
direct Pub/Sub publishing, or `-s transport=combined` to emit both in a
single file (sharing common types) for a gradual logging->pubsub migration.

:param objects: A tree of objects (metrics and pings) as returned from
`parser.parse_objects`.
:param output_dir: Path to an output directory to write to.
:param options: options dictionary (currently unused for Go).
:param options: options dictionary. Supports `transport` key with values
`logging` (default), `pubsub`, or `combined`.
"""
output_go(objs, output_dir, options, transport="pubsub")
output_go(
objs, output_dir, options, transport=_resolve_transport("logging", options)
)
60 changes: 39 additions & 21 deletions glean_parser/templates/go_server.jinja2
Original file line number Diff line number Diff line change
@@ -1,4 +1,6 @@
{# The final Go code is autogenerated, but this template is not. Please file bugs! #}
{% set include_pubsub = transport in ["pubsub", "combined"] %}
{% set include_logging = transport in ["logging", "combined"] %}
package glean

// This Source Code Form is subject to the terms of the Mozilla Public
Expand All @@ -9,30 +11,31 @@ package glean

// required imports
import (
{% if transport == "pubsub" %}
{% if include_pubsub %}
"bytes"
"compress/gzip"
{% endif %}
"encoding/json"
"fmt"
"io"
"sync"
"time"

pubsub "cloud.google.com/go/pubsub/v2"
"github.com/google/uuid"
{% else %}
"encoding/json"
{% if include_logging %}
"errors"
{% endif %}
"fmt"
"io"
{% if include_logging %}
"strconv"
{% endif %}
{% if include_pubsub %}
"sync"
{% endif %}
"time"

"github.com/google/uuid"
{% if include_pubsub %}
pubsub "cloud.google.com/go/pubsub/v2"
{% endif %}
"github.com/google/uuid"
)

{% if transport == "pubsub" %}
{% if include_pubsub %}
// GleanEventsBuilder constructs Pub/Sub messages carrying Glean pings.
// Builder is stateless beyond its app-identity fields; callers own the
// pubsub.Client, the pubsub.Publisher, batching settings, retries, shutdown sequencing, and any
Expand Down Expand Up @@ -74,7 +77,8 @@ func compressPayload(data []byte) ([]byte, error) {
gzipPool.Put(gz)
return buf.Bytes(), nil
}
{% else %}
{% endif %}
{% if include_logging %}
// log type string used to identify logs to process in the Moz Data Pipeline
var gleanEventMozlogType string = "glean-server-event"

Expand Down Expand Up @@ -120,7 +124,7 @@ type pingInfo struct {
EndTime string `json:"end_time"`
}

{% if transport == "logging" %}
{% if include_logging %}
type ping struct {
DocumentNamespace string `json:"document_namespace"`
DocumentType string `json:"document_type"`
Expand Down Expand Up @@ -148,7 +152,7 @@ type gleanEvent struct {
Extra map[string]string `json:"extra"`
}

{% if transport == "logging" %}
{% if include_logging %}
type logEnvelope struct {
Timestamp string
Logger string
Expand All @@ -157,11 +161,23 @@ type logEnvelope struct {
}
{% endif %}

{% if transport == "pubsub" %}
{% if include_pubsub %}
func (g GleanEventsBuilder) createClientInfo() clientInfo {
{% else %}
func (g GleanEventsLogger) createClientInfo() clientInfo {
// Fields with default values are required in the Glean schema, but not used in server context
return clientInfo{
TelemetrySDKBuild: "glean_parser v{{ parser_version }}",
FirstRunDate: "Unknown",
OS: "Unknown",
OSVersion: "Unknown",
Architecture: "Unknown",
AppBuild: "Unknown",
AppDisplayVersion: g.AppDisplayVersion,
AppChannel: g.AppChannel,
}
}
{% endif %}
{% if include_logging %}
func (g GleanEventsLogger) createClientInfo() clientInfo {
// Fields with default values are required in the Glean schema, but not used in server context
return clientInfo{
TelemetrySDKBuild: "glean_parser v{{ parser_version }}",
Expand All @@ -174,6 +190,7 @@ func (g GleanEventsLogger) createClientInfo() clientInfo {
AppChannel: g.AppChannel,
}
}
{% endif %}

func createPingInfo() pingInfo {
{# times are ISO-8601 strings, e.g. "2023-12-19T22:09:17.440Z" #}
Expand All @@ -185,7 +202,7 @@ func createPingInfo() pingInfo {
}
}

{% if transport == "logging" %}
{% if include_logging %}
func (g GleanEventsLogger) createPing(documentType string, config RequestInfo, payload pingPayload) (ping, error) {
payloadJson, err := json.Marshal(payload)
if err != nil {
Expand Down Expand Up @@ -332,7 +349,7 @@ type {{ ping|ping_type_name }} struct {
{% endif %}
}

{% if transport == "pubsub" %}
{% if include_pubsub %}
// Build{{ ping|ping_type_name }}Message constructs a Pub/Sub message carrying
// the given `{{ ping }}` ping. The caller publishes the returned message
// (e.g., topic.Publish(ctx, msg)) and owns batching, retries, shutdown
Expand Down Expand Up @@ -426,7 +443,8 @@ func (g GleanEventsBuilder) Build{{ ping|ping_type_name }}MessageWithoutUserInfo
) (*pubsub.Message, error) {
return g.Build{{ ping|ping_type_name }}Message(defaultRequestInfo, params)
}
{% else %}
{% endif %}
{% if include_logging %}
// Record and submit `{{ ping }}` ping
func (g GleanEventsLogger) Record{{ ping|ping_type_name }}(
requestInfo RequestInfo,
Expand Down
1 change: 0 additions & 1 deletion glean_parser/translate.py
Original file line number Diff line number Diff line change
Expand Up @@ -58,7 +58,6 @@ def __init__(

OUTPUTTERS = {
"go_server": Outputter(go_server.output_go_logger, []),
"go_server_pubsub": Outputter(go_server.output_go_pubsub, []),
"javascript": Outputter(javascript.output_javascript, []),
"typescript": Outputter(javascript.output_typescript, []),
"javascript_server": Outputter(javascript_server.output_javascript, []),
Expand Down
5 changes: 5 additions & 0 deletions glean_parser/translation_options.py
Original file line number Diff line number Diff line change
Expand Up @@ -45,6 +45,11 @@ def translate_options(ctx, param, value):
Markdown:
- `project_title`: The project's title.

Go server (`go_server`):
- `transport`: Transport mode. One of `logging` (Cloud Logging, the default),
`pubsub` (Pub/Sub direct publishing), or `combined` (emit both in a
single file, sharing common types, for a gradual migration).

(press q to exit)"""

if value:
Expand Down
55 changes: 46 additions & 9 deletions tests/test_go_server_pubsub.py
Original file line number Diff line number Diff line change
Expand Up @@ -4,7 +4,8 @@
# http://creativecommons.org/publicdomain/zero/1.0/

"""
Tests for the `go_server_pubsub` outputter and its runtime behavior.
Tests for the `go_server` outputter's Pub/Sub and combined transports
(`-s transport=pubsub` / `-s transport=combined`) and their runtime behavior.
"""

from pathlib import Path
Expand Down Expand Up @@ -38,10 +39,12 @@
# Helpers


def _translate(glean_module_path, yaml_filenames):
"""Translate the given YAML fixtures with the pubsub outputter."""
def _translate(glean_module_path, yaml_filenames, transport="pubsub"):
"""Translate the given YAML fixtures with the given go_server transport."""
yaml_files = [ROOT / "data" / name for name in yaml_filenames]
translate.translate(yaml_files, "go_server_pubsub", glean_module_path)
translate.translate(
yaml_files, "go_server", glean_module_path, {"transport": transport}
)


def _gofmt_parse_ok(path):
Expand Down Expand Up @@ -92,8 +95,9 @@ def test_parser_pubsub_ping_no_metrics(tmp_path, capsys):
without any metrics."""
translate.translate(
ROOT / "data" / "server_pings.yaml",
"go_server_pubsub",
"go_server",
tmp_path,
{"transport": "pubsub"},
)
assert all(False for _ in tmp_path.iterdir())

Expand All @@ -103,8 +107,9 @@ def test_parser_pubsub_metrics_unsupported_type(tmp_path, capsys):
emitted for each unsupported type."""
translate.translate(
[ROOT / "data" / "go_server_metrics_unsupported.yaml"],
"go_server_pubsub",
"go_server",
tmp_path,
{"transport": "pubsub"},
)
captured = capsys.readouterr()
assert "Ignoring unsupported metric type" in captured.out
Expand All @@ -117,8 +122,9 @@ def test_parser_pubsub_labeled_boolean_without_labels(tmp_path, capsys):
"""labeled_boolean without static labels is rejected."""
translate.translate(
[ROOT / "data" / "go_server_metrics_unsupported.yaml"],
"go_server_pubsub",
"go_server",
tmp_path,
{"transport": "pubsub"},
)
captured = capsys.readouterr()
assert "Ignoring labeled_boolean metric without static labels" in captured.out
Expand All @@ -129,8 +135,9 @@ def test_parser_pubsub_labeled_boolean(tmp_path):
"""labeled_boolean metrics generate proper struct types."""
translate.translate(
ROOT / "data" / "go_server_labeled_boolean_metrics.yaml",
"go_server_pubsub",
"go_server",
tmp_path,
{"transport": "pubsub"},
)

assert set(x.name for x in tmp_path.iterdir()) == set(["server_events.go"])
Expand Down Expand Up @@ -165,8 +172,9 @@ def test_parser_pubsub_generation(tmp_path):
we don't re-check it here."""
translate.translate(
ROOT / "data" / "go_server_events_only_metrics.yaml",
"go_server_pubsub",
"go_server",
tmp_path,
{"transport": "pubsub"},
)

assert set(x.name for x in tmp_path.iterdir()) == set(["server_events.go"])
Expand Down Expand Up @@ -342,3 +350,32 @@ def test_run_pubsub_nil_string_list(tmp_path):
)

_validate_payload_against_schema(payload_bytes)


@pytest.mark.go_dependency
def test_run_combined_transport(tmp_path):
"""`transport=combined` emits both the logging Logger and the pubsub
Builder into one package, sharing common types. This is what lets a server
dual-write during a logging->pubsub migration. Assert both APIs coexist
with shared types emitted once, then compile + publish via the builder to
prove the combined package builds and the pubsub path still works."""
glean_module_path = tmp_path / "glean"
_translate(glean_module_path, YAML_EVENTS_PING, transport="combined")

with (glean_module_path / "server_events.go").open("r", encoding="utf-8") as fd:
content = fd.read()

# Both transports' entry types and APIs are present in the single file.
assert "type GleanEventsBuilder struct" in content
assert "type GleanEventsLogger struct" in content
assert "func (g GleanEventsBuilder) BuildEventsPingMessage(" in content
assert "func (g GleanEventsLogger) RecordEventsPing(" in content
# Shared types are emitted once, not duplicated across the two paths.
# gofmt validates syntax but not duplicate declarations, so guard here.
assert content.count("type clientInfo struct") == 1
assert content.count("type pingInfo struct") == 1

# The combined package compiles and the builder publishes a message.
msgs = _run_publisher(tmp_path, PUBSUB_SCENARIOS["events_ping"]["code"])
assert len(msgs) == 1, f"expected one published message, got {len(msgs)}"
assert msgs[0]["attributes"]["document_type"] == "events"