Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
Show all changes
21 commits
Select commit Hold shift + click to select a range
9916b0b
adding async for datatrees
aladinor Sep 13, 2025
afa42e9
adding async method to _maybe_create_index
aladinor Sep 13, 2025
d469f2e
using async as complete instead of gathering results
aladinor Sep 13, 2025
d53498a
adding tests for open_group, open_dtree and _maybe_create_index using…
aladinor Sep 14, 2025
3b26dd6
Merge branch 'main' into async-dtreec
aladinor Sep 14, 2025
b5ab48a
ensuing _maybe_create_default_indexes_async is compatible with zarr v2
aladinor Sep 14, 2025
94a9efd
resolving the mypy type errors
aladinor Sep 14, 2025
288a818
Merge branch 'async-dtreec' of https://github.com/aladinor/xarray int…
aladinor Sep 14, 2025
573a700
attemp 2: resolving mypy type errors
aladinor Sep 14, 2025
7557261
Merge branch 'main' into async-dtreec
aladinor Sep 15, 2025
3c10a23
Merge branch 'main' into async-dtreec
aladinor Oct 8, 2025
013804c
Merge branch 'main' into async-dtreec
aladinor Dec 9, 2025
f4ca679
Merge branch 'main' into async-dtreec
aladinor Dec 12, 2025
531c589
refactor: consolidate async index creation for DataTree opening
aladinor Dec 12, 2025
0ee2a73
perf: remove unnecessary semaphore from async datatree opening
aladinor Dec 12, 2025
6d6cd1e
fix: add zarr v2 fallback for datatree opening
aladinor Dec 12, 2025
640081b
Merge branch 'main' into async-dtreec
aladinor Dec 13, 2025
0ee154e
updating whats-new.rst file
aladinor Dec 13, 2025
542cad3
Merge branch 'async-dtreec' of https://github.com/aladinor/xarray int…
aladinor Dec 13, 2025
31b50dc
fix: re-add semaphore to async datatree opening to prevent deadlocks
aladinor Dec 13, 2025
b6a3b27
Merge branch 'main' into async-dtreec
aladinor Dec 15, 2025
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
3 changes: 3 additions & 0 deletions doc/whats-new.rst
Original file line number Diff line number Diff line change
Expand Up @@ -43,6 +43,9 @@ Internal Changes
Performance
~~~~~~~~~~~

- Improve performance of :py:func:`open_datatree` for zarr stores by using async/concurrent
loading of groups and indexes (:pull:`10742`).
By `Alfonso Ladino <https://github.com/aladinor>`_.
- Add a fastpath to the backend plugin system for standard engines (:issue:`10178`, :pull:`10937`).
By `Sam Levang <https://github.com/slevang>`_.

Expand Down
161 changes: 161 additions & 0 deletions xarray/backends/api.py
Original file line number Diff line number Diff line change
Expand Up @@ -22,6 +22,7 @@
T_PathFileOrDataStore,
_find_absolute_paths,
_normalize_path,
datatree_from_dict_with_io_cleanup,
)
from xarray.coders import CFDatetimeCoder, CFTimedeltaCoder
from xarray.core import dtypes, indexing
Expand Down Expand Up @@ -385,6 +386,35 @@ def _datatree_from_backend_datatree(
return tree


async def _maybe_create_default_indexes_async(ds):
import asyncio

# Determine which coords need default indexes
to_index_names = [
name
for name, coord in ds.coords.items()
if coord.dims == (name,) and name not in ds.xindexes
]

if to_index_names:

async def load_var(var):
try:
return await var.load_async()
except NotImplementedError:
return await asyncio.to_thread(var.load)

await asyncio.gather(
*[load_var(ds.coords[name].variable) for name in to_index_names]
)

# Build indexes (now data is in-memory so no remote I/O per coord)
to_index = {name: ds.coords[name].variable for name in to_index_names}
if to_index:
return ds.assign_coords(Coordinates(to_index))
return ds


def open_dataset(
filename_or_obj: T_PathFileOrDataStore,
*,
Expand Down Expand Up @@ -1099,6 +1129,137 @@ def open_datatree(
return tree


async def open_datatree_async(
filename_or_obj: T_PathFileOrDataStore,
*,
engine: T_Engine = None,
chunks: T_Chunks = None,
cache: bool | None = None,
decode_cf: bool | None = None,
mask_and_scale: bool | Mapping[str, bool] | None = None,
decode_times: bool
| CFDatetimeCoder
| Mapping[str, bool | CFDatetimeCoder]
| None = None,
decode_timedelta: bool
| CFTimedeltaCoder
| Mapping[str, bool | CFTimedeltaCoder]
| None = None,
use_cftime: bool | Mapping[str, bool] | None = None,
concat_characters: bool | Mapping[str, bool] | None = None,
decode_coords: Literal["coordinates", "all"] | bool | None = None,
drop_variables: str | Iterable[str] | None = None,
create_default_indexes: bool = True,
inline_array: bool = False,
chunked_array_type: str | None = None,
from_array_kwargs: dict[str, Any] | None = None,
backend_kwargs: dict[str, Any] | None = None,
**kwargs,
) -> DataTree:
"""Async version of open_datatree that concurrently builds default indexes.

Supports the "zarr" engine (both Zarr v2 and v3). For other engines, a
ValueError is raised.
"""
import asyncio

if cache is None:
cache = chunks is None

if backend_kwargs is not None:
kwargs.update(backend_kwargs)

if engine is None:
engine = plugins.guess_engine(filename_or_obj)

if from_array_kwargs is None:
from_array_kwargs = {}

# Only zarr supports async lazy loading at present
if engine != "zarr":
raise ValueError(f"Engine {engine!r} does not support asynchronous operations")

backend = plugins.get_backend(engine)

decoders = _resolve_decoders_kwargs(
decode_cf,
open_backend_dataset_parameters=backend.open_dataset_parameters,
mask_and_scale=mask_and_scale,
decode_times=decode_times,
decode_timedelta=decode_timedelta,
concat_characters=concat_characters,
use_cftime=use_cftime,
decode_coords=decode_coords,
)

overwrite_encoded_chunks = kwargs.pop("overwrite_encoded_chunks", None)

# Prefer backend async group opening if available (currently zarr only)
if hasattr(backend, "open_groups_as_dict_async"):
groups_dict = await backend.open_groups_as_dict_async(
filename_or_obj,
drop_variables=drop_variables,
**decoders,
**kwargs,
)
backend_tree = datatree_from_dict_with_io_cleanup(groups_dict)
else:
backend_tree = backend.open_datatree(
filename_or_obj,
drop_variables=drop_variables,
**decoders,
**kwargs,
)

# Protect variables for caching behavior consistency
_protect_datatree_variables_inplace(backend_tree, cache)

# For each dataset in the tree, concurrently create default indexes (if requested)
results: dict[str, Dataset] = {}

async def process_node(path: str, node_ds: Dataset) -> tuple[str, Dataset]:
ds = node_ds
if create_default_indexes:
ds = await _maybe_create_default_indexes_async(ds)
# Optional chunking (synchronous)
if chunks is not None:
ds = _chunk_ds(
ds,
filename_or_obj,
engine,
chunks,
overwrite_encoded_chunks,
inline_array,
chunked_array_type,
from_array_kwargs,
node=path,
**decoders,
**kwargs,
)
return path, ds

# Build tasks
tasks = [
process_node(path, node.dataset)
for path, [node] in group_subtrees(backend_tree)
]

# Execute concurrently and collect
for fut in asyncio.as_completed(tasks):
path, ds = await fut
results[path] = ds

# Build DataTree
tree = DataTree.from_dict(results)

# Carry over close handlers from backend tree when needed (mirrors sync path)
if create_default_indexes or chunks is not None:
for _path, [node] in group_subtrees(backend_tree):
tree[_path].set_close(node._close)

return tree


def open_groups(
filename_or_obj: T_PathFileOrDataStore,
*,
Expand Down
Loading
Loading