Skip to content

Commit 4c9d887

Browse files
committed
feat: Allow servers to express supported endpoints with ConfigResponse
1 parent b0a7878 commit 4c9d887

File tree

2 files changed

+394
-14
lines changed

2 files changed

+394
-14
lines changed

pyiceberg/catalog/rest/__init__.py

Lines changed: 166 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -21,7 +21,7 @@
2121
Union,
2222
)
2323

24-
from pydantic import Field, field_validator
24+
from pydantic import ConfigDict, Field, field_validator
2525
from requests import HTTPError, Session
2626
from tenacity import RetryCallState, retry, retry_if_exception_type, stop_after_attempt
2727

@@ -76,6 +76,43 @@
7676
import pyarrow as pa
7777

7878

79+
class HttpMethod(str, Enum):
80+
GET = "GET"
81+
HEAD = "HEAD"
82+
POST = "POST"
83+
DELETE = "DELETE"
84+
85+
86+
class Endpoint(IcebergBaseModel):
87+
model_config = ConfigDict(frozen=True)
88+
89+
http_method: HttpMethod = Field()
90+
path: str = Field()
91+
92+
@field_validator("path", mode="before")
93+
@classmethod
94+
def _validate_path(cls, raw_path: str) -> str:
95+
if not raw_path:
96+
raise ValueError("Invalid path: empty")
97+
raw_path = raw_path.strip()
98+
if not raw_path:
99+
raise ValueError("Invalid path: empty")
100+
return raw_path
101+
102+
def __str__(self) -> str:
103+
"""Return the string representation of the Endpoint class."""
104+
return f"{self.http_method.value} {self.path}"
105+
106+
@classmethod
107+
def from_string(cls, endpoint: str | None) -> "Endpoint":
108+
if endpoint is None:
109+
raise ValueError("Invalid endpoint (must consist of 'METHOD /path'): None")
110+
elements = endpoint.split(None, 1)
111+
if len(elements) != 2:
112+
raise ValueError(f"Invalid endpoint (must consist of two elements separated by a single space): {endpoint}")
113+
return cls(http_method=HttpMethod(elements[0].upper()), path=elements[1])
114+
115+
79116
class Endpoints:
80117
get_config: str = "config"
81118
list_namespaces: str = "namespaces"
@@ -86,7 +123,7 @@ class Endpoints:
86123
namespace_exists: str = "namespaces/{namespace}"
87124
list_tables: str = "namespaces/{namespace}/tables"
88125
create_table: str = "namespaces/{namespace}/tables"
89-
register_table = "namespaces/{namespace}/register"
126+
register_table: str = "namespaces/{namespace}/register"
90127
load_table: str = "namespaces/{namespace}/tables/{table}"
91128
update_table: str = "namespaces/{namespace}/tables/{table}"
92129
drop_table: str = "namespaces/{namespace}/tables/{table}"
@@ -100,6 +137,66 @@ class Endpoints:
100137
fetch_scan_tasks: str = "namespaces/{namespace}/tables/{table}/tasks"
101138

102139

140+
class Capability:
141+
V1_LIST_NAMESPACES = Endpoint(http_method=HttpMethod.GET, path="/v1/{prefix}/namespaces")
142+
V1_LOAD_NAMESPACE = Endpoint(http_method=HttpMethod.GET, path="/v1/{prefix}/namespaces/{namespace}")
143+
V1_NAMESPACE_EXISTS = Endpoint(http_method=HttpMethod.HEAD, path="/v1/{prefix}/namespaces/{namespace}")
144+
V1_UPDATE_NAMESPACE = Endpoint(http_method=HttpMethod.POST, path="/v1/{prefix}/namespaces/{namespace}/properties")
145+
V1_CREATE_NAMESPACE = Endpoint(http_method=HttpMethod.POST, path="/v1/{prefix}/namespaces")
146+
V1_DELETE_NAMESPACE = Endpoint(http_method=HttpMethod.DELETE, path="/v1/{prefix}/namespaces/{namespace}")
147+
148+
V1_LIST_TABLES = Endpoint(http_method=HttpMethod.GET, path="/v1/{prefix}/namespaces/{namespace}/tables")
149+
V1_LOAD_TABLE = Endpoint(http_method=HttpMethod.GET, path="/v1/{prefix}/namespaces/{namespace}/tables/{table}")
150+
V1_TABLE_EXISTS = Endpoint(http_method=HttpMethod.HEAD, path="/v1/{prefix}/namespaces/{namespace}/tables/{table}")
151+
V1_CREATE_TABLE = Endpoint(http_method=HttpMethod.POST, path="/v1/{prefix}/namespaces/{namespace}/tables")
152+
V1_UPDATE_TABLE = Endpoint(http_method=HttpMethod.POST, path="/v1/{prefix}/namespaces/{namespace}/tables/{table}")
153+
V1_DELETE_TABLE = Endpoint(http_method=HttpMethod.DELETE, path="/v1/{prefix}/namespaces/{namespace}/tables/{table}")
154+
V1_RENAME_TABLE = Endpoint(http_method=HttpMethod.POST, path="/v1/{prefix}/tables/rename")
155+
V1_REGISTER_TABLE = Endpoint(http_method=HttpMethod.POST, path="/v1/{prefix}/namespaces/{namespace}/register")
156+
157+
V1_LIST_VIEWS = Endpoint(http_method=HttpMethod.GET, path="/v1/{prefix}/namespaces/{namespace}/views")
158+
V1_LOAD_VIEW = Endpoint(http_method=HttpMethod.GET, path="/v1/{prefix}/namespaces/{namespace}/views/{view}")
159+
V1_VIEW_EXISTS = Endpoint(http_method=HttpMethod.HEAD, path="/v1/{prefix}/namespaces/{namespace}/views/{view}")
160+
V1_CREATE_VIEW = Endpoint(http_method=HttpMethod.POST, path="/v1/{prefix}/namespaces/{namespace}/views")
161+
V1_UPDATE_VIEW = Endpoint(http_method=HttpMethod.POST, path="/v1/{prefix}/namespaces/{namespace}/views/{view}")
162+
V1_DELETE_VIEW = Endpoint(http_method=HttpMethod.DELETE, path="/v1/{prefix}/namespaces/{namespace}/views/{view}")
163+
V1_RENAME_VIEW = Endpoint(http_method=HttpMethod.POST, path="/v1/{prefix}/views/rename")
164+
V1_SUBMIT_TABLE_SCAN_PLAN = Endpoint(
165+
http_method=HttpMethod.POST, path="/v1/{prefix}/namespaces/{namespace}/tables/{table}/plan"
166+
)
167+
V1_TABLE_SCAN_PLAN_TASKS = Endpoint(
168+
http_method=HttpMethod.POST, path="/v1/{prefix}/namespaces/{namespace}/tables/{table}/tasks"
169+
)
170+
171+
172+
# Default endpoints for backwards compatibility with legacy servers that don't return endpoints
173+
# in ConfigResponse. Only includes namespace and table endpoints.
174+
DEFAULT_ENDPOINTS: frozenset[Endpoint] = frozenset(
175+
(
176+
Capability.V1_LIST_NAMESPACES,
177+
Capability.V1_LOAD_NAMESPACE,
178+
Capability.V1_CREATE_NAMESPACE,
179+
Capability.V1_UPDATE_NAMESPACE,
180+
Capability.V1_DELETE_NAMESPACE,
181+
Capability.V1_LIST_TABLES,
182+
Capability.V1_LOAD_TABLE,
183+
Capability.V1_CREATE_TABLE,
184+
Capability.V1_UPDATE_TABLE,
185+
Capability.V1_DELETE_TABLE,
186+
Capability.V1_RENAME_TABLE,
187+
Capability.V1_REGISTER_TABLE,
188+
)
189+
)
190+
191+
# View endpoints conditionally added based on VIEW_ENDPOINTS_SUPPORTED property.
192+
VIEW_ENDPOINTS: frozenset[Endpoint] = frozenset(
193+
(
194+
Capability.V1_LIST_VIEWS,
195+
Capability.V1_DELETE_VIEW,
196+
)
197+
)
198+
199+
103200
class IdentifierKind(Enum):
104201
TABLE = "table"
105202
VIEW = "view"
@@ -134,6 +231,8 @@ class IdentifierKind(Enum):
134231
CUSTOM = "custom"
135232
REST_SCAN_PLANNING_ENABLED = "rest-scan-planning-enabled"
136233
REST_SCAN_PLANNING_ENABLED_DEFAULT = False
234+
VIEW_ENDPOINTS_SUPPORTED = "view-endpoints-supported"
235+
VIEW_ENDPOINTS_SUPPORTED_DEFAULT = False
137236

138237
NAMESPACE_SEPARATOR = b"\x1f".decode(UTF8)
139238

@@ -180,6 +279,14 @@ class RegisterTableRequest(IcebergBaseModel):
180279
class ConfigResponse(IcebergBaseModel):
181280
defaults: Properties | None = Field(default_factory=dict)
182281
overrides: Properties | None = Field(default_factory=dict)
282+
endpoints: set[Endpoint] | None = Field(default=None)
283+
284+
@field_validator("endpoints", mode="before")
285+
@classmethod
286+
def _parse_endpoints(cls, v: list[str] | None) -> set[Endpoint] | None:
287+
if v is None:
288+
return None
289+
return {Endpoint.from_string(s) for s in v}
183290

184291

185292
class ListNamespaceResponse(IcebergBaseModel):
@@ -218,6 +325,7 @@ class ListViewsResponse(IcebergBaseModel):
218325
class RestCatalog(Catalog):
219326
uri: str
220327
_session: Session
328+
_supported_endpoints: set[Endpoint]
221329

222330
def __init__(self, name: str, **properties: str):
223331
"""Rest Catalog.
@@ -279,7 +387,9 @@ def is_rest_scan_planning_enabled(self) -> bool:
279387
Returns:
280388
True if enabled, False otherwise.
281389
"""
282-
return property_as_bool(self.properties, REST_SCAN_PLANNING_ENABLED, REST_SCAN_PLANNING_ENABLED_DEFAULT)
390+
return Capability.V1_SUBMIT_TABLE_SCAN_PLAN in self._supported_endpoints and property_as_bool(
391+
self.properties, REST_SCAN_PLANNING_ENABLED, REST_SCAN_PLANNING_ENABLED_DEFAULT
392+
)
283393

284394
def _create_legacy_oauth2_auth_manager(self, session: Session) -> AuthManager:
285395
"""Create the LegacyOAuth2AuthManager by fetching required properties.
@@ -327,6 +437,18 @@ def url(self, endpoint: str, prefixed: bool = True, **kwargs: Any) -> str:
327437

328438
return url + endpoint.format(**kwargs)
329439

440+
def _check_endpoint(self, endpoint: Endpoint) -> None:
441+
"""Check if an endpoint is supported by the server.
442+
443+
Args:
444+
endpoint: The endpoint to check against the set of supported endpoints
445+
446+
Raises:
447+
NotImplementedError: If the endpoint is not supported.
448+
"""
449+
if endpoint not in self._supported_endpoints:
450+
raise NotImplementedError(f"Server does not support endpoint: {endpoint}")
451+
330452
@property
331453
def auth_url(self) -> str:
332454
self._warn_oauth_tokens_deprecation()
@@ -384,6 +506,17 @@ def _fetch_config(self) -> None:
384506
# Update URI based on overrides
385507
self.uri = config[URI]
386508

509+
# Determine supported endpoints
510+
endpoints = config_response.endpoints
511+
if endpoints:
512+
self._supported_endpoints = set(endpoints)
513+
else:
514+
# Use default endpoints for legacy servers that don't return endpoints
515+
self._supported_endpoints = set(DEFAULT_ENDPOINTS)
516+
# Conditionally add view endpoints based on config
517+
if property_as_bool(self.properties, VIEW_ENDPOINTS_SUPPORTED, VIEW_ENDPOINTS_SUPPORTED_DEFAULT):
518+
self._supported_endpoints.update(VIEW_ENDPOINTS)
519+
387520
def _identifier_to_validated_tuple(self, identifier: str | Identifier) -> Identifier:
388521
identifier_tuple = self.identifier_to_tuple(identifier)
389522
if len(identifier_tuple) <= 1:
@@ -503,6 +636,7 @@ def _create_table(
503636
properties: Properties = EMPTY_DICT,
504637
stage_create: bool = False,
505638
) -> TableResponse:
639+
self._check_endpoint(Capability.V1_CREATE_TABLE)
506640
iceberg_schema = self._convert_schema_if_needed(
507641
schema,
508642
int(properties.get(TableProperties.FORMAT_VERSION, TableProperties.DEFAULT_FORMAT_VERSION)), # type: ignore
@@ -591,6 +725,7 @@ def register_table(self, identifier: str | Identifier, metadata_location: str) -
591725
Raises:
592726
TableAlreadyExistsError: If the table already exists
593727
"""
728+
self._check_endpoint(Capability.V1_REGISTER_TABLE)
594729
namespace_and_table = self._split_identifier_for_path(identifier)
595730
request = RegisterTableRequest(
596731
name=namespace_and_table["table"],
@@ -611,6 +746,7 @@ def register_table(self, identifier: str | Identifier, metadata_location: str) -
611746

612747
@retry(**_RETRY_ARGS)
613748
def list_tables(self, namespace: str | Identifier) -> list[Identifier]:
749+
self._check_endpoint(Capability.V1_LIST_TABLES)
614750
namespace_tuple = self._check_valid_namespace_identifier(namespace)
615751
namespace_concat = NAMESPACE_SEPARATOR.join(namespace_tuple)
616752
response = self._session.get(self.url(Endpoints.list_tables, namespace=namespace_concat))
@@ -622,6 +758,7 @@ def list_tables(self, namespace: str | Identifier) -> list[Identifier]:
622758

623759
@retry(**_RETRY_ARGS)
624760
def load_table(self, identifier: str | Identifier) -> Table:
761+
self._check_endpoint(Capability.V1_LOAD_TABLE)
625762
params = {}
626763
if mode := self.properties.get(SNAPSHOT_LOADING_MODE):
627764
if mode in {"all", "refs"}:
@@ -642,6 +779,7 @@ def load_table(self, identifier: str | Identifier) -> Table:
642779

643780
@retry(**_RETRY_ARGS)
644781
def drop_table(self, identifier: str | Identifier, purge_requested: bool = False) -> None:
782+
self._check_endpoint(Capability.V1_DELETE_TABLE)
645783
response = self._session.delete(
646784
self.url(Endpoints.drop_table, prefixed=True, **self._split_identifier_for_path(identifier)),
647785
params={"purgeRequested": purge_requested},
@@ -657,6 +795,7 @@ def purge_table(self, identifier: str | Identifier) -> None:
657795

658796
@retry(**_RETRY_ARGS)
659797
def rename_table(self, from_identifier: str | Identifier, to_identifier: str | Identifier) -> Table:
798+
self._check_endpoint(Capability.V1_RENAME_TABLE)
660799
payload = {
661800
"source": self._split_identifier_for_json(from_identifier),
662801
"destination": self._split_identifier_for_json(to_identifier),
@@ -692,6 +831,8 @@ def _remove_catalog_name_from_table_request_identifier(self, table_request: Comm
692831

693832
@retry(**_RETRY_ARGS)
694833
def list_views(self, namespace: str | Identifier) -> list[Identifier]:
834+
if Capability.V1_LIST_VIEWS not in self._supported_endpoints:
835+
return []
695836
namespace_tuple = self._check_valid_namespace_identifier(namespace)
696837
namespace_concat = NAMESPACE_SEPARATOR.join(namespace_tuple)
697838
response = self._session.get(self.url(Endpoints.list_views, namespace=namespace_concat))
@@ -720,6 +861,7 @@ def commit_table(
720861
CommitFailedException: Requirement not met, or a conflict with a concurrent commit.
721862
CommitStateUnknownException: Failed due to an internal exception on the side of the catalog.
722863
"""
864+
self._check_endpoint(Capability.V1_UPDATE_TABLE)
723865
identifier = table.name()
724866
table_identifier = TableIdentifier(namespace=identifier[:-1], name=identifier[-1])
725867
table_request = CommitTableRequest(identifier=table_identifier, requirements=requirements, updates=updates)
@@ -749,6 +891,7 @@ def commit_table(
749891

750892
@retry(**_RETRY_ARGS)
751893
def create_namespace(self, namespace: str | Identifier, properties: Properties = EMPTY_DICT) -> None:
894+
self._check_endpoint(Capability.V1_CREATE_NAMESPACE)
752895
namespace_tuple = self._check_valid_namespace_identifier(namespace)
753896
payload = {"namespace": namespace_tuple, "properties": properties}
754897
response = self._session.post(self.url(Endpoints.create_namespace), json=payload)
@@ -759,6 +902,7 @@ def create_namespace(self, namespace: str | Identifier, properties: Properties =
759902

760903
@retry(**_RETRY_ARGS)
761904
def drop_namespace(self, namespace: str | Identifier) -> None:
905+
self._check_endpoint(Capability.V1_DELETE_NAMESPACE)
762906
namespace_tuple = self._check_valid_namespace_identifier(namespace)
763907
namespace = NAMESPACE_SEPARATOR.join(namespace_tuple)
764908
response = self._session.delete(self.url(Endpoints.drop_namespace, namespace=namespace))
@@ -769,6 +913,7 @@ def drop_namespace(self, namespace: str | Identifier) -> None:
769913

770914
@retry(**_RETRY_ARGS)
771915
def list_namespaces(self, namespace: str | Identifier = ()) -> list[Identifier]:
916+
self._check_endpoint(Capability.V1_LIST_NAMESPACES)
772917
namespace_tuple = self.identifier_to_tuple(namespace)
773918
response = self._session.get(
774919
self.url(
@@ -786,6 +931,7 @@ def list_namespaces(self, namespace: str | Identifier = ()) -> list[Identifier]:
786931

787932
@retry(**_RETRY_ARGS)
788933
def load_namespace_properties(self, namespace: str | Identifier) -> Properties:
934+
self._check_endpoint(Capability.V1_LOAD_NAMESPACE)
789935
namespace_tuple = self._check_valid_namespace_identifier(namespace)
790936
namespace = NAMESPACE_SEPARATOR.join(namespace_tuple)
791937
response = self._session.get(self.url(Endpoints.load_namespace_metadata, namespace=namespace))
@@ -800,6 +946,7 @@ def load_namespace_properties(self, namespace: str | Identifier) -> Properties:
800946
def update_namespace_properties(
801947
self, namespace: str | Identifier, removals: set[str] | None = None, updates: Properties = EMPTY_DICT
802948
) -> PropertiesUpdateSummary:
949+
self._check_endpoint(Capability.V1_UPDATE_NAMESPACE)
803950
namespace_tuple = self._check_valid_namespace_identifier(namespace)
804951
namespace = NAMESPACE_SEPARATOR.join(namespace_tuple)
805952
payload = {"removals": list(removals or []), "updates": updates}
@@ -819,6 +966,14 @@ def update_namespace_properties(
819966
def namespace_exists(self, namespace: str | Identifier) -> bool:
820967
namespace_tuple = self._check_valid_namespace_identifier(namespace)
821968
namespace = NAMESPACE_SEPARATOR.join(namespace_tuple)
969+
970+
if Capability.V1_NAMESPACE_EXISTS not in self._supported_endpoints:
971+
try:
972+
self.load_namespace_properties(namespace_tuple)
973+
return True
974+
except NoSuchNamespaceError:
975+
return False
976+
822977
response = self._session.head(self.url(Endpoints.namespace_exists, namespace=namespace))
823978

824979
if response.status_code == 404:
@@ -843,6 +998,13 @@ def table_exists(self, identifier: str | Identifier) -> bool:
843998
Returns:
844999
bool: True if the table exists, False otherwise.
8451000
"""
1001+
if Capability.V1_TABLE_EXISTS not in self._supported_endpoints:
1002+
try:
1003+
self.load_table(identifier)
1004+
return True
1005+
except NoSuchTableError:
1006+
return False
1007+
8461008
response = self._session.head(
8471009
self.url(Endpoints.load_table, prefixed=True, **self._split_identifier_for_path(identifier))
8481010
)
@@ -886,6 +1048,7 @@ def view_exists(self, identifier: str | Identifier) -> bool:
8861048

8871049
@retry(**_RETRY_ARGS)
8881050
def drop_view(self, identifier: str) -> None:
1051+
self._check_endpoint(Capability.V1_DELETE_VIEW)
8891052
response = self._session.delete(
8901053
self.url(Endpoints.drop_view, prefixed=True, **self._split_identifier_for_path(identifier, IdentifierKind.VIEW)),
8911054
)

0 commit comments

Comments
 (0)