Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
Show all changes
16 commits
Select commit Hold shift + click to select a range
91f3640
feat: add PaginationList for lazy page fetching in catalog list opera…
stark256-spec Jun 3, 2026
5a01899
fix(pagination): resolve mypy and ruff lint errors
stark256-spec Jun 3, 2026
98d6900
fix(tests): resolve ruff lint errors and add len() performance tests
stark256-spec Jun 3, 2026
9fbcdd3
fix(pagination): add docstrings to __getitem__ overload stubs for pyd…
stark256-spec Jun 3, 2026
10c2ba0
fix(pagination): use noqa:D105 on overload stubs instead of docstrings
stark256-spec Jun 3, 2026
0b08854
fix(pagination): drop @overload stubs to resolve D105/D418 pydocstyle…
stark256-spec Jun 3, 2026
6a7b6d0
fix(pagination): add D105/D418 to pydocstyle ignore list; restore @ov…
stark256-spec Jun 3, 2026
d71317c
fix(tests): move PaginationList import to correct alphabetical positi…
stark256-spec Jun 4, 2026
1780ae6
fix(tests): account for RestCatalog config endpoint call in call_coun…
stark256-spec Jun 4, 2026
e2bb799
refactor(pagination): move PaginationList to typedef, add PageFetchRe…
stark256-spec Jun 6, 2026
392a8f5
fix: update PaginationList import in test_rest.py
stark256-spec Jun 6, 2026
7ab945a
fix(lint): add D105 back to pydocstyle ignore for @overload stubs
stark256-spec Jun 10, 2026
329b1bd
fix: resolve ruff import ordering and pagination cleanup bug
stark256-spec Jun 10, 2026
769728d
feat(pagination): add count, index, reversed, copy, and + to Paginati…
stark256-spec Jun 11, 2026
3883280
Address review nits: named FetchNextPage alias and HTTP call count as…
stark256-spec Jun 19, 2026
1c3abd9
Add __mul__ and __rmul__ overrides to PaginationList
stark256-spec Jun 19, 2026
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 1 addition & 1 deletion .pre-commit-config.yaml
Original file line number Diff line number Diff line change
Expand Up @@ -58,7 +58,7 @@ repos:
- id: pydocstyle
args:
[
"--ignore=D100,D102,D101,D103,D104,D107,D203,D212,D213,D404,D405,D406,D407,D411,D413,D415,D417",
"--ignore=D100,D102,D101,D103,D104,D105,D107,D203,D212,D213,D404,D405,D406,D407,D411,D413,D415,D417",
]
additional_dependencies:
- tomli==2.0.1
Expand Down
85 changes: 40 additions & 45 deletions pyiceberg/catalog/rest/__init__.py
Original file line number Diff line number Diff line change
Expand Up @@ -21,6 +21,7 @@
from typing import (
TYPE_CHECKING,
Any,
TypeAlias,
)
from urllib.parse import quote, unquote

Expand Down Expand Up @@ -86,7 +87,7 @@
TableRequirement,
TableUpdate,
)
from pyiceberg.typedef import EMPTY_DICT, UTF8, IcebergBaseModel, Identifier, Properties
from pyiceberg.typedef import EMPTY_DICT, UTF8, IcebergBaseModel, Identifier, PaginationList, Properties
from pyiceberg.types import transform_dict_value_to_str
from pyiceberg.utils.deprecated import deprecation_message
from pyiceberg.utils.properties import get_first_property_value, get_header_properties, property_as_bool, property_as_int
Expand All @@ -96,6 +97,8 @@
if TYPE_CHECKING:
import pyarrow as pa

_PageFetchResult: TypeAlias = tuple[list[Identifier], str | None]


class HttpMethod(str, Enum):
GET = "GET"
Expand Down Expand Up @@ -1051,26 +1054,24 @@ def list_tables(self, namespace: str | Identifier) -> list[Identifier]:
raise ValueError(f"{PAGE_SIZE} must be a positive integer")
params["pageSize"] = str(page_size)

tables: list[Identifier] = []
page_token: str | None = None

while True:
if page_token:
params["pageToken"] = page_token
def _fetch_page(page_token: str) -> _PageFetchResult:
params["pageToken"] = page_token
response = self._session.get(url, params=params)
try:
response.raise_for_status()
except HTTPError as exc:
_handle_non_200_response(exc, {404: NoSuchNamespaceError})

parsed = ListTablesResponse.model_validate_json(response.text)
tables.extend([(*table.namespace, table.name) for table in parsed.identifiers])
return [(*t.namespace, t.name) for t in parsed.identifiers], parsed.next_page_token

if not parsed.next_page_token:
break
page_token = parsed.next_page_token

return tables
response = self._session.get(url, params=params)
try:
response.raise_for_status()
except HTTPError as exc:
_handle_non_200_response(exc, {404: NoSuchNamespaceError})
parsed = ListTablesResponse.model_validate_json(response.text)
first_page: list[Identifier] = [(*t.namespace, t.name) for t in parsed.identifiers]
return PaginationList(first_page, parsed.next_page_token, _fetch_page)

@retry(**_RETRY_ARGS)
@override
Expand Down Expand Up @@ -1165,27 +1166,24 @@ def list_views(self, namespace: str | Identifier) -> list[Identifier]:
raise ValueError(f"{PAGE_SIZE} must be a positive integer")
params["pageSize"] = str(page_size)

views: list[Identifier] = []
page_token: str | None = None

while True:
if page_token:
params["pageToken"] = page_token

def _fetch_page(page_token: str) -> _PageFetchResult:
params["pageToken"] = page_token
response = self._session.get(url, params=params)
try:
response.raise_for_status()
except HTTPError as exc:
_handle_non_200_response(exc, {404: NoSuchNamespaceError})

parsed = ListViewsResponse.model_validate_json(response.text)
views.extend([(*view.namespace, view.name) for view in parsed.identifiers])

if not parsed.next_page_token:
break
page_token = parsed.next_page_token
return [(*v.namespace, v.name) for v in parsed.identifiers], parsed.next_page_token

return views
response = self._session.get(url, params=params)
try:
response.raise_for_status()
except HTTPError as exc:
_handle_non_200_response(exc, {404: NoSuchNamespaceError})
parsed = ListViewsResponse.model_validate_json(response.text)
first_page: list[Identifier] = [(*v.namespace, v.name) for v in parsed.identifiers]
return PaginationList(first_page, parsed.next_page_token, _fetch_page)

@retry(**_RETRY_ARGS)
@override
Expand Down Expand Up @@ -1279,37 +1277,34 @@ def drop_namespace(self, namespace: str | Identifier) -> None:
def list_namespaces(self, namespace: str | Identifier = ()) -> list[Identifier]:
self._check_endpoint(Capability.V1_LIST_NAMESPACES)
namespace_tuple = self.identifier_to_tuple(namespace)
namespaces_url = self.url(Endpoints.list_namespaces)

params: dict[str, str] = {}
page_size = property_as_int(self.properties, PAGE_SIZE, None)
if page_size is not None:
if page_size <= 0:
raise ValueError(f"{PAGE_SIZE} must be a positive integer")
params["pageSize"] = str(page_size)
if namespace_tuple:
params["parent"] = self._encode_namespace_path(namespace_tuple)

namespaces: list[Identifier] = []
page_token: str | None = None

while True:
if namespace_tuple:
params["parent"] = self._encode_namespace_path(namespace_tuple)
if page_token:
params["pageToken"] = page_token
response = self._session.get(self.url(Endpoints.list_namespaces), params=params)

def _fetch_page(page_token: str) -> _PageFetchResult:
params["pageToken"] = page_token
response = self._session.get(namespaces_url, params=params)
try:
response.raise_for_status()
except HTTPError as exc:
_handle_non_200_response(exc, {404: NoSuchNamespaceError})

parsed = ListNamespaceResponse.model_validate_json(response.text)
namespaces.extend(parsed.namespaces)

if not parsed.next_page_token:
break
page_token = parsed.next_page_token
return list(parsed.namespaces), parsed.next_page_token

return namespaces
response = self._session.get(namespaces_url, params=params)
try:
response.raise_for_status()
except HTTPError as exc:
_handle_non_200_response(exc, {404: NoSuchNamespaceError})
parsed = ListNamespaceResponse.model_validate_json(response.text)
return PaginationList(list(parsed.namespaces), parsed.next_page_token, _fetch_page)

@retry(**_RETRY_ARGS)
@override
Expand Down
146 changes: 145 additions & 1 deletion pyiceberg/typedef.py
Original file line number Diff line number Diff line change
Expand Up @@ -16,8 +16,9 @@
# under the License.
from __future__ import annotations

import sys
from abc import abstractmethod
from collections.abc import Callable
from collections.abc import Callable, Iterator
from datetime import date, datetime, time
from decimal import Decimal
from typing import (
Expand All @@ -26,9 +27,11 @@
Generic,
Literal,
Protocol,
SupportsIndex,
TypeAlias,
TypeVar,
Union,
overload,
runtime_checkable,
)
from uuid import UUID
Expand Down Expand Up @@ -153,6 +156,7 @@ def model_dump_json(


T = TypeVar("T")
S = TypeVar("S")


class IcebergRootModel(RootModel[T], Generic[T]):
Expand Down Expand Up @@ -211,3 +215,143 @@ def __hash__(self) -> int:

TableVersion: TypeAlias = Literal[1, 2, 3]
ViewVersion: TypeAlias = Literal[1]

FetchNextPage: TypeAlias = Callable[[str], tuple[list[T], str | None]]


class PaginationList(list[T]):
"""A list that lazily fetches subsequent pages from a paginated API.

The first page is pre-loaded on construction. Subsequent pages are only
fetched when the caller iterates past items already in memory. Operations
that require the complete result set — ``len()``, ``in``, slicing,
``repr()`` — trigger a full fetch of all remaining pages.

Args:
first_page: Items from the first API response.
next_page_token: Pagination token returned with the first response,
or ``None`` if no further pages exist.
fetch_next_page: Callable matching ``FetchNextPage[T]`` — accepts a
page token and returns ``(items, next_page_token_or_None)``.
"""

def __init__(
self,
first_page: list[T],
next_page_token: str | None,
fetch_next_page: FetchNextPage[T],
) -> None:
super().__init__(first_page)
self._next_page_token = next_page_token
self._fetch_next_page = fetch_next_page

def _fetch_all(self) -> None:
while self._next_page_token:
items, self._next_page_token = self._fetch_next_page(self._next_page_token)
list.extend(self, items)

def _fetch_through_index(self, idx: int) -> None:
while list.__len__(self) <= idx and self._next_page_token:
items, self._next_page_token = self._fetch_next_page(self._next_page_token)
list.extend(self, items)

def __iter__(self) -> Iterator[T]:
"""Iterate lazily, fetching pages only as the caller advances."""
idx = 0
while True:
if idx < list.__len__(self):
yield list.__getitem__(self, idx)
idx += 1
elif self._next_page_token:
items, self._next_page_token = self._fetch_next_page(self._next_page_token)
list.extend(self, items)
else:
return

def __len__(self) -> int:
"""Return the total number of items, fetching all pages first."""
self._fetch_all()
return list.__len__(self)

def __contains__(self, item: object) -> bool:
"""Return True if item is present, fetching all pages first."""
self._fetch_all()
return list.__contains__(self, item)

def __repr__(self) -> str:
"""Return string representation after fetching all pages."""
self._fetch_all()
return f"PaginationList({list.__repr__(self)})"

def __eq__(self, other: object) -> bool:
"""Compare equality after fetching all pages."""
self._fetch_all()
return list.__eq__(self, other)

def __ne__(self, other: object) -> bool:
"""Compare inequality after fetching all pages."""
return not self.__eq__(other)

@overload
def __getitem__(self, idx: SupportsIndex) -> T: ...

@overload
def __getitem__(self, idx: slice) -> list[T]: ...

def __getitem__(self, idx: SupportsIndex | slice) -> T | list[T]:
"""Fetch pages as needed before returning the requested item(s)."""
if isinstance(idx, slice):
self._fetch_all()
else:
i = idx.__index__()
if i < 0:
self._fetch_all()
else:
self._fetch_through_index(i)
return list.__getitem__(self, idx)

def count(self, value: T) -> int:
"""Return the number of occurrences of value, fetching all pages first."""
self._fetch_all()
return list.count(self, value)

def index(self, value: T, start: SupportsIndex = 0, stop: SupportsIndex = sys.maxsize, /) -> int:
"""Return the index of the first occurrence of value, fetching all pages first."""
self._fetch_all()
return list.index(self, value, start, stop)

def __reversed__(self) -> Iterator[T]:
"""Return an iterator over the items in reverse order, fetching all pages first."""
self._fetch_all()
return list.__reversed__(self)

def copy(self) -> list[T]:
"""Return a plain list with all items, fetching all pages first."""
self._fetch_all()
return list.copy(self)

@overload
def __add__(self, other: list[T]) -> list[T]: ...

@overload
def __add__(self, other: list[S]) -> list[S | T]: ...

def __add__(self, other: list[Any]) -> list[Any]:
"""Return self + other as a plain list, fetching all pages first."""
self._fetch_all()
return list.__add__(self, other)

def __radd__(self, other: list[T]) -> list[T]:
"""Return other + self as a plain list, fetching all pages first."""
self._fetch_all()
return list.__add__(other, self)

def __mul__(self, n: SupportsIndex) -> list[T]:
"""Return self * n as a plain list, fetching all pages first."""
self._fetch_all()
return list.__mul__(self, n)

def __rmul__(self, n: SupportsIndex) -> list[T]:
"""Return n * self as a plain list, fetching all pages first."""
self._fetch_all()
return list.__mul__(self, n)
Loading