Skip to content

Commit 7b6744e

Browse files
Gayathri Srividya RajavarapuGayathri Srividya Rajavarapu
authored andcommitted
Fix: implement lazy pagination iterator for catalog list methods (#3365)
- Changed list_tables, list_namespaces, list_views return types from list[Identifier] to Iterator[Identifier] in the base Catalog abstract class and all implementations - REST catalog uses true generators with per-page HTTP fetch helpers (decorated with @Retry) to correctly support auth retry logic - Other catalogs (SQL, DynamoDB, Glue, Hive, BigQuery, NoOp) return iter() of their results - CLI and output updated to work with iterators - All test files updated to wrap list_* calls with list() where needed Closes #3365
1 parent 75cd77f commit 7b6744e

16 files changed

Lines changed: 168 additions & 152 deletions

pyiceberg/catalog/__init__.py

Lines changed: 7 additions & 6 deletions
Original file line numberDiff line numberDiff line change
@@ -25,6 +25,7 @@
2525
from collections.abc import Callable
2626
from dataclasses import dataclass
2727
from enum import Enum
28+
from collections.abc import Iterator
2829
from typing import (
2930
TYPE_CHECKING,
3031
Any,
@@ -607,42 +608,42 @@ def drop_namespace(self, namespace: str | Identifier) -> None:
607608
"""
608609

609610
@abstractmethod
610-
def list_tables(self, namespace: str | Identifier) -> list[Identifier]:
611+
def list_tables(self, namespace: str | Identifier) -> Iterator[Identifier]:
611612
"""List tables under the given namespace in the catalog.
612613
613614
Args:
614615
namespace (str | Identifier): Namespace identifier to search.
615616
616617
Returns:
617-
List[Identifier]: list of table identifiers.
618+
Iterator[Identifier]: an iterator of table identifiers.
618619
619620
Raises:
620621
NoSuchNamespaceError: If a namespace with the given name does not exist.
621622
"""
622623

623624
@abstractmethod
624-
def list_namespaces(self, namespace: str | Identifier = ()) -> list[Identifier]:
625+
def list_namespaces(self, namespace: str | Identifier = ()) -> Iterator[Identifier]:
625626
"""List namespaces from the given namespace. If not given, list top-level namespaces from the catalog.
626627
627628
Args:
628629
namespace (str | Identifier): Namespace identifier to search.
629630
630631
Returns:
631-
List[Identifier]: a List of namespace identifiers.
632+
Iterator[Identifier]: an iterator of namespace identifiers.
632633
633634
Raises:
634635
NoSuchNamespaceError: If a namespace with the given name does not exist.
635636
"""
636637

637638
@abstractmethod
638-
def list_views(self, namespace: str | Identifier) -> list[Identifier]:
639+
def list_views(self, namespace: str | Identifier) -> Iterator[Identifier]:
639640
"""List views under the given namespace in the catalog.
640641
641642
Args:
642643
namespace (str | Identifier): Namespace identifier to search.
643644
644645
Returns:
645-
List[Identifier]: list of table identifiers.
646+
Iterator[Identifier]: an iterator of view identifiers.
646647
647648
Raises:
648649
NoSuchNamespaceError: If a namespace with the given name does not exist.

pyiceberg/catalog/bigquery_metastore.py

Lines changed: 6 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -17,6 +17,7 @@
1717
from __future__ import annotations
1818

1919
import json
20+
from collections.abc import Iterator
2021
from typing import TYPE_CHECKING, Any
2122

2223
from google.api_core.exceptions import NotFound
@@ -252,7 +253,7 @@ def drop_namespace(self, namespace: str | Identifier) -> None:
252253
raise NoSuchNamespaceError(f"Namespace {namespace} does not exist.") from e
253254

254255
@override
255-
def list_tables(self, namespace: str | Identifier) -> list[Identifier]:
256+
def list_tables(self, namespace: str | Identifier) -> Iterator[Identifier]:
256257
database_name = self.identifier_to_database(namespace)
257258
iceberg_tables: list[Identifier] = []
258259
try:
@@ -264,18 +265,18 @@ def list_tables(self, namespace: str | Identifier) -> list[Identifier]:
264265
iceberg_tables.append((database_name, bq_table_list_item.table_id))
265266
except NotFound:
266267
raise NoSuchNamespaceError(f"Namespace (dataset) '{database_name}' not found.") from None
267-
return iceberg_tables
268+
return iter(iceberg_tables)
268269

269270
@override
270-
def list_namespaces(self, namespace: str | Identifier = ()) -> list[Identifier]:
271+
def list_namespaces(self, namespace: str | Identifier = ()) -> Iterator[Identifier]:
271272
# Since this catalog only supports one-level namespaces, it always returns an empty list unless
272273
# passed an empty namespace to list all namespaces within the catalog.
273274
if namespace:
274275
raise NoSuchNamespaceError(f"Namespace (dataset) '{namespace}' not found.") from None
275276

276277
# List top-level datasets
277278
datasets_iterator = self.client.list_datasets()
278-
return [(dataset.dataset_id,) for dataset in datasets_iterator]
279+
return iter([(dataset.dataset_id,) for dataset in datasets_iterator])
279280

280281
@override
281282
def register_table(self, identifier: str | Identifier, metadata_location: str, overwrite: bool = False) -> Table:
@@ -314,7 +315,7 @@ def register_table(self, identifier: str | Identifier, metadata_location: str, o
314315
return self.load_table(identifier=identifier)
315316

316317
@override
317-
def list_views(self, namespace: str | Identifier) -> list[Identifier]:
318+
def list_views(self, namespace: str | Identifier) -> Iterator[Identifier]:
318319
raise NotImplementedError
319320

320321
@override

pyiceberg/catalog/dynamodb.py

Lines changed: 13 additions & 9 deletions
Original file line numberDiff line numberDiff line change
@@ -15,6 +15,7 @@
1515
# specific language governing permissions and limitations
1616
# under the License.
1717
import uuid
18+
from collections.abc import Iterator
1819
from time import time
1920
from typing import (
2021
TYPE_CHECKING,
@@ -396,8 +397,11 @@ def drop_namespace(self, namespace: str | Identifier) -> None:
396397
database_name = self.identifier_to_database(namespace, NoSuchNamespaceError)
397398
table_identifiers = self.list_tables(namespace=database_name)
398399

399-
if len(table_identifiers) > 0:
400+
try:
401+
next(table_identifiers)
400402
raise NamespaceNotEmptyError(f"Database {database_name} is not empty")
403+
except StopIteration:
404+
pass
401405

402406
try:
403407
self._delete_dynamo_item(
@@ -409,14 +413,14 @@ def drop_namespace(self, namespace: str | Identifier) -> None:
409413
raise NoSuchNamespaceError(f"Database does not exist: {database_name}") from e
410414

411415
@override
412-
def list_tables(self, namespace: str | Identifier) -> list[Identifier]:
416+
def list_tables(self, namespace: str | Identifier) -> Iterator[Identifier]:
413417
"""List Iceberg tables under the given namespace in the catalog.
414418
415419
Args:
416420
namespace (str | Identifier): Namespace identifier to search.
417421
418422
Returns:
419-
List[Identifier]: list of table identifiers.
423+
Iterator[Identifier]: an iterator of table identifiers.
420424
"""
421425
database_name = self.identifier_to_database(namespace, NoSuchNamespaceError)
422426

@@ -451,20 +455,20 @@ def list_tables(self, namespace: str | Identifier) -> list[Identifier]:
451455

452456
table_identifiers.append(self.identifier_to_tuple(identifier_col))
453457

454-
return table_identifiers
458+
return iter(table_identifiers)
455459

456460
@override
457-
def list_namespaces(self, namespace: str | Identifier = ()) -> list[Identifier]:
461+
def list_namespaces(self, namespace: str | Identifier = ()) -> Iterator[Identifier]:
458462
"""List top-level namespaces from the catalog.
459463
460464
We do not support hierarchical namespace.
461465
462466
Returns:
463-
List[Identifier]: a List of namespace identifiers.
467+
Iterator[Identifier]: an iterator of namespace identifiers.
464468
"""
465469
# Hierarchical namespace is not supported. Return an empty list
466470
if namespace:
467-
return []
471+
return iter([])
468472

469473
paginator = self.dynamodb.get_paginator("query")
470474

@@ -494,7 +498,7 @@ def list_namespaces(self, namespace: str | Identifier = ()) -> list[Identifier]:
494498
namespace_col = _dict[DYNAMODB_COL_NAMESPACE]
495499
database_identifiers.append(self.identifier_to_tuple(namespace_col))
496500

497-
return database_identifiers
501+
return iter(database_identifiers)
498502

499503
@override
500504
def load_namespace_properties(self, namespace: str | Identifier) -> Properties:
@@ -565,7 +569,7 @@ def create_view(
565569
raise NotImplementedError
566570

567571
@override
568-
def list_views(self, namespace: str | Identifier) -> list[Identifier]:
572+
def list_views(self, namespace: str | Identifier) -> Iterator[Identifier]:
569573
raise NotImplementedError
570574

571575
@override

pyiceberg/catalog/glue.py

Lines changed: 9 additions & 8 deletions
Original file line numberDiff line numberDiff line change
@@ -17,6 +17,7 @@
1717

1818

1919
import logging
20+
from collections.abc import Iterator
2021
from typing import (
2122
TYPE_CHECKING,
2223
Any,
@@ -860,14 +861,14 @@ def drop_namespace(self, namespace: str | Identifier) -> None:
860861
self.glue.delete_database(Name=database_name)
861862

862863
@override
863-
def list_tables(self, namespace: str | Identifier) -> list[Identifier]:
864+
def list_tables(self, namespace: str | Identifier) -> Iterator[Identifier]:
864865
"""List Iceberg tables under the given namespace in the catalog.
865866
866867
Args:
867868
namespace (str | Identifier): Namespace identifier to search.
868869
869870
Returns:
870-
List[Identifier]: list of table identifiers.
871+
Iterator[Identifier]: an iterator of table identifiers.
871872
872873
Raises:
873874
NoSuchNamespaceError: If a namespace with the given name does not exist, or the identifier is invalid.
@@ -889,18 +890,18 @@ def list_tables(self, namespace: str | Identifier) -> list[Identifier]:
889890

890891
except self.glue.exceptions.EntityNotFoundException as e:
891892
raise NoSuchNamespaceError(f"Database does not exist: {database_name}") from e
892-
return [(database_name, table["Name"]) for table in table_list if self.__is_iceberg_table(table)]
893+
return iter([(database_name, table["Name"]) for table in table_list if self.__is_iceberg_table(table)])
893894

894895
@override
895-
def list_namespaces(self, namespace: str | Identifier = ()) -> list[Identifier]:
896+
def list_namespaces(self, namespace: str | Identifier = ()) -> Iterator[Identifier]:
896897
"""List namespaces from the given namespace. If not given, list top-level namespaces from the catalog.
897898
898899
Returns:
899-
List[Identifier]: a List of namespace identifiers.
900+
Iterator[Identifier]: an iterator of namespace identifiers.
900901
"""
901902
# Hierarchical namespace is not supported. Return an empty list
902903
if namespace:
903-
return []
904+
return iter([])
904905

905906
database_list: list[DatabaseTypeDef] = []
906907
next_token: str | None = None
@@ -912,7 +913,7 @@ def list_namespaces(self, namespace: str | Identifier = ()) -> list[Identifier]:
912913
if not next_token:
913914
break
914915

915-
return [self.identifier_to_tuple(database["Name"]) for database in database_list]
916+
return iter([self.identifier_to_tuple(database["Name"]) for database in database_list])
916917

917918
@override
918919
def load_namespace_properties(self, namespace: str | Identifier) -> Properties:
@@ -982,7 +983,7 @@ def create_view(
982983
raise NotImplementedError
983984

984985
@override
985-
def list_views(self, namespace: str | Identifier) -> list[Identifier]:
986+
def list_views(self, namespace: str | Identifier) -> Iterator[Identifier]:
986987
raise NotImplementedError
987988

988989
@override

pyiceberg/catalog/hive.py

Lines changed: 10 additions & 9 deletions
Original file line numberDiff line numberDiff line change
@@ -18,6 +18,7 @@
1818
import logging
1919
import socket
2020
import time
21+
from collections.abc import Iterator
2122
from types import TracebackType
2223
from typing import (
2324
TYPE_CHECKING,
@@ -479,7 +480,7 @@ def register_table(self, identifier: str | Identifier, metadata_location: str, o
479480
return self._convert_hive_into_iceberg(hive_table)
480481

481482
@override
482-
def list_views(self, namespace: str | Identifier) -> list[Identifier]:
483+
def list_views(self, namespace: str | Identifier) -> Iterator[Identifier]:
483484
raise NotImplementedError
484485

485486
@override
@@ -760,7 +761,7 @@ def drop_namespace(self, namespace: str | Identifier) -> None:
760761
raise NoSuchNamespaceError(f"Database does not exists: {database_name}") from e
761762

762763
@override
763-
def list_tables(self, namespace: str | Identifier) -> list[Identifier]:
764+
def list_tables(self, namespace: str | Identifier) -> Iterator[Identifier]:
764765
"""List Iceberg tables under the given namespace in the catalog.
765766
766767
When the database doesn't exist, it will just return an empty list.
@@ -769,34 +770,34 @@ def list_tables(self, namespace: str | Identifier) -> list[Identifier]:
769770
namespace: Database to list.
770771
771772
Returns:
772-
List[Identifier]: list of table identifiers.
773+
Iterator[Identifier]: an iterator of table identifiers.
773774
774775
Raises:
775776
NoSuchNamespaceError: If a namespace with the given name does not exist, or the identifier is invalid.
776777
"""
777778
database_name = self.identifier_to_database(namespace, NoSuchNamespaceError)
778779
with self._client as open_client:
779-
return [
780+
return iter([
780781
(database_name, table.tableName)
781782
for table in open_client.get_table_objects_by_name(
782783
dbname=database_name, tbl_names=open_client.get_all_tables(db_name=database_name)
783784
)
784785
if table.parameters.get(TABLE_TYPE, "").lower() == ICEBERG
785-
]
786+
])
786787

787788
@override
788-
def list_namespaces(self, namespace: str | Identifier = ()) -> list[Identifier]:
789+
def list_namespaces(self, namespace: str | Identifier = ()) -> Iterator[Identifier]:
789790
"""List namespaces from the given namespace. If not given, list top-level namespaces from the catalog.
790791
791792
Returns:
792-
List[Identifier]: a List of namespace identifiers.
793+
Iterator[Identifier]: an iterator of namespace identifiers.
793794
"""
794795
# Hierarchical namespace is not supported. Return an empty list
795796
if namespace:
796-
return []
797+
return iter([])
797798

798799
with self._client as open_client:
799-
return list(map(self.identifier_to_tuple, open_client.get_all_databases()))
800+
return iter(list(map(self.identifier_to_tuple, open_client.get_all_databases())))
800801

801802
@override
802803
def load_namespace_properties(self, namespace: str | Identifier) -> Properties:

pyiceberg/catalog/noop.py

Lines changed: 4 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -16,6 +16,7 @@
1616
# under the License.
1717
from __future__ import annotations
1818

19+
from collections.abc import Iterator
1920
from typing import (
2021
TYPE_CHECKING,
2122
)
@@ -124,11 +125,11 @@ def drop_namespace(self, namespace: str | Identifier) -> None:
124125
raise NotImplementedError
125126

126127
@override
127-
def list_tables(self, namespace: str | Identifier) -> list[Identifier]:
128+
def list_tables(self, namespace: str | Identifier) -> Iterator[Identifier]:
128129
raise NotImplementedError
129130

130131
@override
131-
def list_namespaces(self, namespace: str | Identifier = ()) -> list[Identifier]:
132+
def list_namespaces(self, namespace: str | Identifier = ()) -> Iterator[Identifier]:
132133
raise NotImplementedError
133134

134135
@override
@@ -142,7 +143,7 @@ def update_namespace_properties(
142143
raise NotImplementedError
143144

144145
@override
145-
def list_views(self, namespace: str | Identifier) -> list[Identifier]:
146+
def list_views(self, namespace: str | Identifier) -> Iterator[Identifier]:
146147
raise NotImplementedError
147148

148149
@override

0 commit comments

Comments
 (0)