From 523c5b9da73ed4d443e3635607514b4177fa4125 Mon Sep 17 00:00:00 2001 From: "Reuven V. Gonzales" Date: Tue, 11 Nov 2025 06:26:55 +0000 Subject: [PATCH 1/5] fix: fixes issues with setting a custom translator that takes options --- dagster_sqlmesh/config.py | 76 +++++++++++++++------------ dagster_sqlmesh/resource.py | 15 ++++-- sample/dagster_project/definitions.py | 15 +++--- 3 files changed, 61 insertions(+), 45 deletions(-) diff --git a/dagster_sqlmesh/config.py b/dagster_sqlmesh/config.py index 27b5620..75d76eb 100644 --- a/dagster_sqlmesh/config.py +++ b/dagster_sqlmesh/config.py @@ -1,4 +1,3 @@ -import inspect import typing as t from dataclasses import dataclass from pathlib import Path @@ -28,23 +27,54 @@ class SQLMeshContextConfig(Config): running via dagster. The config also manages the translator class used for converting SQLMesh - models to Dagster assets. You can specify a custom translator by setting - the translator_class_name field to the fully qualified class name. + models to Dagster assets and provides a consistent translator to + dagster-sqlmesh. The config must always be provided to the SQLMeshResource + in order for the integration to function correctly. For example, when + setting up the dagster Definitions, you must provide the + SQLMeshContextConfig as a resource along with the SQLMeshResource as + follows: + + ```python + sqlmesh_context_config = SQLMeshContextConfig( + path="/path/to/sqlmesh/project", gateway="local", + ) + + @sqlmesh_assets( + environment="dev", config=sqlmesh_context_config, + enabled_subsetting=True, + ) def sqlmesh_project( + context: AssetExecutionContext, sqlmesh: SQLMeshResource, + sqlmesh_context_config: SQLMeshContextConfig + ) -> t.Iterator[MaterializeResult[t.Any]]: + yield from sqlmesh.run(context, config=sqlmesh_config) + + + defs = Definitions( + assets=[sqlmesh_project], resources={ + "sqlmesh": SQLMeshResource(), "sqlmesh_context_config": + sqlmesh_context_config, + }, + ) + ``` + + In order to provide a custom translator, you will need to subclass this + class and return a different translator. However, due to the way that + dagster assets/jobs/ops are run, you will need to ensure that the custom + translator is _instantiated_ within the get_translator method rather than + simply returning an instance variable. This is because dagster will + serialize/deserialize the config object and any instance variables will + not be preserved. Therefore, any options you'd like to pass to the translator + must be serializable within your custom SQLMeshContextConfig subclass. + + This class provides the minimum configuration required to run dagster-sqlmesh. """ path: str gateway: str config_override: dict[str, t.Any] | None = Field(default_factory=lambda: None) - translator_class_name: str = Field( - default="dagster_sqlmesh.translator.SQLMeshDagsterTranslator", - description="Fully qualified class name of the SQLMesh Dagster translator to use" - ) def get_translator(self) -> "SQLMeshDagsterTranslator": - """Get a translator instance using the configured class name. - - Imports and validates the translator class, then creates a new instance. - The class must inherit from SQLMeshDagsterTranslator. + """Get a translator instance. Override this method to provide a custom translator. Returns: SQLMeshDagsterTranslator: A new instance of the configured translator class @@ -53,29 +83,7 @@ def get_translator(self) -> "SQLMeshDagsterTranslator": ValueError: If the imported object is not a class or does not inherit from SQLMeshDagsterTranslator """ - from importlib import import_module - - from dagster_sqlmesh.translator import SQLMeshDagsterTranslator - - module_name, class_name = self.translator_class_name.rsplit(".", 1) - module = import_module(module_name) - translator_class = getattr(module, class_name) - - # Validate that the imported class inherits from SQLMeshDagsterTranslator - if not inspect.isclass(translator_class): - raise ValueError( - f"'{self.translator_class_name}' is not a class. " - f"Expected a class that inherits from SQLMeshDagsterTranslator." - ) - - if not issubclass(translator_class, SQLMeshDagsterTranslator): - raise ValueError( - f"Translator class '{self.translator_class_name}' must inherit from " - f"SQLMeshDagsterTranslator. Found class that inherits from: " - f"{[base.__name__ for base in translator_class.__bases__]}" - ) - - return translator_class() + return SQLMeshDagsterTranslator() @property def sqlmesh_config(self) -> MeshConfig: diff --git a/dagster_sqlmesh/resource.py b/dagster_sqlmesh/resource.py index 40944e2..e52fb95 100644 --- a/dagster_sqlmesh/resource.py +++ b/dagster_sqlmesh/resource.py @@ -580,13 +580,13 @@ def errors(self) -> list[Exception]: class SQLMeshResource(dg.ConfigurableResource): - config: SQLMeshContextConfig is_testing: bool = False def run( self, context: dg.AssetExecutionContext, *, + config: SQLMeshContextConfig, context_factory: ContextFactory[ContextCls] = DEFAULT_CONTEXT_FACTORY, environment: str = "dev", start: TimeLike | None = None, @@ -618,7 +618,7 @@ def run( [model.fqn for model, _ in mesh.non_external_models_dag()] ) selected_models_set, models_map, select_models = ( - self._get_selected_models_from_context(context=context, models=models) + self._get_selected_models_from_context(context=context, config=config, models=models) ) if all_available_models == selected_models_set or select_models is None: @@ -632,6 +632,7 @@ def run( event_handler = self.create_event_handler( context=context, + config=config, models_map=models_map, dag=dag, prefix="sqlmesh: ", @@ -686,13 +687,14 @@ def create_event_handler( self, *, context: dg.AssetExecutionContext, + config: SQLMeshContextConfig, dag: DAG[str], models_map: dict[str, Model], prefix: str, is_testing: bool, materializations_enabled: bool, ) -> DagsterSQLMeshEventHandler: - translator = self.config.get_translator() + translator = config.get_translator() return DagsterSQLMeshEventHandler( context=context, dag=dag, @@ -704,7 +706,10 @@ def create_event_handler( ) def _get_selected_models_from_context( - self, context: dg.AssetExecutionContext, models: MappingProxyType[str, Model] + self, + context: dg.AssetExecutionContext, + config: SQLMeshContextConfig, + models: MappingProxyType[str, Model] ) -> tuple[set[str], dict[str, Model], list[str] | None]: models_map = models.copy() try: @@ -718,7 +723,7 @@ def _get_selected_models_from_context( else: raise e - translator = self.config.get_translator() + translator = config.get_translator() select_models: list[str] = [] models_map = {} for key, model in models.items(): diff --git a/sample/dagster_project/definitions.py b/sample/dagster_project/definitions.py index 6e22016..9f51a8c 100644 --- a/sample/dagster_project/definitions.py +++ b/sample/dagster_project/definitions.py @@ -30,7 +30,6 @@ sqlmesh_config = SQLMeshContextConfig( path=SQLMESH_PROJECT_PATH, gateway="local", - translator_class_name="definitions.RewrittenSQLMeshTranslator" ) @@ -41,15 +40,18 @@ class RewrittenSQLMeshTranslator(SQLMeshDagsterTranslator): We include this as a test of the translator functionality. """ + def __init__(self, custom_key: str): + self.custom_key = custom_key + def get_asset_key(self, context: Context, fqn: str) -> AssetKey: table = exp.to_table(fqn) # Ensure fqn is a valid table expression if table.db == "sqlmesh_example": # For the sqlmesh_example project, we use a custom key - return AssetKey(["sqlmesh", table.name]) + return AssetKey([self.custom_key, table.name]) return AssetKey([table.db, table.name]) def get_group_name(self, context, model): - return "sqlmesh" + return self.custom_key @asset(key=["sources", "reset_asset"]) @@ -107,9 +109,9 @@ def post_full_model() -> pl.DataFrame: enabled_subsetting=True, ) def sqlmesh_project( - context: AssetExecutionContext, sqlmesh: SQLMeshResource + context: AssetExecutionContext, sqlmesh: SQLMeshResource, sqlmesh_config: SQLMeshContextConfig ) -> t.Iterator[MaterializeResult[t.Any]]: - yield from sqlmesh.run(context) + yield from sqlmesh.run(context, config=sqlmesh_config) all_assets_job = define_asset_job(name="all_assets_job") @@ -117,11 +119,12 @@ def sqlmesh_project( defs = Definitions( assets=[sqlmesh_project, test_source, reset_asset, post_full_model], resources={ - "sqlmesh": SQLMeshResource(config=sqlmesh_config), + "sqlmesh": SQLMeshResource(), "io_manager": DuckDBPolarsIOManager( database=DUCKDB_PATH, schema="sources", ), + "sqlmesh_config": sqlmesh_config, }, jobs=[all_assets_job], ) From f1773102084902a154b7a60918d1a0f1308402b8 Mon Sep 17 00:00:00 2001 From: "Reuven V. Gonzales" Date: Tue, 11 Nov 2025 06:27:19 +0000 Subject: [PATCH 2/5] chore: update lock --- uv.lock | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/uv.lock b/uv.lock index b6a5639..cc40cf0 100644 --- a/uv.lock +++ b/uv.lock @@ -304,7 +304,7 @@ wheels = [ [[package]] name = "dagster-sqlmesh" -version = "0.20.0" +version = "0.21.0" source = { editable = "." } dependencies = [ { name = "dagster" }, From 1354c7b7b251c949533f65ea1318c57779c171c9 Mon Sep 17 00:00:00 2001 From: "Reuven V. Gonzales" Date: Tue, 11 Nov 2025 06:40:50 +0000 Subject: [PATCH 3/5] fix: use custom translator properly in sample dagster --- sample/dagster_project/definitions.py | 15 +++++++++++---- 1 file changed, 11 insertions(+), 4 deletions(-) diff --git a/sample/dagster_project/definitions.py b/sample/dagster_project/definitions.py index 9f51a8c..669d4a7 100644 --- a/sample/dagster_project/definitions.py +++ b/sample/dagster_project/definitions.py @@ -8,6 +8,7 @@ AssetKey, Definitions, MaterializeResult, + ResourceParam, asset, define_asset_job, ) @@ -27,12 +28,18 @@ SQLMESH_CACHE_PATH = os.path.join(SQLMESH_PROJECT_PATH, ".cache") DUCKDB_PATH = os.path.join(CURR_DIR, "../../db.db") -sqlmesh_config = SQLMeshContextConfig( +class CustomSQLMeshContextConfig(SQLMeshContextConfig): + custom_key: str + + def get_translator(self): + return RewrittenSQLMeshTranslator(self.custom_key) + +sqlmesh_config = CustomSQLMeshContextConfig( path=SQLMESH_PROJECT_PATH, gateway="local", + custom_key="custom_sqlmesh_prefix" ) - class RewrittenSQLMeshTranslator(SQLMeshDagsterTranslator): """A contrived SQLMeshDagsterTranslator that flattens the catalog of the sqlmesh project and only uses the table db and name @@ -85,7 +92,7 @@ def test_source() -> pl.DataFrame: ) -@asset(deps=[AssetKey(["sqlmesh", "full_model"])]) +@asset(deps=[AssetKey(["custom_sqlmesh_prefix", "full_model"])]) def post_full_model() -> pl.DataFrame: """An asset that depends on the `full_model` asset from the sqlmesh project. This is used to test that the sqlmesh assets are correctly materialized and @@ -109,7 +116,7 @@ def post_full_model() -> pl.DataFrame: enabled_subsetting=True, ) def sqlmesh_project( - context: AssetExecutionContext, sqlmesh: SQLMeshResource, sqlmesh_config: SQLMeshContextConfig + context: AssetExecutionContext, sqlmesh: SQLMeshResource, sqlmesh_config: ResourceParam[SQLMeshContextConfig] ) -> t.Iterator[MaterializeResult[t.Any]]: yield from sqlmesh.run(context, config=sqlmesh_config) From 2181faa64422e5d41d58ea02739cbc4696d02f1a Mon Sep 17 00:00:00 2001 From: "Reuven V. Gonzales" Date: Tue, 11 Nov 2025 06:44:54 +0000 Subject: [PATCH 4/5] fix: remove test_config --- dagster_sqlmesh/test_config.py | 65 ---------------------------------- 1 file changed, 65 deletions(-) delete mode 100644 dagster_sqlmesh/test_config.py diff --git a/dagster_sqlmesh/test_config.py b/dagster_sqlmesh/test_config.py deleted file mode 100644 index d031b8b..0000000 --- a/dagster_sqlmesh/test_config.py +++ /dev/null @@ -1,65 +0,0 @@ -import pytest - -from dagster_sqlmesh.config import SQLMeshContextConfig -from dagster_sqlmesh.translator import SQLMeshDagsterTranslator - - -def test_get_translator_with_valid_class(): - """Test that get_translator works with the default translator class.""" - config = SQLMeshContextConfig(path="/tmp/test", gateway="local") - translator = config.get_translator() - assert isinstance(translator, SQLMeshDagsterTranslator) - - -def test_get_translator_with_non_class(): - """Test that get_translator raises ValueError when pointing to a non-class.""" - config = SQLMeshContextConfig( - path="/tmp/test", - gateway="local", - translator_class_name="sys.version" - ) - - with pytest.raises(ValueError, match="is not a class"): - config.get_translator() - - -def test_get_translator_with_invalid_inheritance(): - """Test that get_translator raises ValueError when class doesn't inherit from SQLMeshDagsterTranslator.""" - config = SQLMeshContextConfig( - path="/tmp/test", - gateway="local", - translator_class_name="builtins.dict" - ) - - with pytest.raises(ValueError, match="must inherit from SQLMeshDagsterTranslator"): - config.get_translator() - - -def test_get_translator_with_nonexistent_class(): - """Test that get_translator raises AttributeError when class doesn't exist.""" - config = SQLMeshContextConfig( - path="/tmp/test", - gateway="local", - translator_class_name="dagster_sqlmesh.translator.NonexistentClass" - ) - - with pytest.raises(AttributeError): - config.get_translator() - - -class MockValidTranslator(SQLMeshDagsterTranslator): - """A mock translator for testing custom inheritance.""" - pass - - -def test_get_translator_with_valid_custom_class(): - """Test that get_translator works with custom classes that inherit from SQLMeshDagsterTranslator.""" - config = SQLMeshContextConfig( - path="/tmp/test", - gateway="local", - translator_class_name=f"{__name__}.MockValidTranslator" - ) - - translator = config.get_translator() - assert isinstance(translator, SQLMeshDagsterTranslator) - assert isinstance(translator, MockValidTranslator) From a464f8ed7aff501ad66c3e846769323d544c57b5 Mon Sep 17 00:00:00 2001 From: "Reuven V. Gonzales" Date: Tue, 11 Nov 2025 07:02:53 +0000 Subject: [PATCH 5/5] fix: address more issues with moving config around --- dagster_sqlmesh/config.py | 5 ++--- dagster_sqlmesh/resource.py | 7 +++++-- dagster_sqlmesh/test_resource.py | 29 ++++++++++++++++++----------- dagster_sqlmesh/testing/context.py | 12 ++++++------ 4 files changed, 31 insertions(+), 22 deletions(-) diff --git a/dagster_sqlmesh/config.py b/dagster_sqlmesh/config.py index 75d76eb..a30b511 100644 --- a/dagster_sqlmesh/config.py +++ b/dagster_sqlmesh/config.py @@ -7,8 +7,7 @@ from sqlmesh.core.config import Config as MeshConfig from sqlmesh.core.config.loader import load_configs -if t.TYPE_CHECKING: - from dagster_sqlmesh.translator import SQLMeshDagsterTranslator +from dagster_sqlmesh.translator import SQLMeshDagsterTranslator @dataclass @@ -73,7 +72,7 @@ class and return a different translator. However, due to the way that gateway: str config_override: dict[str, t.Any] | None = Field(default_factory=lambda: None) - def get_translator(self) -> "SQLMeshDagsterTranslator": + def get_translator(self) -> SQLMeshDagsterTranslator: """Get a translator instance. Override this method to provide a custom translator. Returns: diff --git a/dagster_sqlmesh/resource.py b/dagster_sqlmesh/resource.py index e52fb95..50e1cc0 100644 --- a/dagster_sqlmesh/resource.py +++ b/dagster_sqlmesh/resource.py @@ -606,7 +606,9 @@ def run( logger = context.log controller = self.get_controller( - context_factory=context_factory, log_override=logger + config=config, + context_factory=context_factory, + log_override=logger ) with controller.instance(environment) as mesh: @@ -738,11 +740,12 @@ def _get_selected_models_from_context( def get_controller( self, + config: SQLMeshContextConfig, context_factory: ContextFactory[ContextCls], log_override: logging.Logger | None = None, ) -> DagsterSQLMeshController[ContextCls]: return DagsterSQLMeshController.setup_with_config( - config=self.config, + config=config, context_factory=context_factory, log_override=log_override, ) diff --git a/dagster_sqlmesh/test_resource.py b/dagster_sqlmesh/test_resource.py index 3563978..0739b06 100644 --- a/dagster_sqlmesh/test_resource.py +++ b/dagster_sqlmesh/test_resource.py @@ -59,10 +59,11 @@ def test_sqlmesh_resource_should_report_no_errors( ): resource = sample_sqlmesh_resource_initialization.resource dg_context = sample_sqlmesh_resource_initialization.dagster_context + context_config = sample_sqlmesh_resource_initialization.test_context.context_config success = True try: - for result in resource.run(dg_context): + for result in resource.run(context=dg_context, config=context_config): pass except PlanOrRunFailedError as e: success = False @@ -83,10 +84,11 @@ def test_sqlmesh_resource_properly_reports_errors( ) resource = sqlmesh_resource_initialization.resource dg_context = sqlmesh_resource_initialization.dagster_context + context_config = sqlmesh_resource_initialization.test_context.context_config caught_failure = False try: - for result in resource.run(dg_context): + for result in resource.run(context=dg_context, config=context_config): pass except PlanOrRunFailedError as e: caught_failure = True @@ -106,6 +108,7 @@ def test_sqlmesh_resource_properly_reports_errors_not_thrown( ): dg_context = sample_sqlmesh_resource_initialization.dagster_context resource = sample_sqlmesh_resource_initialization.resource + context_config = sample_sqlmesh_resource_initialization.test_context.context_config def event_handler_factory( *args: t.Any, **kwargs: t.Any @@ -120,7 +123,7 @@ def event_handler_factory( caught_failure = False try: - for result in resource.run(dg_context): + for result in resource.run(context=dg_context, config=context_config): pass except PlanOrRunFailedError as e: caught_failure = True @@ -151,17 +154,19 @@ def test_sqlmesh_resource_should_properly_materialize_results_when_no_plan_is_ru resource = sample_sqlmesh_resource_initialization.resource dg_context = sample_sqlmesh_resource_initialization.dagster_context dg_instance = sample_sqlmesh_resource_initialization.dagster_instance + context_config = sample_sqlmesh_resource_initialization.test_context.context_config # First run should materialize all models initial_results: list[dg.MaterializeResult] = [] - for result in resource.run(dg_context): + for result in resource.run(context=dg_context, config=context_config): initial_results.append(result) assert result.asset_key is not None, "Expected asset key to be present." - dg_instance.report_runless_asset_event(dg.AssetMaterialization( - asset_key=result.asset_key, - metadata=result.metadata, - )) - + dg_instance.report_runless_asset_event( + dg.AssetMaterialization( + asset_key=result.asset_key, + metadata=result.metadata, + ) + ) # All metadata times should be set to the same time initial_times: set[float] = set() @@ -180,7 +185,7 @@ def test_sqlmesh_resource_should_properly_materialize_results_when_no_plan_is_ru # Second run should also materialize all models second_results: list[dg.MaterializeResult] = [] - for result in resource.run(dg_context): + for result in resource.run(context=dg_context, config=context_config): second_results.append(result) assert len(second_results) > 0, "Expected second results to be non-empty." @@ -204,7 +209,9 @@ def test_sqlmesh_resource_should_properly_materialize_results_when_no_plan_is_ru # Third run will restate the full model third_results: list[dg.MaterializeResult] = [] for result in resource.run( - dg_context, restate_models=["sqlmesh_example.full_model"] + context=dg_context, + config=context_config, + restate_models=["sqlmesh_example.full_model"], ): third_results.append(result) diff --git a/dagster_sqlmesh/testing/context.py b/dagster_sqlmesh/testing/context.py index 866873b..2137653 100644 --- a/dagster_sqlmesh/testing/context.py +++ b/dagster_sqlmesh/testing/context.py @@ -56,8 +56,8 @@ class TestSQLMeshResource(SQLMeshResource): It allows for easy setup and teardown of the SQLMesh context. """ - def __init__(self, config: SQLMeshContextConfig, is_testing: bool = False): - super().__init__(config=config, is_testing=is_testing) + def __init__(self, is_testing: bool = False): + super().__init__(is_testing=is_testing) def default_event_handler_factory(*args: t.Any, **kwargs: t.Any) -> DagsterSQLMeshEventHandler: """Default event handler factory for the SQLMesh resource.""" return DagsterSQLMeshEventHandler(*args, **kwargs) @@ -82,7 +82,9 @@ def create_event_handler(self, *args: t.Any, **kwargs: t.Any) -> DagsterSQLMeshE DagsterSQLMeshEventHandler: The created event handler. """ # Ensure translator is passed to the event handler factory - kwargs['translator'] = self.config.get_translator() + # FIXME: this is a hack to deal with an older signature that didn't expected the config + config = t.cast(SQLMeshContextConfig, kwargs.pop("config")) + kwargs["translator"] = config.get_translator() return self._event_handler_factory(*args, **kwargs) @@ -99,9 +101,7 @@ def create_controller(self) -> DagsterSQLMeshController[Context]: ) def create_resource(self) -> TestSQLMeshResource: - return TestSQLMeshResource( - config=self.context_config, is_testing=True, - ) + return TestSQLMeshResource(is_testing=True) def query(self, *args: t.Any, **kwargs: t.Any) -> list[t.Any]: conn = duckdb.connect(self.db_path)