Skip to content

Commit b8717f1

Browse files
committed
Add create_sql_view method to Catalog
1 parent 75cd77f commit b8717f1

4 files changed

Lines changed: 90 additions & 18 deletions

File tree

pyiceberg/catalog/__init__.py

Lines changed: 45 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -20,6 +20,7 @@
2020
import importlib
2121
import logging
2222
import re
23+
import time
2324
import uuid
2425
from abc import ABC, abstractmethod
2526
from collections.abc import Callable
@@ -71,7 +72,7 @@
7172
from pyiceberg.utils.config import Config, merge_config
7273
from pyiceberg.utils.properties import property_as_bool
7374
from pyiceberg.view import View
74-
from pyiceberg.view.metadata import ViewVersion
75+
from pyiceberg.view.metadata import SQLViewRepresentation, ViewVersion
7576

7677
if TYPE_CHECKING:
7778
import pyarrow as pa
@@ -744,6 +745,49 @@ def create_view(
744745
ViewAlreadyExistsError: If a view with the name already exists.
745746
"""
746747

748+
def create_sql_view(
749+
self,
750+
identifier: str | Identifier,
751+
schema: Schema | pa.Schema,
752+
dialect: str,
753+
sql: str,
754+
default_namespace: str | Identifier,
755+
location: str | None = None,
756+
properties: Properties = EMPTY_DICT,
757+
default_catalog: str | None = None,
758+
) -> View:
759+
"""Create a view.
760+
761+
Args:
762+
identifier (str | Identifier): View identifier.
763+
schema (Schema): View's schema.
764+
dialect (str): SQL dialect for the view.
765+
sql (str): SQL for the view.
766+
default_namespace (str | Identifier): Default namespace name.
767+
location (str | None): Location for the view. Optional Argument.
768+
properties (Properties): View properties that can be a string based dictionary.
769+
default_catalog (str | None): Default catalog name. Optional Argument.
770+
771+
Returns:
772+
View: the created view instance.
773+
774+
Raises:
775+
ViewAlreadyExistsError: If a view with the name already exists.
776+
"""
777+
iceberg_schema = self._convert_schema_if_needed(schema)
778+
namespace_tuple = Catalog.identifier_to_tuple(default_namespace)
779+
780+
view_version = ViewVersion(
781+
version_id=1,
782+
schema_id=iceberg_schema.schema_id,
783+
timestamp_ms=int(time.time() * 1000),
784+
summary={}, # TODO Set summary field like EnvironmentContext of Iceberg Java
785+
representations=[SQLViewRepresentation(type="sql", dialect=dialect, sql=sql)],
786+
default_catalog=default_catalog,
787+
default_namespace=namespace_tuple,
788+
)
789+
return self.create_view(identifier, iceberg_schema, view_version, location, properties)
790+
747791
@staticmethod
748792
def identifier_to_tuple(identifier: str | Identifier) -> Identifier:
749793
"""Parse an identifier to a tuple.

tests/integration/test_catalog.py

Lines changed: 17 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -653,6 +653,23 @@ def test_rest_create_view(
653653
assert rest_catalog.load_view(identifier).schema() == view.schema()
654654

655655

656+
@pytest.mark.integration
657+
def test_rest_create_sql_view(
658+
rest_catalog: RestCatalog, example_view_metadata_v1: dict[str, Any], database_name: str, view_name: str
659+
) -> None:
660+
identifier = (database_name, view_name)
661+
662+
rest_catalog.create_namespace_if_not_exists(database_name)
663+
view = View(identifier, ViewMetadata.model_validate(example_view_metadata_v1))
664+
665+
assert not rest_catalog.view_exists(identifier)
666+
667+
rest_catalog.create_sql_view(identifier, view.schema(), "spark", "SELECT * FROM prod.db.table", "default")
668+
669+
assert rest_catalog.view_exists(identifier)
670+
assert rest_catalog.load_view(identifier).schema() == view.schema()
671+
672+
656673
@pytest.mark.integration
657674
def test_rest_drop_view(
658675
rest_catalog: RestCatalog, example_view_metadata_v1: dict[str, Any], database_name: str, view_name: str

tests/integration/test_rest_catalog.py

Lines changed: 1 addition & 17 deletions
Original file line numberDiff line numberDiff line change
@@ -16,15 +16,13 @@
1616
# under the License.
1717
# pylint:disable=redefined-outer-name
1818

19-
import time
2019

2120
import pytest
2221
from pytest_lazy_fixtures import lf
2322

2423
from pyiceberg.catalog.rest import RestCatalog
2524
from pyiceberg.exceptions import NoSuchViewError
2625
from pyiceberg.schema import Schema
27-
from pyiceberg.view.metadata import SQLViewRepresentation, ViewVersion
2826

2927
TEST_NAMESPACE_IDENTIFIER = "TEST NS"
3028

@@ -76,21 +74,7 @@ def test_load_view(catalog: RestCatalog, table_schema_nested: Schema, database_n
7674
if not catalog.namespace_exists(database_name):
7775
catalog.create_namespace(database_name)
7876

79-
view_version = ViewVersion(
80-
version_id=1,
81-
schema_id=1,
82-
timestamp_ms=int(time.time() * 1000),
83-
summary={},
84-
representations=[
85-
SQLViewRepresentation(
86-
type="sql",
87-
sql="SELECT 1 as some_col",
88-
dialect="spark",
89-
)
90-
],
91-
default_namespace=["default"],
92-
)
93-
view = catalog.create_view(identifier, table_schema_nested, view_version=view_version)
77+
view = catalog.create_sql_view(identifier, table_schema_nested, "spark", "SELECT 1 as some_col", "default")
9478
loaded_view = catalog.load_view(identifier)
9579
assert view == loaded_view
9680

tests/integration/test_writes/test_writes.py

Lines changed: 27 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -1799,6 +1799,33 @@ def test_create_view(
17991799
session_catalog.drop_view(identifier) # clean up
18001800

18011801

1802+
@pytest.mark.integration
1803+
def test_create_sql_view(
1804+
spark: SparkSession,
1805+
session_catalog: Catalog,
1806+
) -> None:
1807+
# Create a view using the REST Catalog.
1808+
identifier = "default.some_view"
1809+
schema = pa.schema([pa.field("some_col", pa.int32())])
1810+
session_catalog.create_sql_view(
1811+
identifier=identifier,
1812+
schema=schema,
1813+
dialect="spark",
1814+
sql="SELECT 1 as some_col",
1815+
default_namespace="default",
1816+
)
1817+
1818+
# Ensure the view exists.
1819+
assert session_catalog.view_exists(identifier)
1820+
1821+
# Query the view in spark to ensure it was properly created.
1822+
df = spark.table(identifier)
1823+
assert df.count() == 1
1824+
assert df.collect()[0].some_col == 1
1825+
1826+
session_catalog.drop_view(identifier) # clean up
1827+
1828+
18021829
@pytest.mark.integration
18031830
def test_view_exists(
18041831
spark: SparkSession,

0 commit comments

Comments
 (0)