From 30bab500ffb479591c8e9edc6bfcf8f6b1312fec Mon Sep 17 00:00:00 2001 From: Scotty Pate Date: Tue, 14 Oct 2025 13:52:04 -0500 Subject: [PATCH 1/4] Add support for Bigquery reservations in config --- sqlmesh/core/config/connection.py | 2 ++ sqlmesh/core/engine_adapter/bigquery.py | 6 ++++++ tests/core/test_connection_config.py | 15 +++++++++++++++ 3 files changed, 23 insertions(+) diff --git a/sqlmesh/core/config/connection.py b/sqlmesh/core/config/connection.py index 8341f8466f..93fdb8516b 100644 --- a/sqlmesh/core/config/connection.py +++ b/sqlmesh/core/config/connection.py @@ -1058,6 +1058,7 @@ class BigQueryConnectionConfig(ConnectionConfig): job_retry_deadline_seconds: t.Optional[int] = None priority: t.Optional[BigQueryPriority] = None maximum_bytes_billed: t.Optional[int] = None + reservation_id: t.Optional[str] = None concurrent_tasks: int = 1 register_comments: bool = True @@ -1167,6 +1168,7 @@ def _extra_engine_config(self) -> t.Dict[str, t.Any]: "job_retry_deadline_seconds", "priority", "maximum_bytes_billed", + "reservation_id", } } diff --git a/sqlmesh/core/engine_adapter/bigquery.py b/sqlmesh/core/engine_adapter/bigquery.py index 59a56b6ace..fe94397449 100644 --- a/sqlmesh/core/engine_adapter/bigquery.py +++ b/sqlmesh/core/engine_adapter/bigquery.py @@ -1106,7 +1106,13 @@ def _execute( else [] ) + # Create job config with reservation support job_config = QueryJobConfig(**self._job_params, connection_properties=connection_properties) + + # Set reservation directly on the job_config object if specified + reservation_id = self._extra_config.get("reservation_id") + if reservation_id: + job_config.reservation = reservation_id self._query_job = self._db_call( self.client.query, query=sql, diff --git a/tests/core/test_connection_config.py b/tests/core/test_connection_config.py index 4e1397b7f1..69749bc8a9 100644 --- a/tests/core/test_connection_config.py +++ b/tests/core/test_connection_config.py @@ -1042,6 +1042,21 @@ def test_bigquery(make_config): assert config.get_catalog() == "project" assert config.is_recommended_for_state_sync is False + # Test reservation_id + config_with_reservation = make_config( + type="bigquery", + project="project", + reservation_id="projects/my-project/locations/us-central1/reservations/my-reservation", + check_import=False, + ) + assert isinstance(config_with_reservation, BigQueryConnectionConfig) + assert config_with_reservation.reservation_id == "projects/my-project/locations/us-central1/reservations/my-reservation" + + # Test that reservation_id is included in _extra_engine_config + extra_config = config_with_reservation._extra_engine_config + assert "reservation_id" in extra_config + assert extra_config["reservation_id"] == "projects/my-project/locations/us-central1/reservations/my-reservation" + with pytest.raises(ConfigError, match="you must also specify the `project` field"): make_config(type="bigquery", execution_project="execution_project", check_import=False) From a8fcc2fed9af2b528709b2db5567ecc5f76c5c5c Mon Sep 17 00:00:00 2001 From: Scotty Pate Date: Tue, 14 Oct 2025 15:25:40 -0500 Subject: [PATCH 2/4] linting and style changes --- sqlmesh/core/engine_adapter/bigquery.py | 2 +- tests/core/test_connection_config.py | 12 +++++++++--- 2 files changed, 10 insertions(+), 4 deletions(-) diff --git a/sqlmesh/core/engine_adapter/bigquery.py b/sqlmesh/core/engine_adapter/bigquery.py index fe94397449..8fa543a9f7 100644 --- a/sqlmesh/core/engine_adapter/bigquery.py +++ b/sqlmesh/core/engine_adapter/bigquery.py @@ -1108,7 +1108,7 @@ def _execute( # Create job config with reservation support job_config = QueryJobConfig(**self._job_params, connection_properties=connection_properties) - + # Set reservation directly on the job_config object if specified reservation_id = self._extra_config.get("reservation_id") if reservation_id: diff --git a/tests/core/test_connection_config.py b/tests/core/test_connection_config.py index 69749bc8a9..7241f0fd7c 100644 --- a/tests/core/test_connection_config.py +++ b/tests/core/test_connection_config.py @@ -1050,12 +1050,18 @@ def test_bigquery(make_config): check_import=False, ) assert isinstance(config_with_reservation, BigQueryConnectionConfig) - assert config_with_reservation.reservation_id == "projects/my-project/locations/us-central1/reservations/my-reservation" - + assert ( + config_with_reservation.reservation_id + == "projects/my-project/locations/us-central1/reservations/my-reservation" + ) + # Test that reservation_id is included in _extra_engine_config extra_config = config_with_reservation._extra_engine_config assert "reservation_id" in extra_config - assert extra_config["reservation_id"] == "projects/my-project/locations/us-central1/reservations/my-reservation" + assert ( + extra_config["reservation_id"] + == "projects/my-project/locations/us-central1/reservations/my-reservation" + ) with pytest.raises(ConfigError, match="you must also specify the `project` field"): make_config(type="bigquery", execution_project="execution_project", check_import=False) From bc75d7afe0ef18028db73a7901140da06d42f95d Mon Sep 17 00:00:00 2001 From: Scotty Pate Date: Wed, 7 Jan 2026 07:28:18 -0600 Subject: [PATCH 3/4] Fix bigquery cli engine config test --- tests/cli/test_cli.py | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/tests/cli/test_cli.py b/tests/cli/test_cli.py index 480d186fa1..4cfb52a370 100644 --- a/tests/cli/test_cli.py +++ b/tests/cli/test_cli.py @@ -1951,7 +1951,7 @@ def test_init_dbt_template(runner: CliRunner, tmp_path: Path): def test_init_project_engine_configs(tmp_path): engine_type_to_config = { "redshift": "# concurrent_tasks: 4\n # register_comments: True\n # pre_ping: False\n # pretty_sql: False\n # schema_differ_overrides: \n # catalog_type_overrides: \n # user: \n # password: \n # database: \n # host: \n # port: \n # source_address: \n # unix_sock: \n # ssl: \n # sslmode: \n # timeout: \n # tcp_keepalive: \n # application_name: \n # preferred_role: \n # principal_arn: \n # credentials_provider: \n # region: \n # cluster_identifier: \n # iam: \n # is_serverless: \n # serverless_acct_id: \n # serverless_work_group: \n # enable_merge: ", - "bigquery": "# concurrent_tasks: 1\n # register_comments: True\n # pre_ping: False\n # pretty_sql: False\n # schema_differ_overrides: \n # catalog_type_overrides: \n # method: oauth\n # project: \n # execution_project: \n # quota_project: \n # location: \n # keyfile: \n # keyfile_json: \n # token: \n # refresh_token: \n # client_id: \n # client_secret: \n # token_uri: \n # scopes: \n # impersonated_service_account: \n # job_creation_timeout_seconds: \n # job_execution_timeout_seconds: \n # job_retries: 1\n # job_retry_deadline_seconds: \n # priority: \n # maximum_bytes_billed: ", + "bigquery": "# concurrent_tasks: 1\n # register_comments: True\n # pre_ping: False\n # pretty_sql: False\n # schema_differ_overrides: \n # catalog_type_overrides: \n # method: oauth\n # project: \n # execution_project: \n # quota_project: \n # location: \n # keyfile: \n # keyfile_json: \n # token: \n # refresh_token: \n # client_id: \n # client_secret: \n # token_uri: \n # scopes: \n # impersonated_service_account: \n # job_creation_timeout_seconds: \n # job_execution_timeout_seconds: \n # job_retries: 1\n # job_retry_deadline_seconds: \n # priority: \n # maximum_bytes_billed: \n # reservation_id: ", "snowflake": "account: \n # concurrent_tasks: 4\n # register_comments: True\n # pre_ping: False\n # pretty_sql: False\n # schema_differ_overrides: \n # catalog_type_overrides: \n # user: \n # password: \n # warehouse: \n # database: \n # role: \n # authenticator: \n # token: \n # host: \n # port: \n # application: Tobiko_SQLMesh\n # private_key: \n # private_key_path: \n # private_key_passphrase: \n # session_parameters: ", "databricks": "# concurrent_tasks: 1\n # register_comments: True\n # pre_ping: False\n # pretty_sql: False\n # schema_differ_overrides: \n # catalog_type_overrides: \n # server_hostname: \n # http_path: \n # access_token: \n # auth_type: \n # oauth_client_id: \n # oauth_client_secret: \n # catalog: \n # http_headers: \n # session_configuration: \n # databricks_connect_server_hostname: \n # databricks_connect_access_token: \n # databricks_connect_cluster_id: \n # databricks_connect_use_serverless: False\n # force_databricks_connect: False\n # disable_databricks_connect: False\n # disable_spark_session: False", "postgres": "host: \n user: \n password: \n port: \n database: \n # concurrent_tasks: 4\n # register_comments: True\n # pre_ping: True\n # pretty_sql: False\n # schema_differ_overrides: \n # catalog_type_overrides: \n # keepalives_idle: \n # connect_timeout: 10\n # role: \n # sslmode: \n # application_name: ", From 04fe4fa96e70f47a7584acb70adba0213182b5f8 Mon Sep 17 00:00:00 2001 From: Scotty Pate Date: Wed, 7 Jan 2026 08:36:17 -0600 Subject: [PATCH 4/4] Update naming conventions and how to set the property --- sqlmesh/core/config/connection.py | 4 ++-- sqlmesh/core/engine_adapter/bigquery.py | 10 ++++------ tests/cli/test_cli.py | 2 +- tests/core/test_connection_config.py | 12 ++++++------ 4 files changed, 13 insertions(+), 15 deletions(-) diff --git a/sqlmesh/core/config/connection.py b/sqlmesh/core/config/connection.py index 2a3d591957..c8843fcc5e 100644 --- a/sqlmesh/core/config/connection.py +++ b/sqlmesh/core/config/connection.py @@ -1062,7 +1062,7 @@ class BigQueryConnectionConfig(ConnectionConfig): job_retry_deadline_seconds: t.Optional[int] = None priority: t.Optional[BigQueryPriority] = None maximum_bytes_billed: t.Optional[int] = None - reservation_id: t.Optional[str] = None + reservation: t.Optional[str] = None concurrent_tasks: int = 1 register_comments: bool = True @@ -1172,7 +1172,7 @@ def _extra_engine_config(self) -> t.Dict[str, t.Any]: "job_retry_deadline_seconds", "priority", "maximum_bytes_billed", - "reservation_id", + "reservation", } } diff --git a/sqlmesh/core/engine_adapter/bigquery.py b/sqlmesh/core/engine_adapter/bigquery.py index 8fa543a9f7..6e5ae11a61 100644 --- a/sqlmesh/core/engine_adapter/bigquery.py +++ b/sqlmesh/core/engine_adapter/bigquery.py @@ -140,8 +140,10 @@ def _job_params(self) -> t.Dict[str, t.Any]: "priority", BigQueryPriority.INTERACTIVE.bigquery_constant ), } - if self._extra_config.get("maximum_bytes_billed"): + if self._extra_config.get("maximum_bytes_billed") is not None: params["maximum_bytes_billed"] = self._extra_config.get("maximum_bytes_billed") + if self._extra_config.get("reservation") is not None: + params["reservation"] = self._extra_config.get("reservation") if self.correlation_id: # BigQuery label keys must be lowercase key = self.correlation_id.job_type.value.lower() @@ -1106,13 +1108,9 @@ def _execute( else [] ) - # Create job config with reservation support + # Create job config job_config = QueryJobConfig(**self._job_params, connection_properties=connection_properties) - # Set reservation directly on the job_config object if specified - reservation_id = self._extra_config.get("reservation_id") - if reservation_id: - job_config.reservation = reservation_id self._query_job = self._db_call( self.client.query, query=sql, diff --git a/tests/cli/test_cli.py b/tests/cli/test_cli.py index 4cfb52a370..2dfb9c6313 100644 --- a/tests/cli/test_cli.py +++ b/tests/cli/test_cli.py @@ -1951,7 +1951,7 @@ def test_init_dbt_template(runner: CliRunner, tmp_path: Path): def test_init_project_engine_configs(tmp_path): engine_type_to_config = { "redshift": "# concurrent_tasks: 4\n # register_comments: True\n # pre_ping: False\n # pretty_sql: False\n # schema_differ_overrides: \n # catalog_type_overrides: \n # user: \n # password: \n # database: \n # host: \n # port: \n # source_address: \n # unix_sock: \n # ssl: \n # sslmode: \n # timeout: \n # tcp_keepalive: \n # application_name: \n # preferred_role: \n # principal_arn: \n # credentials_provider: \n # region: \n # cluster_identifier: \n # iam: \n # is_serverless: \n # serverless_acct_id: \n # serverless_work_group: \n # enable_merge: ", - "bigquery": "# concurrent_tasks: 1\n # register_comments: True\n # pre_ping: False\n # pretty_sql: False\n # schema_differ_overrides: \n # catalog_type_overrides: \n # method: oauth\n # project: \n # execution_project: \n # quota_project: \n # location: \n # keyfile: \n # keyfile_json: \n # token: \n # refresh_token: \n # client_id: \n # client_secret: \n # token_uri: \n # scopes: \n # impersonated_service_account: \n # job_creation_timeout_seconds: \n # job_execution_timeout_seconds: \n # job_retries: 1\n # job_retry_deadline_seconds: \n # priority: \n # maximum_bytes_billed: \n # reservation_id: ", + "bigquery": "# concurrent_tasks: 1\n # register_comments: True\n # pre_ping: False\n # pretty_sql: False\n # schema_differ_overrides: \n # catalog_type_overrides: \n # method: oauth\n # project: \n # execution_project: \n # quota_project: \n # location: \n # keyfile: \n # keyfile_json: \n # token: \n # refresh_token: \n # client_id: \n # client_secret: \n # token_uri: \n # scopes: \n # impersonated_service_account: \n # job_creation_timeout_seconds: \n # job_execution_timeout_seconds: \n # job_retries: 1\n # job_retry_deadline_seconds: \n # priority: \n # maximum_bytes_billed: \n # reservation: ", "snowflake": "account: \n # concurrent_tasks: 4\n # register_comments: True\n # pre_ping: False\n # pretty_sql: False\n # schema_differ_overrides: \n # catalog_type_overrides: \n # user: \n # password: \n # warehouse: \n # database: \n # role: \n # authenticator: \n # token: \n # host: \n # port: \n # application: Tobiko_SQLMesh\n # private_key: \n # private_key_path: \n # private_key_passphrase: \n # session_parameters: ", "databricks": "# concurrent_tasks: 1\n # register_comments: True\n # pre_ping: False\n # pretty_sql: False\n # schema_differ_overrides: \n # catalog_type_overrides: \n # server_hostname: \n # http_path: \n # access_token: \n # auth_type: \n # oauth_client_id: \n # oauth_client_secret: \n # catalog: \n # http_headers: \n # session_configuration: \n # databricks_connect_server_hostname: \n # databricks_connect_access_token: \n # databricks_connect_cluster_id: \n # databricks_connect_use_serverless: False\n # force_databricks_connect: False\n # disable_databricks_connect: False\n # disable_spark_session: False", "postgres": "host: \n user: \n password: \n port: \n database: \n # concurrent_tasks: 4\n # register_comments: True\n # pre_ping: True\n # pretty_sql: False\n # schema_differ_overrides: \n # catalog_type_overrides: \n # keepalives_idle: \n # connect_timeout: 10\n # role: \n # sslmode: \n # application_name: ", diff --git a/tests/core/test_connection_config.py b/tests/core/test_connection_config.py index f06e37e609..2ff95525f7 100644 --- a/tests/core/test_connection_config.py +++ b/tests/core/test_connection_config.py @@ -1131,24 +1131,24 @@ def test_bigquery(make_config): assert config.get_catalog() == "project" assert config.is_recommended_for_state_sync is False - # Test reservation_id + # Test reservation config_with_reservation = make_config( type="bigquery", project="project", - reservation_id="projects/my-project/locations/us-central1/reservations/my-reservation", + reservation="projects/my-project/locations/us-central1/reservations/my-reservation", check_import=False, ) assert isinstance(config_with_reservation, BigQueryConnectionConfig) assert ( - config_with_reservation.reservation_id + config_with_reservation.reservation == "projects/my-project/locations/us-central1/reservations/my-reservation" ) - # Test that reservation_id is included in _extra_engine_config + # Test that reservation is included in _extra_engine_config extra_config = config_with_reservation._extra_engine_config - assert "reservation_id" in extra_config + assert "reservation" in extra_config assert ( - extra_config["reservation_id"] + extra_config["reservation"] == "projects/my-project/locations/us-central1/reservations/my-reservation" )