Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
1 change: 1 addition & 0 deletions changes/3987.feature.md
Original file line number Diff line number Diff line change
@@ -0,0 +1 @@
Two new fields on `ArrayConfig` control how the sharding codec coalesces partial-shard reads: `sharding_coalesce_max_gap_bytes` (default 1 MiB) and `sharding_coalesce_max_bytes` (default 16 MiB). When reading multiple chunks from the same shard, nearby byte ranges are merged into a single request to the store if separated by no more than `sharding_coalesce_max_gap_bytes` and the merged read stays within `sharding_coalesce_max_bytes`. Defaults are seeded from the matching `array.sharding_coalesce_max_gap_bytes` / `array.sharding_coalesce_max_bytes` keys in [`zarr.config`][] at array-creation time, and can be overridden per array by passing `config={...}` to [`zarr.create_array`][].
1 change: 1 addition & 0 deletions docs/user-guide/config.md
Original file line number Diff line number Diff line change
Expand Up @@ -35,6 +35,7 @@ Configuration options include the following:
- Async and threading options, e.g. `async.concurrency` and `threading.max_workers`
- Selections of implementations of codecs, codec pipelines and buffers
- Enabling GPU support with `zarr.config.enable_gpu()`. See GPU support for more.
- Control request merging when reading multiple chunks from the same shard with `array.sharding_coalesce_max_gap_bytes` and `array.sharding_coalesce_max_bytes`. Reads of nearby chunks are coalesced into a single request to the store when separated by at most `sharding_coalesce_max_gap_bytes` and the resulting merged read is no larger than `sharding_coalesce_max_bytes`.

For selecting custom implementations of codecs, pipelines, buffers and ndbuffers,
first register the implementations in the registry and then select them in the config.
Expand Down
14 changes: 13 additions & 1 deletion src/zarr/codecs/sharding.py
Original file line number Diff line number Diff line change
Expand Up @@ -527,6 +527,8 @@ async def _decode_partial_single(
chunk_spec.prototype,
chunks_per_shard,
all_chunk_coords,
max_gap_bytes=shard_spec.config.sharding_coalesce_max_gap_bytes,
max_coalesced_bytes=shard_spec.config.sharding_coalesce_max_bytes,
)

if shard_dict_maybe is None:
Expand Down Expand Up @@ -846,10 +848,16 @@ async def _load_partial_shard_maybe(
prototype: BufferPrototype,
chunks_per_shard: tuple[int, ...],
all_chunk_coords: set[tuple[int, ...]],
max_gap_bytes: int,
max_coalesced_bytes: int,
) -> ShardMapping | None:
"""
Read chunks from `byte_getter` for the case where the read is less than a full shard.
Returns a mapping of chunk coordinates to bytes or None.

`max_gap_bytes` and `max_coalesced_bytes` are forwarded to
`Store.get_ranges` to control byte-range coalescing across the requested
chunks.
"""
shard_index = await self._load_shard_index_maybe(byte_getter, chunks_per_shard)
if shard_index is None:
Expand All @@ -873,7 +881,11 @@ async def _load_partial_shard_maybe(
byte_ranges = [byte_range for _, byte_range in chunk_coord_byte_ranges]
try:
async for group in byte_getter.store.get_ranges(
byte_getter.path, byte_ranges, prototype=prototype
byte_getter.path,
byte_ranges,
prototype=prototype,
max_gap_bytes=max_gap_bytes,
max_coalesced_bytes=max_coalesced_bytes,
):
for idx, buf in group:
if buf is not None:
Expand Down
30 changes: 28 additions & 2 deletions src/zarr/core/array_spec.py
Original file line number Diff line number Diff line change
Expand Up @@ -7,6 +7,7 @@
MemoryOrder,
parse_bool,
parse_fill_value,
parse_int,
parse_order,
parse_shapelike,
)
Expand All @@ -29,6 +30,8 @@ class ArrayConfigParams(TypedDict):
order: NotRequired[MemoryOrder]
write_empty_chunks: NotRequired[bool]
read_missing_chunks: NotRequired[bool]
sharding_coalesce_max_gap_bytes: NotRequired[int]
sharding_coalesce_max_bytes: NotRequired[int]


@dataclass(frozen=True)
Expand All @@ -45,22 +48,42 @@ class ArrayConfig:
read_missing_chunks : bool
If True, missing chunks will be filled with the array's fill value on read.
If False, reading missing chunks will raise a ``ChunkNotFoundError``.
sharding_coalesce_max_gap_bytes : int
When reading multiple chunks from the same shard, nearby byte ranges
separated by no more than this many bytes are coalesced into a single
request to the store.
sharding_coalesce_max_bytes : int
Requests will not be coalesced if doing so would exceed this byte size.
"""

order: MemoryOrder
write_empty_chunks: bool
read_missing_chunks: bool
sharding_coalesce_max_gap_bytes: int
sharding_coalesce_max_bytes: int

def __init__(
self, order: MemoryOrder, write_empty_chunks: bool, *, read_missing_chunks: bool = True
self,
order: MemoryOrder,
write_empty_chunks: bool,
*,
read_missing_chunks: bool = True,
sharding_coalesce_max_gap_bytes: int = 1 << 20, # 1 MiB

Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

flagging this as an old problem we need to deal with, not a blocker: the default values appear twice, once here, and once in the global config object. that's a latent bug. the defaults should be defined in exactly 1 place.

Copy link
Copy Markdown
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

+1 For now I added test_array_config_init_defaults_match_global_config which makes sure they don't drift.

sharding_coalesce_max_bytes: int = 16 << 20, # 16 MiB
) -> None:
order_parsed = parse_order(order)
write_empty_chunks_parsed = parse_bool(write_empty_chunks)
read_missing_chunks_parsed = parse_bool(read_missing_chunks)
sharding_coalesce_max_gap_bytes_parsed = parse_int(sharding_coalesce_max_gap_bytes)
sharding_coalesce_max_bytes_parsed = parse_int(sharding_coalesce_max_bytes)

object.__setattr__(self, "order", order_parsed)
object.__setattr__(self, "write_empty_chunks", write_empty_chunks_parsed)
object.__setattr__(self, "read_missing_chunks", read_missing_chunks_parsed)
object.__setattr__(
self, "sharding_coalesce_max_gap_bytes", sharding_coalesce_max_gap_bytes_parsed
)
object.__setattr__(self, "sharding_coalesce_max_bytes", sharding_coalesce_max_bytes_parsed)

@classmethod
def from_dict(cls, data: ArrayConfigParams) -> Self:
Expand All @@ -72,7 +95,8 @@ def from_dict(cls, data: ArrayConfigParams) -> Self:
kwargs_out: ArrayConfigParams = {}
for f in fields(ArrayConfig):
field_name = cast(
"Literal['order', 'write_empty_chunks', 'read_missing_chunks']", f.name
"Literal['order', 'write_empty_chunks', 'read_missing_chunks', 'sharding_coalesce_max_gap_bytes', 'sharding_coalesce_max_bytes']",
f.name,
)
if field_name not in data:
kwargs_out[field_name] = zarr_config.get(f"array.{field_name}")
Expand All @@ -88,6 +112,8 @@ def to_dict(self) -> ArrayConfigParams:
"order": self.order,
"write_empty_chunks": self.write_empty_chunks,
"read_missing_chunks": self.read_missing_chunks,
"sharding_coalesce_max_gap_bytes": self.sharding_coalesce_max_gap_bytes,
"sharding_coalesce_max_bytes": self.sharding_coalesce_max_bytes,
}


Expand Down
6 changes: 6 additions & 0 deletions src/zarr/core/common.py
Original file line number Diff line number Diff line change
Expand Up @@ -217,6 +217,12 @@ def parse_bool(data: Any) -> bool:
raise ValueError(f"Expected bool, got {data} instead.")


def parse_int(data: Any) -> int:
if isinstance(data, int) and not isinstance(data, bool):
return data
raise ValueError(f"Expected int, got {data} instead.")


def _warn_write_empty_chunks_kwarg() -> None:
# TODO: link to docs page on array configuration in this message
msg = (
Expand Down
2 changes: 2 additions & 0 deletions src/zarr/core/config.py
Original file line number Diff line number Diff line change
Expand Up @@ -99,6 +99,8 @@ def enable_gpu(self) -> ConfigSet:
"read_missing_chunks": True,
"target_shard_size_bytes": None,
"rectilinear_chunks": False,
"sharding_coalesce_max_gap_bytes": 1 << 20, # 1 MiB
"sharding_coalesce_max_bytes": 16 << 20, # 16 MiB
},
"async": {"concurrency": 10, "timeout": None},
"threading": {"max_workers": None},
Expand Down
163 changes: 163 additions & 0 deletions tests/test_codecs/test_sharding_unit.py
Original file line number Diff line number Diff line change
@@ -1,3 +1,8 @@
from __future__ import annotations

from typing import TYPE_CHECKING, cast
from unittest.mock import AsyncMock

import numpy as np
import pytest

Expand All @@ -10,9 +15,14 @@
)
from zarr.core.buffer import default_buffer_prototype
from zarr.core.buffer.cpu import Buffer
from zarr.core.config import config
from zarr.storage._common import StorePath
from zarr.storage._memory import MemoryStore

if TYPE_CHECKING:
from zarr.core.array import ShardsConfigParam
from zarr.core.array_spec import ArrayConfigParams

# ============================================================================
# _ShardIndex tests
# ============================================================================
Expand Down Expand Up @@ -155,6 +165,8 @@ async def test_load_partial_shard_maybe_index_load_fails() -> None:
prototype=default_buffer_prototype(),
chunks_per_shard=(2,),
all_chunk_coords={(0,)},
max_gap_bytes=1 << 20,
max_coalesced_bytes=16 << 20,
)

assert result is None
Expand Down Expand Up @@ -187,6 +199,8 @@ async def mock_load_index(
prototype=default_buffer_prototype(),
chunks_per_shard=chunks_per_shard,
all_chunk_coords={(0,), (1,), (2,)},
max_gap_bytes=1 << 20,
max_coalesced_bytes=16 << 20,
)

assert result is not None
Expand Down Expand Up @@ -220,6 +234,8 @@ async def mock_load_index(
prototype=default_buffer_prototype(),
chunks_per_shard=chunks_per_shard,
all_chunk_coords={(0,), (1,), (2,)},
max_gap_bytes=1 << 20,
max_coalesced_bytes=16 << 20,
)

assert result == {}
Expand Down Expand Up @@ -251,6 +267,8 @@ async def mock_load_index(
prototype=default_buffer_prototype(),
chunks_per_shard=chunks_per_shard,
all_chunk_coords={(0,), (1,)},
max_gap_bytes=1 << 20,
max_coalesced_bytes=16 << 20,
)

assert result is not None
Expand Down Expand Up @@ -292,6 +310,8 @@ async def mock_load_index(
prototype=default_buffer_prototype(),
chunks_per_shard=chunks_per_shard,
all_chunk_coords={(0,)},
max_gap_bytes=1 << 20,
max_coalesced_bytes=16 << 20,
)

assert result is None
Expand Down Expand Up @@ -336,6 +356,8 @@ async def boom(*args: object, **kwargs: object) -> Buffer | None:
prototype=default_buffer_prototype(),
chunks_per_shard=chunks_per_shard,
all_chunk_coords={(0,)},
max_gap_bytes=1 << 20,
max_coalesced_bytes=16 << 20,
)


Expand Down Expand Up @@ -368,6 +390,8 @@ async def mock_load_index(
prototype=default_buffer_prototype(),
chunks_per_shard=chunks_per_shard,
all_chunk_coords={(0,), (1,)},
max_gap_bytes=1 << 20,
max_coalesced_bytes=16 << 20,
)

assert result is not None
Expand Down Expand Up @@ -405,6 +429,8 @@ async def mock_load_index(
prototype=default_buffer_prototype(),
chunks_per_shard=chunks_per_shard,
all_chunk_coords={(0,)},
max_gap_bytes=1 << 20,
max_coalesced_bytes=16 << 20,
)

assert result == {}
Expand Down Expand Up @@ -486,3 +512,140 @@ def test_is_total_shard_1d() -> None:
# Partial
partial_coords: set[tuple[int, ...]] = {(0,), (2,)}
assert codec._is_total_shard(partial_coords, chunks_per_shard) is False


# ============================================================================
# Coalescing config option tests
#
# Assert that the `array.sharding_coalesce_max_gap_bytes` and
# `array.sharding_coalesce_max_bytes` global config keys flow through
# `ArrayConfig` to `Store.get_ranges` as `max_gap_bytes` /
# `max_coalesced_bytes` kwargs, and that per-array `config={...}` overrides
# the global default.
# ============================================================================


def _trigger_partial_shard_read(array_config: ArrayConfigParams | None = None) -> AsyncMock:
"""Build a sharded array on a mocked `MemoryStore`, trigger a partial-shard
read via the public read path, and return the `get_ranges` mock.
"""
import zarr

chunk_shape = (2,)
shard_shape = (8,)
data = np.arange(8, dtype="int32")

store = MemoryStore()
store_mock = AsyncMock(wraps=store, spec=store.__class__)

shards: ShardsConfigParam = {
"shape": shard_shape,
"index_location": "end",
}
a = zarr.create_array(
StorePath(store_mock),
shape=(8,),
chunks=chunk_shape,
shards=shards,
dtype=data.dtype,
fill_value=-1,
config=array_config,
)
a[:] = data

store_mock.reset_mock()

# Read a strict subset of chunks to take the partial-shard read path.
_ = a[0:4]

return cast(AsyncMock, store_mock.get_ranges)


def test_load_partial_shard_forwards_global_config_to_get_ranges() -> None:
"""Global `array.sharding_coalesce_*` values flow into ArrayConfig at
array-creation time and are forwarded to `Store.get_ranges`."""
with config.set(
{
"array.sharding_coalesce_max_gap_bytes": 4242,
"array.sharding_coalesce_max_bytes": 424242,
}
):
get_ranges_mock = _trigger_partial_shard_read()

assert get_ranges_mock.call_count >= 1
for call in get_ranges_mock.call_args_list:
kwargs = call.kwargs
assert kwargs["max_gap_bytes"] == 4242
assert kwargs["max_coalesced_bytes"] == 424242


def test_load_partial_shard_per_array_config_overrides_global() -> None:
"""Per-array `config={...}` passed to `create_array` takes precedence over
the global config and is forwarded to `Store.get_ranges`."""
with config.set(
{
"array.sharding_coalesce_max_gap_bytes": 4242,
"array.sharding_coalesce_max_bytes": 424242,
}
):
get_ranges_mock = _trigger_partial_shard_read(
array_config={
"sharding_coalesce_max_gap_bytes": 99,
"sharding_coalesce_max_bytes": 9999,
},
)

assert get_ranges_mock.call_count >= 1
for call in get_ranges_mock.call_args_list:
kwargs = call.kwargs
assert kwargs["max_gap_bytes"] == 99
assert kwargs["max_coalesced_bytes"] == 9999


def test_load_partial_shard_uses_config_defaults() -> None:
"""Without explicit config, defaults from `zarr.config` are forwarded."""
get_ranges_mock = _trigger_partial_shard_read()

assert get_ranges_mock.call_count >= 1
for call in get_ranges_mock.call_args_list:
kwargs = call.kwargs
assert kwargs["max_gap_bytes"] == config.get("array.sharding_coalesce_max_gap_bytes")
assert kwargs["max_coalesced_bytes"] == config.get("array.sharding_coalesce_max_bytes")


async def test_load_partial_shard_explicit_kwargs_passthrough(
monkeypatch: pytest.MonkeyPatch,
) -> None:
"""`_load_partial_shard_maybe` forwards its explicit kwargs to `get_ranges`."""
codec = ShardingCodec(chunk_shape=(2,))
chunks_per_shard = (4,)

index = _ShardIndex.create_empty(chunks_per_shard)
index.set_chunk_slice((0,), slice(0, 100))
index.set_chunk_slice((2,), slice(200, 300))

store = MemoryStore()
await store.set("shard", Buffer.from_bytes(b"x" * 300))
store_mock = AsyncMock(wraps=store, spec=store.__class__)
byte_getter = StorePath(store_mock, "shard")

async def mock_load_index(
self: ShardingCodec, byte_getter: StorePath, cps: tuple[int, ...]
) -> _ShardIndex:
return index

monkeypatch.setattr(ShardingCodec, "_load_shard_index_maybe", mock_load_index)

await codec._load_partial_shard_maybe(
byte_getter=byte_getter,
prototype=default_buffer_prototype(),
chunks_per_shard=chunks_per_shard,
all_chunk_coords={(0,), (2,)},
max_gap_bytes=12345,
max_coalesced_bytes=67890,
)

store_mock.get_ranges.assert_called_once()
kwargs = store_mock.get_ranges.call_args.kwargs
assert kwargs["max_gap_bytes"] == 12345
assert kwargs["max_coalesced_bytes"] == 67890
Loading
Loading