diff --git a/ibmcloudant/__init__.py b/ibmcloudant/__init__.py index 650d39da..1ce2afa9 100644 --- a/ibmcloudant/__init__.py +++ b/ibmcloudant/__init__.py @@ -23,6 +23,7 @@ from .couchdb_session_token_manager import CouchDbSessionTokenManager from .cloudant_v1 import CloudantV1 from .features.changes_follower import ChangesFollower +from .features.pagination import Pager, PagerType, Pagination # sdk-core's __construct_authenticator works with a long switch-case so monkey-patching is required get_authenticator.__construct_authenticator = new_construct_authenticator diff --git a/ibmcloudant/features/pagination.py b/ibmcloudant/features/pagination.py new file mode 100644 index 00000000..ce97cde9 --- /dev/null +++ b/ibmcloudant/features/pagination.py @@ -0,0 +1,435 @@ +# coding: utf-8 +# © Copyright IBM Corporation 2025. +# +# Licensed under the Apache License, Version 2.0 (the "License"); +# you may not use this file except in compliance with the License. +# You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. +""" + Feature for paginating requests. + + Import :class:`~ibmcloudant.Pagination` and :class:`~ibmcloudant.PagerType` + from :mod:`ibmcloudant`. + Use :meth:`Pagination.new_pagination` to create a :class:`Pagination` + for the specific :class:`PagerType` operation and options. +""" + +from abc import abstractmethod +from collections.abc import Callable, Iterable, Iterator, Sequence +from enum import auto, Enum +from functools import partial +from types import MappingProxyType +from typing import Generic, Optional, Protocol, TypeVar + +from ibm_cloud_sdk_core import DetailedResponse +from ibmcloudant.cloudant_v1 import CloudantV1,\ + AllDocsResult, DocsResultRow, Document, FindResult, SearchResult, SearchResultRow, ViewResult, ViewResultRow + +# Type variable for the result +R = TypeVar('R', AllDocsResult, FindResult, SearchResult, ViewResult) +# Type variable for the items +I = TypeVar('I', DocsResultRow, Document, SearchResultRow, ViewResultRow) +# Type variable for the key in key based paging +K = TypeVar('K') + +_MAX_LIMIT = 200 +_MIN_LIMIT = 1 + +class PagerType(Enum): + """ + Enumeration of the available Pager types + """ + + POST_ALL_DOCS = auto() + POST_DESIGN_DOCS = auto() + POST_FIND = auto() + POST_PARTITION_ALL_DOCS = auto() + POST_PARTITION_FIND = auto() + POST_PARTITION_SEARCH = auto() + POST_PARTITION_VIEW = auto() + POST_SEARCH = auto() + POST_VIEW = auto() + +class Pager(Protocol[I]): + """ + Protocol for pagination of Cloudant operations. + + Use Pager.new_pager to create a new pager for one of + the operation types in PagerType. + """ + + @abstractmethod + def has_next(self) -> bool: + """ + returns False if there are no more pages + """ + + raise NotImplementedError() + + @abstractmethod + def get_next(self) -> Sequence[I]: + """ + returns the next page of results + """ + + raise NotImplementedError() + + @abstractmethod + def get_all(self) -> Sequence[I]: + """ + returns all the pages of results in single list + """ + + raise NotImplementedError() + +class Pagination: + """ + Entry point for the pagination features. + + Use :meth:`Pagination.new_pagination` to create a :class:`Pagination` + instance for the specific :class:`PagerType` operation and options. + + Then create a Pager or Iterable using one of the functions: + * :meth:`pager` - for an IBM Cloud SDK style Pager + * :meth:`pages` - for a page Iterable + * :meth:`rows` - for a row Iterable + """ + + def __init__(self, client: CloudantV1, type: PagerType, opts: dict): + self._client = client + self._operation_type = type + self._initial_opts = dict(opts) + + def pager(self) -> Pager[I]: + """ + Create a new IBM Cloud SDK style Pager. + This type is useful for retrieving one page at a time from a function call. + """ + + return _IteratorPager(self.pages) + + def pages(self) -> Iterable[Sequence[I]]: + """ + Create a new Iterable for all the pages. + This type is useful for handling pages in a for loop. + + for page in Pagination.new_pagination(client, **opts).pages(): + ... + """ + + return self._operation_type(self._client, self._initial_opts) + + def rows(self) -> Iterable[I]: + """ + Create a new Iterable for all the rows from all the pages. + This type is useful for handling rows in a for loop. + + for row in Pagination.new_pagination(client, **opts).rows(): + ... + """ + + for page in self.pages(): + yield from page + + @classmethod + def _validate_limit(cls, opts: dict): + limit: int | None = opts.get('limit') + # For None case the valid default limit of _MAX_LIMIT will be applied later + if limit is not None: + if limit > _MAX_LIMIT: + raise ValueError(f'The provided limit {limit} exceeds the maximum page size value of {_MAX_LIMIT}.') + if limit < _MIN_LIMIT: + raise ValueError(f'The provided limit {limit} is lower than the minimum page size value of {_MIN_LIMIT}.') + + @classmethod + def _validate_options_absent(cls, invalid_opts: Sequence[str], opts: dict): + # for each invalid_opts entry check if it is present in opts dict + for invalid_opt in invalid_opts: + if invalid_opt in opts: + raise ValueError(f"The option '{invalid_opt}' is invalid when using pagination.") + + @classmethod + def new_pagination(cls, client:CloudantV1, type: PagerType, **kwargs): + """ + Create a new Pagination. + client: CloudantV1 - the Cloudant service client + type: PagerType - the operation type to paginate + kwargs: dict - the options for the operation + """ + + # Validate the limit + cls._validate_limit(kwargs) + if type == PagerType.POST_ALL_DOCS: + cls._validate_options_absent(('keys',), kwargs) + return Pagination(client, _AllDocsPageIterator, kwargs) + if type == PagerType.POST_DESIGN_DOCS: + cls._validate_options_absent(('keys',), kwargs) + return Pagination(client, _DesignDocsPageIterator, kwargs) + if type == PagerType.POST_FIND: + return Pagination(client, _FindPageIterator, kwargs) + if type == PagerType.POST_PARTITION_ALL_DOCS: + cls._validate_options_absent(('keys',), kwargs) + return Pagination(client, _AllDocsPartitionPageIterator, kwargs) + if type == PagerType.POST_PARTITION_FIND: + return Pagination(client, _FindPartitionPageIterator, kwargs) + if type == PagerType.POST_PARTITION_SEARCH: + return Pagination(client, _SearchPartitionPageIterator, kwargs) + if type == PagerType.POST_PARTITION_VIEW: + cls._validate_options_absent(('keys',), kwargs) + return Pagination(client, _ViewPartitionPageIterator, kwargs) + if type == PagerType.POST_SEARCH: + cls._validate_options_absent(('counts', 'group_field', 'group_limit', 'group_sort', 'ranges',), kwargs) + return Pagination(client, _SearchPageIterator, kwargs) + if type == PagerType.POST_VIEW: + cls._validate_options_absent(('keys',), kwargs) + return Pagination(client, _ViewPageIterator, kwargs) + +class _IteratorPagerState(Enum): + NEW = auto() + GET_NEXT = auto() + GET_ALL = auto() + CONSUMED = auto() + +class _IteratorPager(Pager[I]): + + _state_mixed_msg = 'This pager has been consumed, use a new Pager.' + _state_consumed_msg = 'Cannot mix get_all() and get_next() use only one method or make a new Pager.' + + def __init__(self, iterable_func: Callable[[], Iterator[Sequence[I]]]): + self._iterable_func: Callable[[], Iterator[Sequence[I]]] = iterable_func + self._iterator: Iterator[Sequence[I]] = iter(self._iterable_func()) + self._state: _IteratorPagerState = _IteratorPagerState.NEW + + def has_next(self) -> bool: + """ + returns False if there are no more pages + """ + + return self._iterator._has_next + + def get_next(self) -> Sequence[I]: + """ + returns the next page of results + """ + self._check_state(mode=_IteratorPagerState.GET_NEXT) + page: Sequence[I] = next(self._iterator) + if not self._iterator._has_next: + self._state = _IteratorPagerState.CONSUMED + return page + + def get_all(self) -> Sequence[I]: + """ + returns all the pages of results in single list + """ + + self._check_state(mode=_IteratorPagerState.GET_ALL) + all_items: list[I] = [] + for page in self._iterable_func(): + all_items.extend(page) + self._state = _IteratorPagerState.CONSUMED + return (*all_items,) + + def _check_state(self, mode: _IteratorPagerState): + if self._state == mode: + return + if self._state == _IteratorPagerState.NEW: + self._state = mode + return + if self._state == _IteratorPagerState.CONSUMED: + raise Exception(_IteratorPager._state_consumed_msg) + raise Exception(_IteratorPager._state_mixed_msg) + +class _BasePageIterator(Iterator[Sequence[I]]): + + def __init__(self, + client: CloudantV1, + operation: Callable[..., DetailedResponse], + page_opts: list[str], + opts: dict): + self._client: CloudantV1 = client + self._has_next: bool = True + # split the opts into fixed and page parts based on page_opts + self._next_page_opts: dict = {} + fixed_opts: dict = dict(opts) + # Get the page size and set the limit acoordingly + self._page_size: int = self._page_size_from_opts_limit(fixed_opts) + fixed_opts['limit'] = self._page_size + # Remove the options that change per page + for k in page_opts: + if v := fixed_opts.pop(k, None): + self._next_page_opts[k] = v + fixed_opts = MappingProxyType(fixed_opts) + # Partial method with the fixed ops + self._next_request_function: Callable[..., DetailedResponse] = partial(operation, **fixed_opts) + + def __iter__(self) -> Iterator[Sequence[I]]: + return self + + def __next__(self) -> Sequence[I]: + if self._has_next: + return (*self._next_request(),) + raise StopIteration() + + def _next_request(self) -> list[I]: + response: DetailedResponse = self._next_request_function(**self._next_page_opts) + result: dict = response.get_result() + typed_result: R = self._result_converter()(result) + items: list[I] = self._items(typed_result) + if len(items) < self._page_size: + self._has_next = False + else: + self._next_page_opts = self._get_next_page_options(typed_result) + return items + + def _page_size_from_opts_limit(self, opts:dict) -> int: + return opts.get('limit', _MAX_LIMIT) + + @abstractmethod + def _result_converter(self) -> Callable[[dict], R]: + raise NotImplementedError() + + @abstractmethod + def _items(self, result: R) -> list[I]: + raise NotImplementedError() + + @abstractmethod + def _get_next_page_options(self, result: R) -> dict: + raise NotImplementedError() + +class _KeyPageIterator(_BasePageIterator, Generic[K]): + + def __init__(self, client: CloudantV1, operation: Callable[..., DetailedResponse], opts: dict): + super().__init__(client, operation, ['start_key', 'start_key_doc_id'], opts) + self._boundary_failure: Optional[str] = None + + def _next_request(self) -> list[I]: + if self._boundary_failure is not None: + raise Exception(self._boundary_failure) + items: list[I] = super()._next_request() + if self._has_next: + last_item: I = items.pop() + if len(items) > 0: + # Get, but don't remove the last item from the list + penultimate_item: I = items[-1] + self._boundary_failure: Optional[str] = self.check_boundary(penultimate_item, last_item) + return items + + def _get_next_page_options(self, result: R) -> dict: + # last item is used for next page options + last_item = self._items(result)[-1] + return { + 'start_key': last_item.key, + 'start_key_doc_id': last_item.id, + } + + def _items(self, result: R) -> list[I]: + return result.rows + + def _page_size_from_opts_limit(self, opts:dict) -> int: + return super()._page_size_from_opts_limit(opts) + 1 + + @abstractmethod + def check_boundary(self, penultimate_item: I, last_item: I) -> Optional[str]: + raise NotImplementedError() + +class _BookmarkPageIterator(_BasePageIterator): + + def __init__(self, client: CloudantV1, operation: Callable[..., DetailedResponse], opts: dict): + super().__init__(client, operation, ['bookmark'], opts) + + def _get_next_page_options(self, result: R) -> dict: + return {'bookmark': result.bookmark} + +class _AllDocsBasePageIterator(_KeyPageIterator[str]): + + def _result_converter(self) -> Callable[[dict], AllDocsResult]: + return AllDocsResult.from_dict + + def _get_next_page_options(self, result: R) -> dict: + # Remove start_key_doc_id for all_docs paging + opts: dict = super()._get_next_page_options(result) + del opts['start_key_doc_id'] + return opts + + def check_boundary(self, penultimate_item: I, last_item: I) -> Optional[str]: + # IDs are always unique in _all_docs pagers so return None + return None + +class _AllDocsPageIterator(_AllDocsBasePageIterator): + + def __init__(self, client: CloudantV1, opts: dict): + super().__init__(client, client.post_all_docs, opts) + +class _AllDocsPartitionPageIterator(_AllDocsBasePageIterator): + + def __init__(self, client: CloudantV1, opts: dict): + super().__init__(client, client.post_partition_all_docs, opts) + +class _DesignDocsPageIterator(_AllDocsBasePageIterator): + + def __init__(self, client: CloudantV1, opts: dict): + super().__init__(client, client.post_design_docs, opts) + +class _FindBasePageIterator(_BookmarkPageIterator): + + def _items(self, result: FindResult): + return result.docs + + def _result_converter(self): + return FindResult.from_dict + +class _FindPageIterator(_FindBasePageIterator): + + def __init__(self, client: CloudantV1, opts: dict): + super().__init__(client, client.post_find, opts) + +class _FindPartitionPageIterator(_FindBasePageIterator): + + def __init__(self, client: CloudantV1, opts: dict): + super().__init__(client, client.post_partition_find, opts) + +class _SearchBasePageIterator(_BookmarkPageIterator): + + def _items(self, result: SearchResult): + return result.rows + + def _result_converter(self): + return SearchResult.from_dict + +class _SearchPageIterator(_SearchBasePageIterator): + + def __init__(self, client: CloudantV1, opts: dict): + super().__init__(client, client.post_search, opts) + +class _SearchPartitionPageIterator(_SearchBasePageIterator): + + def __init__(self, client: CloudantV1, opts: dict): + super().__init__(client, client.post_partition_search, opts) + +class _ViewBasePageIterator(_KeyPageIterator[any]): + + def _result_converter(self): + return ViewResult.from_dict + + def check_boundary(self, penultimate_item: I, last_item: I) -> Optional[str]: + if penultimate_item.id == (boundary_id := last_item.id) \ + and penultimate_item.key == (boundary_key := last_item.key): + return f'Cannot paginate on a boundary containing identical keys {boundary_key} and document IDs {boundary_id}' + return None + +class _ViewPageIterator(_ViewBasePageIterator): + + def __init__(self, client: CloudantV1, opts: dict): + super().__init__(client, client.post_view, opts) + +class _ViewPartitionPageIterator(_ViewBasePageIterator): + + def __init__(self, client: CloudantV1, opts: dict): + super().__init__(client, client.post_partition_view, opts) diff --git a/test/examples/src/features/pagination/all_docs_pagination.py b/test/examples/src/features/pagination/all_docs_pagination.py new file mode 100644 index 00000000..d1765b98 --- /dev/null +++ b/test/examples/src/features/pagination/all_docs_pagination.py @@ -0,0 +1,60 @@ +# © Copyright IBM Corporation 2025. All Rights Reserved. +# +# Licensed under the Apache License, Version 2.0 (the "License"); you may not use this file except in compliance with +# the License. You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software distributed under the License is distributed on +# an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the License for the +# specific language governing permissions and limitations under the License. + +from ibmcloudant import Pager, Pagination, PagerType +from ibmcloudant.cloudant_v1 import CloudantV1 + +# Initialize service +service = CloudantV1.new_instance() + +# Setup options +opts = { + 'db': 'orders', # example database name + 'limit': 50, # limit option sets the page size, + 'start_key': 'abc' # start from example doc ID abc +} + +# Create pagination +pagination = Pagination.new_pagination( + service, PagerType.POST_ALL_DOCS, **opts) +# pagination can be reused without side-effects as a factory for iterables or pagers +# options are fixed at pagination creation time + +# Option: iterate pages +# Ideal for using a for loop with each page. +# Each call to pages() returns a fresh iterator that can be traversed once. +for page in pagination.pages(): + # Do something with page + pass + +# Option: iterate rows +# Ideal for using a for loop with each row. +# Each call to rows() returns a fresh iterator that can be traversed once. +for row in pagination.rows(): + # Do something with row + pass + +# Option: use pager next page +# For retrieving one page at a time with a method call. +pager: Pager = pagination.pager() +if pager.get_next(): + page = pager.get_next() + # Do something with page + +# Option: use pager all results +# For retrieving all result rows in a single list +# Note: all result rows may be very large! +# Preferably use iterables instead of get_all for memory efficiency with large result sets. +all_pager: Pager = pagination.pager() +all_rows = all_pager.get_all() +for page in all_rows: + # Do something with row + pass diff --git a/test/examples/src/features/pagination/design_docs_pagination.py b/test/examples/src/features/pagination/design_docs_pagination.py new file mode 100644 index 00000000..244f03ac --- /dev/null +++ b/test/examples/src/features/pagination/design_docs_pagination.py @@ -0,0 +1,59 @@ +# © Copyright IBM Corporation 2025. All Rights Reserved. +# +# Licensed under the Apache License, Version 2.0 (the "License"); you may not use this file except in compliance with +# the License. You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software distributed under the License is distributed on +# an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the License for the +# specific language governing permissions and limitations under the License. + +from ibmcloudant import Pager, Pagination, PagerType +from ibmcloudant.cloudant_v1 import CloudantV1 + +# Initialize service +service = CloudantV1.new_instance() + +# Setup options +opts = { + 'db': 'shoppers', # example database name + 'limit': 50 # limit option sets the page size +} + +# Create pagination +pagination = Pagination.new_pagination( + service, PagerType.POST_DESIGN_DOCS, **opts) +# pagination can be reused without side-effects as a factory for iterables or pagers +# options are fixed at pagination creation time + +# Option: iterate pages +# Ideal for using a for loop with each page. +# Each call to pages() returns a fresh iterator that can be traversed once. +for page in pagination.pages(): + # Do something with page + pass + +# Option: iterate rows +# Ideal for using a for loop with each row. +# Each call to rows() returns a fresh iterator that can be traversed once. +for row in pagination.rows(): + # Do something with row + pass + +# Option: use pager next page +# For retrieving one page at a time with a method call. +pager: Pager = pagination.pager() +if pager.get_next(): + page = pager.get_next() + # Do something with page + +# Option: use pager all results +# For retrieving all result rows in a single list +# Note: all result rows may be very large! +# Preferably use iterables instead of get_all for memory efficiency with large result sets. +all_pager: Pager = pagination.pager() +all_rows = all_pager.get_all() +for page in all_rows: + # Do something with row + pass diff --git a/test/examples/src/features/pagination/find_pagination.py b/test/examples/src/features/pagination/find_pagination.py new file mode 100644 index 00000000..a7b1dae4 --- /dev/null +++ b/test/examples/src/features/pagination/find_pagination.py @@ -0,0 +1,62 @@ +# © Copyright IBM Corporation 2025. All Rights Reserved. +# +# Licensed under the Apache License, Version 2.0 (the "License"); you may not use this file except in compliance with +# the License. You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software distributed under the License is distributed on +# an 'AS IS' BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the License for the +# specific language governing permissions and limitations under the License. + +from ibmcloudant import Pager, Pagination, PagerType +from ibmcloudant.cloudant_v1 import CloudantV1 + +# Initialize service +service = CloudantV1.new_instance() + +# Setup options +opts = { + 'db': 'shoppers', # example database name + 'limit': 50, # limit option sets the page size + 'fields': ['_id', 'type', 'name', 'email'], # return these fields + 'selector': {'email_verified': True}, # select docs with verified emails + 'sort': [{'email': 'desc'}] # sort descending by email +} + +# Create pagination +pagination = Pagination.new_pagination( + service, PagerType.POST_FIND, **opts) +# pagination can be reused without side-effects as a factory for iterables or pagers +# options are fixed at pagination creation time + +# Option: iterate pages +# Ideal for using a for loop with each page. +# Each call to pages() returns a fresh iterator that can be traversed once. +for page in pagination.pages(): + # Do something with page + pass + +# Option: iterate rows +# Ideal for using a for loop with each row. +# Each call to rows() returns a fresh iterator that can be traversed once. +for row in pagination.rows(): + # Do something with row + pass + +# Option: use pager next page +# For retrieving one page at a time with a method call. +pager: Pager = pagination.pager() +if pager.get_next(): + page = pager.get_next() + # Do something with page + +# Option: use pager all results +# For retrieving all result rows in a single list +# Note: all result rows may be very large! +# Preferably use iterables instead of get_all for memory efficiency with large result sets. +all_pager: Pager = pagination.pager() +all_rows = all_pager.get_all() +for page in all_rows: + # Do something with row + pass diff --git a/test/examples/src/features/pagination/partition_all_docs_pagination.py b/test/examples/src/features/pagination/partition_all_docs_pagination.py new file mode 100644 index 00000000..662b95c5 --- /dev/null +++ b/test/examples/src/features/pagination/partition_all_docs_pagination.py @@ -0,0 +1,60 @@ +# © Copyright IBM Corporation 2025. All Rights Reserved. +# +# Licensed under the Apache License, Version 2.0 (the "License"); you may not use this file except in compliance with +# the License. You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software distributed under the License is distributed on +# an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the License for the +# specific language governing permissions and limitations under the License. + +from ibmcloudant import Pager, Pagination, PagerType +from ibmcloudant.cloudant_v1 import CloudantV1 + +# Initialize service +service = CloudantV1.new_instance() + +# Setup options +opts = { + 'db': 'events', # example database name + 'limit': 50, # limit option sets the page size + 'partition_key': 'ns1HJS13AMkK', # query only this partition +} + +# Create pagination +pagination = Pagination.new_pagination( + service, PagerType.POST_PARTITION_ALL_DOCS, **opts) +# pagination can be reused without side-effects as a factory for iterables or pagers +# options are fixed at pagination creation time + +# Option: iterate pages +# Ideal for using a for loop with each page. +# Each call to pages() returns a fresh iterator that can be traversed once. +for page in pagination.pages(): + # Do something with page + pass + +# Option: iterate rows +# Ideal for using a for loop with each row. +# Each call to rows() returns a fresh iterator that can be traversed once. +for row in pagination.rows(): + # Do something with row + pass + +# Option: use pager next page +# For retrieving one page at a time with a method call. +pager: Pager = pagination.pager() +if pager.get_next(): + page = pager.get_next() + # Do something with page + +# Option: use pager all results +# For retrieving all result rows in a single list +# Note: all result rows may be very large! +# Preferably use iterables instead of get_all for memory efficiency with large result sets. +all_pager: Pager = pagination.pager() +all_rows = all_pager.get_all() +for page in all_rows: + # Do something with row + pass diff --git a/test/examples/src/features/pagination/partition_find_pagination.py b/test/examples/src/features/pagination/partition_find_pagination.py new file mode 100644 index 00000000..035d680b --- /dev/null +++ b/test/examples/src/features/pagination/partition_find_pagination.py @@ -0,0 +1,62 @@ +# © Copyright IBM Corporation 2025. All Rights Reserved. +# +# Licensed under the Apache License, Version 2.0 (the "License"); you may not use this file except in compliance with +# the License. You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software distributed under the License is distributed on +# an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the License for the +# specific language governing permissions and limitations under the License. + +from ibmcloudant import Pager, Pagination, PagerType +from ibmcloudant.cloudant_v1 import CloudantV1 + +# Initialize service +service = CloudantV1.new_instance() + +# Setup options +opts = { + 'db': 'events', # example database name + 'limit': 50, # limit option sets the page size + 'partition_key': 'ns1HJS13AMkK', # query only this partition + 'fields': ['productId', 'eventType', 'date'], # return these fields + 'selector': {'userId': 'abc123'} # select documents with "userId" field equal to "abc123" +} + +# Create pagination +pagination = Pagination.new_pagination( + service, PagerType.POST_PARTITION_FIND, **opts) +# pagination can be reused without side-effects as a factory for iterables or pagers +# options are fixed at pagination creation time + +# Option: iterate pages +# Ideal for using a for loop with each page. +# Each call to pages() returns a fresh iterator that can be traversed once. +for page in pagination.pages(): + # Do something with page + pass + +# Option: iterate rows +# Ideal for using a for loop with each row. +# Each call to rows() returns a fresh iterator that can be traversed once. +for row in pagination.rows(): + # Do something with row + pass + +# Option: use pager next page +# For retrieving one page at a time with a method call. +pager: Pager = pagination.pager() +if pager.get_next(): + page = pager.get_next() + # Do something with page + +# Option: use pager all results +# For retrieving all result rows in a single list +# Note: all result rows may be very large! +# Preferably use iterables instead of get_all for memory efficiency with large result sets. +all_pager: Pager = pagination.pager() +all_rows = all_pager.get_all() +for page in all_rows: + # Do something with row + pass diff --git a/test/examples/src/features/pagination/partition_search_pagination.py b/test/examples/src/features/pagination/partition_search_pagination.py new file mode 100644 index 00000000..134ab4dd --- /dev/null +++ b/test/examples/src/features/pagination/partition_search_pagination.py @@ -0,0 +1,58 @@ +# © Copyright IBM Corporation 2025. All Rights Reserved. +# +# Licensed under the Apache License, Version 2.0 (the "License"); you may not use this file except in compliance with +# the License. You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software distributed under the License is distributed on +# an 'AS IS' BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the License for the +# specific language governing permissions and limitations under the License. + +from ibmcloudant import Pager, Pagination, PagerType +from ibmcloudant.cloudant_v1 import CloudantV1 + +# Initialize service +service = CloudantV1.new_instance() + +# Setup options +opts = { + 'db': 'events', # example database name + 'limit': 50, # limit option sets the page size + 'partition_key': 'ns1HJS13AMkK', # query only this partition + 'ddoc': 'checkout', # use the allUsers design document + 'index': 'findByDate', # search in this index + 'query': 'date:[2019-01-01T12:00:00.000Z TO 2019-01-31T12:00:00.000Z]' # Lucene search query +} + +# Create pagination +pagination = Pagination.new_pagination( + service, PagerType.POST_PARTITION_SEARCH, **opts) +# pagination can be reused without side-effects as a factory for iterables or pagers +# options are fixed at pagination creation time + +# Option: iterate pages +# Ideal for using a for loop with each page. +# Each call to pages() returns a fresh iterator that can be traversed once. +for page in pagination.pages(): + # Do something with page + pass + +# Option: iterate rows +# Ideal for using a for loop with each row. +# Each call to rows() returns a fresh iterator that can be traversed once. +for row in pagination.rows(): + # Do something with row + pass + +# For retrieving one page at a time with a method call. +pager: Pager = pagination.pager() +if pager.get_next(): + page = pager.get_next() + # Do something with page + +all_pager: Pager = pagination.pager() +all_rows = all_pager.get_all() +for page in all_rows: + # Do something with row + pass diff --git a/test/examples/src/features/pagination/partition_view_pagination.py b/test/examples/src/features/pagination/partition_view_pagination.py new file mode 100644 index 00000000..26e70dd1 --- /dev/null +++ b/test/examples/src/features/pagination/partition_view_pagination.py @@ -0,0 +1,62 @@ +# © Copyright IBM Corporation 2025. All Rights Reserved. +# +# Licensed under the Apache License, Version 2.0 (the "License"); you may not use this file except in compliance with +# the License. You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software distributed under the License is distributed on +# an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the License for the +# specific language governing permissions and limitations under the License. + +from ibmcloudant import Pager, Pagination, PagerType +from ibmcloudant.cloudant_v1 import CloudantV1 + +# Initialize service +service = CloudantV1.new_instance() + +# Setup options +opts = { + 'db': 'events', # example database name + 'limit': 50, # limit option sets the page size + 'partition_key': 'ns1HJS13AMkK', # query only this partition + 'ddoc': 'checkout', # use the checkout design document + 'view': 'byProductId' # the view to use +} + +# Create pagination +pagination = Pagination.new_pagination( + service, PagerType.POST_PARTITION_VIEW, **opts) +# pagination can be reused without side-effects as a factory for iterables or pagers +# options are fixed at pagination creation time + +# Option: iterate pages +# Ideal for using a for loop with each page. +# Each call to pages() returns a fresh iterator that can be traversed once. +for page in pagination.pages(): + # Do something with page + pass + +# Option: iterate rows +# Ideal for using a for loop with each row. +# Each call to rows() returns a fresh iterator that can be traversed once. +for row in pagination.rows(): + # Do something with row + pass + +# Option: use pager next page +# For retrieving one page at a time with a method call. +pager: Pager = pagination.pager() +if pager.get_next(): + page = pager.get_next() + # Do something with page + +# Option: use pager all results +# For retrieving all result rows in a single list +# Note: all result rows may be very large! +# Preferably use iterables instead of get_all for memory efficiency with large result sets. +all_pager: Pager = pagination.pager() +all_rows = all_pager.get_all() +for page in all_rows: + # Do something with row + pass diff --git a/test/examples/src/features/pagination/search_pagination.py b/test/examples/src/features/pagination/search_pagination.py new file mode 100644 index 00000000..efc3b0e0 --- /dev/null +++ b/test/examples/src/features/pagination/search_pagination.py @@ -0,0 +1,57 @@ +# © Copyright IBM Corporation 2025. All Rights Reserved. +# +# Licensed under the Apache License, Version 2.0 (the "License"); you may not use this file except in compliance with +# the License. You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software distributed under the License is distributed on +# an 'AS IS' BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the License for the +# specific language governing permissions and limitations under the License. + +from ibmcloudant import Pager, Pagination, PagerType +from ibmcloudant.cloudant_v1 import CloudantV1 + +# Initialize service +service = CloudantV1.new_instance() + +# Setup options +opts = { + 'db': 'shoppers', # example database name + 'limit': 50, # limit option sets the page size + 'ddoc': 'allUsers', # use the allUsers design document + 'index': 'activeUsers', # search in this index + 'query': 'name:Jane* AND active:True' # Lucene search query +} + +# Create pagination +pagination = Pagination.new_pagination( + service, PagerType.POST_SEARCH, **opts) +# pagination can be reused without side-effects as a factory for iterables or pagers +# options are fixed at pagination creation time + +# Option: iterate pages +# Ideal for using a for loop with each page. +# Each call to pages() returns a fresh iterator that can be traversed once. +for page in pagination.pages(): + # Do something with page + pass + +# Option: iterate rows +# Ideal for using a for loop with each row. +# Each call to rows() returns a fresh iterator that can be traversed once. +for row in pagination.rows(): + # Do something with row + pass + +# For retrieving one page at a time with a method call. +pager: Pager = pagination.pager() +if pager.get_next(): + page = pager.get_next() + # Do something with page + +all_pager: Pager = pagination.pager() +all_rows = all_pager.get_all() +for page in all_rows: + # Do something with row + pass diff --git a/test/examples/src/features/pagination/view_pagination.py b/test/examples/src/features/pagination/view_pagination.py new file mode 100644 index 00000000..a0d21495 --- /dev/null +++ b/test/examples/src/features/pagination/view_pagination.py @@ -0,0 +1,56 @@ +# © Copyright IBM Corporation 2025. All Rights Reserved. +# +# Licensed under the Apache License, Version 2.0 (the "License"); you may not use this file except in compliance with +# the License. You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software distributed under the License is distributed on +# an 'AS IS' BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the License for the +# specific language governing permissions and limitations under the License. + +from ibmcloudant import Pager, Pagination, PagerType +from ibmcloudant.cloudant_v1 import CloudantV1 + +# Initialize service +service = CloudantV1.new_instance() + +# Setup options +opts = { + 'db': 'shoppers', # example database name + 'limit': 50, # limit option sets the page size + 'ddoc': 'allUsers', # use the allUsers design document + 'view': 'getVerifiedEmails' # the view to use +} + +# Create pagination +pagination = Pagination.new_pagination( + service, PagerType.POST_VIEW, **opts) +# pagination can be reused without side-effects as a factory for iterables or pagers +# options are fixed at pagination creation time + +# Option: iterate pages +# Ideal for using a for loop with each page. +# Each call to pages() returns a fresh iterator that can be traversed once. +for page in pagination.pages(): + # Do something with page + pass + +# Option: iterate rows +# Ideal for using a for loop with each row. +# Each call to rows() returns a fresh iterator that can be traversed once. +for row in pagination.rows(): + # Do something with row + pass + +# For retrieving one page at a time with a method call. +pager: Pager = pagination.pager() +if pager.get_next(): + page = pager.get_next() + # Do something with page + +all_pager: Pager = pagination.pager() +all_rows = all_pager.get_all() +for page in all_rows: + # Do something with row + pass diff --git a/test/unit/features/conftest.py b/test/unit/features/conftest.py index d591e3de..dab13b99 100644 --- a/test/unit/features/conftest.py +++ b/test/unit/features/conftest.py @@ -18,6 +18,7 @@ Shared tests' fixtures """ +from typing import Sequence import unittest import pytest import os @@ -27,15 +28,22 @@ from threading import Timer import itertools +from itertools import islice from requests import codes from requests.exceptions import ConnectionError +from ibm_cloud_sdk_core import ApiException, DetailedResponse + from ibmcloudant.cloudant_v1 import ( + AllDocsResult, CloudantV1, + FindResult, PostChangesEnums, ChangesResult, ChangesResultItem, + SearchResult, + ViewResult, ) from ibmcloudant.features.changes_follower import ( @@ -43,6 +51,7 @@ _BATCH_SIZE, _Mode, ) +from ibmcloudant.features.pagination import PagerType @pytest.fixture(scope='class') @@ -106,8 +115,7 @@ def limits(request): _BATCH_SIZE + 123, ] - -class ChangesFollowerBaseCase(unittest.TestCase): +class MockClientBaseCase(unittest.TestCase): @classmethod def setUpClass(cls): # Setup client env config @@ -117,6 +125,40 @@ def setUpClass(cls): service_name='TEST_SERVICE', ) + def make_error_tuple(self, error: str) -> tuple[any]: + if error == 'bad_io': + return (200, {}, ConnectionError('peer reset')) + elif error == 'bad_json': + return (200, {}, '{') + else: + return ( + codes[error], + {}, + json.dumps({'error': error}), + ) + + def make_error(self, error: str) -> dict[str:any]: + error_tuple = self.make_error_tuple(error) + if error == 'bad_io': + return {'body': error_tuple[2]} + else: + return { + 'status': error_tuple[0], + 'content_type': 'application/json', + 'body': error_tuple[2], + } + + def make_error_exception(self, error: str) -> Exception: + error_dict = self.make_error(error) + if error == 'bad_io': + return error_dict['body'] + elif error == 'bad_json': + return ApiException(code=error_dict['status'], + message='Error processing the HTTP response',) + return ApiException(error_dict['status']) + +class ChangesFollowerBaseCase(MockClientBaseCase): + def prepare_mock_changes( self, batches=0, @@ -134,16 +176,7 @@ def __call__(self, request): if self._return_error: self._return_error = False error = next(self._errors) - if error == 'bad_io': - return (200, {}, ConnectionError('peer reset')) - elif error == 'bad_json': - return (200, {}, '{') - else: - return ( - codes[error], - {}, - json.dumps({'error': error}), - ) + return self.make_error_tuple(error) # this stands for "large" seq in empty result case last_seq = f'{batches * _BATCH_SIZE}-abcdef' pending = 0 @@ -199,22 +232,7 @@ def __call__(self, request): def prepare_mock_with_error(self, error: str): _base_url = os.environ.get('TEST_SERVER_URL', 'http://localhost:5984') url = _base_url + '/db/_changes' - if error == 'bad_io': - return responses.post(url, body=ConnectionError('peer reset')) - elif error == 'bad_json': - return responses.post( - url, - status=200, - content_type='application/json', - body='{', - ) - else: - return responses.post( - url, - status=codes[error], - content_type='application/json', - json={'error': error}, - ) + return responses.post(url, **self.make_error(error)) def runner(self, follower, mode, timeout=1, stop_after=0): """ @@ -240,3 +258,134 @@ def main(): return counter return main() + +class PaginationMockSupport: + all_docs_pagers: Sequence[PagerType] = ( + PagerType.POST_ALL_DOCS, + PagerType.POST_DESIGN_DOCS, + PagerType.POST_PARTITION_ALL_DOCS + ) + view_pagers: Sequence[PagerType] = ( + PagerType.POST_VIEW, + PagerType.POST_PARTITION_VIEW + ) + # the key pager types (n+1 paging) + key_pagers: Sequence[PagerType] = all_docs_pagers + view_pagers + find_pagers: Sequence[PagerType] = ( + PagerType.POST_FIND, + PagerType.POST_PARTITION_FIND) + search_pagers: Sequence[PagerType] = ( + PagerType.POST_SEARCH, + PagerType.POST_PARTITION_SEARCH) + + # Map of pager type to a tuple of patch function name, result wrapper lambda, result row lambda + operation_map: dict[PagerType:str] = { + PagerType.POST_ALL_DOCS: 'ibmcloudant.cloudant_v1.CloudantV1.post_all_docs', + PagerType.POST_DESIGN_DOCS: 'ibmcloudant.cloudant_v1.CloudantV1.post_design_docs', + PagerType.POST_FIND: 'ibmcloudant.cloudant_v1.CloudantV1.post_find', + PagerType.POST_PARTITION_ALL_DOCS: 'ibmcloudant.cloudant_v1.CloudantV1.post_partition_all_docs', + PagerType.POST_PARTITION_FIND: 'ibmcloudant.cloudant_v1.CloudantV1.post_partition_find', + PagerType.POST_PARTITION_SEARCH: 'ibmcloudant.cloudant_v1.CloudantV1.post_partition_search', + PagerType.POST_PARTITION_VIEW: 'ibmcloudant.cloudant_v1.CloudantV1.post_partition_view', + PagerType.POST_SEARCH: 'ibmcloudant.cloudant_v1.CloudantV1.post_search', + PagerType.POST_VIEW: 'ibmcloudant.cloudant_v1.CloudantV1.post_view' + } + + def make_wrapper(pager_type: PagerType, total: int, rows: Sequence) -> dict[str:any]: + if pager_type in PaginationMockSupport.key_pagers: + return {'total_rows': total, 'rows': rows} + else: + bkmk = 'emptypagebookmark' + last_row = rows[-1] if len(rows) > 0 else None + if pager_type in PaginationMockSupport.find_pagers: + return {'bookmark': last_row['_id'] if last_row else bkmk, 'docs': rows} + elif pager_type in PaginationMockSupport.search_pagers: + return {'bookmark': last_row['id'] if last_row else bkmk, 'total_rows': total, 'rows': rows} + else: + raise Exception('Unknown pager type, fail test.') + + def make_row(pager_type: PagerType, i: int) -> dict[str:any]: + id = f'testdoc{i}' + rev = f'1-abc{i}' + if pager_type in PaginationMockSupport.key_pagers: + if pager_type in (PagerType.POST_VIEW, PagerType.POST_PARTITION_VIEW): + key = i + value = 1 + else: + key = id + value = {'rev': rev} + return {'id': id, 'key': key, 'value': value} + elif pager_type in PaginationMockSupport.find_pagers: + return {'_id':id, '_rev': rev, 'testfield': i} + elif pager_type in PaginationMockSupport.search_pagers: + return {'fields':{}, 'id': id} + else: + raise Exception('Unknown pager type, fail test.') + +class PaginationMockResponse: + """ + Test class for mocking page responses. + """ + def __init__(self, + total_items: int, + page_size: int, + pager_type: PagerType + ): + self.total_items: int = total_items + self.page_size: int = page_size + self.pages = self.generator() + self.pager_type: PagerType = pager_type + self.plus_one_paging: bool = self.pager_type in PaginationMockSupport.key_pagers + self.expected_pages: list[list] = [] + + # for compatibility with python <= 3.12 + def batched(self, iterable, n): + """Batch data into tuples of length n. The last batch may be shorter.""" + it = iter(iterable) + while True: + batch = tuple(islice(it, n)) + if not batch: + break + yield batch + + def generator(self): + for page in self.batched(range(0, self.total_items), self.page_size): + rows = [PaginationMockSupport.make_row(self.pager_type, i) for i in page] + if self.plus_one_paging: + # Add an n+1 row for key based paging if more pages + if (n_plus_one := page[-1] + 1) < self.total_items: + rows.append(PaginationMockSupport.make_row(self.pager_type, n_plus_one)) + yield DetailedResponse(response=PaginationMockSupport.make_wrapper(self.pager_type, self.total_items, rows)) + yield DetailedResponse(response=PaginationMockSupport.make_wrapper(self.pager_type, self.total_items, [])) + + def convert_result(self, result: dict) -> Sequence: + if self.pager_type in PaginationMockSupport.all_docs_pagers: + return AllDocsResult.from_dict(result).rows + elif self.pager_type in PaginationMockSupport.find_pagers: + return FindResult.from_dict(result).docs + elif self.pager_type in PaginationMockSupport.search_pagers: + return SearchResult.from_dict(result).rows + elif self.pager_type in PaginationMockSupport.view_pagers: + return ViewResult.from_dict(result).rows + + def get_next_page(self, **kwargs): + # return next(self.pages) + # ignore kwargs + # get next page + page = next(self.pages) + # convert to an expected page + rows = self.convert_result(page.get_result()) + if len(rows) > self.page_size and self.plus_one_paging: + self.expected_pages.append(rows[:-1]) + else: + self.expected_pages.append(rows) + return page + + def get_expected_page(self, page: int) -> list: + return self.expected_pages[page - 1] + + def all_expected_items(self) -> list: + all_items: list = [] + for page in self.expected_pages: + all_items.extend(page) + return all_items diff --git a/test/unit/features/test_pagination_base.py b/test/unit/features/test_pagination_base.py new file mode 100644 index 00000000..9a3caa29 --- /dev/null +++ b/test/unit/features/test_pagination_base.py @@ -0,0 +1,405 @@ +# coding: utf-8 + +# © Copyright IBM Corporation 2025. +# +# Licensed under the Apache License, Version 2.0 (the "License"); +# you may not use this file except in compliance with the License. +# You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. + +from collections.abc import Callable +from typing import Iterable +from unittest.mock import Mock, patch +from ibmcloudant.cloudant_v1 import ViewResult, ViewResultRow +from ibmcloudant.features.pagination import _BasePageIterator, _IteratorPager, Pager, PagerType, Pagination +from conftest import MockClientBaseCase, PaginationMockResponse + +class BasePageMockResponses(PaginationMockResponse): + """ + Test class for mocking page responses. + """ + def __init__(self, total_items: int, page_size: int): + super().__init__(total_items, page_size, PagerType.POST_VIEW) + # This test uses View structures, but doesn't do n+1 like a view/key pager + # Override plus_one_paging to accommodate this weird hybrid + self.plus_one_paging = False + +class TestPageIterator(_BasePageIterator): + """ + A test subclass of the _BasePager under test. + """ + operation: Callable = None + page_keys: list[str] = [] + + def __init__(self, client, opts): + super().__init__(client, TestPageIterator.operation or client.post_view, TestPageIterator.page_keys, opts) + + def _result_converter(self) -> Callable[[dict], ViewResult]: + return lambda d: ViewResult.from_dict(d) + + def _items(self, result: ViewResult) -> tuple[ViewResultRow]: + return result.rows + + def _get_next_page_options(self, result: ViewResult) -> dict: + if len(result.rows ) == 0: + self.assertFail("Test failure: tried to setNextPageOptions on empty page.") + else: + return {'start_key': result.rows[-1].key} + +class TestBasePageIterator(MockClientBaseCase): + def test_init(self): + operation = self.client.post_view + opts = {'db': 'test', 'limit': 20} + page_iterator: Iterable[ViewResultRow] = TestPageIterator(self.client, opts) + # Assert client is set + self.assertEqual(page_iterator._client, self.client, 'The supplied client should be set.') + # Assert operation is set + self.assertIsNotNone(page_iterator._next_request_function, 'The operation function should be set.') + # Assert partial function parts are as expected + self.assertEqual(page_iterator._next_request_function.func, operation, 'The partial function should be the operation.') + self.assertEqual(page_iterator._next_request_function.keywords, opts, 'The partial function kwargs should be the options.') + + def test_partial_options(self): + static_opts = {'db': 'test', 'limit': 20, 'baz': 'faz'} + page_opts = {'foo': 'boo', 'bar': 'far'} + opts = {**static_opts, **page_opts} + # Use page_opts.keys() to pass the list of names for page options + with patch('test_pagination_base.TestPageIterator.page_keys', page_opts.keys()): + page_iterator: Iterable[ViewResultRow] = TestPageIterator(self.client, opts) + # Assert partial function has only static opts + self.assertEqual(page_iterator._next_request_function.keywords, static_opts, 'The partial function kwargs should be only the static options.') + # Assert next page options + self.assertEqual(page_iterator._next_page_opts, page_opts, 'The next page options should match the expected.') + + def test_default_page_size(self): + opts = {'db': 'test'} + page_iterator: Iterable[ViewResultRow] = TestPageIterator(self.client, opts) + # Assert the default page size + expected_page_size = 200 + self.assertEqual(page_iterator._page_size, expected_page_size, 'The default page size should be set.') + self.assertEqual(page_iterator._next_request_function.keywords, opts | {'limit': expected_page_size}, 'The default page size should be present in the options.') + + def test_limit_page_size(self): + opts = {'db': 'test', 'limit': 42} + page_iterator: Iterable[ViewResultRow] = TestPageIterator(self.client, opts) + # Assert the provided page size + expected_page_size = 42 + self.assertEqual(page_iterator._page_size, expected_page_size, 'The default page size should be set.') + self.assertEqual(page_iterator._next_request_function.keywords, opts | {'limit': expected_page_size}, 'The default page size should be present in the options.') + + def test_has_next_initially_true(self): + opts = {'limit': 1} + page_iterator: Iterable[ViewResultRow] = TestPageIterator(self.client, opts) + # Assert _has_next + self.assertTrue(page_iterator._has_next, '_has_next should initially return True.') + + def test_has_next_true_for_result_equal_to_limit(self): + page_size = 1 + # Init with mock that returns only a single row + with patch('test_pagination_base.TestPageIterator.operation', BasePageMockResponses(1, page_size).get_next_page): + page_iterator: Iterable[ViewResultRow] = TestPageIterator( + self.client, + {'limit': page_size}) + # Get first page with 1 result + next(page_iterator) + # Assert _has_next + self.assertTrue(page_iterator._has_next, '_has_next should return True.') + + def test_has_next_false_for_result_less_than_limit(self): + page_size = 1 + # Init with mock that returns zero rows + with patch('test_pagination_base.TestPageIterator.operation', BasePageMockResponses(0, page_size).get_next_page): + page_iterator: Iterable[ViewResultRow] = TestPageIterator( + self.client, + {'limit': page_size}) + # Get first page with 0 result + next(page_iterator) + # Assert _has_next + self.assertFalse(page_iterator._has_next, '_has_next should return False.') + + def test_next_first_page(self): + page_size = 25 + # Mock that returns one page of 25 items + mock = BasePageMockResponses(page_size, page_size) + with patch('test_pagination_base.TestPageIterator.operation', mock.get_next_page): + page_iterator: Iterable[ViewResultRow] = TestPageIterator( + self.client, + {'limit': page_size}) + # Get first page + actual_page: list[ViewResultRow] = next(page_iterator) + # Assert first page + self.assertSequenceEqual(actual_page, mock.get_expected_page(1), "The actual page should match the expected page") + + def test_next_two_pages(self): + page_size = 3 + # Mock that returns two pages of 3 items + mock = BasePageMockResponses(2*page_size, page_size) + with patch('test_pagination_base.TestPageIterator.operation', mock.get_next_page): + page_iterator: Iterable[ViewResultRow] = TestPageIterator( + self.client, + {'limit': page_size}) + # Get first page + actual_page_1: list[ViewResultRow] = next(page_iterator) + # Assert first page + self.assertSequenceEqual(actual_page_1, mock.get_expected_page(1), "The actual page should match the expected page") + # Assert has_next + self.assertTrue(page_iterator._has_next, '_has_next should return True.') + # Get second page + actual_page_2: list[ViewResultRow] = next(page_iterator) + # Assert first page + self.assertSequenceEqual(actual_page_2, mock.get_expected_page(2), "The actual page should match the expected page") + # Assert has_next, True since page is not smaller than limit + self.assertTrue(page_iterator._has_next, '_has_next should return True.') + + def test_next_until_empty(self): + page_size = 3 + # Mock that returns 3 pages of 3 items + mock = BasePageMockResponses(3*page_size, page_size) + with patch('test_pagination_base.TestPageIterator.operation', mock.get_next_page): + page_iterator: Iterable[ViewResultRow] = TestPageIterator( + self.client, + {'limit': page_size}) + page_count = 0 + actual_items = [] + while page_iterator._has_next: + page_count += 1 + page = next(page_iterator) + # Assert each page is the same or smaller than the limit to confirm all results not in one page + self.assertTrue(len(page) <= page_size, "The actual page size should be the expected page size.") + actual_items.extend(page) + self.assertSequenceEqual(actual_items, mock.all_expected_items(), "The results should match all the pages.") + self.assertEqual(page_count, len(mock.expected_pages), "There should be the correct number of pages.") + + def test_next_until_smaller(self): + page_size = 3 + # Mock that returns 3 pages of 3 items, then 1 more page with 1 item + mock = BasePageMockResponses(3*page_size + 1, page_size) + with patch('test_pagination_base.TestPageIterator.operation', mock.get_next_page): + page_iterator: Iterable[ViewResultRow] = TestPageIterator( + self.client, + {'limit': page_size}) + page_count = 0 + actual_items = [] + while page_iterator._has_next: + page_count += 1 + page = next(page_iterator) + # Assert each page is the same or smaller than the limit to confirm all results not in one page + self.assertTrue(len(page) <= page_size, "The actual page size should be the expected page size.") + actual_items.extend(page) + self.assertSequenceEqual(actual_items, mock.all_expected_items(), "The results should match all the pages.") + self.assertEqual(page_count, len(mock.expected_pages), "There should be the correct number of pages.") + + def test_next_exception(self): + page_size = 2 + # Mock that returns one page of one item + mock = BasePageMockResponses(page_size - 1, page_size) + with patch('test_pagination_base.TestPageIterator.operation', mock.get_next_page): + page_iterator: Iterable[ViewResultRow] = TestPageIterator( + self.client, + {'limit': page_size}) + # Get first and only page + actual_page: list[ViewResultRow] = next(page_iterator) + # Assert page + self.assertSequenceEqual(actual_page, mock.get_expected_page(1), "The actual page should match the expected page") + # Assert _has_next now False + self.assertFalse(page_iterator._has_next, '_has_next should return False.') + # Assert StopIteraton on get_next() + with self.assertRaises(StopIteration): + next(page_iterator) + + def test_pages_immutable(self): + page_size = 1 + mock = BasePageMockResponses(page_size, page_size) + with patch('test_pagination_base.TestPageIterator.operation', mock.get_next_page): + page_iterator: Iterable[ViewResultRow] = TestPageIterator( + self.client, + {'limit': page_size}) + # Get page + actual_page: list[ViewResultRow] = next(page_iterator) + # Assert immutable tuple type + self.assertIsInstance(actual_page, tuple) + + def test_set_next_page_options(self): + page_size = 1 + mock = BasePageMockResponses(5*page_size, page_size) + with patch('test_pagination_base.TestPageIterator.operation', mock.get_next_page): + page_iterator: Iterable[ViewResultRow] = TestPageIterator( + self.client, + {'limit': page_size}) + self.assertIsNone(page_iterator._next_page_opts.get('start_key'), "The start key should intially be None.") + # Since we use a page size of 1, each next page options key, is the same as the element from the page and the page count + page_count = 0 + while page_iterator._has_next: + page = next(page_iterator) + if page_iterator._has_next: + self.assertEqual(page_count, page_iterator._next_page_opts.get('start_key'), "The key should increment per page.") + else: + self.assertEqual(page_count - 1, page_iterator._next_page_opts.get('start_key'), "The options should not be set for the final page.") + page_count += 1 + + def test_next_resumes_after_error(self): + page_size = 1 + mock = BasePageMockResponses(3*page_size, page_size) + with patch('test_pagination_base.TestPageIterator.operation', mock.get_next_page): + page_iterator: Iterable[ViewResultRow] = TestPageIterator( + self.client, + {'limit': page_size}) + self.assertIsNone(page_iterator._next_page_opts.get('start_key'), "The start key should intially be None.") + actual_page = next(page_iterator) + self.assertSequenceEqual(actual_page, mock.get_expected_page(1), "The actual page should match the expected page") + self.assertEqual(0, page_iterator._next_page_opts.get('start_key'), "The start_key should be 0 for the second page.") + with patch('ibmcloudant.features.pagination._BasePageIterator._next_request', Exception('test exception')): + with self.assertRaises(Exception): + next(page_iterator) + self.assertTrue(page_iterator._has_next, '_has_next should return True.') + self.assertEqual(0, page_iterator._next_page_opts.get('start_key'), "The start_key should still be 0 for the second page.") + second_page = next(page_iterator) + self.assertSequenceEqual(second_page, mock.get_expected_page(2), "The actual page should match the expected page") + self.assertTrue(page_iterator._has_next, '_has_next should return False.') + + + def test_pages_iterable(self): + page_size = 23 + mock = BasePageMockResponses(3*page_size-1, page_size) + pagination = Pagination(self.client, TestPageIterator, {'limit': page_size}) + with patch('test_pagination_base.TestPageIterator.operation', mock.get_next_page): + # Check pages are iterable + page_number = 0 + for page in pagination.pages(): + page_number += 1 + self.assertSequenceEqual(page, mock.get_expected_page(page_number), "The actual page should match the expected page") + # Asser the correct number of pages + self.assertEqual(page_number, 3, 'There should have been 3 pages.') + + def test_rows_iterable(self): + page_size = 23 + mock = BasePageMockResponses(3*page_size-1, page_size) + pagination = Pagination(self.client, TestPageIterator, {'limit': page_size}) + with patch('test_pagination_base.TestPageIterator.operation', mock.get_next_page): + actual_items = [] + # Check rows are iterable + for row in pagination.rows(): + actual_items.append(row) + self.assertSequenceEqual(actual_items, mock.all_expected_items(), "The actual rows should match the expected rows.") + + def test_as_pager_get_next_first_page(self): + page_size = 7 + # Mock that returns two pages of 7 items + mock = BasePageMockResponses(2*page_size, page_size) + pagination = Pagination(self.client, TestPageIterator, {'limit': page_size}) + with patch('test_pagination_base.TestPageIterator.operation', mock.get_next_page): + pager = pagination.pager() + # Get first page + actual_page: list[ViewResultRow] = pager.get_next() + self.assertSequenceEqual(actual_page, mock.get_expected_page(1), "The actual page should match the expected page") + + def test_as_pager_get_all(self): + page_size = 11 + # Mock that returns 6 pages of 11 items, then 1 more page with 5 items + mock = BasePageMockResponses(71, page_size) + pagination = Pagination(self.client, TestPageIterator, {'limit': page_size}) + with patch('test_pagination_base.TestPageIterator.operation', mock.get_next_page): + pager: Pager[ViewResultRow] = pagination.pager() + actual_items = pager.get_all() + self.assertSequenceEqual(actual_items, mock.all_expected_items(), "The results should match all the pages.") + # Assert consumed state prevents calling again + with self.assertRaises(Exception, msg=_IteratorPager._state_consumed_msg): + pager.get_all() + + def test_as_pager_get_all_restarts_after_error(self): + page_size = 1 + mock = BasePageMockResponses(2*page_size, page_size) + first_page = mock.get_next_page() + # mock response order + # first page, error, first page replay, second page + mockmock = Mock(side_effect=[ + first_page, + Exception('test exception'), + first_page, + mock.get_next_page() + ]) + pagination = Pagination(self.client, TestPageIterator, {'limit': page_size}) + with patch('test_pagination_base.TestPageIterator.operation', mockmock): + pager = pagination.pager() + with self.assertRaises(Exception): + pager.get_all() + self.assertSequenceEqual(pager.get_all(), mock.all_expected_items(), "The results should match all the pages.") + + def test_as_pager_get_next_get_all_throws(self): + page_size = 11 + # Mock that returns 6 pages of 11 items, then 1 more page with 5 items + mock = BasePageMockResponses(71, page_size) + pagination = Pagination(self.client, TestPageIterator, {'limit': page_size}) + with patch('test_pagination_base.TestPageIterator.operation', mock.get_next_page): + pager: Pager[ViewResultRow] = pagination.pager() + first_page = pager.get_next() + self.assertSequenceEqual(first_page, mock.get_expected_page(1), "The actual page should match the expected page") + # Assert throws + with self.assertRaises(Exception, msg=_IteratorPager._state_mixed_msg): + pager.get_all() + # Assert second page ok + self.assertSequenceEqual(pager.get_next(), mock.get_expected_page(2), "The actual page should match the expected page") + + def test_as_pager_get_all_get_next_throws(self): + page_size = 1 + mock = BasePageMockResponses(2*page_size, page_size) + first_page = mock.get_next_page() + # mock response order + # first page, error, first page replay, second page + mockmock = Mock(side_effect=[ + first_page, + Exception('test exception') + ]) + pagination = Pagination(self.client, TestPageIterator, {'limit': page_size}) + with patch('test_pagination_base.TestPageIterator.operation', mockmock): + pager = pagination.pager() + # Stop get all part way through so it isn't consumed when we call get Next + with self.assertRaises(Exception): + pager.get_all() + # Assert calling get_next() throws + with self.assertRaises(Exception, msg=_IteratorPager._state_mixed_msg): + pager.get_next() + + def test_as_pager_get_next_resumes_after_error(self): + page_size = 1 + mock = BasePageMockResponses(2*page_size, page_size) + # mock response order + # first page, error, second page + mockmock = Mock(side_effect=[ + mock.get_next_page(), + Exception('test exception'), + mock.get_next_page() + ]) + pagination = Pagination(self.client, TestPageIterator, {'limit': page_size}) + with patch('test_pagination_base.TestPageIterator.operation', mockmock): + pager = pagination.pager() + # Assert first page + self.assertSequenceEqual(pager.get_next(), mock.get_expected_page(1), "The actual page should match the expected page") + with self.assertRaises(Exception): + pager.get_next() + # Assert second page after error + self.assertSequenceEqual(pager.get_next(), mock.get_expected_page(2), "The actual page should match the expected page") + + def test_as_pager_get_next_until_consumed(self): + page_size = 7 + # Mock that returns two pages of 7 items + mock = BasePageMockResponses(2*page_size, page_size) + pagination = Pagination(self.client, TestPageIterator, {'limit': page_size}) + with patch('test_pagination_base.TestPageIterator.operation', mock.get_next_page): + pager = pagination.pager() + page_count = 0 + while pager.has_next(): + page_count += 1 + self.assertSequenceEqual(pager.get_next(), mock.get_expected_page(page_count), "The actual page should match the expected page") + # Note 3 because third page is empty + self.assertEqual(page_count, 3, 'There should be the expected number of pages.') + # Assert consumed state prevents calling again + with self.assertRaises(Exception, msg=_IteratorPager._state_consumed_msg): + pager.get_next() diff --git a/test/unit/features/test_pagination_bookmark.py b/test/unit/features/test_pagination_bookmark.py new file mode 100644 index 00000000..7ad4c952 --- /dev/null +++ b/test/unit/features/test_pagination_bookmark.py @@ -0,0 +1,124 @@ +# coding: utf-8 + +# © Copyright IBM Corporation 2025. +# +# Licensed under the Apache License, Version 2.0 (the "License"); +# you may not use this file except in compliance with the License. +# You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. + +from collections.abc import Callable, Iterator +from unittest.mock import patch +from ibmcloudant.cloudant_v1 import SearchResult, SearchResultRow +from ibmcloudant.features.pagination import _BookmarkPageIterator, PagerType, Pagination +from conftest import MockClientBaseCase, PaginationMockResponse + +class BookmarkTestPageIterator(_BookmarkPageIterator): + """ + A test subclass of the _BookmarkPager under test. + """ + operation: Callable = None + boundary_func: Callable = lambda p,l: None + + def __init__(self, client, opts): + super().__init__(client, BookmarkTestPageIterator.operation or client.post_view, opts) + + def _result_converter(self) -> Callable[[dict], SearchResult]: + return lambda d: SearchResult.from_dict(d) + + def _items(self, result: SearchResult) -> tuple[SearchResultRow]: + return result.rows + +class BookmarkPaginationMockResponses(PaginationMockResponse): + """ + Test class for mocking page responses. + """ + def __init__(self, total_items: int, page_size: int): + super().__init__(total_items, page_size, PagerType.POST_SEARCH) + +class TestBookmarkPageIterator(MockClientBaseCase): + + # Test page size default + def test_default_page_size(self): + page_iterator: Iterator[tuple[SearchResultRow]] = BookmarkTestPageIterator(self.client, {}) + # Assert the limit default as page size + self.assertEqual(page_iterator._page_size, 200, 'The page size should be one more than the default limit.') + + # Test page size limit + def test_limit_page_size(self): + page_iterator: Iterator[tuple[SearchResultRow]] = BookmarkTestPageIterator(self.client, {'limit': 42}) + # Assert the limit provided as page size + self.assertEqual(page_iterator._page_size, 42, 'The page size should be one more than the default limit.') + + # Test all items on page when no more pages + def test_get_next_page_less_than_limit(self): + page_size = 21 + mock = BookmarkPaginationMockResponses(page_size - 1, page_size) + with patch('test_pagination_bookmark.BookmarkTestPageIterator.operation', mock.get_next_page): + page_iterator = BookmarkTestPageIterator(self.client, {'limit': page_size}) + # Get and assert first page + actual_page = next(page_iterator) + self.assertSequenceEqual(actual_page, mock.get_expected_page(1), 'The actual page should match the expected page') + # Assert page size + self.assertEqual(len(actual_page), page_size - 1, 'The actual page size should match the expected page size.') + # Assert has_next False because n+1 limit is 1 more than user page size + self.assertFalse(page_iterator._has_next, '_has_next should return False.') + + # Test correct items on page when limit + def test_get_next_page_equal_to_limit(self): + page_size = 14 + mock = BookmarkPaginationMockResponses(page_size, page_size) + with patch('test_pagination_bookmark.BookmarkTestPageIterator.operation', mock.get_next_page): + page_iterator = BookmarkTestPageIterator(self.client, {'limit': page_size}) + # Get and assert first page + actual_page = next(page_iterator) + self.assertSequenceEqual(actual_page, mock.get_expected_page(1), 'The actual page should match the expected page.') + # Assert page size + self.assertEqual(len(actual_page), page_size, 'The actual page size should match the expected page size.') + # Assert has_next True + self.assertTrue(page_iterator._has_next, '_has_next should return True.') + # Assert bookmark + self.assertEqual(page_iterator._next_page_opts['bookmark'], f'testdoc{page_size - 1}', 'The bookmark should be one less than the page size.') + # Get and assert second page + second_page = next(page_iterator) + # Note row keys are zero indexed so page size - 1 + self.assertEqual(len(second_page), 0, "The second page should be empty.") + self.assertFalse(page_iterator._has_next, '_has_next should return False.') + + # Test correct items on page when n+more + def test_get_next_page_greater_than_limit(self): + page_size = 7 + mock = BookmarkPaginationMockResponses(page_size+2, page_size) + with patch('test_pagination_bookmark.BookmarkTestPageIterator.operation', mock.get_next_page): + page_iterator = BookmarkTestPageIterator(self.client, {'limit': page_size}) + # Get and assert first page + actual_page = next(page_iterator) + self.assertSequenceEqual(actual_page, mock.get_expected_page(1), 'The actual page should match the expected page.') + # Assert page size + self.assertEqual(len(actual_page), page_size, 'The actual page size should match the expected page size.') + # Assert has_next True + self.assertTrue(page_iterator._has_next, '_has_next should return True.') + # Get and assert second page + second_page = next(page_iterator) + self.assertEqual(len(second_page), 2 , 'The second page should have two items.') + # Note row keys are zero indexed so n+1 element that is first item on second page matches page size + self.assertEqual(second_page[0].id, f'testdoc{page_size}', 'The first item key on the second page should match the page size number.') + self.assertSequenceEqual(second_page, mock.get_expected_page(2), "The actual page should match the expected page") + self.assertFalse(page_iterator._has_next, '_has_next should return False.') + + # Test getting all items + def test_get_all(self): + page_size = 3 + mock = BookmarkPaginationMockResponses(page_size*12, page_size) + pagination = Pagination(self.client, BookmarkTestPageIterator, {'limit': page_size}) + with patch('test_pagination_bookmark.BookmarkTestPageIterator.operation', mock.get_next_page): + pager = pagination.pager() + # Get and assert all items + self.assertSequenceEqual(pager.get_all(), mock.all_expected_items(), 'The results should match all the pages.') diff --git a/test/unit/features/test_pagination_key.py b/test/unit/features/test_pagination_key.py new file mode 100644 index 00000000..16faf35f --- /dev/null +++ b/test/unit/features/test_pagination_key.py @@ -0,0 +1,181 @@ +# coding: utf-8 + +# © Copyright IBM Corporation 2025. +# +# Licensed under the Apache License, Version 2.0 (the "License"); +# you may not use this file except in compliance with the License. +# You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. + +from collections.abc import Callable, Iterator +from unittest.mock import Mock, patch +from ibm_cloud_sdk_core import DetailedResponse +from ibmcloudant.cloudant_v1 import ViewResult, ViewResultRow +from ibmcloudant.features.pagination import _KeyPageIterator, PagerType, Pagination +from conftest import MockClientBaseCase, PaginationMockResponse + +class KeyTestPageIterator(_KeyPageIterator): + """ + A test subclass of the _KeyPager under test. + """ + operation: Callable = None + boundary_func: Callable = lambda p,l: None + + def __init__(self, client, opts): + super().__init__(client, KeyTestPageIterator.operation or client.post_view, opts) + + def _result_converter(self) -> Callable[[dict], ViewResult]: + return lambda d: ViewResult.from_dict(d) + + def _items(self, result: ViewResult) -> tuple[ViewResultRow]: + return result.rows + + def _get_next_page_options(self, result: ViewResult) -> dict: + if len(result.rows ) == 0: + self.assertFail("Test failure: tried to setNextPageOptions on empty page.") + else: + return {'start_key': result.rows[-1].key} + + def check_boundary(self, penultimate_item, last_item): + return KeyTestPageIterator.boundary_func(penultimate_item, last_item) + +class KeyPaginationMockResponses(PaginationMockResponse): + """ + Test class for mocking page responses. + """ + def __init__(self, total_items: int, page_size: int): + super().__init__(total_items, page_size, PagerType.POST_VIEW) + +class TestKeyPageIterator(MockClientBaseCase): + + # Test page size default (+1) + def test_default_page_size(self): + page_iterator: Iterator[tuple[ViewResultRow]] = KeyTestPageIterator(self.client, {}) + # Assert the limit default as page size + self.assertEqual(page_iterator._page_size, 201, 'The page size should be one more than the default limit.') + + # Test page size limit (+1) + def test_limit_page_size(self): + page_iterator: Iterator[tuple[ViewResultRow]] = KeyTestPageIterator(self.client, {'limit': 42}) + # Assert the limit provided as page size + self.assertEqual(page_iterator._page_size, 43, 'The page size should be one more than the default limit.') + + # Test all items on page when no more pages + def test_get_next_page_less_than_limit(self): + page_size = 21 + mock = KeyPaginationMockResponses(page_size, page_size) + with patch('test_pagination_key.KeyTestPageIterator.operation', mock.get_next_page): + page_iterator = KeyTestPageIterator(self.client, {'limit': page_size}) + # Get and assert first page + actual_page = next(page_iterator) + self.assertSequenceEqual(actual_page, mock.get_expected_page(1), 'The actual page should match the expected page') + # Assert page size + self.assertEqual(len(actual_page), page_size, 'The actual page size should match the expected page size.') + # Assert has_next False because n+1 limit is 1 more than user page size + self.assertFalse(page_iterator._has_next, '_has_next should return False.') + + # Test correct items on page when n+1 + def test_get_next_page_equal_to_limit(self): + page_size = 14 + mock = KeyPaginationMockResponses(page_size+1, page_size) + with patch('test_pagination_key.KeyTestPageIterator.operation', mock.get_next_page): + page_iterator = KeyTestPageIterator(self.client, {'limit': page_size}) + # Get and assert first page + actual_page = next(page_iterator) + self.assertSequenceEqual(actual_page, mock.get_expected_page(1), 'The actual page should match the expected page.') + # Assert page size + self.assertEqual(len(actual_page), page_size, 'The actual page size should match the expected page size.') + # Assert has_next True + self.assertTrue(page_iterator._has_next, '_has_next should return True.') + # Get and assert second page + second_page = next(page_iterator) + self.assertEqual(len(second_page), 1 , 'The second page should have one item.') + # Note row keys are zero indexed so n+1 element that is first item on second page matches page size + self.assertEqual(second_page[0].key, page_size, 'The first item key on the second page should match the page size number.') + self.assertSequenceEqual(second_page, mock.get_expected_page(2), "The actual page should match the expected page") + self.assertFalse(page_iterator._has_next, '_has_next should return False.') + + # Test correct items on page when n+more + def test_get_next_page_greater_than_limit(self): + page_size = 7 + mock = KeyPaginationMockResponses(page_size+2, page_size) + with patch('test_pagination_key.KeyTestPageIterator.operation', mock.get_next_page): + page_iterator = KeyTestPageIterator(self.client, {'limit': page_size}) + # Get and assert first page + actual_page = next(page_iterator) + self.assertSequenceEqual(actual_page, mock.get_expected_page(1), 'The actual page should match the expected page.') + # Assert page size + self.assertEqual(len(actual_page), page_size, 'The actual page size should match the expected page size.') + # Assert has_next True + self.assertTrue(page_iterator._has_next, '_has_next should return True.') + # Get and assert second page + second_page = next(page_iterator) + self.assertEqual(len(second_page), 2 , 'The second page should have two items.') + # Note row keys are zero indexed so n+1 element that is first item on second page matches page size + self.assertEqual(second_page[0].key, page_size, 'The first item key on the second page should match the page size number.') + self.assertSequenceEqual(second_page, mock.get_expected_page(2), "The actual page should match the expected page") + self.assertFalse(page_iterator._has_next, '_has_next should return False.') + + # Test getting all items + def test_get_all(self): + page_size = 3 + mock = KeyPaginationMockResponses(page_size*12, page_size) + pagination = Pagination(self.client, KeyTestPageIterator, {'limit': page_size}) + with patch('test_pagination_key.KeyTestPageIterator.operation', mock.get_next_page): + pager = pagination.pager() + # Get and assert all items + self.assertSequenceEqual(pager.get_all(), mock.all_expected_items(), 'The results should match all the pages.') + + def test_no_boundary_check_by_default(self): + mock_rows = [ + {'id': '1', 'key': 1, 'value': 1}, + {'id': '1', 'key': 1, 'value': 1} + ] + expected_rows = ViewResult.from_dict({'rows': mock_rows}).rows + mockmock = Mock(return_value=DetailedResponse(response={'rows': mock_rows})) + with patch('test_pagination_key.KeyTestPageIterator.operation', mockmock): + page_iterator = KeyTestPageIterator(self.client, {'limit': 1}) + # Get and assert page + self.assertSequenceEqual(next(page_iterator), (expected_rows[0],)) + + def test_boundary_failure_throws_on_get_next(self): + mock_rows = [ + {'id': '1', 'key': 1, 'value': 1}, + {'id': '1', 'key': 1, 'value': 1} + ] + expected_rows = ViewResult.from_dict({'rows': mock_rows}).rows + mockmock = Mock(return_value=DetailedResponse(response={'rows': mock_rows})) + with patch('test_pagination_key.KeyTestPageIterator.operation', mockmock): + page_iterator = KeyTestPageIterator(self.client, {'limit': 1}) + with patch( + 'test_pagination_key.KeyTestPageIterator.boundary_func', + lambda p,l: 'test error' if p.id == l.id and p.key == l.key else None): + # Get and assert page + self.assertSequenceEqual(next(page_iterator), (expected_rows[0],)) + # Assert has_next True + self.assertTrue(page_iterator._has_next, '_has_next should return True.') + with self.assertRaises(Exception): + next(page_iterator) + + def test_no_boundary_check_when_no_items_left(self): + mock_rows = [ + {'id': '1', 'key': 1, 'value': 1} + ] + expected_rows = ViewResult.from_dict({'rows': mock_rows}).rows + mockmock = Mock(return_value=DetailedResponse(response={'rows': mock_rows})) + with patch('test_pagination_key.KeyTestPageIterator.operation', mockmock): + page_iterator = KeyTestPageIterator(self.client, {'limit': 1}) + with patch( + 'test_pagination_key.KeyTestPageIterator.boundary_func', + Exception('Check boundary should not be called.')): + # Get and assert page if boundary is checked, will raise exception + self.assertSequenceEqual(next(page_iterator), (expected_rows[0],)) + # Assert has_next False + self.assertFalse(page_iterator._has_next, '_has_next should return True.') diff --git a/test/unit/features/test_pagination_operations.py b/test/unit/features/test_pagination_operations.py new file mode 100644 index 00000000..16383431 --- /dev/null +++ b/test/unit/features/test_pagination_operations.py @@ -0,0 +1,193 @@ +# coding: utf-8 + +# © Copyright IBM Corporation 2025. +# +# Licensed under the Apache License, Version 2.0 (the "License"); +# you may not use this file except in compliance with the License. +# You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. + +from unittest.mock import patch + +import pytest +from conftest import MockClientBaseCase, PaginationMockSupport, PaginationMockResponse +from ibmcloudant.features.pagination import _MAX_LIMIT, Pager, PagerType, Pagination + +@pytest.mark.usefixtures("errors") +class TestPaginationOperations(MockClientBaseCase): + + test_page_size = 10 + # tuples of test parameters + # total items, page size + page_sets = ( + (0, test_page_size), + (1, test_page_size), + (test_page_size - 1, test_page_size), + (test_page_size, test_page_size), + (test_page_size + 1, test_page_size), + (3 * test_page_size, test_page_size), + (3 * test_page_size + 1, test_page_size), + (4 * test_page_size - 1, test_page_size), + ) + + def get_expected_pages(self, total: int, page_size: int, pager_type: PagerType) -> int: + full_pages = total // page_size + partial_pages = 0 if (total % page_size == 0) else 1 + expected_pages = full_pages + partial_pages + # Need at least 1 empty page to know there are no more results + # if not ending on a partial page, except if the first page or + # using n+1 paging (because an exact user page is a partial real page). + if partial_pages == 0 and (expected_pages == 0 or pager_type not in PaginationMockSupport.key_pagers): + expected_pages += 1; # Will get at least 1 empty page + return expected_pages + + def test_pager(self): + for pager_type in PagerType: + with self.subTest(pager_type): + for page_set in self.page_sets: + with self.subTest(page_set): + actual_items: set = set() + actual_item_count: int = 0 + actual_page_count: int = 0 + expected_items_count: int = page_set[0] + page_size: int = page_set[1] + expected_page_count: int = self.get_expected_pages(expected_items_count, page_size, pager_type) + with patch(PaginationMockSupport.operation_map[pager_type], PaginationMockResponse(expected_items_count, page_size, pager_type).get_next_page): + pager: Pager = Pagination.new_pagination(self.client, pager_type, limit=page_size).pager() + while(pager.has_next()): + page = pager.get_next() + actual_page_count += 1 + actual_item_count += len(page) + for row in page: + if pager_type in (PagerType.POST_FIND, PagerType.POST_PARTITION_FIND): + id = row._id + else: + id = row.id + actual_items.add(id) + self.assertEqual(actual_page_count, expected_page_count, 'There should be the expected number of pages.') + self.assertEqual(actual_item_count, expected_items_count, 'There should be the expected number of items.') + self.assertEqual(len(actual_items), expected_items_count, 'The items should be unique.') + + def test_pages(self): + for pager_type in PagerType: + with self.subTest(pager_type): + for page_set in self.page_sets: + with self.subTest(page_set): + actual_items: set = set() + actual_item_count: int = 0 + actual_page_count: int = 0 + expected_items_count: int = page_set[0] + page_size: int = page_set[1] + expected_page_count: int = self.get_expected_pages(expected_items_count, page_size, pager_type) + with patch(PaginationMockSupport.operation_map[pager_type], PaginationMockResponse(expected_items_count, page_size, pager_type).get_next_page): + for page in Pagination.new_pagination(self.client, pager_type, limit=page_size).pages(): + actual_page_count += 1 + actual_item_count += len(page) + for row in page: + if pager_type in (PagerType.POST_FIND, PagerType.POST_PARTITION_FIND): + id = row._id + else: + id = row.id + actual_items.add(id) + self.assertEqual(actual_page_count, expected_page_count, 'There should be the expected number of pages.') + self.assertEqual(actual_item_count, expected_items_count, 'There should be the expected number of items.') + self.assertEqual(len(actual_items), expected_items_count, 'The items should be unique.') + + def test_rows(self): + for pager_type in PagerType: + with self.subTest(pager_type): + for page_set in self.page_sets: + with self.subTest(page_set): + actual_items: set = set() + actual_item_count: int = 0 + expected_items_count: int = page_set[0] + page_size: int = page_set[1] + with patch(PaginationMockSupport.operation_map[pager_type], PaginationMockResponse(expected_items_count, page_size, pager_type).get_next_page): + for row in Pagination.new_pagination(self.client, pager_type, limit=page_size).rows(): + actual_item_count += 1 + if pager_type in (PagerType.POST_FIND, PagerType.POST_PARTITION_FIND): + id = row._id + else: + id = row.id + actual_items.add(id) + self.assertEqual(actual_item_count, expected_items_count, 'There should be the expected number of items.') + self.assertEqual(len(actual_items), expected_items_count, 'The items should be unique.') + + def test_pager_errors(self): + page_size = _MAX_LIMIT + for pager_type in PagerType: + with self.subTest(pager_type): + for error in (*self.terminal_errors, *self.transient_errors): + expected_exception = self.make_error_exception(error) + with self.subTest(error): + # mock responses + mock_pages = PaginationMockResponse(2*page_size, page_size, pager_type) + mock_first_page = mock_pages.get_next_page() + mock_second_page = mock_pages.get_next_page() + mock_third_page = mock_pages.get_next_page() # empty + for responses in ( + # first sub-test, error on first page + (expected_exception, mock_first_page, mock_second_page, mock_third_page), + # second sub-test error on second page + (mock_first_page, expected_exception, mock_second_page, mock_third_page), + ): + with patch(PaginationMockSupport.operation_map[pager_type], side_effect=iter(responses)): + pager: Pager = Pagination.new_pagination(self.client, pager_type, limit=page_size).pager() + expect_exception_on_page: int = responses.index(expected_exception) + 1 + actual_page_count: int = 0 + while (pager.has_next()): + actual_page_count += 1 + if actual_page_count == expect_exception_on_page: + with self.assertRaises(type(expected_exception), msg='There should be an exception while paging.'): + pager.get_next() + else: + pager.get_next() + + def test_pages_errors(self): + page_size = _MAX_LIMIT + for pager_type in PagerType: + with self.subTest(pager_type): + for error in (*self.terminal_errors, *self.transient_errors): + expected_exception = self.make_error_exception(error) + with self.subTest(error): + for responses in ( + # first sub-test, error on first page + (expected_exception,), + # second sub-test error on second page + (PaginationMockResponse(2*page_size, page_size, pager_type).get_next_page(), expected_exception), + ): + with patch(PaginationMockSupport.operation_map[pager_type], side_effect=iter(responses)): + actual_page_count: int = 0 + expected_page_count: int = len(responses) - 1 + with self.assertRaises(type(expected_exception), msg='There should be an exception while paging.'): + for page in Pagination.new_pagination(self.client, pager_type, limit=page_size).pages(): + actual_page_count += 1 + self.assertEqual(actual_page_count, expected_page_count, 'Should have got the correct number of pages before error.') + + def test_rows_errors(self): + page_size = _MAX_LIMIT + for pager_type in PagerType: + with self.subTest(pager_type): + for error in (*self.terminal_errors, *self.transient_errors): + expected_exception = self.make_error_exception(error) + with self.subTest(error): + for responses in ( + # first sub-test, error on first page + (expected_exception,), + # second sub-test error on second page + (PaginationMockResponse(2*page_size, page_size, pager_type).get_next_page(), expected_exception), + ): + with patch(PaginationMockSupport.operation_map[pager_type], side_effect=iter(responses)): + actual_item_count: int = 0 + expected_item_count: int = page_size * (len(responses) - 1) + with self.assertRaises(type(expected_exception), msg='There should be an exception while paging.'): + for row in Pagination.new_pagination(self.client, pager_type, limit=page_size).rows(): + actual_item_count += 1 + self.assertEqual(actual_item_count, expected_item_count, 'Should have got the correct number of items before error.') diff --git a/test/unit/features/test_pagination_option_validation.py b/test/unit/features/test_pagination_option_validation.py new file mode 100644 index 00000000..c694262d --- /dev/null +++ b/test/unit/features/test_pagination_option_validation.py @@ -0,0 +1,57 @@ +# coding: utf-8 + +# © Copyright IBM Corporation 2025. +# +# Licensed under the Apache License, Version 2.0 (the "License"); +# you may not use this file except in compliance with the License. +# You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. + +from unittest import TestCase +from ibmcloudant.features.pagination import _MIN_LIMIT, _MAX_LIMIT, PagerType, Pagination + +class TestPaginationOptionValidation(TestCase): + + all_doc_paginations = (PagerType.POST_ALL_DOCS, PagerType.POST_PARTITION_ALL_DOCS, PagerType.POST_DESIGN_DOCS) + find_paginations = (PagerType.POST_PARTITION_FIND, PagerType.POST_FIND) + search_paginations = (PagerType.POST_SEARCH, PagerType.POST_PARTITION_SEARCH) + view_paginations = (PagerType.POST_PARTITION_VIEW, PagerType.POST_VIEW) + view_like_paginations = (*all_doc_paginations, *view_paginations) + all_paginations = (*find_paginations, *search_paginations, *view_like_paginations) + + def test_valid_limits(self): + test_limits = (_MIN_LIMIT, _MAX_LIMIT - 1, _MAX_LIMIT, None) + for limit in test_limits: + for pager_type in self.all_paginations: + with self.subTest(pager_type): + try: + Pagination.new_pagination(None, pager_type, limit=limit) + except ValueError: + self.fail('There should be no ValueError for valid limits.') + + def test_invalid_limits(self): + test_limits = (_MIN_LIMIT - 1, _MAX_LIMIT + 1) + for limit in test_limits: + for pager_type in self.all_paginations: + with self.subTest(pager_type): + with self.assertRaises(ValueError, msg='There should be a ValueError for invalid limits.'): + Pagination.new_pagination(None, pager_type, limit=limit) + + def test_keys_value_error_for_view_like(self): + for pager_type in self.view_like_paginations: + with self.subTest(pager_type): + with self.assertRaises(ValueError, msg=f'There should be a ValueError for {pager_type} with keys.'): + Pagination.new_pagination(None, pager_type, keys=['a','b','c']) + + def test_facet_value_errors_for_search(self): + for invalid_opt in ('counts', 'group_field', 'group_limit', 'group_sort', 'ranges',): + with self.subTest(invalid_opt): + with self.assertRaises(ValueError, msg=f'There should be a ValueError for search with option {invalid_opt}.'): + Pagination.new_pagination(None, PagerType.POST_SEARCH, **{invalid_opt: 'test value'})