From 6066b3f1f82d1c010dbb0ac210ce9a8897480c5e Mon Sep 17 00:00:00 2001 From: Achilleas Athanasiou Fragkoulis Date: Sat, 7 Feb 2026 18:50:46 +0000 Subject: [PATCH 1/4] feat: adopt Alembic for database schema migrations MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit Adopt **Alembic** as the migration framework for `DatabaseSessionService`, replacing manual `ALTER TABLE` operations. This enables automatic schema tracking, upgrade/downgrade support, and Kubernetes Helm hook integration for production deployments. Addresses #3343 --- - **AlembicMigrationRunner** - Wraps Alembic’s command API with programmatic config (instead of `alembic.ini`) - Ships migrations inside the package - **V1 baseline migration** - Creates all current schema tables - **V0 → V1 in-place bootstrap** - Migrates pickle-based databases to JSON - No separate destination database required - `env.py` and `script.py.mako` - Support both online and offline migration modes --- - `upgrade` — Apply pending migrations with auto-bootstrap for existing DBs - `downgrade` — Roll back by revision spec - `check` — Exit `0` if up-to-date, `1` if pending (CI/CD gate) - `stamp` — Bootstrap Alembic tracking for existing databases - `generate` — Create new migration scripts (autogenerate or template) --- - `_ensure_alembic_tracking()` stamps baseline after table creation - V0 databases auto-migrate to V1 on first access - `ADK_AUTO_MIGRATE_DB=true` - Runs pending migrations on startup - `ADK_AUTO_MIGRATE_DB=false` (default) - Raises `RuntimeError` if schema is behind --- - `get_alembic_revision()` - `is_alembic_managed()` - `is_alembic_managed_from_url()` --- - unit tests for `AlembicMigrationRunner` (SQLite) - CLI command tests covering all subcommands and edge cases - integration tests across SQLite / PostgreSQL / MySQL - Updated `test_database_schema.py` for V0 auto-migration behavior - GHA workflow: - PostgreSQL 15–17 - MySQL 8.0 / 8.4 / 9.2 - Python 3.10–3.14 - `docker-compose.yml` for local integration testing / development --- - `migration_guide.md` — User-facing upgrade and CLI reference - `helm_migration_guide.md` — Kubernetes Job and Cloud SQL Proxy examples - `migration/README.md` — Contributor workflow for adding schema versions --- .github/workflows/test-migrations.yml | 144 +++++ docs/helm_migration_guide.md | 128 ++++ docs/migration_guide.md | 97 +++ pyproject.toml | 1 + src/google/adk/cli/cli_tools_click.py | 215 +++++++ src/google/adk/sessions/alembic_runner.py | 474 +++++++++++++++ .../adk/sessions/database_session_service.py | 74 +++ src/google/adk/sessions/migration/README.md | 166 +++++- .../sessions/migration/_schema_check_utils.py | 79 ++- src/google/adk/sessions/migration/env.py | 79 +++ .../adk/sessions/migration/script.py.mako | 45 ++ .../versions/001_baseline_v1_schema.py | 135 +++++ .../sessions/migration/versions/__init__.py | 13 + tests/integration/sessions/__init__.py | 13 + tests/integration/sessions/docker-compose.yml | 29 + .../test_database_migration_integration.py | 353 +++++++++++ .../sessions/migration/test_alembic_runner.py | 381 ++++++++++++ .../migration/test_cli_migrate_commands.py | 560 ++++++++++++++++++ .../migration/test_database_schema.py | 15 +- 19 files changed, 2977 insertions(+), 24 deletions(-) create mode 100644 .github/workflows/test-migrations.yml create mode 100644 docs/helm_migration_guide.md create mode 100644 docs/migration_guide.md create mode 100644 src/google/adk/sessions/alembic_runner.py create mode 100644 src/google/adk/sessions/migration/env.py create mode 100644 src/google/adk/sessions/migration/script.py.mako create mode 100644 src/google/adk/sessions/migration/versions/001_baseline_v1_schema.py create mode 100644 src/google/adk/sessions/migration/versions/__init__.py create mode 100644 tests/integration/sessions/__init__.py create mode 100644 tests/integration/sessions/docker-compose.yml create mode 100644 tests/integration/sessions/test_database_migration_integration.py create mode 100644 tests/unittests/sessions/migration/test_alembic_runner.py create mode 100644 tests/unittests/sessions/migration/test_cli_migrate_commands.py diff --git a/.github/workflows/test-migrations.yml b/.github/workflows/test-migrations.yml new file mode 100644 index 0000000000..954b6231bd --- /dev/null +++ b/.github/workflows/test-migrations.yml @@ -0,0 +1,144 @@ +# Copyright 2026 Google LLC +# +# Licensed under the Apache License, Version 2.0 (the "License"); +# you may not use this file except in compliance with the License. +# You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. + +name: Database Migration Tests + +on: + push: + branches: [main] + paths: + - 'src/google/adk/sessions/**' + - 'src/google/adk/cli/cli_tools_click.py' + - 'tests/**/sessions/**' + - '.github/workflows/test-migrations.yml' + pull_request: + branches: [main] + paths: + - 'src/google/adk/sessions/**' + - 'src/google/adk/cli/cli_tools_click.py' + - 'tests/**/sessions/**' + - '.github/workflows/test-migrations.yml' + +jobs: + unit-tests: + name: Unit Tests (Python ${{ matrix.python-version }}) + runs-on: ubuntu-latest + strategy: + matrix: + python-version: ["3.10", "3.11", "3.12", "3.13", "3.14"] + + steps: + - name: Checkout code + uses: actions/checkout@v6 + + - name: Setup uv and Python + uses: astral-sh/setup-uv@v7 + with: + enable-cache: true + cache-dependency-glob: "uv.lock" + + - name: Install dependencies + run: uv sync --python ${{ matrix.python-version }} --extra test + + - name: Run migration unit tests + run: uv run pytest tests/unittests/sessions/migration/ -v + + integration-tests-postgres: + name: Integration (PostgreSQL ${{ matrix.postgres-version }}, Python ${{ matrix.python-version }}) + runs-on: ubuntu-latest + strategy: + fail-fast: false + matrix: + postgres-version: ["15", "16", "17"] + python-version: ["3.11", "3.12", "3.13", "3.14"] + + services: + postgres: + image: postgres:${{ matrix.postgres-version }} + env: + POSTGRES_USER: testuser + POSTGRES_PASSWORD: testpass + POSTGRES_DB: test_adk + options: >- + --health-cmd pg_isready + --health-interval 10s + --health-timeout 5s + --health-retries 5 + ports: + - 5432:5432 + + steps: + - name: Checkout code + uses: actions/checkout@v6 + + - name: Setup uv and Python + uses: astral-sh/setup-uv@v7 + with: + enable-cache: true + cache-dependency-glob: "uv.lock" + + - name: Install dependencies + run: | + uv sync --python ${{ matrix.python-version }} --extra test + uv pip install psycopg2-binary + + - name: Run integration tests (PostgreSQL) + env: + TEST_POSTGRES_URL: postgresql://testuser:testpass@localhost:5432/test_adk + run: uv run pytest tests/integration/sessions/test_database_migration_integration.py -v + + integration-tests-mysql: + name: Integration (MySQL ${{ matrix.mysql-version }}, Python ${{ matrix.python-version }}) + runs-on: ubuntu-latest + strategy: + fail-fast: false + matrix: + mysql-version: ["8.0", "8.4", "9.2"] + python-version: ["3.11", "3.12", "3.13", "3.14"] + + services: + mysql: + image: mysql:${{ matrix.mysql-version }} + env: + MYSQL_ROOT_PASSWORD: testpass + MYSQL_DATABASE: test_adk + MYSQL_USER: testuser + MYSQL_PASSWORD: testpass + options: >- + --health-cmd "mysqladmin ping" + --health-interval 10s + --health-timeout 5s + --health-retries 5 + ports: + - 3306:3306 + + steps: + - name: Checkout code + uses: actions/checkout@v6 + + - name: Setup uv and Python + uses: astral-sh/setup-uv@v7 + with: + enable-cache: true + cache-dependency-glob: "uv.lock" + + - name: Install dependencies + run: | + uv sync --python ${{ matrix.python-version }} --extra test + uv pip install mysqlclient + + - name: Run integration tests (MySQL) + env: + TEST_MYSQL_URL: mysql://testuser:testpass@localhost:3306/test_adk + run: uv run pytest tests/integration/sessions/test_database_migration_integration.py -v \ No newline at end of file diff --git a/docs/helm_migration_guide.md b/docs/helm_migration_guide.md new file mode 100644 index 0000000000..ab81525cd2 --- /dev/null +++ b/docs/helm_migration_guide.md @@ -0,0 +1,128 @@ +# Database Migrations on Kubernetes + +In Kubernetes deployments, run database migrations before application pods start +using Helm `pre-install` and `pre-upgrade` hooks. This ensures all pods see the +same schema and avoids race conditions from concurrent migration attempts. + +Disable `ADK_AUTO_MIGRATE_DB` in your application pods when using this approach. + +## Migration Job + +Add a Job template to your Helm chart that runs `adk migrate upgrade` as a +pre-install and pre-upgrade hook: + +```yaml +apiVersion: batch/v1 +kind: Job +metadata: + name: {{ .Release.Name }}-db-migration + annotations: + "helm.sh/hook": pre-install,pre-upgrade + "helm.sh/hook-weight": "-5" + "helm.sh/hook-delete-policy": before-hook-creation +spec: + ttlSecondsAfterFinished: 86400 # Auto-cleanup after 24h + backoffLimit: 3 + activeDeadlineSeconds: 300 # Overall job timeout (5 min) + template: + spec: + restartPolicy: Never + serviceAccountName: {{ .Values.serviceAccountName }} + containers: + - name: migration + image: "{{ .Values.image.repository }}:{{ .Values.image.tag }}" + imagePullPolicy: {{ .Values.image.pullPolicy }} + command: ["adk", "migrate", "upgrade", "--db_url", "$(DATABASE_URL)"] + env: + - name: DATABASE_URL + valueFrom: + secretKeyRef: + name: {{ .Values.database.secretName }} + key: url + securityContext: + runAsNonRoot: true + allowPrivilegeEscalation: false + readOnlyRootFilesystem: false + capabilities: + drop: + - ALL +``` + +Key fields: + +- **`activeDeadlineSeconds`**: Overall timeout for the Job. If the migration + hasn't finished within this window the Job is terminated. Adjust based on + your expected migration duration. +- **`ttlSecondsAfterFinished`**: Automatically deletes completed Job resources + after 24 hours to avoid clutter. +- **`backoffLimit`**: Number of retries before the Job is marked as failed. +- **`securityContext`**: Follows least-privilege: non-root user, no privilege + escalation, all Linux capabilities dropped. + +The `pre-install,pre-upgrade` annotations ensure the Job runs before any +application pods are created or updated. `helm.sh/hook-delete-policy: +before-hook-creation` cleans up the previous Job before creating a new one on +subsequent upgrades. + +The `adk migrate upgrade` command auto-bootstraps databases that predate Alembic +support, so this Job handles both fresh deployments and upgrades from earlier ADK +versions. + +## Application Configuration + +In your application Deployment, disable auto-migration since the Helm hook +handles it. `false` is the default, so this is optional, but you may to set it for explicitness: + +```yaml +env: +- name: ADK_AUTO_MIGRATE_DB + value: "false" # default +``` + +## Cloud SQL Proxy + +If your database is a Cloud SQL instance on GKE, add the +[Cloud SQL Auth Proxy](https://cloud.google.com/sql/docs/postgres/connect-kubernetes-engine) +as a +[native sidecar](https://kubernetes.io/docs/concepts/workloads/pods/sidecar-containers/) +init container (`restartPolicy: Always`). Kubernetes starts it before the +migration container, keeps it running alongside, and terminates it automatically +when the migration exits: + +```yaml +initContainers: +- name: cloud-sql-proxy + image: gcr.io/cloud-sql-connectors/cloud-sql-proxy:latest + restartPolicy: Always # Native sidecar (K8s 1.28+) + args: + - "--structured-logs" + - "--port=5432" + - "{{ .Values.database.instanceConnectionName }}" + securityContext: + runAsNonRoot: true +containers: +- name: migration + image: "{{ .Values.image.repository }}:{{ .Values.image.tag }}" + command: ["adk", "migrate", "upgrade", "--db_url", + "postgresql://$(DB_USER):$(DB_PASS)@127.0.0.1:5432/$(DB_NAME)"] + env: + - name: DB_USER + valueFrom: + secretKeyRef: + name: {{ .Values.database.secretName }} + key: username + - name: DB_PASS + valueFrom: + secretKeyRef: + name: {{ .Values.database.secretName }} + key: password + - name: DB_NAME + valueFrom: + secretKeyRef: + name: {{ .Values.database.secretName }} + key: database +``` + +Refer to the +[GKE Cloud SQL connectivity documentation](https://cloud.google.com/sql/docs/postgres/connect-kubernetes-engine) +for Workload Identity and IAM setup. \ No newline at end of file diff --git a/docs/migration_guide.md b/docs/migration_guide.md new file mode 100644 index 0000000000..555ab1fb19 --- /dev/null +++ b/docs/migration_guide.md @@ -0,0 +1,97 @@ +# Database Migration Guide + +ADK uses [Alembic](https://alembic.sqlalchemy.org/) to manage database schema +changes for `DatabaseSessionService`. When you upgrade ADK to a version that +includes schema changes, Alembic applies the necessary migrations to bring your +database up to date. + +Migrations can run automatically on application startup or manually via the ADK +CLI. + +## Configuration + +| Variable | Default | Description | +|----------|---------|-------------| +| `ADK_AUTO_MIGRATE_DB` | `false` | Enable automatic migration on startup | + +### Auto-migration (development and simple deployments) + +Set the environment variable before starting your application: + +```bash +export ADK_AUTO_MIGRATE_DB=true +``` + +When enabled, `DatabaseSessionService` automatically detects the database schema +version and applies any pending migrations during initialization. This includes +bootstrapping Alembic tracking for databases that predate Alembic support. + +### Manual migration (production) + +Run migrations explicitly using the CLI before deploying your application: + +```bash +adk migrate upgrade --db_url "postgresql://user:pass@host/db" +``` + +### Kubernetes deployments + +For Kubernetes, use Helm hooks to run migrations before application pods start. +See the [Helm Migration Guide](helm_migration_guide.md). + +## Upgrading from Earlier ADK Versions + +### Existing databases without Alembic tracking + +ADK versions up to and including 1.24.0 do not use Alembic for schema tracking. +If you are upgrading from any of these versions, the migration system +automatically detects your database schema version and bootstraps Alembic +tracking. + +**Using the CLI:** + +```bash +adk migrate upgrade --db_url "postgresql://user:pass@host/db" +``` + +This command auto-bootstraps: it detects the current schema version, performs any +necessary data migration (e.g., V0 pickle-to-JSON conversion), stamps the +Alembic baseline revision, and applies any pending migrations. + +**Using auto-migration:** + +When `ADK_AUTO_MIGRATE_DB=true`, `DatabaseSessionService` handles bootstrapping +transparently on startup, including V0-to-V1 migration. + +### New databases + +No action needed. `DatabaseSessionService` creates tables using the latest +schema and stamps the Alembic baseline automatically. + +### Legacy copy-based migration + +The existing copy-based migration command remains available: + +```bash +adk migrate session \ + --source_db_url "postgresql://localhost:5432/v0" \ + --dest_db_url "postgresql://localhost:5432/v1" +``` + +This copies data from a source database to a destination database, converting +the schema in the process. It is an alternative to the in-place migration +performed by `adk migrate upgrade`. + +## CLI Reference + +| Command | Description | +|---------|-------------| +| `adk migrate upgrade --db_url URL` | Apply pending migrations (auto-bootstraps existing databases) | +| `adk migrate downgrade --db_url URL --revision "-1"` | Rollback one migration step | +| `adk migrate check --db_url URL` | Check if migrations are pending (exit 0 = up-to-date, exit 1 = pending) | +| `adk migrate stamp --db_url URL` | Bootstrap Alembic tracking for an existing database | +| `adk migrate generate --db_url URL --message MSG` | Generate a new migration script (contributors) | +| `adk migrate session --source_db_url URL --dest_db_url URL` | Legacy copy-based migration | + +All commands accept an optional `--log_level` flag (`DEBUG`, `INFO`, `WARNING`, +`ERROR`, `CRITICAL`). The default is `INFO`. diff --git a/pyproject.toml b/pyproject.toml index da05cfcee9..79b9ddf746 100644 --- a/pyproject.toml +++ b/pyproject.toml @@ -27,6 +27,7 @@ dependencies = [ # go/keep-sorted start "PyYAML>=6.0.2, <7.0.0", # For APIHubToolset. "aiosqlite>=0.21.0", # For SQLite database + "alembic>=1.18.3, <2.0.0", # For database migrations "anyio>=4.9.0, <5.0.0", # For MCP Session Manager "authlib>=1.6.6, <2.0.0", # For RestAPI Tool "click>=8.1.8, <9.0.0", # For CLI tools diff --git a/src/google/adk/cli/cli_tools_click.py b/src/google/adk/cli/cli_tools_click.py index e4fb70c70c..762d1746f7 100644 --- a/src/google/adk/cli/cli_tools_click.py +++ b/src/google/adk/cli/cli_tools_click.py @@ -1728,6 +1728,221 @@ def cli_migrate_session( click.secho(f"Migration failed: {e}", fg="red", err=True) +@migrate.command("upgrade", cls=HelpfulCommand) +@click.option( + "--db_url", + required=True, + help=( + "SQLAlchemy URL of the database to migrate, e.g." + " postgresql://user:pass@host/db." + ), +) +@click.option( + "--log_level", + type=LOG_LEVELS, + default="INFO", + help="Optional. Set the logging level.", +) +def cli_migrate_upgrade(*, db_url: str, log_level: str): + """Runs pending Alembic migrations to bring the database to the latest schema.""" + logs.setup_adk_logger(getattr(logging, log_level.upper())) + try: + from sqlalchemy import create_engine + from sqlalchemy import inspect as sa_inspect + + from ..sessions.alembic_runner import AlembicMigrationRunner + from ..sessions.migration._schema_check_utils import is_alembic_managed + from ..sessions.migration._schema_check_utils import to_sync_url + + runner = AlembicMigrationRunner(db_url) + + # Handle existing databases that have ADK tables but no + # Alembic tracking (alembic_version table). + engine = create_engine(to_sync_url(db_url)) + try: + with engine.connect() as conn: + inspector = sa_inspect(conn) + has_adk_tables = inspector.has_table("sessions") + alembic_managed = is_alembic_managed(inspector, conn) + finally: + engine.dispose() + + if has_adk_tables and not alembic_managed: + click.secho( + "Existing ADK database detected without Alembic" + " tracking. Bootstrapping...", + fg="yellow", + ) + runner.bootstrap_existing_database() + + if runner.check_needs_migration(): + runner.run_migrations() + click.secho("Migration completed successfully.", fg="green") + else: + click.secho("Database is already up-to-date.", fg="green") + except Exception as e: + click.secho(f"Migration failed: {e}", fg="red", err=True) + raise SystemExit(1) + + +@migrate.command("downgrade", cls=HelpfulCommand) +@click.option( + "--db_url", + required=True, + help=( + "SQLAlchemy URL of the database to downgrade, e.g." + " postgresql://user:pass@host/db." + ), +) +@click.option( + "--revision", + default="-1", + help="Alembic revision target. Use '-1' to roll back one step.", +) +@click.option( + "--log_level", + type=LOG_LEVELS, + default="INFO", + help="Optional. Set the logging level.", +) +def cli_migrate_downgrade(*, db_url: str, revision: str, log_level: str): + """Rolls back Alembic migrations by the given revision spec.""" + logs.setup_adk_logger(getattr(logging, log_level.upper())) + try: + from ..sessions.alembic_runner import AlembicMigrationRunner + + runner = AlembicMigrationRunner(db_url) + runner.downgrade(revision) + click.secho( + f"Downgrade to '{revision}' completed successfully.", + fg="green", + ) + except Exception as e: + click.secho(f"Downgrade failed: {e}", fg="red", err=True) + raise SystemExit(1) + + +@migrate.command("check", cls=HelpfulCommand) +@click.option( + "--db_url", + required=True, + help=( + "SQLAlchemy URL of the database to check, e.g." + " postgresql://user:pass@host/db." + ), +) +@click.option( + "--log_level", + type=LOG_LEVELS, + default="INFO", + help="Optional. Set the logging level.", +) +def cli_migrate_check(*, db_url: str, log_level: str): + """Checks if the database has pending migrations. + + Exit code 0 means up-to-date, exit code 1 means migrations are pending. + """ + logs.setup_adk_logger(getattr(logging, log_level.upper())) + try: + from ..sessions.alembic_runner import AlembicMigrationRunner + + runner = AlembicMigrationRunner(db_url) + if runner.check_needs_migration(): + current = runner.get_current_revision() + click.secho( + f"Migrations pending (current revision: {current}).", + fg="yellow", + ) + raise SystemExit(1) + else: + click.secho("Database is up-to-date.", fg="green") + except SystemExit: + raise + except Exception as e: + click.secho(f"Check failed: {e}", fg="red", err=True) + raise SystemExit(1) + + +@migrate.command("stamp", cls=HelpfulCommand) +@click.option( + "--db_url", + required=True, + help=( + "SQLAlchemy URL of the database to stamp, e.g." + " postgresql://user:pass@host/db." + ), +) +@click.option( + "--log_level", + type=LOG_LEVELS, + default="INFO", + help="Optional. Set the logging level.", +) +def cli_migrate_stamp(*, db_url: str, log_level: str): + """Bootstraps Alembic tracking for an existing database. + + Detects the current ADK schema version and stamps the appropriate + Alembic baseline revision without running any migrations. + """ + logs.setup_adk_logger(getattr(logging, log_level.upper())) + try: + from ..sessions.alembic_runner import AlembicMigrationRunner + + runner = AlembicMigrationRunner(db_url) + runner.bootstrap_existing_database() + click.secho("Alembic tracking bootstrapped successfully.", fg="green") + except Exception as e: + click.secho(f"Stamp failed: {e}", fg="red", err=True) + raise SystemExit(1) + + +@migrate.command("generate", cls=HelpfulCommand) +@click.option( + "--db_url", + required=True, + help=( + "SQLAlchemy URL of the database to compare against, e.g." + " postgresql://user:pass@host/db." + ), +) +@click.option( + "--message", + required=True, + help="Short description for the migration (used in filename).", +) +@click.option( + "--no_autogenerate", + is_flag=True, + default=False, + help="Create an empty template instead of auto-detecting changes.", +) +@click.option( + "--log_level", + type=LOG_LEVELS, + default="INFO", + help="Optional. Set the logging level.", +) +def cli_migrate_generate( + *, db_url: str, message: str, no_autogenerate: bool, log_level: str +): + """Generates a new Alembic migration script. + + Compares the current SQLAlchemy models against the database and + produces an upgrade/downgrade migration script in + sessions/migration/versions/. + """ + logs.setup_adk_logger(getattr(logging, log_level.upper())) + try: + from ..sessions.alembic_runner import AlembicMigrationRunner + + runner = AlembicMigrationRunner(db_url) + path = runner.generate_revision(message, autogenerate=not no_autogenerate) + click.secho(f"Generated migration: {path}", fg="green") + except Exception as e: + click.secho(f"Generate failed: {e}", fg="red", err=True) + raise SystemExit(1) + + @deploy.command("agent_engine") @click.option( "--api_key", diff --git a/src/google/adk/sessions/alembic_runner.py b/src/google/adk/sessions/alembic_runner.py new file mode 100644 index 0000000000..c41586fe08 --- /dev/null +++ b/src/google/adk/sessions/alembic_runner.py @@ -0,0 +1,474 @@ +# Copyright 2026 Google LLC +# +# Licensed under the Apache License, Version 2.0 (the "License"); +# you may not use this file except in compliance with the License. +# You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. + +"""Alembic migration runner for ADK database schema management. + +Provides programmatic access to Alembic migrations without requiring +an alembic.ini file. Migrations are bundled inside the package and +located relative to the source tree. +""" + +from __future__ import annotations + +from datetime import timezone +import json +import logging +import os +import pathlib +import pickle +from typing import Optional + +from alembic import command as alembic_command +from alembic.config import Config as AlembicConfig +from alembic.runtime.migration import MigrationContext +from alembic.script import ScriptDirectory +from sqlalchemy import create_engine +from sqlalchemy import inspect as sa_inspect +from sqlalchemy import text + +from .migration._schema_check_utils import get_db_schema_version_from_connection +from .migration._schema_check_utils import LATEST_SCHEMA_VERSION +from .migration._schema_check_utils import SCHEMA_VERSION_0_PICKLE +from .migration._schema_check_utils import SCHEMA_VERSION_1_JSON +from .migration._schema_check_utils import to_sync_url + +logger = logging.getLogger("google_adk." + __name__) + +_MIGRATION_DIR = pathlib.Path(__file__).parent / "migration" +_VERSIONS_DIR = _MIGRATION_DIR / "versions" + +# Revision IDs matching the migration scripts in versions/ +_BASELINE_V1_REVISION = "001_baseline_v1" + + +class AlembicMigrationRunner: + """Runs Alembic migrations programmatically. + + This class wraps Alembic's command API to provide a simple interface + for checking, running, and rolling back database migrations. It uses + synchronous SQLAlchemy engines because Alembic's migration machinery + is synchronous. + + Attributes: + db_url: The database URL (async drivers are converted to sync). + """ + + def __init__( + self, + db_url: str, + log: Optional[logging.Logger] = None, + ) -> None: + self._db_url = to_sync_url(db_url) + self._log = log or logger + + def _make_alembic_config(self) -> AlembicConfig: + """Build a programmatic Alembic Config pointing at our migrations.""" + cfg = AlembicConfig() + cfg.set_main_option("script_location", str(_MIGRATION_DIR)) + cfg.set_main_option("sqlalchemy.url", self._db_url) + # version_locations lets Alembic find our migration scripts + cfg.set_main_option("version_locations", str(_VERSIONS_DIR)) + # Use OS path separator to avoid deprecation warning + cfg.set_main_option("path_separator", "os") + return cfg + + def _get_script_directory(self) -> ScriptDirectory: + """Return the Alembic ScriptDirectory for our migrations.""" + return ScriptDirectory.from_config(self._make_alembic_config()) + + def _get_head_revision(self) -> Optional[str]: + """Return the head revision ID from the migration scripts.""" + script = self._get_script_directory() + head = script.get_current_head() + return head + + def get_current_revision(self) -> Optional[str]: + """Return the current Alembic revision stamped in the database. + + Returns None if alembic_version table does not exist or is empty. + """ + engine = create_engine(self._db_url) + try: + with engine.connect() as conn: + context = MigrationContext.configure(conn) + return context.get_current_revision() + finally: + engine.dispose() + + def check_needs_migration(self) -> bool: + """Check whether the database needs migration. + + Returns True if the current revision differs from head, or if + the database has no alembic_version table yet. + """ + head = self._get_head_revision() + current = self.get_current_revision() + self._log.debug("Migration check: current=%s, head=%s", current, head) + return current != head + + def run_migrations(self) -> None: + """Run all pending migrations up to head. + + Raises: + Exception: If migration execution fails. + """ + cfg = self._make_alembic_config() + engine = create_engine(self._db_url) + try: + with engine.begin() as conn: + cfg.attributes["connection"] = conn + alembic_command.upgrade(cfg, "head") + self._log.info("Migrations completed successfully.") + finally: + engine.dispose() + + def downgrade(self, revision: str = "-1") -> None: + """Downgrade the database by the given revision spec. + + Args: + revision: Alembic revision target. Use "-1" to roll back one + step, or a specific revision ID. + + Raises: + Exception: If downgrade execution fails. + """ + cfg = self._make_alembic_config() + engine = create_engine(self._db_url) + try: + with engine.begin() as conn: + cfg.attributes["connection"] = conn + alembic_command.downgrade(cfg, revision) + self._log.info("Downgrade to '%s' completed successfully.", revision) + finally: + engine.dispose() + + def stamp(self, revision: str) -> None: + """Stamp the database with a specific revision without running migrations. + + This is used to bootstrap Alembic for existing databases that + already have the schema but no alembic_version tracking. + + Args: + revision: The revision ID to stamp. + """ + cfg = self._make_alembic_config() + engine = create_engine(self._db_url) + try: + with engine.begin() as conn: + cfg.attributes["connection"] = conn + alembic_command.stamp(cfg, revision) + self._log.info("Database stamped with revision '%s'.", revision) + finally: + engine.dispose() + + def generate_revision( + self, + message: str, + *, + autogenerate: bool = True, + output_dir: Optional[str] = None, + ) -> str: + """Generate a new Alembic migration script. + + Compares the current SQLAlchemy models (target metadata) against + the live database and produces an ``upgrade()``/``downgrade()`` + migration script. + + Args: + message: A short description used in the filename and docstring + (e.g. ``"add_session_tags"``). + autogenerate: If True (default), Alembic inspects the database + and auto-generates migration operations. If False, an empty + migration template is created. + output_dir: Optional directory to write the generated script + into. Defaults to ``sessions/migration/versions/``. + + Returns: + The path to the generated migration script. + + Raises: + Exception: If revision generation fails. + """ + cfg = self._make_alembic_config() + revision_kwargs = { + "message": message, + "autogenerate": autogenerate, + } + if output_dir is not None: + revision_kwargs["version_path"] = output_dir + # Register the output dir so Alembic accepts it as valid. + sep = os.pathsep + cfg.set_main_option( + "version_locations", + f"{output_dir}{sep}{_VERSIONS_DIR}", + ) + if autogenerate: + engine = create_engine(self._db_url) + try: + with engine.begin() as conn: + cfg.attributes["connection"] = conn + script = alembic_command.revision(cfg, **revision_kwargs) + finally: + engine.dispose() + else: + script = alembic_command.revision(cfg, **revision_kwargs) + path = script.path + self._log.info("Generated migration script: %s", path) + return path + + def bootstrap_existing_database(self) -> None: + """Bootstrap Alembic tracking for an existing database. + + Detects the current ADK schema version and handles it: + + - **V1 databases**: stamps with ``001_baseline_v1``. + - **V0 databases**: runs an in-place V0→V1 transformation + (pickle → JSON events, schema restructure), then stamps. + + The V0→V1 transformation is performed outside of Alembic's + migration chain so the chain stays linear. The existing + copy-based ``adk migrate session`` command remains available + as an alternative for users who prefer a two-database approach. + + Raises: + RuntimeError: If the database has an unrecognized schema + version. + """ + engine = create_engine(self._db_url) + try: + with engine.connect() as conn: + schema_version = get_db_schema_version_from_connection(conn) + finally: + engine.dispose() + + if schema_version == SCHEMA_VERSION_1_JSON: + self._log.info("Detected V1 schema. Stamping Alembic baseline.") + self.stamp(_BASELINE_V1_REVISION) + elif schema_version == SCHEMA_VERSION_0_PICKLE: + self._log.info( + "Detected V0 (pickle) schema. Running in-place V0→V1 migration." + ) + self._migrate_v0_to_v1_in_place() + self.stamp(_BASELINE_V1_REVISION) + self._log.info("V0→V1 migration complete. Alembic baseline stamped.") + elif schema_version == LATEST_SCHEMA_VERSION: + self._log.info("Database at latest schema version. Stamping baseline.") + self.stamp(_BASELINE_V1_REVISION) + else: + raise RuntimeError(f"Unrecognized schema version: {schema_version}") + + # ------------------------------------------------------------------ + # V0 → V1 in-place migration helpers + # ------------------------------------------------------------------ + + def _migrate_v0_to_v1_in_place(self) -> None: + """Transform a V0 database to V1 schema in-place. + + Steps: + 1. Create ``adk_internal_metadata`` table. + 2. Add ``event_data`` column to ``events``. + 3. Convert each V0 event row into a V1 JSON blob. + 4. Drop the V0-only columns. + 5. Set ``schema_version = '1'`` in metadata. + """ + engine = create_engine(self._db_url) + try: + with engine.begin() as conn: + dialect = engine.dialect.name + inspector = sa_inspect(conn) + + # 1. Create metadata table + if not inspector.has_table("adk_internal_metadata"): + conn.execute( + text( + "CREATE TABLE adk_internal_metadata (" + " key VARCHAR(128) NOT NULL PRIMARY KEY," + " value VARCHAR(256) NOT NULL" + ")" + ) + ) + + # 2. Add event_data column + event_cols = {c["name"] for c in inspector.get_columns("events")} + if "event_data" not in event_cols: + if dialect == "postgresql": + conn.execute(text("ALTER TABLE events ADD COLUMN event_data JSONB")) + elif dialect == "mysql": + conn.execute( + text("ALTER TABLE events ADD COLUMN event_data LONGTEXT") + ) + else: + conn.execute(text("ALTER TABLE events ADD COLUMN event_data TEXT")) + + # 3. Migrate each event row + rows = conn.execute(text("SELECT * FROM events")) + for row in rows: + event_data = self._v0_row_to_event_data(row._mapping) + conn.execute( + text( + "UPDATE events SET event_data = :data" + " WHERE id = :id AND app_name = :app" + " AND user_id = :user AND session_id = :sid" + ), + { + "data": json.dumps(event_data), + "id": row._mapping["id"], + "app": row._mapping["app_name"], + "user": row._mapping["user_id"], + "sid": row._mapping["session_id"], + }, + ) + + # 4. Drop V0-only columns + v0_only = [ + "author", + "actions", + "long_running_tool_ids_json", + "branch", + "content", + "grounding_metadata", + "custom_metadata", + "usage_metadata", + "citation_metadata", + "partial", + "turn_complete", + "error_code", + "error_message", + "interrupted", + "input_transcription", + "output_transcription", + ] + if dialect == "sqlite": + # SQLite requires table recreation for column drops. + # Build a new table with only V1 columns. + keep_cols = [c for c in event_cols if c not in v0_only] + keep_cols_with_data = sorted(set(keep_cols) | {"event_data"}) + cols_str = ", ".join(keep_cols_with_data) + conn.execute( + text( + f"CREATE TABLE events_v1_tmp AS SELECT {cols_str} FROM events" + ) + ) + conn.execute(text("DROP TABLE events")) + conn.execute(text("ALTER TABLE events_v1_tmp RENAME TO events")) + else: + for col in v0_only: + if col in event_cols: + conn.execute(text(f"ALTER TABLE events DROP COLUMN {col}")) + + # 5. Set schema version + conn.execute( + text( + "INSERT INTO adk_internal_metadata (key, value)" + " VALUES ('schema_version', '1')" + ) + ) + + self._log.info("In-place V0→V1 migration completed successfully.") + finally: + engine.dispose() + + @staticmethod + def _v0_row_to_event_data(row) -> dict: + """Convert a V0 event row mapping to a V1 event_data dict.""" + data = {} + data["id"] = row["id"] + data["invocation_id"] = row.get("invocation_id") or "" + data["author"] = row.get("author") or "agent" + + branch = row.get("branch") + if branch: + data["branch"] = branch + + # Unpickle actions + actions_raw = row.get("actions") + if actions_raw is not None: + try: + if isinstance(actions_raw, bytes): + obj = pickle.loads(actions_raw) # noqa: S301 + else: + obj = actions_raw + if hasattr(obj, "model_dump"): + actions_dict = obj.model_dump(mode="json", exclude_none=True) + elif isinstance(obj, dict): + actions_dict = obj + else: + actions_dict = {} + if actions_dict: + data["actions"] = actions_dict + except Exception as e: + logger.warning("Failed to unpickle actions: %s", e) + + # Timestamp + ts = row.get("timestamp") + if ts is not None: + if isinstance(ts, str): + from datetime import datetime + + try: + ts = datetime.strptime(ts, "%Y-%m-%d %H:%M:%S.%f") + except ValueError: + try: + ts = datetime.strptime(ts, "%Y-%m-%d %H:%M:%S") + except ValueError: + ts = None + if ts is not None and hasattr(ts, "replace"): + ts = ts.replace(tzinfo=timezone.utc) + data["timestamp"] = ts.timestamp() + + # long_running_tool_ids + lrt_json = row.get("long_running_tool_ids_json") + if lrt_json: + try: + ids = json.loads(lrt_json) + if ids: + data["long_running_tool_ids"] = ids + except json.JSONDecodeError: + pass + + # Scalar fields + for field in ("partial", "turn_complete", "interrupted"): + val = row.get(field) + if val is not None: + data[field] = val + + for field in ("error_code", "error_message"): + val = row.get(field) + if val: + data[field] = val + + # JSON fields + for field in ( + "content", + "grounding_metadata", + "custom_metadata", + "usage_metadata", + "citation_metadata", + "input_transcription", + "output_transcription", + ): + val = row.get(field) + if val is None: + continue + if isinstance(val, str): + try: + parsed = json.loads(val) + if parsed: + data[field] = parsed + except json.JSONDecodeError: + pass + elif isinstance(val, dict) and val: + data[field] = val + + return data diff --git a/src/google/adk/sessions/database_session_service.py b/src/google/adk/sessions/database_session_service.py index 18dd999a94..3c604eed77 100644 --- a/src/google/adk/sessions/database_session_service.py +++ b/src/google/adk/sessions/database_session_service.py @@ -19,6 +19,7 @@ from datetime import datetime from datetime import timezone import logging +import os from typing import Any from typing import AsyncIterator from typing import Optional @@ -178,6 +179,12 @@ async def _prepare_tables(self): This method is called lazily before each database operation. It checks the DB schema version to use and creates the tables (including setting the schema version metadata) if needed. + + Alembic integration (opt-in via ``ADK_AUTO_MIGRATE_DB=true``): + When enabled, pending Alembic migrations are applied automatically + before the application reads or writes data. When disabled (the + default), a ``RuntimeError`` is raised if the database is behind + to prevent silent schema mismatches. """ # Check the database schema version and set the _db_schema_version if # needed @@ -236,6 +243,73 @@ async def _prepare_tables(self): sql_session.add(metadata) await sql_session.commit() + # Stamp Alembic baseline for newly created databases so that + # future migrations can be tracked without a separate bootstrap + # step. + await self._ensure_alembic_tracking() + + async def _ensure_alembic_tracking(self) -> None: + """Stamp or run Alembic migrations as needed. + + This is called after table creation to ensure the database has + Alembic tracking. Behavior depends on the database state: + + - **V0 or V1 databases without Alembic tracking**: bootstrapped + automatically. V0 databases are migrated to V1 in-place + (pickle → JSON events) and then stamped with the Alembic + baseline revision. + - **Databases with pending Alembic migrations**: + + - If ``ADK_AUTO_MIGRATE_DB=true``: runs migrations automatically. + - Otherwise: raises ``RuntimeError`` directing the user to run + ``adk migrate upgrade``. + + - **Fresh or up-to-date databases**: stamps the baseline revision + (no-op if already tracked). + """ + try: + from .alembic_runner import AlembicMigrationRunner + except ImportError: + logger.debug("Alembic not installed; skipping migration tracking.") + return + + db_url = str(self.db_engine.url) + runner = AlembicMigrationRunner(db_url) + + if not _schema_check_utils.is_alembic_managed_from_url(db_url): + # No alembic_version table — bootstrap it. + # For V0 databases this performs an in-place migration to V1. + was_v0 = ( + self._db_schema_version == _schema_check_utils.SCHEMA_VERSION_0_PICKLE + ) + runner.bootstrap_existing_database() + + if was_v0: + # The database has been migrated from V0 to V1 in-place. + # Refresh the cached schema version so the ORM uses V1 + # models for subsequent operations. + self._db_schema_version = _schema_check_utils.LATEST_SCHEMA_VERSION + logger.info( + "V0 database migrated to V1 and Alembic tracking established." + ) + return + + if not runner.check_needs_migration(): + return + + auto_migrate = os.getenv("ADK_AUTO_MIGRATE_DB", "false").lower() == "true" + + if auto_migrate: + logger.info("ADK_AUTO_MIGRATE_DB is set. Running migrations...") + runner.run_migrations() + else: + raise RuntimeError( + "Database schema is behind the current ADK version." + " Run 'adk migrate upgrade --db_url ' to apply" + " pending migrations, or set the ADK_AUTO_MIGRATE_DB=true" + " environment variable to enable automatic migration." + ) + @override async def create_session( self, diff --git a/src/google/adk/sessions/migration/README.md b/src/google/adk/sessions/migration/README.md index 77fb5fbebd..fefc7357f9 100644 --- a/src/google/adk/sessions/migration/README.md +++ b/src/google/adk/sessions/migration/README.md @@ -1,10 +1,154 @@ -# Process for Adding a New Schema Version +# Database Schema Migrations for Contributors -This document outlines the steps required to introduce a new database schema -version for `DatabaseSessionService`. Let's assume you are introducing schema -version `2.0`, migrating from `1.0`. +This document describes how to add new database schema versions for +`DatabaseSessionService` using Alembic. -## 1. Update SQLAlchemy Models +For user-facing migration documentation, see the +[Migration Guide](../../../../docs/migration_guide.md). For design rationale, +see the [Alembic Adoption RFC](../../../../docs/rfcs/alembic-adoption.md). + +## Overview + +ADK uses [Alembic](https://alembic.sqlalchemy.org/) with a strictly linear +revision chain. The chain starts at `001_baseline_v1` and each new migration +appends to it. Migration scripts live in `sessions/migration/versions/` and +ship with the `google-adk` package. + +ADK maintains two version tracking mechanisms that migration scripts must keep +in sync: + +- `alembic_version` table — Alembic's revision tracking. +- `adk_internal_metadata.schema_version` — ADK's higher-level compatibility + layer used by `DatabaseSessionService` for schema detection. + +## Adding a New Schema Version + +The following steps assume you are adding schema version `2`, migrating from +`1`. (Existing versions use bare integers: `"0"`, `"1"`.) + +### 1. Update SQLAlchemy Models + +Fork from the latest schema version in `google/adk/sessions/schemas/` and +modify the SQLAlchemy model classes (`StorageSession`, `StorageEvent`, +`StorageAppState`, `StorageUserState`, `StorageMetadata`) to reflect the new +schema. Call the new file `v2.py`. + +Changes might include adding new `mapped_column` definitions, changing types, +or adding new classes for new tables. + +### 2. Generate the Migration + +```bash +adk migrate generate --db_url "sqlite:///dev.db" --message "add_session_tags" +``` + +This uses Alembic's autogenerate to compare the SQLAlchemy models against the +database and produce `upgrade()` and `downgrade()` functions. The output file +is placed in `sessions/migration/versions/`. + +To create an empty template instead of auto-detecting changes, add +`--no_autogenerate`. + +**Autogenerate limitations**: Alembic cannot detect column renames (appears as +drop + add), data type changes requiring transformation, or some index/constraint +changes. Always review the generated code. + +### 3. Review and Customize + +- Verify both `upgrade()` and `downgrade()` functions are correct. +- Add data migration logic if needed (e.g., backfilling new columns). +- Update `adk_internal_metadata.schema_version` in `upgrade()`: + + ```python + op.execute( + "UPDATE adk_internal_metadata SET value = '2' " + "WHERE key = 'schema_version'" + ) + ``` + +- Add metadata to the migration file header: + - Database schema version (e.g., `2`) + - Compatible ADK versions (e.g., `>=1.26.0`) + - Rollback notes (any downgrade limitations) + +### 4. Update Schema Version Constants + +Add the new version and update `LATEST_SCHEMA_VERSION` in +`google/adk/sessions/migration/_schema_check_utils.py`: + +```python +SCHEMA_VERSION_2 = "2" +LATEST_SCHEMA_VERSION = SCHEMA_VERSION_2 +``` + +### 5. Update `DatabaseSessionService` Business Logic + +If the schema change affects how data is read or written during normal +operation (e.g., a new column that needs to be populated on session creation), +update the methods in `DatabaseSessionService` (`create_session`, +`get_session`, `append_event`, etc.) accordingly. + +`DatabaseSessionService` is designed to be backward-compatible with the +previous schema for at least 2 releases. It detects the current database schema +and branches based on `_db_schema_version`. Modify `_prepare_tables` and the +CRUD methods to support both the old and new schema. + +### 6. Test + +Run unit tests: + +```bash +pytest tests/unittests/sessions/migration/ +``` + +Run integration tests against real databases: + +```bash +docker compose -f tests/integration/sessions/docker-compose.yml up -d +TEST_POSTGRES_URL="postgresql://testuser:testpass@localhost:5432/test_adk" \ + pytest tests/integration/sessions/ +``` + +Test the full cycle: upgrade, downgrade, upgrade again. + +### 7. Commit + +Use conventional commit format: + +``` +feat(migration): add session tags column +``` + +No CLI changes are needed — the `adk migrate upgrade` command automatically +discovers and applies new migrations. + +### 8. Deprecate the Previous Schema + +After at least 2 releases, remove backward-compatibility logic for the +previous schema. Only use the latest schema in `DatabaseSessionService` and +raise an exception if detecting legacy schema versions. Keep old schema files +(`schemas/v1.py`) and migration scripts for reference. + +## Migration Reversibility Policy + +- All migrations must implement `downgrade()`, even if limited. +- Non-reversible migrations (dropping columns, destructive data + transformations) must be tied to a **MAJOR** release of the ADK SDK. +- Document any downgrade limitations in the migration file header. + +--- + +## Legacy Manual Workflow (Deprecated) + +> **Deprecated**: The manual copy-based migration workflow below is deprecated +> and will be removed in a future major release. Use the Alembic workflow above +> for all new schema versions. + +The following steps describe the legacy process for adding a new schema version +without Alembic. This workflow uses the `adk migrate session` copy-based +migration command. + +### 1. Update SQLAlchemy Models Fork from the latest schema version in `google/adk/sessions/schemas/` folder and modify the SQLAlchemy model classes (`StorageSession`, `StorageEvent`, @@ -12,7 +156,7 @@ modify the SQLAlchemy model classes (`StorageSession`, `StorageEvent`, `2.0` schema, call it `v2.py`. Changes might be adding new `mapped_column` definitions, changing types, or adding new classes for new tables. -## 2. Create a New Migration Script +### 2. Create a New Migration Script You need to create a script that migrates data from schema `1.0` to `2.0`. @@ -57,7 +201,7 @@ You need to create a script that migrates data from schema `1.0` to `2.0`. dest_session.commit() ``` -## 3. Update Schema Version Constant +### 3. Update Schema Version Constant You need to add the new version and update `LATEST_SCHEMA_VERSION` in `google/adk/sessions/migration/_schema_check_utils.py` to reflect the new version: @@ -70,7 +214,7 @@ LATEST_SCHEMA_VERSION = SCHEMA_VERSION_2_0 This will also update `LATEST_VERSION` in `migration_runner.py`, as it uses this constant. -## 4. Register the New Migration Script in Migration Runner +### 4. Register the New Migration Script in Migration Runner In `google/adk/sessions/migration/migration_runner.py`, import your new migration script and add it to the `MIGRATIONS` dictionary. This tells the @@ -95,7 +239,7 @@ MIGRATIONS = { } ``` -## 5. Update `DatabaseSessionService` Business Logic +### 5. Update `DatabaseSessionService` Business Logic If your schema change affects how data should be read or written during normal operation (e.g., you added a new column that needs to be populated on session @@ -112,7 +256,7 @@ DatabaseSessionService's methods (`create_session`, `get_session`, `append_event`, etc.) to branch based on the `_db_schema_version` variable accordingly. -## 6. CLI Command Changes +### 6. CLI Command Changes No changes are needed for the Click command definition in `cli_tools_click.py`. The `adk migrate session` command calls `migration_runner.upgrade()`, which will @@ -120,7 +264,7 @@ now automatically detect the source database version and apply the necessary migration steps (e.g., `0.1 -> 1.0 -> 2.0`, or `1.0 -> 2.0`) to reach `LATEST_VERSION`. -## 7. Deprecate the Previous Schema +### 7. Deprecate the Previous Schema After a few releases (at least 2), remove the logic for the previous schema. Only use the latest schema in the `DatabaseSessionService`, and raise an diff --git a/src/google/adk/sessions/migration/_schema_check_utils.py b/src/google/adk/sessions/migration/_schema_check_utils.py index a6bc8a546a..fd746c6d7c 100644 --- a/src/google/adk/sessions/migration/_schema_check_utils.py +++ b/src/google/adk/sessions/migration/_schema_check_utils.py @@ -16,6 +16,7 @@ from __future__ import annotations import logging +from typing import Optional from sqlalchemy import create_engine as create_sync_engine from sqlalchemy import inspect @@ -29,6 +30,42 @@ LATEST_SCHEMA_VERSION = SCHEMA_VERSION_1_JSON +def get_alembic_revision(inspector, connection) -> Optional[str]: + """Return the current Alembic revision from the database, or None. + + Args: + inspector: A SQLAlchemy inspector bound to the connection. + connection: An active SQLAlchemy connection. + + Returns: + The revision string if ``alembic_version`` exists and has a row, + ``None`` otherwise. + """ + if not inspector.has_table("alembic_version"): + return None + try: + row = connection.execute( + text("SELECT version_num FROM alembic_version") + ).fetchone() + return row[0] if row else None + except Exception as e: + logger.debug("Could not read alembic_version: %s", e) + return None + + +def is_alembic_managed(inspector, connection) -> bool: + """Return True if the database has Alembic tracking. + + A database is Alembic-managed when the ``alembic_version`` table + exists **and** contains a revision. + + Args: + inspector: A SQLAlchemy inspector bound to the connection. + connection: An active SQLAlchemy connection. + """ + return get_alembic_revision(inspector, connection) is not None + + def _get_schema_version_impl(inspector, connection) -> str: """Gets DB schema version using inspector and connection.""" if inspector.has_table("adk_internal_metadata"): @@ -61,12 +98,12 @@ def _get_schema_version_impl(inspector, connection) -> str: cols = {c["name"] for c in inspector.get_columns("events")} if "actions" in cols and "event_data" not in cols: logger.warning( - "The database is using the legacy v0 schema, which uses Pickle to" - " serialize event actions. The v0 schema will not be supported" - " going forward and will be deprecated in a few rollouts. Please" - " migrate to the v1 schema which uses JSON serialization for event" - " data. You can use `adk migrate session` command to migrate your" - " database." + "The database is using the legacy v0 schema, which uses" + " Pickle to serialize event actions. The v0 schema is" + " deprecated. Upgrade to the latest ADK version and run" + " 'adk migrate upgrade' to migrate automatically, or set" + " ADK_AUTO_MIGRATE_DB=true for automatic migration on" + " startup." ) return SCHEMA_VERSION_0_PICKLE except Exception as e: @@ -82,6 +119,36 @@ def get_db_schema_version_from_connection(connection) -> str: return _get_schema_version_impl(inspector, connection) +def is_alembic_managed_from_url(db_url: str) -> bool: + """Check whether the database at *db_url* is Alembic-managed. + + Opens a short-lived synchronous connection, checks for the + ``alembic_version`` table, and disposes the engine. + + Args: + db_url: The database URL (async drivers are converted to sync). + + Returns: + ``True`` if the database has an ``alembic_version`` table with a + revision, ``False`` otherwise. + """ + engine = None + try: + engine = create_sync_engine(to_sync_url(db_url)) + with engine.connect() as conn: + inspector = inspect(conn) + return is_alembic_managed(inspector, conn) + except Exception: + logger.debug( + "Could not check Alembic status for %s; assuming unmanaged.", + db_url, + ) + return False + finally: + if engine: + engine.dispose() + + def to_sync_url(db_url: str) -> str: """Removes '+driver' from SQLAlchemy URL. diff --git a/src/google/adk/sessions/migration/env.py b/src/google/adk/sessions/migration/env.py new file mode 100644 index 0000000000..179d0d1b5c --- /dev/null +++ b/src/google/adk/sessions/migration/env.py @@ -0,0 +1,79 @@ +# Copyright 2026 Google LLC +# +# Licensed under the Apache License, Version 2.0 (the "License"); +# you may not use this file except in compliance with the License. +# You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. + +"""Alembic environment configuration for ADK database migrations. + +This module is invoked by Alembic's migration machinery. It configures +the migration context with the target metadata (V1 schema) and handles +both offline (SQL generation) and online (live connection) modes. + +The connection is always provided programmatically by +AlembicMigrationRunner via config.attributes['connection']. +""" + +from __future__ import annotations + +from alembic import context +from google.adk.sessions.schemas.v1 import Base as BaseV1 + +config = context.config + +target_metadata = BaseV1.metadata + + +def run_migrations_offline() -> None: + """Run migrations in 'offline' mode (SQL script generation). + + In this mode, Alembic generates SQL statements without connecting + to a database. The URL must be set in the Alembic config. + """ + url = config.get_main_option("sqlalchemy.url") + context.configure( + url=url, + target_metadata=target_metadata, + literal_binds=True, + dialect_opts={"paramstyle": "named"}, + ) + + with context.begin_transaction(): + context.run_migrations() + + +def run_migrations_online() -> None: + """Run migrations in 'online' mode. + + Connection is provided by AlembicMigrationRunner via + config.attributes['connection']. + """ + connectable = config.attributes.get("connection") + + if connectable is None: + raise RuntimeError( + "No connection provided. Use AlembicMigrationRunner " + "to run migrations programmatically." + ) + + context.configure( + connection=connectable, + target_metadata=target_metadata, + ) + + with context.begin_transaction(): + context.run_migrations() + + +if context.is_offline_mode(): + run_migrations_offline() +else: + run_migrations_online() diff --git a/src/google/adk/sessions/migration/script.py.mako b/src/google/adk/sessions/migration/script.py.mako new file mode 100644 index 0000000000..f7d5249912 --- /dev/null +++ b/src/google/adk/sessions/migration/script.py.mako @@ -0,0 +1,45 @@ +# Copyright 2026 Google LLC +# +# Licensed under the Apache License, Version 2.0 (the "License"); +# you may not use this file except in compliance with the License. +# You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. + +"""${message} + +Revision ID: ${up_revision} +Revises: ${down_revision | comma,n} +Create Date: ${create_date} + +Database Schema Version: [FILL IN, e.g. v2] +Compatible ADK Versions: [FILL IN, e.g. >=1.25.0] + +Rollback Notes: + [FILL IN: Any special considerations for downgrade, or "Standard rollback"] +""" + +from __future__ import annotations + +import sqlalchemy as sa +from alembic import op +${imports if imports else ""} + +revision = ${repr(up_revision)} +down_revision = ${repr(down_revision)} +branch_labels = ${repr(branch_labels)} +depends_on = ${repr(depends_on)} + + +def upgrade() -> None: + ${upgrades if upgrades else "pass"} + + +def downgrade() -> None: + ${downgrades if downgrades else "pass"} \ No newline at end of file diff --git a/src/google/adk/sessions/migration/versions/001_baseline_v1_schema.py b/src/google/adk/sessions/migration/versions/001_baseline_v1_schema.py new file mode 100644 index 0000000000..da57779162 --- /dev/null +++ b/src/google/adk/sessions/migration/versions/001_baseline_v1_schema.py @@ -0,0 +1,135 @@ +# Copyright 2026 Google LLC +# +# Licensed under the Apache License, Version 2.0 (the "License"); +# you may not use this file except in compliance with the License. +# You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. + +"""Baseline V1 schema (JSON-based). + +Revision ID: 001_baseline_v1 +Revises: None +Create Date: 2026-02-05 + +Database Schema Version: v1 +Compatible ADK Versions: >=1.22.0 + +This migration creates the V1 database schema used by +DatabaseSessionService. It is the baseline for new deployments +and for existing V1 databases being bootstrapped into Alembic. +""" + +from __future__ import annotations + +from alembic import op +from google.adk.sessions.schemas.shared import DynamicJSON +from google.adk.sessions.schemas.shared import PreciseTimestamp +import sqlalchemy as sa + +revision = "001_baseline_v1" +down_revision = None +branch_labels = None +depends_on = None + + +def upgrade() -> None: + """Create V1 schema tables.""" + op.create_table( + "adk_internal_metadata", + sa.Column("key", sa.String(128), nullable=False), + sa.Column("value", sa.String(256), nullable=False), + sa.PrimaryKeyConstraint("key"), + ) + + op.create_table( + "sessions", + sa.Column("app_name", sa.String(128), nullable=False), + sa.Column("user_id", sa.String(128), nullable=False), + sa.Column("id", sa.String(128), nullable=False), + sa.Column("state", DynamicJSON(), nullable=True), + sa.Column( + "create_time", + PreciseTimestamp(), + nullable=False, + server_default=sa.func.now(), + ), + sa.Column( + "update_time", + PreciseTimestamp(), + nullable=False, + server_default=sa.func.now(), + ), + sa.PrimaryKeyConstraint("app_name", "user_id", "id"), + ) + + op.create_table( + "events", + sa.Column("id", sa.String(128), nullable=False), + sa.Column("app_name", sa.String(128), nullable=False), + sa.Column("user_id", sa.String(128), nullable=False), + sa.Column("session_id", sa.String(128), nullable=False), + sa.Column("invocation_id", sa.String(256), nullable=False), + sa.Column("event_data", DynamicJSON(), nullable=True), + sa.Column( + "timestamp", + PreciseTimestamp(), + nullable=False, + server_default=sa.func.now(), + ), + sa.PrimaryKeyConstraint("id", "app_name", "user_id", "session_id"), + sa.ForeignKeyConstraint( + ["app_name", "user_id", "session_id"], + ["sessions.app_name", "sessions.user_id", "sessions.id"], + ondelete="CASCADE", + ), + ) + + op.create_table( + "app_states", + sa.Column("app_name", sa.String(128), nullable=False), + sa.Column("state", DynamicJSON(), nullable=True), + sa.Column( + "update_time", + PreciseTimestamp(), + nullable=False, + server_default=sa.func.now(), + ), + sa.PrimaryKeyConstraint("app_name"), + ) + + op.create_table( + "user_states", + sa.Column("app_name", sa.String(128), nullable=False), + sa.Column("user_id", sa.String(128), nullable=False), + sa.Column("state", DynamicJSON(), nullable=True), + sa.Column( + "update_time", + PreciseTimestamp(), + nullable=False, + server_default=sa.func.now(), + ), + sa.PrimaryKeyConstraint("app_name", "user_id"), + ) + + op.execute( + sa.text( + "INSERT INTO adk_internal_metadata (key, value)" + " VALUES ('schema_version', '1')" + ) + ) + + +def downgrade() -> None: + """Drop all V1 schema tables.""" + op.drop_table("user_states") + op.drop_table("app_states") + op.drop_table("events") + op.drop_table("sessions") + op.drop_table("adk_internal_metadata") diff --git a/src/google/adk/sessions/migration/versions/__init__.py b/src/google/adk/sessions/migration/versions/__init__.py new file mode 100644 index 0000000000..58d482ea38 --- /dev/null +++ b/src/google/adk/sessions/migration/versions/__init__.py @@ -0,0 +1,13 @@ +# Copyright 2026 Google LLC +# +# Licensed under the Apache License, Version 2.0 (the "License"); +# you may not use this file except in compliance with the License. +# You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. diff --git a/tests/integration/sessions/__init__.py b/tests/integration/sessions/__init__.py new file mode 100644 index 0000000000..58d482ea38 --- /dev/null +++ b/tests/integration/sessions/__init__.py @@ -0,0 +1,13 @@ +# Copyright 2026 Google LLC +# +# Licensed under the Apache License, Version 2.0 (the "License"); +# you may not use this file except in compliance with the License. +# You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. diff --git a/tests/integration/sessions/docker-compose.yml b/tests/integration/sessions/docker-compose.yml new file mode 100644 index 0000000000..11059875fb --- /dev/null +++ b/tests/integration/sessions/docker-compose.yml @@ -0,0 +1,29 @@ +services: + postgres: + image: postgres:17 + environment: + POSTGRES_USER: testuser + POSTGRES_PASSWORD: testpass + POSTGRES_DB: test_adk + ports: + - "5432:5432" + healthcheck: + test: ["CMD-SHELL", "pg_isready -U testuser -d test_adk"] + interval: 5s + timeout: 5s + retries: 5 + + mysql: + image: mysql:8.4 + environment: + MYSQL_ROOT_PASSWORD: testpass + MYSQL_DATABASE: test_adk + MYSQL_USER: testuser + MYSQL_PASSWORD: testpass + ports: + - "3306:3306" + healthcheck: + test: ["CMD", "mysqladmin", "ping", "-h", "localhost"] + interval: 5s + timeout: 5s + retries: 5 \ No newline at end of file diff --git a/tests/integration/sessions/test_database_migration_integration.py b/tests/integration/sessions/test_database_migration_integration.py new file mode 100644 index 0000000000..196a8dc3ac --- /dev/null +++ b/tests/integration/sessions/test_database_migration_integration.py @@ -0,0 +1,353 @@ +# Copyright 2026 Google LLC +# +# Licensed under the Apache License, Version 2.0 (the "License"); +# you may not use this file except in compliance with the License. +# You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. + +"""Integration tests for Alembic database migrations. + +These tests run against real database engines (PostgreSQL, MySQL, SQLite) +to verify that migrations work correctly across dialects. + +Usage: + # SQLite only (no external dependencies) + pytest tests/integration/sessions/test_database_migration_integration.py + + # With PostgreSQL and MySQL (start containers first) + docker compose -f tests/integration/sessions/docker-compose.yml up -d + TEST_POSTGRES_URL=postgresql://testuser:testpass@localhost:5432/test_adk \ + TEST_MYSQL_URL=mysql://testuser:testpass@localhost:3306/test_adk \ + pytest tests/integration/sessions/test_database_migration_integration.py +""" + +import os + +from google.adk.sessions.alembic_runner import AlembicMigrationRunner +from google.adk.sessions.migration._schema_check_utils import SCHEMA_VERSION_1_JSON +import pytest +from sqlalchemy import create_engine +from sqlalchemy import inspect +from sqlalchemy import text + +# --------------------------------------------------------------------------- +# Fixtures +# --------------------------------------------------------------------------- + +_POSTGRES_URL = os.environ.get("TEST_POSTGRES_URL") +_MYSQL_URL = os.environ.get("TEST_MYSQL_URL") + + +def _sqlite_url(tmp_path, name="test.db"): + return f"sqlite:///{tmp_path / name}" + + +def _clean_database(db_url): + """Drop all tables so tests start from a clean state.""" + engine = create_engine(db_url) + try: + with engine.begin() as conn: + inspector = inspect(conn) + table_names = inspector.get_table_names() + if table_names: + # Disable FK checks for MySQL during cleanup + dialect = engine.dialect.name + if dialect == "mysql": + conn.execute(text("SET FOREIGN_KEY_CHECKS = 0")) + for table in table_names: + conn.execute(text(f"DROP TABLE IF EXISTS {table}")) + if dialect == "mysql": + conn.execute(text("SET FOREIGN_KEY_CHECKS = 1")) + finally: + engine.dispose() + + +@pytest.fixture( + params=[ + pytest.param("sqlite", id="sqlite"), + pytest.param( + "postgres", + id="postgres", + marks=pytest.mark.skipif( + _POSTGRES_URL is None, + reason="TEST_POSTGRES_URL not set", + ), + ), + pytest.param( + "mysql", + id="mysql", + marks=pytest.mark.skipif( + _MYSQL_URL is None, + reason="TEST_MYSQL_URL not set", + ), + ), + ] +) +def db_url(request, tmp_path): + """Provide a clean database URL for each test.""" + if request.param == "sqlite": + url = _sqlite_url(tmp_path) + elif request.param == "postgres": + url = _POSTGRES_URL + elif request.param == "mysql": + url = _MYSQL_URL + else: + raise ValueError(f"Unknown db param: {request.param}") + + # Clean before test + if request.param != "sqlite": + _clean_database(url) + + yield url + + # Clean after test + if request.param != "sqlite": + _clean_database(url) + + +@pytest.fixture +def runner(db_url): + return AlembicMigrationRunner(db_url) + + +# --------------------------------------------------------------------------- +# Tests +# --------------------------------------------------------------------------- + + +class TestUpgradeDowngradeUpgradeCycle: + """Verify the full upgrade -> downgrade -> upgrade cycle.""" + + def test_full_cycle(self, runner, db_url): + """Database survives upgrade -> downgrade -> upgrade.""" + # 1. Upgrade to head + runner.run_migrations() + assert runner.check_needs_migration() is False + assert runner.get_current_revision() == "001_baseline_v1" + + # 2. Downgrade to base + runner.downgrade("base") + assert runner.check_needs_migration() is True + assert runner.get_current_revision() is None + + # 3. Upgrade again + runner.run_migrations() + assert runner.check_needs_migration() is False + assert runner.get_current_revision() == "001_baseline_v1" + + def test_tables_correct_after_cycle(self, runner, db_url): + """All V1 tables exist and are correct after a full cycle.""" + runner.run_migrations() + runner.downgrade("base") + runner.run_migrations() + + engine = create_engine(db_url) + try: + with engine.connect() as conn: + inspector = inspect(conn) + tables = set(inspector.get_table_names()) + + assert "adk_internal_metadata" in tables + assert "sessions" in tables + assert "events" in tables + assert "app_states" in tables + assert "user_states" in tables + + # Verify events has V1 columns + event_cols = {c["name"] for c in inspector.get_columns("events")} + assert "event_data" in event_cols + assert "actions" not in event_cols + finally: + engine.dispose() + + +class TestUpgradeIdempotency: + """Verify running upgrade multiple times is safe.""" + + def test_double_upgrade(self, runner): + """Running upgrade twice should not fail or change state.""" + runner.run_migrations() + rev_after_first = runner.get_current_revision() + + runner.run_migrations() + rev_after_second = runner.get_current_revision() + + assert rev_after_first == rev_after_second + assert runner.check_needs_migration() is False + + def test_triple_upgrade(self, runner): + """Running upgrade three times should be equally safe.""" + for _ in range(3): + runner.run_migrations() + assert runner.check_needs_migration() is False + + +class TestSchemaVersionMetadata: + """Verify ADK schema_version metadata is set correctly.""" + + def test_schema_version_set_after_upgrade(self, runner, db_url): + """schema_version should be '1' after V1 migration.""" + runner.run_migrations() + + engine = create_engine(db_url) + try: + with engine.connect() as conn: + result = conn.execute( + text( + "SELECT value FROM adk_internal_metadata" + " WHERE key = 'schema_version'" + ) + ).scalar_one() + finally: + engine.dispose() + + assert result == SCHEMA_VERSION_1_JSON + + def test_schema_version_restored_after_cycle(self, runner, db_url): + """schema_version should be correct after upgrade/downgrade/upgrade.""" + runner.run_migrations() + runner.downgrade("base") + runner.run_migrations() + + engine = create_engine(db_url) + try: + with engine.connect() as conn: + result = conn.execute( + text( + "SELECT value FROM adk_internal_metadata" + " WHERE key = 'schema_version'" + ) + ).scalar_one() + finally: + engine.dispose() + + assert result == SCHEMA_VERSION_1_JSON + + +class TestDataPreservation: + """Verify data is preserved through migrations.""" + + def test_data_survives_idempotent_upgrade(self, runner, db_url): + """Data inserted after migration should survive a second upgrade.""" + runner.run_migrations() + + # Insert test data + engine = create_engine(db_url) + try: + with engine.begin() as conn: + conn.execute( + text( + "INSERT INTO sessions (app_name, user_id, id, create_time," + " update_time) VALUES (:app, :user, :sid, CURRENT_TIMESTAMP," + " CURRENT_TIMESTAMP)" + ), + {"app": "test_app", "user": "test_user", "sid": "session_1"}, + ) + finally: + engine.dispose() + + # Run upgrade again (should be no-op) + runner.run_migrations() + + # Verify data still exists + engine = create_engine(db_url) + try: + with engine.connect() as conn: + count = conn.execute(text("SELECT COUNT(*) FROM sessions")).scalar_one() + finally: + engine.dispose() + + assert count == 1 + + +class TestForeignKeyConstraints: + """Verify FK constraints work correctly across dialects.""" + + def test_cascade_delete(self, runner, db_url): + """Deleting a session should cascade-delete its events.""" + runner.run_migrations() + + engine = create_engine(db_url) + try: + with engine.begin() as conn: + # Enable FK enforcement for SQLite + if engine.dialect.name == "sqlite": + conn.execute(text("PRAGMA foreign_keys = ON")) + + conn.execute( + text( + "INSERT INTO sessions (app_name, user_id, id, create_time," + " update_time) VALUES (:app, :user, :sid," + " CURRENT_TIMESTAMP, CURRENT_TIMESTAMP)" + ), + {"app": "app1", "user": "user1", "sid": "s1"}, + ) + conn.execute( + text( + "INSERT INTO events (id, app_name, user_id, session_id," + " invocation_id, timestamp) VALUES (:eid, :app, :user," + " :sid, :inv, CURRENT_TIMESTAMP)" + ), + { + "eid": "e1", + "app": "app1", + "user": "user1", + "sid": "s1", + "inv": "inv1", + }, + ) + + # Delete session — event should cascade + conn.execute( + text( + "DELETE FROM sessions WHERE app_name = :app" + " AND user_id = :user AND id = :sid" + ), + {"app": "app1", "user": "user1", "sid": "s1"}, + ) + + event_count = conn.execute( + text("SELECT COUNT(*) FROM events") + ).scalar_one() + + finally: + engine.dispose() + + assert event_count == 0 + + +class TestBootstrapExistingDatabase: + """Verify bootstrapping works across dialects.""" + + def test_bootstrap_then_upgrade_is_noop(self, runner, db_url): + """After bootstrap, upgrade should detect no pending migrations.""" + # Create schema manually (simulating existing deployment) + from google.adk.sessions.schemas.v1 import Base as BaseV1 + + engine = create_engine(db_url) + try: + BaseV1.metadata.create_all(engine) + with engine.begin() as conn: + conn.execute( + text( + "INSERT INTO adk_internal_metadata (key, value)" + " VALUES ('schema_version', '1')" + ) + ) + finally: + engine.dispose() + + # Bootstrap Alembic + runner.bootstrap_existing_database() + assert runner.check_needs_migration() is False + + # Upgrade should be a no-op + runner.run_migrations() + assert runner.check_needs_migration() is False diff --git a/tests/unittests/sessions/migration/test_alembic_runner.py b/tests/unittests/sessions/migration/test_alembic_runner.py new file mode 100644 index 0000000000..8f7362a1f0 --- /dev/null +++ b/tests/unittests/sessions/migration/test_alembic_runner.py @@ -0,0 +1,381 @@ +# Copyright 2026 Google LLC +# +# Licensed under the Apache License, Version 2.0 (the "License"); +# you may not use this file except in compliance with the License. +# You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. + +"""Unit tests for AlembicMigrationRunner.""" + +from google.adk.sessions.alembic_runner import AlembicMigrationRunner +from google.adk.sessions.migration import _schema_check_utils +from google.adk.sessions.schemas.v1 import Base as BaseV1 +import pytest +from sqlalchemy import create_engine +from sqlalchemy import inspect +from sqlalchemy import text + + +@pytest.fixture +def db_url(tmp_path): + """Provide a fresh SQLite database URL for each test.""" + db_path = tmp_path / "test.db" + return f"sqlite:///{db_path}" + + +@pytest.fixture +def runner(db_url): + """Provide an AlembicMigrationRunner for a fresh database.""" + return AlembicMigrationRunner(db_url) + + +class TestCheckNeedsMigration: + + def test_new_database_needs_migration(self, runner): + """A fresh database with no tables should need migration.""" + assert runner.check_needs_migration() is True + + def test_migrated_database_does_not_need_migration(self, runner): + """After running migrations, database should not need migration.""" + runner.run_migrations() + assert runner.check_needs_migration() is False + + def test_downgraded_database_needs_migration(self, runner): + """After downgrading, database should need migration again.""" + runner.run_migrations() + runner.downgrade("base") + assert runner.check_needs_migration() is True + + +class TestRunMigrations: + + def test_creates_all_v1_tables(self, runner, db_url): + """Migration should create all V1 schema tables.""" + runner.run_migrations() + + engine = create_engine(db_url) + try: + with engine.connect() as conn: + inspector = inspect(conn) + tables = set(inspector.get_table_names()) + finally: + engine.dispose() + + expected = { + "adk_internal_metadata", + "sessions", + "events", + "app_states", + "user_states", + "alembic_version", + } + assert expected.issubset(tables) + + def test_sets_schema_version_metadata(self, runner, db_url): + """Migration should insert schema_version = '1' into metadata.""" + runner.run_migrations() + + engine = create_engine(db_url) + try: + with engine.connect() as conn: + result = conn.execute( + text( + "SELECT value FROM adk_internal_metadata" + " WHERE key = 'schema_version'" + ) + ).scalar_one() + finally: + engine.dispose() + + assert result == _schema_check_utils.SCHEMA_VERSION_1_JSON + + def test_sessions_table_has_correct_columns(self, runner, db_url): + """Sessions table should have the expected V1 columns.""" + runner.run_migrations() + + engine = create_engine(db_url) + try: + with engine.connect() as conn: + inspector = inspect(conn) + columns = {c["name"] for c in inspector.get_columns("sessions")} + finally: + engine.dispose() + + expected = { + "app_name", + "user_id", + "id", + "state", + "create_time", + "update_time", + } + assert expected == columns + + def test_events_table_has_correct_columns(self, runner, db_url): + """Events table should have V1 columns (event_data, not actions).""" + runner.run_migrations() + + engine = create_engine(db_url) + try: + with engine.connect() as conn: + inspector = inspect(conn) + columns = {c["name"] for c in inspector.get_columns("events")} + finally: + engine.dispose() + + expected = { + "id", + "app_name", + "user_id", + "session_id", + "invocation_id", + "event_data", + "timestamp", + } + assert expected == columns + assert "actions" not in columns + + def test_events_foreign_key_to_sessions(self, runner, db_url): + """Events table should have a FK to sessions with CASCADE delete.""" + runner.run_migrations() + + engine = create_engine(db_url) + try: + with engine.connect() as conn: + inspector = inspect(conn) + fks = inspector.get_foreign_keys("events") + finally: + engine.dispose() + + assert len(fks) == 1 + fk = fks[0] + assert fk["referred_table"] == "sessions" + assert set(fk["constrained_columns"]) == { + "app_name", + "user_id", + "session_id", + } + assert set(fk["referred_columns"]) == {"app_name", "user_id", "id"} + + +class TestIdempotency: + + def test_upgrade_twice_is_safe(self, runner): + """Running upgrade when already at head should be a no-op.""" + runner.run_migrations() + assert runner.check_needs_migration() is False + + # Second upgrade should not raise + runner.run_migrations() + assert runner.check_needs_migration() is False + + def test_upgrade_downgrade_upgrade_cycle(self, runner, db_url): + """Database should survive a full upgrade→downgrade→upgrade cycle.""" + runner.run_migrations() + assert runner.check_needs_migration() is False + + runner.downgrade("base") + assert runner.check_needs_migration() is True + + runner.run_migrations() + assert runner.check_needs_migration() is False + + # Verify tables still exist and are correct + engine = create_engine(db_url) + try: + with engine.connect() as conn: + inspector = inspect(conn) + tables = set(inspector.get_table_names()) + finally: + engine.dispose() + + assert "sessions" in tables + assert "events" in tables + assert "adk_internal_metadata" in tables + + +class TestGetCurrentRevision: + + def test_returns_none_for_new_database(self, runner): + """New database should have no current revision.""" + assert runner.get_current_revision() is None + + def test_returns_revision_after_migration(self, runner): + """After migration, current revision should match head.""" + runner.run_migrations() + current = runner.get_current_revision() + assert current is not None + assert current == "001_baseline_v1" + + +class TestDowngrade: + + def test_downgrade_removes_tables(self, runner, db_url): + """Downgrading to base should remove all ADK tables.""" + runner.run_migrations() + runner.downgrade("base") + + engine = create_engine(db_url) + try: + with engine.connect() as conn: + inspector = inspect(conn) + tables = set(inspector.get_table_names()) + finally: + engine.dispose() + + assert "sessions" not in tables + assert "events" not in tables + assert "adk_internal_metadata" not in tables + + def test_downgrade_minus_one(self, runner): + """Downgrade -1 from head should go to base (only one migration).""" + runner.run_migrations() + assert runner.get_current_revision() == "001_baseline_v1" + + runner.downgrade("-1") + assert runner.get_current_revision() is None + + +class TestStamp: + + def test_stamp_sets_revision_without_running_migration(self, db_url): + """Stamp should set alembic_version without creating tables.""" + runner = AlembicMigrationRunner(db_url) + runner.stamp("001_baseline_v1") + + assert runner.get_current_revision() == "001_baseline_v1" + + # Tables should NOT exist (stamp doesn't run migrations) + engine = create_engine(db_url) + try: + with engine.connect() as conn: + inspector = inspect(conn) + assert not inspector.has_table("sessions") + finally: + engine.dispose() + + +class TestBootstrapExistingDatabase: + + def test_bootstrap_v1_database(self, db_url): + """Bootstrapping a V1 database should stamp the baseline revision.""" + # Manually create V1 schema (simulating existing deployment) + engine = create_engine(db_url) + try: + BaseV1.metadata.create_all(engine) + with engine.begin() as conn: + conn.execute( + text( + "INSERT INTO adk_internal_metadata (key, value)" + " VALUES ('schema_version', '1')" + ) + ) + finally: + engine.dispose() + + runner = AlembicMigrationRunner(db_url) + runner.bootstrap_existing_database() + + assert runner.get_current_revision() == "001_baseline_v1" + assert runner.check_needs_migration() is False + + def test_bootstrap_v0_database_migrates_in_place(self, tmp_path): + """Bootstrapping a V0 database should migrate it to V1 in-place.""" + import pickle + + from google.adk.sessions.schemas import v0 + + db_path = tmp_path / "v0.db" + db_url = f"sqlite:///{db_path}" + + # Create V0 schema and insert test data + engine = create_engine(db_url) + try: + v0.Base.metadata.create_all(engine) + with engine.begin() as conn: + conn.execute( + text( + "INSERT INTO sessions (app_name, user_id, id," + " state, create_time, update_time)" + " VALUES ('app1', 'user1', 's1', '{}'," + " CURRENT_TIMESTAMP, CURRENT_TIMESTAMP)" + ) + ) + conn.execute( + text( + "INSERT INTO events (id, app_name, user_id," + " session_id, invocation_id, author," + " actions, timestamp)" + " VALUES (:id, :app, :user, :sid," + " :inv, :author, :actions," + " CURRENT_TIMESTAMP)" + ), + { + "id": "e1", + "app": "app1", + "user": "user1", + "sid": "s1", + "inv": "inv1", + "author": "agent", + "actions": pickle.dumps({}), + }, + ) + finally: + engine.dispose() + + runner = AlembicMigrationRunner(db_url) + runner.bootstrap_existing_database() + + assert runner.get_current_revision() == "001_baseline_v1" + assert runner.check_needs_migration() is False + + # Verify V1 schema: event_data column exists, actions does not + engine = create_engine(db_url) + try: + with engine.connect() as conn: + inspector = inspect(conn) + event_cols = {c["name"] for c in inspector.get_columns("events")} + assert "event_data" in event_cols + assert "actions" not in event_cols + + # Verify metadata table was created + assert inspector.has_table("adk_internal_metadata") + + # Verify schema version + version = conn.execute( + text( + "SELECT value FROM adk_internal_metadata" + " WHERE key = 'schema_version'" + ) + ).scalar_one() + assert version == "1" + + # Verify event data was migrated + event_data_raw = conn.execute( + text("SELECT event_data FROM events WHERE id = 'e1'") + ).scalar_one() + assert event_data_raw is not None + finally: + engine.dispose() + + +class TestAsyncUrlConversion: + + def test_async_url_is_converted_to_sync(self, tmp_path): + """Runner should handle async SQLAlchemy URLs transparently.""" + db_path = tmp_path / "async_test.db" + async_url = f"sqlite+aiosqlite:///{db_path}" + + runner = AlembicMigrationRunner(async_url) + # Should not raise — async driver stripped internally + assert runner.check_needs_migration() is True + + runner.run_migrations() + assert runner.check_needs_migration() is False diff --git a/tests/unittests/sessions/migration/test_cli_migrate_commands.py b/tests/unittests/sessions/migration/test_cli_migrate_commands.py new file mode 100644 index 0000000000..b3c2c9f8c2 --- /dev/null +++ b/tests/unittests/sessions/migration/test_cli_migrate_commands.py @@ -0,0 +1,560 @@ +# Copyright 2026 Google LLC +# +# Licensed under the Apache License, Version 2.0 (the "License"); +# you may not use this file except in compliance with the License. +# You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. + +"""Tests for ``adk migrate`` CLI commands (upgrade, downgrade, check, stamp).""" + +import os +import pickle + +from click.testing import CliRunner +from google.adk.cli import cli_tools_click +from google.adk.sessions.alembic_runner import AlembicMigrationRunner +import pytest +from sqlalchemy import create_engine +from sqlalchemy import inspect +from sqlalchemy import text + + +@pytest.fixture +def db_url(tmp_path): + """Provide a fresh SQLite database URL for each test.""" + db_path = tmp_path / "test_cli.db" + return f"sqlite:///{db_path}" + + +@pytest.fixture +def cli(): + """Provide a Click CliRunner.""" + return CliRunner() + + +def _create_v1_tables(db_url): + """Create V1 schema tables without Alembic tracking.""" + engine = create_engine(db_url) + try: + with engine.begin() as conn: + conn.execute( + text( + "CREATE TABLE adk_internal_metadata (" + " key VARCHAR(128) NOT NULL PRIMARY KEY," + " value VARCHAR(256) NOT NULL" + ")" + ) + ) + conn.execute( + text( + "INSERT INTO adk_internal_metadata (key, value)" + " VALUES ('schema_version', '1')" + ) + ) + conn.execute( + text( + "CREATE TABLE sessions (" + " app_name VARCHAR NOT NULL," + " user_id VARCHAR NOT NULL," + " id VARCHAR NOT NULL," + " state TEXT," + " create_time DATETIME," + " update_time DATETIME," + " PRIMARY KEY (app_name, user_id, id)" + ")" + ) + ) + conn.execute( + text( + "CREATE TABLE events (" + " id VARCHAR NOT NULL," + " app_name VARCHAR NOT NULL," + " user_id VARCHAR NOT NULL," + " session_id VARCHAR NOT NULL," + " invocation_id VARCHAR," + " event_data TEXT," + " timestamp DATETIME," + " PRIMARY KEY (id, app_name, user_id, session_id)," + " FOREIGN KEY (app_name, user_id, session_id)" + " REFERENCES sessions (app_name, user_id, id)" + " ON DELETE CASCADE" + ")" + ) + ) + finally: + engine.dispose() + + +def _create_v0_tables(db_url): + """Create V0 schema tables (pickle-based) without Alembic tracking.""" + engine = create_engine(db_url) + try: + with engine.begin() as conn: + conn.execute( + text( + "CREATE TABLE sessions (" + " app_name VARCHAR NOT NULL," + " user_id VARCHAR NOT NULL," + " id VARCHAR NOT NULL," + " state TEXT NOT NULL," + " create_time DATETIME," + " update_time DATETIME," + " PRIMARY KEY (app_name, user_id, id)" + ")" + ) + ) + conn.execute( + text( + "CREATE TABLE events (" + " id VARCHAR NOT NULL," + " app_name VARCHAR NOT NULL," + " user_id VARCHAR NOT NULL," + " session_id VARCHAR NOT NULL," + " invocation_id VARCHAR," + " author VARCHAR," + " branch VARCHAR," + " actions BLOB NOT NULL," + " long_running_tool_ids_json TEXT," + " content TEXT," + " grounding_metadata TEXT," + " custom_metadata TEXT," + " usage_metadata TEXT," + " citation_metadata TEXT," + " partial BOOLEAN," + " turn_complete BOOLEAN," + " error_code TEXT," + " error_message TEXT," + " interrupted BOOLEAN," + " input_transcription TEXT," + " output_transcription TEXT," + " timestamp DATETIME," + " PRIMARY KEY (id, app_name, user_id, session_id)," + " FOREIGN KEY (app_name, user_id, session_id)" + " REFERENCES sessions (app_name, user_id, id)" + " ON DELETE CASCADE" + ")" + ) + ) + finally: + engine.dispose() + + +def _has_alembic_version(db_url): + """Check if alembic_version table exists and has a revision.""" + engine = create_engine(db_url) + try: + with engine.connect() as conn: + inspector = inspect(conn) + if not inspector.has_table("alembic_version"): + return False + row = conn.execute( + text("SELECT version_num FROM alembic_version") + ).fetchone() + return row is not None + finally: + engine.dispose() + + +# ── upgrade command ────────────────────────────────────────────── + + +class TestUpgrade: + + def test_fresh_database(self, cli, db_url): + """upgrade on empty DB creates tables and stamps Alembic.""" + result = cli.invoke( + cli_tools_click.main, + ["migrate", "upgrade", "--db_url", db_url], + ) + assert result.exit_code == 0, result.output + assert "completed successfully" in result.output + + engine = create_engine(db_url) + try: + with engine.connect() as conn: + inspector = inspect(conn) + assert inspector.has_table("sessions") + assert inspector.has_table("events") + assert inspector.has_table("alembic_version") + finally: + engine.dispose() + + def test_existing_v1_database_bootstraps(self, cli, db_url): + """upgrade on existing V1 DB (no Alembic) bootstraps then is up-to-date.""" + _create_v1_tables(db_url) + + result = cli.invoke( + cli_tools_click.main, + ["migrate", "upgrade", "--db_url", db_url], + ) + assert result.exit_code == 0, result.output + assert "Bootstrapping" in result.output + assert _has_alembic_version(db_url) + + def test_already_up_to_date(self, cli, db_url): + """upgrade on already-migrated DB is a no-op.""" + runner = AlembicMigrationRunner(db_url) + runner.run_migrations() + + result = cli.invoke( + cli_tools_click.main, + ["migrate", "upgrade", "--db_url", db_url], + ) + assert result.exit_code == 0, result.output + assert "already up-to-date" in result.output + + def test_existing_v0_database_auto_migrates(self, cli, db_url): + """upgrade on V0 DB bootstraps (V0→V1 in-place) then is up-to-date.""" + _create_v0_tables(db_url) + + # Insert a V0 event so bootstrap has something to migrate. + engine = create_engine(db_url) + try: + with engine.begin() as conn: + conn.execute( + text( + "INSERT INTO sessions (app_name, user_id, id, state)" + " VALUES ('app', 'user', 's1', '{}')" + ) + ) + conn.execute( + text( + "INSERT INTO events" + " (id, app_name, user_id, session_id, actions)" + " VALUES ('e1', 'app', 'user', 's1', :actions)" + ), + {"actions": pickle.dumps({})}, + ) + finally: + engine.dispose() + + result = cli.invoke( + cli_tools_click.main, + ["migrate", "upgrade", "--db_url", db_url], + ) + assert result.exit_code == 0, result.output + assert "Bootstrapping" in result.output + assert _has_alembic_version(db_url) + + # Verify V0 columns are gone + engine = create_engine(db_url) + try: + with engine.connect() as conn: + cols = {c["name"] for c in inspect(conn).get_columns("events")} + assert "event_data" in cols + assert "actions" not in cols + finally: + engine.dispose() + + def test_invalid_url_exits_1(self, cli): + """upgrade with invalid URL exits with code 1.""" + result = cli.invoke( + cli_tools_click.main, + ["migrate", "upgrade", "--db_url", "invalid://not-a-db"], + ) + assert result.exit_code == 1 + + def test_idempotent_double_upgrade(self, cli, db_url): + """Running upgrade twice is safe.""" + for _ in range(2): + result = cli.invoke( + cli_tools_click.main, + ["migrate", "upgrade", "--db_url", db_url], + ) + assert result.exit_code == 0, result.output + assert "already up-to-date" in result.output + + +# ── downgrade command ──────────────────────────────────────────── + + +class TestDowngrade: + + def test_downgrade_one_step(self, cli, db_url): + """downgrade -1 after upgrade removes tables.""" + runner = AlembicMigrationRunner(db_url) + runner.run_migrations() + + result = cli.invoke( + cli_tools_click.main, + ["migrate", "downgrade", "--db_url", db_url, "--revision", "-1"], + ) + assert result.exit_code == 0, result.output + assert "completed successfully" in result.output + + engine = create_engine(db_url) + try: + with engine.connect() as conn: + inspector = inspect(conn) + assert not inspector.has_table("sessions") + assert not inspector.has_table("events") + finally: + engine.dispose() + + def test_downgrade_to_base(self, cli, db_url): + """downgrade to 'base' removes all migration state.""" + runner = AlembicMigrationRunner(db_url) + runner.run_migrations() + + result = cli.invoke( + cli_tools_click.main, + ["migrate", "downgrade", "--db_url", db_url, "--revision", "base"], + ) + assert result.exit_code == 0, result.output + + def test_downgrade_default_revision(self, cli, db_url): + """downgrade without --revision defaults to -1.""" + runner = AlembicMigrationRunner(db_url) + runner.run_migrations() + + result = cli.invoke( + cli_tools_click.main, + ["migrate", "downgrade", "--db_url", db_url], + ) + assert result.exit_code == 0, result.output + assert "'-1'" in result.output + + def test_downgrade_empty_db_exits_1(self, cli, db_url): + """downgrade on empty DB fails.""" + result = cli.invoke( + cli_tools_click.main, + ["migrate", "downgrade", "--db_url", db_url], + ) + assert result.exit_code == 1 + + +# ── check command ──────────────────────────────────────────────── + + +class TestCheck: + + def test_up_to_date_exits_0(self, cli, db_url): + """check on migrated DB exits 0.""" + runner = AlembicMigrationRunner(db_url) + runner.run_migrations() + + result = cli.invoke( + cli_tools_click.main, + ["migrate", "check", "--db_url", db_url], + ) + assert result.exit_code == 0, result.output + assert "up-to-date" in result.output + + def test_pending_migrations_exits_1(self, cli, db_url): + """check on fresh DB exits 1 (migrations pending).""" + result = cli.invoke( + cli_tools_click.main, + ["migrate", "check", "--db_url", db_url], + ) + assert result.exit_code == 1 + assert "pending" in result.output.lower() + + def test_after_downgrade_exits_1(self, cli, db_url): + """check after downgrade exits 1.""" + runner = AlembicMigrationRunner(db_url) + runner.run_migrations() + runner.downgrade("base") + + result = cli.invoke( + cli_tools_click.main, + ["migrate", "check", "--db_url", db_url], + ) + assert result.exit_code == 1 + + +# ── stamp command ──────────────────────────────────────────────── + + +class TestStamp: + + def test_stamp_v1_database(self, cli, db_url): + """stamp on V1 DB sets Alembic tracking.""" + _create_v1_tables(db_url) + + result = cli.invoke( + cli_tools_click.main, + ["migrate", "stamp", "--db_url", db_url], + ) + assert result.exit_code == 0, result.output + assert "bootstrapped" in result.output.lower() + assert _has_alembic_version(db_url) + + # After stamp, check should be up-to-date + runner = AlembicMigrationRunner(db_url) + assert runner.check_needs_migration() is False + + def test_stamp_v0_database_auto_migrates(self, cli, db_url): + """stamp on V0 DB performs in-place migration and stamps.""" + _create_v0_tables(db_url) + engine = create_engine(db_url) + try: + with engine.begin() as conn: + conn.execute( + text( + "INSERT INTO sessions (app_name, user_id, id, state)" + " VALUES ('app', 'user', 's1', '{}')" + ) + ) + conn.execute( + text( + "INSERT INTO events" + " (id, app_name, user_id, session_id, actions)" + " VALUES ('e1', 'app', 'user', 's1', :actions)" + ), + {"actions": pickle.dumps({})}, + ) + finally: + engine.dispose() + + result = cli.invoke( + cli_tools_click.main, + ["migrate", "stamp", "--db_url", db_url], + ) + assert result.exit_code == 0, result.output + assert _has_alembic_version(db_url) + + # Verify V1 schema + engine = create_engine(db_url) + try: + with engine.connect() as conn: + cols = {c["name"] for c in inspect(conn).get_columns("events")} + assert "event_data" in cols + assert "actions" not in cols + finally: + engine.dispose() + + def test_stamp_fresh_database(self, cli, db_url): + """stamp on empty DB stamps the baseline.""" + result = cli.invoke( + cli_tools_click.main, + ["migrate", "stamp", "--db_url", db_url], + ) + assert result.exit_code == 0, result.output + assert _has_alembic_version(db_url) + + +# ── upgrade → check → downgrade → check cycle ─────────────────── + + +class TestUpgradeDowngradeCycle: + + def test_full_cycle(self, cli, db_url): + """upgrade → check (0) → downgrade → check (1) → upgrade → check (0).""" + # Step 1: upgrade + result = cli.invoke( + cli_tools_click.main, + ["migrate", "upgrade", "--db_url", db_url], + ) + assert result.exit_code == 0, result.output + + # Step 2: check → up-to-date + result = cli.invoke( + cli_tools_click.main, + ["migrate", "check", "--db_url", db_url], + ) + assert result.exit_code == 0, result.output + + # Step 3: downgrade + result = cli.invoke( + cli_tools_click.main, + ["migrate", "downgrade", "--db_url", db_url, "--revision", "base"], + ) + assert result.exit_code == 0, result.output + + # Step 4: check → pending + result = cli.invoke( + cli_tools_click.main, + ["migrate", "check", "--db_url", db_url], + ) + assert result.exit_code == 1 + + # Step 5: upgrade again + result = cli.invoke( + cli_tools_click.main, + ["migrate", "upgrade", "--db_url", db_url], + ) + assert result.exit_code == 0, result.output + assert "completed successfully" in result.output + + # Step 6: check → up-to-date + result = cli.invoke( + cli_tools_click.main, + ["migrate", "check", "--db_url", db_url], + ) + assert result.exit_code == 0, result.output + + +# ── generate command ────────────────────────────────────────────── + + +class TestGenerate: + + def test_generate_empty_template(self, db_url, tmp_path): + """generate without autogenerate produces an empty migration script.""" + runner = AlembicMigrationRunner(db_url) + runner.run_migrations() + + output_dir = str(tmp_path / "versions") + os.makedirs(output_dir) + path = runner.generate_revision( + "empty_template", autogenerate=False, output_dir=output_dir + ) + assert os.path.isfile(path) + content = open(path).read() + assert "empty_template" in content + assert "def upgrade" in content + assert "def downgrade" in content + + def test_generate_autogenerate(self, db_url, tmp_path): + """generate with autogenerate produces a migration script.""" + runner = AlembicMigrationRunner(db_url) + runner.run_migrations() + + output_dir = str(tmp_path / "versions") + os.makedirs(output_dir) + path = runner.generate_revision( + "auto_test", autogenerate=True, output_dir=output_dir + ) + assert os.path.isfile(path) + content = open(path).read() + assert "auto_test" in content + + def test_cli_generate_missing_message_exits_nonzero(self, cli, db_url): + """CLI generate requires --message.""" + result = cli.invoke( + cli_tools_click.main, + ["migrate", "generate", "--db_url", db_url], + ) + assert result.exit_code != 0 + + +# ── missing --db_url ───────────────────────────────────────────── + + +class TestMissingRequiredArgs: + + @pytest.mark.parametrize( + "subcommand", + [ + "upgrade", + "downgrade", + "check", + "stamp", + "generate", + ], + ) + def test_missing_db_url_exits_nonzero(self, cli, subcommand): + """All migrate subcommands require --db_url.""" + result = cli.invoke( + cli_tools_click.main, + ["migrate", subcommand], + ) + assert result.exit_code != 0 + assert "db_url" in result.output.lower() or "error" in result.output.lower() diff --git a/tests/unittests/sessions/migration/test_database_schema.py b/tests/unittests/sessions/migration/test_database_schema.py index ceb5420b14..0780ce4be9 100644 --- a/tests/unittests/sessions/migration/test_database_schema.py +++ b/tests/unittests/sessions/migration/test_database_schema.py @@ -76,7 +76,7 @@ def get_schema_version(sync_conn): @pytest.mark.asyncio -async def test_existing_v0_db_uses_v0_schema(tmp_path): +async def test_existing_v0_db_is_auto_migrated_to_v1(tmp_path): db_path = tmp_path / 'v0_db.db' await create_v0_db(db_path) db_url = f'sqlite+aiosqlite:///{db_path}' @@ -85,9 +85,10 @@ async def test_existing_v0_db_uses_v0_schema(tmp_path): await session_service.create_session( app_name='my_app', user_id='test_user', session_id='s1' ) + # V0 databases are auto-migrated to V1 by Alembic bootstrap assert ( session_service._db_schema_version - == _schema_check_utils.SCHEMA_VERSION_0_PICKLE + == _schema_check_utils.LATEST_SCHEMA_VERSION ) session = await session_service.get_session( @@ -95,21 +96,21 @@ async def test_existing_v0_db_uses_v0_schema(tmp_path): ) assert session.id == 's1' - # Verify schema tables + # Verify schema is now V1 engine = create_async_engine(db_url) async with engine.connect() as conn: has_metadata_table = await conn.run_sync( lambda sync_conn: inspect(sync_conn).has_table('adk_internal_metadata') ) - assert not has_metadata_table + assert has_metadata_table - # Verify events table columns for v0 + # Verify events table has V1 columns event_cols = await conn.run_sync( lambda sync_conn: inspect(sync_conn).get_columns('events') ) event_col_names = {c['name'] for c in event_cols} - assert 'event_data' not in event_col_names - assert 'actions' in event_col_names + assert 'event_data' in event_col_names + assert 'actions' not in event_col_names await engine.dispose() From 0ba5074d03d9ea2404a7fd5a852b2b705d81052c Mon Sep 17 00:00:00 2001 From: Achilleas Athanasiou Fragkoulis Date: Sat, 7 Feb 2026 19:01:06 +0000 Subject: [PATCH 2/4] Update docs/helm_migration_guide.md Co-authored-by: gemini-code-assist[bot] <176961590+gemini-code-assist[bot]@users.noreply.github.com> --- docs/helm_migration_guide.md | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/docs/helm_migration_guide.md b/docs/helm_migration_guide.md index ab81525cd2..1488386cee 100644 --- a/docs/helm_migration_guide.md +++ b/docs/helm_migration_guide.md @@ -71,7 +71,7 @@ versions. ## Application Configuration In your application Deployment, disable auto-migration since the Helm hook -handles it. `false` is the default, so this is optional, but you may to set it for explicitness: +handles it. `false` is the default, so this is optional, but you may want to set it for explicitness: ```yaml env: From 0f4bf44b250ae56cd8efe412dbc1d22f8f9c2553 Mon Sep 17 00:00:00 2001 From: Achilleas Athanasiou Fragkoulis Date: Sat, 7 Feb 2026 19:01:24 +0000 Subject: [PATCH 3/4] Update docs/helm_migration_guide.md Co-authored-by: gemini-code-assist[bot] <176961590+gemini-code-assist[bot]@users.noreply.github.com> --- docs/helm_migration_guide.md | 4 ++-- 1 file changed, 2 insertions(+), 2 deletions(-) diff --git a/docs/helm_migration_guide.md b/docs/helm_migration_guide.md index 1488386cee..09fff324ed 100644 --- a/docs/helm_migration_guide.md +++ b/docs/helm_migration_guide.md @@ -84,8 +84,8 @@ env: If your database is a Cloud SQL instance on GKE, add the [Cloud SQL Auth Proxy](https://cloud.google.com/sql/docs/postgres/connect-kubernetes-engine) as a -[native sidecar](https://kubernetes.io/docs/concepts/workloads/pods/sidecar-containers/) -init container (`restartPolicy: Always`). Kubernetes starts it before the +[native sidecar container](https://kubernetes.io/docs/concepts/workloads/pods/sidecar-containers/) +by defining it as an `initContainer` with `restartPolicy: Always`. Kubernetes starts it before the migration container, keeps it running alongside, and terminates it automatically when the migration exits: From 45d77e03a89ad2835abe39951686ce270e758368 Mon Sep 17 00:00:00 2001 From: Achilleas Athanasiou Fragkoulis Date: Sat, 7 Feb 2026 19:07:24 +0000 Subject: [PATCH 4/4] =?UTF-8?q?fix:=20use=20restricted=20unpickler=20for?= =?UTF-8?q?=20V0=E2=86=92V1=20migration?= MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit --- src/google/adk/sessions/alembic_runner.py | 43 ++++++++++++++++++++++- 1 file changed, 42 insertions(+), 1 deletion(-) diff --git a/src/google/adk/sessions/alembic_runner.py b/src/google/adk/sessions/alembic_runner.py index c41586fe08..c64adf8cfe 100644 --- a/src/google/adk/sessions/alembic_runner.py +++ b/src/google/adk/sessions/alembic_runner.py @@ -22,6 +22,7 @@ from __future__ import annotations from datetime import timezone +import io import json import logging import os @@ -45,6 +46,46 @@ logger = logging.getLogger("google_adk." + __name__) +# Modules that must never be unpickled during V0→V1 migration. +_PICKLE_BLOCKED_MODULES = frozenset({ + "os", + "posix", + "nt", + "subprocess", + "sys", + "builtins", + "shutil", + "importlib", + "signal", + "socket", + "http", + "ctypes", + "webbrowser", + "code", + "codeop", + "compile", + "compileall", +}) + + +class _RestrictedUnpickler(pickle.Unpickler): + """Unpickler that rejects obviously dangerous modules.""" + + def find_class(self, module: str, name: str): + top_level = module.split(".")[0] + if top_level in _PICKLE_BLOCKED_MODULES: + raise pickle.UnpicklingError( + f"Refusing to unpickle {module}.{name}: " + f"module '{top_level}' is blocked." + ) + return super().find_class(module, name) + + +def _restricted_loads(data: bytes) -> object: + """Deserialize bytes using a restricted unpickler.""" + return _RestrictedUnpickler(io.BytesIO(data)).load() + + _MIGRATION_DIR = pathlib.Path(__file__).parent / "migration" _VERSIONS_DIR = _MIGRATION_DIR / "versions" @@ -396,7 +437,7 @@ def _v0_row_to_event_data(row) -> dict: if actions_raw is not None: try: if isinstance(actions_raw, bytes): - obj = pickle.loads(actions_raw) # noqa: S301 + obj = _restricted_loads(actions_raw) else: obj = actions_raw if hasattr(obj, "model_dump"):