Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
30 changes: 24 additions & 6 deletions docs/02_concepts/08_pagination.mdx
Original file line number Diff line number Diff line change
Expand Up @@ -12,9 +12,10 @@ import ApiLink from '@site/src/components/ApiLink';

import PaginationAsyncExample from '!!raw-loader!./code/08_pagination_async.py';
import PaginationSyncExample from '!!raw-loader!./code/08_pagination_sync.py';

import IterateItemsAsyncExample from '!!raw-loader!./code/08_iterate_items_async.py';
import IterateItemsSyncExample from '!!raw-loader!./code/08_iterate_items_sync.py';
import IterateCollectionAsyncExample from '!!raw-loader!./code/08_iterate_collection_async.py';
import IterateCollectionSyncExample from '!!raw-loader!./code/08_iterate_collection_sync.py';

Most methods named `list` or `list_something` in the Apify client return a <ApiLink to="class/ListPage">`ListPage`</ApiLink> object. This object provides a consistent interface for working with paginated data and includes the following properties:

Expand Down Expand Up @@ -45,21 +46,38 @@ The <ApiLink to="class/ListPage">`ListPage`</ApiLink> interface offers several k

## Generator-based iteration

For most use cases, `iterate_items()` is the recommended way to process all items in a dataset. It handles pagination automatically using a Python generator, fetching items in batches behind the scenes so you don't need to manage offsets or limits yourself.
For collection clients, the `iterate` method returns an iterator that lazily fetches as many pages as needed
to retrieve every item matching the filters. For dataset, key-value store and request queue clients, the
matching helpers are `iterate_items`, `iterate_keys` and `iterate_requests`. They handle pagination
automatically, so you don't need to manage offsets, limits or cursors yourself.

The example below iterates over every Actor owned by the current user using a collection client's `iterate`
method:

<Tabs>
<TabItem value="AsyncExample" label="Async client" default>
<CodeBlock className="language-python">
{IterateItemsAsyncExample}
{IterateCollectionAsyncExample}
</CodeBlock>
</TabItem>
<TabItem value="SyncExample" label="Sync client">
<CodeBlock className="language-python">
{IterateItemsSyncExample}
{IterateCollectionSyncExample}
</CodeBlock>
</TabItem>
</Tabs>

`iterate_items()` accepts the same filtering parameters as `list_items()` (`clean`, `fields`, `omit`, `unwind`, `skip_empty`, `skip_hidden`), so you can combine automatic pagination with data filtering.
The next example uses `iterate_items` on a dataset client to stream items past a given offset:

Similarly, `KeyValueStoreClient` provides an `iterate_keys()` method for iterating over all keys in a key-value store without manual pagination.
<Tabs>
<TabItem value="AsyncExample" label="Async client" default>
<CodeBlock className="language-python">
{IterateItemsAsyncExample}
</CodeBlock>
</TabItem>
<TabItem value="SyncExample" label="Sync client">
<CodeBlock className="language-python">
{IterateItemsSyncExample}
</CodeBlock>
</TabItem>
</Tabs>
12 changes: 12 additions & 0 deletions docs/02_concepts/code/08_iterate_collection_async.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,12 @@
from apify_client import ApifyClientAsync

TOKEN = 'MY-APIFY-TOKEN'


async def main() -> None:
apify_client = ApifyClientAsync(TOKEN)

# Iterate over all Actors owned by the current user, lazily fetching
# as many pages as needed under the hood.
async for actor in apify_client.actors().iterate(my=True):
print(actor.id)
16 changes: 16 additions & 0 deletions docs/02_concepts/code/08_iterate_collection_sync.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,16 @@
from apify_client import ApifyClient

TOKEN = 'MY-APIFY-TOKEN'


def main() -> None:
apify_client = ApifyClient(TOKEN)

# Iterate over all Actors owned by the current user, lazily fetching
# as many pages as needed under the hood.
for actor in apify_client.actors().iterate(my=True):
print(actor.id)


if __name__ == '__main__':
main()
11 changes: 8 additions & 3 deletions docs/02_concepts/code/08_iterate_items_async.py
Original file line number Diff line number Diff line change
Expand Up @@ -7,6 +7,11 @@ async def main() -> None:
apify_client = ApifyClientAsync(TOKEN)
dataset_client = apify_client.dataset('dataset-id')

# Iterate through all items automatically.
async for item in dataset_client.iterate_items():
print(item)
# Define the pagination parameters
limit = 1500 # Number of items in total
offset = 100 # Starting offset

# Iterate through items automatically, lazily sending as many API calls
# as needed and receiving items in chunks.
async for item in dataset_client.iterate_items(limit=limit, offset=offset):
print(item) # Process the item as needed
11 changes: 8 additions & 3 deletions docs/02_concepts/code/08_iterate_items_sync.py
Original file line number Diff line number Diff line change
Expand Up @@ -7,9 +7,14 @@ def main() -> None:
apify_client = ApifyClient(TOKEN)
dataset_client = apify_client.dataset('dataset-id')

# Iterate through all items automatically.
for item in dataset_client.iterate_items():
print(item)
# Define the pagination parameters
limit = 1500 # Number of items in total
offset = 100 # Starting offset

# Iterate through items automatically, lazily sending as many API calls
# as needed and receiving items in chunks.
for item in dataset_client.iterate_items(limit=limit, offset=offset):
print(item) # Process the item as needed


if __name__ == '__main__':
Expand Down
27 changes: 8 additions & 19 deletions docs/02_concepts/code/08_pagination_async.py
Original file line number Diff line number Diff line change
Expand Up @@ -10,26 +10,15 @@ async def main() -> None:
dataset_client = apify_client.dataset('dataset-id')

# Define the pagination parameters
limit = 1000 # Number of items per page
limit = 1000 # Number items to request from API
offset = 0 # Starting offset
all_items = [] # List to store all fetched items

while True:
# Fetch a page of items
response = await dataset_client.list_items(limit=limit, offset=offset)
items = response.items
total = response.total
# Send single API call to fetch paginated items.
# (number of items per single call can be limited by API)
paginated_items = await dataset_client.list_items(limit=limit, offset=offset)

print(f'Fetched {len(items)} items')
# Inspect pagination metadata returned by API
print(paginated_items.total)

# Add the fetched items to the complete list
all_items.extend(items)

# Exit the loop if there are no more items to fetch
if offset + limit >= total:
break

# Increment the offset for the next page
offset += limit

print(f'Overall fetched {len(all_items)} items')
for item in paginated_items.items:
print(item) # Process the item as needed
27 changes: 8 additions & 19 deletions docs/02_concepts/code/08_pagination_sync.py
Original file line number Diff line number Diff line change
Expand Up @@ -10,26 +10,15 @@ def main() -> None:
dataset_client = apify_client.dataset('dataset-id')

# Define the pagination parameters
limit = 1000 # Number of items per page
limit = 1000 # Number items to request from API
offset = 0 # Starting offset
all_items = [] # List to store all fetched items

while True:
# Fetch a page of items
response = dataset_client.list_items(limit=limit, offset=offset)
items = response.items
total = response.total
# Send single API call to fetch paginated items.
# (number of items per single call can be limited by API)
paginated_items = dataset_client.list_items(limit=limit, offset=offset)

print(f'Fetched {len(items)} items')
# Inspect pagination metadata returned by API
print(paginated_items.total)

# Add the fetched items to the complete list
all_items.extend(items)

# Exit the loop if there are no more items to fetch
if offset + limit >= total:
break

# Increment the offset for the next page
offset += limit

print(f'Overall fetched {len(all_items)} items')
for item in paginated_items.items:
print(item) # Process the item as needed
19 changes: 19 additions & 0 deletions docs/04_upgrading/upgrading_to_v3.mdx
Original file line number Diff line number Diff line change
Expand Up @@ -320,3 +320,22 @@ from apify_client._literals import WebhookEventType

events: list[WebhookEventType] = ['ACTOR.RUN.SUCCEEDED', 'ACTOR.RUN.FAILED']
```

## Async `iterate_*` methods are plain functions, not async generators

Async iteration helpers — <ApiLink to="class/DatasetClientAsync#iterate_items">`DatasetClientAsync.iterate_items()`</ApiLink> and <ApiLink to="class/KeyValueStoreClientAsync#iterate_keys">`KeyValueStoreClientAsync.iterate_keys()`</ApiLink> — were previously declared as `async def` (async generator functions). They are now plain `def` functions that return an `AsyncIterator` produced by a shared pagination helper.

Consumer-side iteration is unchanged — `async for item in client.iterate_items(...)` works the same in both versions:

```python
# Works in both v2 and v3
async for item in client.dataset('my-dataset').iterate_items():
print(item)
```

The difference matters only if your code inspects the function itself:

- The call is no longer a coroutine function — `inspect.iscoroutinefunction(client.iterate_items)` returns `False`, and `inspect.isasyncgenfunction(client.iterate_items)` also returns `False` (it returns a regular function whose result is an async iterator).
- Type checkers see `def (...) -> AsyncIterator[T]` instead of `async def (...) -> AsyncIterator[T]`. Annotations on variables that hold the call's result may need to change from `AsyncGenerator[T, None]` to `AsyncIterator[T]`.

A new <ApiLink to="class/RequestQueueClientAsync#iterate_requests">`RequestQueueClientAsync.iterate_requests()`</ApiLink> helper is also introduced and follows the same `def ... -> AsyncIterator[T]` shape.
177 changes: 177 additions & 0 deletions src/apify_client/_pagination.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,177 @@
from __future__ import annotations

from typing import TYPE_CHECKING, Any, Protocol, TypeVar

if TYPE_CHECKING:
from collections.abc import AsyncIterator, Awaitable, Callable, Iterator

T = TypeVar('T')

DEFAULT_CHUNK_SIZE = 1000
"""Default per-page size used by the iterate helpers when the caller does not specify one.

The value of 1000 keeps backwards compatibility with the previous fixed cache size.
"""


class HasItems(Protocol[T]):
"""Structural contract for a single page of results from a paginated API endpoint.

Implementations must expose `items`. They may optionally expose `count` — the number of items scanned by the API for
this page, which can exceed `len(items)` when filters drop items from the response. The iterator helpers consult
`count` opportunistically via `getattr` for offset bookkeeping and fall back to `len(items)` when it is absent.
"""

items: list[T]


def get_items_iterator(
callback: Callable[..., HasItems[T]],
*,
limit: int | None = None,
offset: int | None = None,
chunk_size: int | None = None,
) -> Iterator[T]:
"""Yield individual items from offset-based paginated API responses.

The `callback` is invoked lazily to fetch each page from the API. It must accept `limit` and `offset` keyword
arguments and return an object whose `items` attribute is a list. If the object also exposes a `count` attribute, it
is used for offset bookkeeping (the Apify API's `count` reflects items scanned, which can exceed items returned when
filters are applied).

Iteration stops when a page returns no items or when the user-requested `limit` is reached. The `total` field is
intentionally not consulted, because it can change between calls.

Args:
callback: Function returning a single page of items.
limit: Maximum total number of items to yield across all pages. `None` or `0` means no limit.
offset: Starting offset for the first page.
chunk_size: Maximum number of items requested per API call. `None` or `0` lets the API decide.
"""
effective_chunk = chunk_size or 0
initial_offset = offset or 0
initial_limit = limit or 0
fetched_items = 0

while True:
current_page = callback(
limit=_next_page_limit(initial_limit, fetched_items, effective_chunk),
offset=initial_offset + fetched_items,
)
yield from current_page.items

fetched_items += max(getattr(current_page, 'count', 0), len(current_page.items))

if not current_page.items or (initial_limit and fetched_items >= initial_limit):
break


async def get_items_iterator_async(
callback: Callable[..., Awaitable[HasItems[T]]],
*,
limit: int | None = None,
offset: int | None = None,
chunk_size: int | None = None,
) -> AsyncIterator[T]:
"""Async variant of :func:`get_items_iterator`.

The `callback` must be an awaitable returning a single page of items.
"""
effective_chunk = chunk_size or 0
initial_offset = offset or 0
initial_limit = limit or 0
fetched_items = 0

while True:
current_page = await callback(
limit=_next_page_limit(initial_limit, fetched_items, effective_chunk),
offset=initial_offset + fetched_items,
)
for item in current_page.items:
yield item

fetched_items += max(getattr(current_page, 'count', 0), len(current_page.items))

if not current_page.items or (initial_limit and fetched_items >= initial_limit):
break


def get_cursor_iterator(
callback: Callable[..., HasItems[T]],
*,
next_cursor: Callable[[Any], str | None],
cursor: str | None = None,
limit: int | None = None,
chunk_size: int | None = None,
) -> Iterator[T]:
"""Yield individual items from cursor-paginated API responses.

Each page is expected to expose `items`; iteration ends when a page returns no items, the cursor extracted by
`next_cursor` is `None`, or the user-requested `limit` is reached.

Args:
callback: Function returning a single page of items. Receives `cursor` and `limit` kwargs.
next_cursor: Callable that extracts the next-page cursor from the returned page (e.g. `lambda p: p.next_cursor`)
and returns `None` when there are no more pages.
cursor: Value of the cursor for the first request, or `None` to start from the beginning.
limit: Maximum total number of items to yield across all pages.
chunk_size: Maximum number of items requested per API call.
"""
effective_chunk = chunk_size or 0
initial_limit = limit or 0
fetched_items = 0

while True:
current_page = callback(
limit=_next_page_limit(initial_limit, fetched_items, effective_chunk),
cursor=cursor,
)
yield from current_page.items

fetched_items += max(getattr(current_page, 'count', 0), len(current_page.items))
cursor = next_cursor(current_page)

if not current_page.items or cursor is None or (initial_limit and fetched_items >= initial_limit):
break


async def get_cursor_iterator_async(
callback: Callable[..., Awaitable[HasItems[T]]],
*,
next_cursor: Callable[[Any], str | None],
cursor: str | None = None,
limit: int | None = None,
chunk_size: int | None = None,
) -> AsyncIterator[T]:
"""Async variant of :func:`get_cursor_iterator`."""
effective_chunk = chunk_size or 0
initial_limit = limit or 0
fetched_items = 0

while True:
current_page = await callback(
limit=_next_page_limit(initial_limit, fetched_items, effective_chunk),
cursor=cursor,
)
for item in current_page.items:
yield item

fetched_items += max(getattr(current_page, 'count', 0), len(current_page.items))
cursor = next_cursor(current_page)

if not current_page.items or cursor is None or (initial_limit and fetched_items >= initial_limit):
break


def _next_page_limit(initial_limit: int, fetched_items: int, effective_chunk: int) -> int:
"""Compute the `limit` value for the next API call.

`0` means no limit on the wire (matches the Apify API contract). When both an overall `initial_limit` and a per-page
`effective_chunk` are set, the call is clamped to whichever is smaller; if either is unset (`0`), the other wins.
"""
if not initial_limit:
return effective_chunk
remaining = initial_limit - fetched_items
if not effective_chunk:
return remaining
return min(remaining, effective_chunk)
Loading
Loading