bioimage_py.util

Shared helpers: block-to-roi conversion, blocking construction and filter halos.

  1"""Shared helpers: block-to-roi conversion, blocking construction and filter halos."""
  2from __future__ import annotations
  3
  4import itertools
  5import numbers
  6import warnings
  7from math import ceil
  8from typing import TYPE_CHECKING, Any, Callable, Dict, List, Optional, Sequence, Tuple, Union
  9
 10import bioimage_cpp as bic
 11from bioimage_cpp.utils import Block, BlockWithHalo, Blocking
 12
 13from .sources.base import Source
 14
 15if TYPE_CHECKING:
 16    from .sources import SourceLike
 17
 18# A per-block descriptor handed to compute functions: a plain ``Block`` (no halo) or a
 19# ``BlockWithHalo`` (halo operations).
 20BlockDescriptor = Union[Block, BlockWithHalo]
 21
 22# Signature of a per-block compute function: ``function(block, inputs, outputs, mask)``.
 23ComputeFn = Callable[
 24    [BlockDescriptor, Sequence[Source], Sequence[Source], Optional[Source]], Any
 25]
 26
 27
 28def to_roi(block: BlockDescriptor) -> Tuple[slice, ...]:
 29    """Convert a ``bioimage_cpp.utils`` ``Block`` into a tuple of slices.
 30
 31    Args:
 32        block: A ``Block`` (carrying ``begin``/``end`` coordinate lists). For halo
 33            operations pass one of ``block.outer_block`` / ``block.inner_block`` /
 34            ``block.inner_block_local``.
 35
 36    Returns:
 37        A tuple of slices that indexes a source or array.
 38    """
 39    return tuple(slice(int(b), int(e)) for b, e in zip(block.begin, block.end))
 40
 41
 42def full_roi(ndim: int) -> Tuple[slice, ...]:
 43    """Return a slicing tuple that selects an entire ``ndim``-dimensional array."""
 44    return tuple(slice(None) for _ in range(ndim))
 45
 46
 47def is_direct(job_type: str, num_workers: int, block_shape: Optional[Tuple[int, ...]]) -> bool:
 48    """Return whether a call qualifies for the direct (whole-array, non-blocked) fast path."""
 49    return job_type == "local" and num_workers == 1 and block_shape is None
 50
 51
 52def check_direct(job_type: str, num_workers: int, block_shape: Optional[Tuple[int, ...]],
 53                 mask: "Optional[SourceLike]", block_ids: Optional[Sequence[int]]) -> bool:
 54    """Like :func:`is_direct`, but reject mask/block_ids the direct reduction path cannot honor."""
 55    if is_direct(job_type, num_workers, block_shape):
 56        if mask is not None or block_ids is not None:
 57            raise ValueError("Direct computation does not support 'mask' or 'block_ids'.")
 58        return True
 59    return False
 60
 61
 62def same_array(a: Source, b: Source) -> bool:
 63    """Return whether two sources wrap the same underlying array object."""
 64    return getattr(a, "array", None) is getattr(b, "array", object())
 65
 66
 67def normalize_halo(halo: Union[int, Sequence[int]], ndim: int) -> List[int]:
 68    """Broadcast a halo to a per-axis list of length ``ndim``."""
 69    if isinstance(halo, numbers.Integral):
 70        return [int(halo)] * ndim
 71    halo = [int(h) for h in halo]
 72    if len(halo) != ndim:
 73        raise ValueError(f"Halo {halo} does not match ndim {ndim}.")
 74    return halo
 75
 76
 77def sigma_to_halo(sigma: Union[float, Sequence[float]], order: int) -> Union[int, List[int]]:
 78    """Compute the halo for applying an image filter block-wise.
 79
 80    Mirrors elf's implementation, based on VIGRA's ``multi_blockwise.hxx``.
 81
 82    Args:
 83        sigma: The sigma value(s) of the filter.
 84        order: The derivative order of the filter (0 for smoothing).
 85
 86    Returns:
 87        The halo, as an int for scalar sigma or a per-axis list for sequence sigma.
 88    """
 89    multiplier = 2
 90    if isinstance(sigma, numbers.Number):
 91        return multiplier * int(ceil(3.0 * sigma + 0.5 * order + 0.5))
 92    return [multiplier * int(ceil(3.0 * sig + 0.5 * order + 0.5)) for sig in sigma]
 93
 94
 95def downscale_shape(shape: Sequence[int], scale_factor: Union[int, Sequence[int]],
 96                    ceil_mode: bool = True) -> Tuple[int, ...]:
 97    """Compute the shape resulting from downscaling by an integer factor.
 98
 99    Mirrors elf's ``downscale_shape``.
100
101    Args:
102        shape: The input array shape.
103        scale_factor: The downscaling factor: a single int (isotropic) or a per-axis sequence.
104        ceil_mode: Whether to round the downscaled size up (so no input voxel is dropped) or
105            down (strict integer division).
106
107    Returns:
108        The downscaled shape.
109
110    Raises:
111        ValueError: If a per-axis ``scale_factor`` does not match the dimensionality of ``shape``.
112    """
113    if isinstance(scale_factor, numbers.Integral):
114        factors = [int(scale_factor)] * len(shape)
115    else:
116        factors = [int(f) for f in scale_factor]
117        if len(factors) != len(shape):
118            raise ValueError(
119                f"scale_factor {scale_factor} does not match the dimensionality {len(shape)}."
120            )
121    if ceil_mode:
122        return tuple(int(s) // f + int((int(s) % f) != 0) for s, f in zip(shape, factors))
123    return tuple(int(s) // f for s, f in zip(shape, factors))
124
125
126def derive_block_shape(source: Source, block_shape: Optional[Sequence[int]]) -> Tuple[int, ...]:
127    """Resolve the block shape, falling back to the source's chunks.
128
129    Args:
130        source: A source exposing ``shape`` and ``chunks``.
131        block_shape: The explicit block shape, or ``None`` to derive it from chunks.
132
133    Returns:
134        The resolved block shape.
135
136    Raises:
137        ValueError: If ``block_shape`` is ``None`` and the source is unchunked.
138    """
139    if block_shape is not None:
140        return tuple(int(b) for b in block_shape)
141    chunks = source.chunks
142    if chunks is not None:
143        return tuple(int(c) for c in chunks)
144    raise ValueError(
145        "block_shape is required for block-wise processing of an unchunked array "
146        "(the source has no chunks to derive it from)."
147    )
148
149
150def get_blocking(shape: Sequence[int], block_shape: Sequence[int],
151                 roi: Optional[Tuple[slice, ...]] = None) -> Blocking:
152    """Build a ``bioimage_cpp.utils.Blocking`` over ``shape`` (or a sub-roi).
153
154    Args:
155        shape: The full array shape.
156        block_shape: The block shape.
157        roi: Optional region of interest to restrict the blocking to.
158
159    Returns:
160        A ``bioimage_cpp.utils.Blocking`` instance.
161    """
162    ndim = len(shape)
163    if roi is None:
164        roi_begin = [0] * ndim
165        roi_end = [int(s) for s in shape]
166    else:
167        roi_begin = [int(sl.start) if sl.start is not None else 0 for sl in roi]
168        roi_end = [int(sl.stop) if sl.stop is not None else int(s) for sl, s in zip(roi, shape)]
169    return bic.utils.Blocking(roi_begin, roi_end, [int(b) for b in block_shape])
170
171
172def check_rerun_args(job_type: str, resume_from: Optional[str],
173                     subset: Optional[Sequence[int]], *, subset_name: str = "block_ids") -> None:
174    """Validate an operation's rerun arguments (``resume_from`` vs a subset).
175
176    Args:
177        job_type: The execution backend (``"local"``/``"subprocess"``/``"slurm"``).
178        resume_from: The preserved temp folder to resume from, or ``None``.
179        subset: The explicit subset (``block_ids``/``item_ids``) to process, or ``None``.
180        subset_name: The subset argument's name, for error messages.
181
182    Raises:
183        ValueError: If both ``resume_from`` and ``subset`` are given, or if ``resume_from`` is
184            used with the local backend (which keeps no temp folder to resume from).
185    """
186    if resume_from is not None:
187        if subset is not None:
188            raise ValueError(f"Pass either 'resume_from' or '{subset_name}', not both.")
189        if job_type == "local":
190            raise ValueError(
191                "resume_from is only valid for distributed backends (subprocess/slurm); the "
192                "local runner keeps no temp folder. Re-run the operation in-process instead "
193                f"(optionally with {subset_name}=err.failed_block_ids for a subset)."
194            )
195
196
197def group_blocks_by_shard(
198    blocking: Blocking,
199    outputs: Sequence[Source],
200    block_ids: Sequence[int],
201) -> Optional[List[List[int]]]:
202    """Group blocks so that every shard is written by a single worker.
203
204    For a sharded zarr v3 array the atomic write unit is the *shard*, not the inner chunk:
205    two blocks writing different inner chunks of the same shard concurrently corrupt it. To
206    keep the block shape flexible (rather than forcing it to a shard multiple) the runners
207    route each group to one worker, which processes its blocks sequentially — so same-shard
208    writes never race. This computes those groups: blocks that share any shard (for any
209    sharded output) are placed in the same group via a union-find over the block ids.
210
211    The shard grid is anchored at coordinate 0 and considered along the trailing (spatial)
212    shard axes only; a leading channel axis on an output is fully written by every block and
213    is not a routing axis (mirrors the chunk handling in
214    :meth:`Runner._validate_write_safety`).
215
216    Args:
217        blocking: The blocking used to map a block id to its (non-halo) write region.
218        outputs: The output sources; only those with a ``shards`` shape drive the grouping.
219        block_ids: The block ids to group.
220
221    Returns:
222        A list of groups (each a sorted list of block ids), ordered by each group's smallest
223        id; ``None`` if no output is sharded (the caller should then use the default
224        one-block-per-unit path); an empty list if ``block_ids`` is empty.
225    """
226    sharded = [(idx, out) for idx, out in enumerate(outputs) if out.shards is not None]
227    if not sharded:
228        return None
229    block_ids = [int(b) for b in block_ids]
230    if not block_ids:
231        return []
232
233    ndim = len(blocking.get_block(block_ids[0]).begin)
234    # Per sharded output, the spatial (trailing) shard extent that defines its cell grid.
235    shard_spatial = [(idx, tuple(int(s) for s in out.shards[-ndim:])) for idx, out in sharded]
236
237    # Union-find over the positions in block_ids (dense 0..n-1); bic's UnionFind, as used in
238    # segmentation/label.py, instead of a hand-rolled one.
239    uf = bic.utils.UnionFind(len(block_ids))
240    cell_owner: Dict[Tuple[int, ...], int] = {}
241    for pos, bid in enumerate(block_ids):
242        block = blocking.get_block(bid)
243        begin = [int(b) for b in block.begin]
244        end = [int(e) for e in block.end]
245        for out_idx, shard in shard_spatial:
246            ranges = [range(begin[d] // shard[d], (end[d] + shard[d] - 1) // shard[d])
247                      for d in range(ndim)]
248            for cell in itertools.product(*ranges):
249                owner = cell_owner.setdefault((out_idx,) + cell, pos)
250                if owner != pos:
251                    uf.merge(pos, owner)
252
253    groups: Dict[int, List[int]] = {}
254    for pos, bid in enumerate(block_ids):
255        groups.setdefault(int(uf.find(pos)), []).append(bid)
256    return sorted((sorted(g) for g in groups.values()), key=lambda g: g[0])
257
258
259def maybe_warn_imbalance(loads: Sequence[int], num_workers: int, n_groups: int,
260                         name: str) -> None:
261    """Warn when shard-exclusive routing leaves workers idle or badly load-imbalanced.
262
263    Args:
264        loads: The per-worker (or per-task) block counts of the assignment.
265        num_workers: The requested number of workers.
266        n_groups: The number of shard groups (schedulable units) the blocks formed.
267        name: A short run name used in the warning message.
268    """
269    if not loads:
270        return
271    if n_groups < int(num_workers):
272        warnings.warn(
273            f"Shard routing for '{name or 'run'}' produced only {n_groups} shard-group(s) for "
274            f"{num_workers} workers, so {int(num_workers) - n_groups} worker(s) will be idle. "
275            "A few shards span the data; use a smaller shard shape or fewer workers to balance. "
276            "Results are still correct.",
277            stacklevel=2,
278        )
279        return
280    mx, mn = max(loads), min(loads)
281    mean = sum(loads) / len(loads)
282    if mx > mn and mx > 1.5 * mean:
283        warnings.warn(
284            f"Uneven worker load for '{name or 'run'}': block counts per worker range {mn}..{mx} "
285            f"(mean {mean:.1f}). Some shards span disproportionately many blocks; results are "
286            "still correct but parallelism is reduced.",
287            stacklevel=2,
288        )
BlockDescriptor = typing.Union[bioimage_cpp._core.Block, bioimage_cpp._core.BlockWithHalo]
ComputeFn = typing.Callable[[typing.Union[bioimage_cpp._core.Block, bioimage_cpp._core.BlockWithHalo], typing.Sequence[bioimage_py.sources.Source], typing.Sequence[bioimage_py.sources.Source], typing.Optional[bioimage_py.sources.Source]], typing.Any]
def to_roi( block: Union[bioimage_cpp._core.Block, bioimage_cpp._core.BlockWithHalo]) -> Tuple[slice, ...]:
29def to_roi(block: BlockDescriptor) -> Tuple[slice, ...]:
30    """Convert a ``bioimage_cpp.utils`` ``Block`` into a tuple of slices.
31
32    Args:
33        block: A ``Block`` (carrying ``begin``/``end`` coordinate lists). For halo
34            operations pass one of ``block.outer_block`` / ``block.inner_block`` /
35            ``block.inner_block_local``.
36
37    Returns:
38        A tuple of slices that indexes a source or array.
39    """
40    return tuple(slice(int(b), int(e)) for b, e in zip(block.begin, block.end))

Convert a bioimage_cpp.utils Block into a tuple of slices.

Args: block: A Block (carrying begin/end coordinate lists). For halo operations pass one of block.outer_block / block.inner_block / block.inner_block_local.

Returns: A tuple of slices that indexes a source or array.

def full_roi(ndim: int) -> Tuple[slice, ...]:
43def full_roi(ndim: int) -> Tuple[slice, ...]:
44    """Return a slicing tuple that selects an entire ``ndim``-dimensional array."""
45    return tuple(slice(None) for _ in range(ndim))

Return a slicing tuple that selects an entire ndim-dimensional array.

def is_direct( job_type: str, num_workers: int, block_shape: Optional[Tuple[int, ...]]) -> bool:
48def is_direct(job_type: str, num_workers: int, block_shape: Optional[Tuple[int, ...]]) -> bool:
49    """Return whether a call qualifies for the direct (whole-array, non-blocked) fast path."""
50    return job_type == "local" and num_workers == 1 and block_shape is None

Return whether a call qualifies for the direct (whole-array, non-blocked) fast path.

def check_direct( job_type: str, num_workers: int, block_shape: Optional[Tuple[int, ...]], mask: "'Optional[SourceLike]'", block_ids: Optional[Sequence[int]]) -> bool:
53def check_direct(job_type: str, num_workers: int, block_shape: Optional[Tuple[int, ...]],
54                 mask: "Optional[SourceLike]", block_ids: Optional[Sequence[int]]) -> bool:
55    """Like :func:`is_direct`, but reject mask/block_ids the direct reduction path cannot honor."""
56    if is_direct(job_type, num_workers, block_shape):
57        if mask is not None or block_ids is not None:
58            raise ValueError("Direct computation does not support 'mask' or 'block_ids'.")
59        return True
60    return False

Like is_direct(), but reject mask/block_ids the direct reduction path cannot honor.

def same_array( a: bioimage_py.sources.Source, b: bioimage_py.sources.Source) -> bool:
63def same_array(a: Source, b: Source) -> bool:
64    """Return whether two sources wrap the same underlying array object."""
65    return getattr(a, "array", None) is getattr(b, "array", object())

Return whether two sources wrap the same underlying array object.

def normalize_halo(halo: Union[int, Sequence[int]], ndim: int) -> List[int]:
68def normalize_halo(halo: Union[int, Sequence[int]], ndim: int) -> List[int]:
69    """Broadcast a halo to a per-axis list of length ``ndim``."""
70    if isinstance(halo, numbers.Integral):
71        return [int(halo)] * ndim
72    halo = [int(h) for h in halo]
73    if len(halo) != ndim:
74        raise ValueError(f"Halo {halo} does not match ndim {ndim}.")
75    return halo

Broadcast a halo to a per-axis list of length ndim.

def sigma_to_halo( sigma: Union[float, Sequence[float]], order: int) -> Union[int, List[int]]:
78def sigma_to_halo(sigma: Union[float, Sequence[float]], order: int) -> Union[int, List[int]]:
79    """Compute the halo for applying an image filter block-wise.
80
81    Mirrors elf's implementation, based on VIGRA's ``multi_blockwise.hxx``.
82
83    Args:
84        sigma: The sigma value(s) of the filter.
85        order: The derivative order of the filter (0 for smoothing).
86
87    Returns:
88        The halo, as an int for scalar sigma or a per-axis list for sequence sigma.
89    """
90    multiplier = 2
91    if isinstance(sigma, numbers.Number):
92        return multiplier * int(ceil(3.0 * sigma + 0.5 * order + 0.5))
93    return [multiplier * int(ceil(3.0 * sig + 0.5 * order + 0.5)) for sig in sigma]

Compute the halo for applying an image filter block-wise.

Mirrors elf's implementation, based on VIGRA's multi_blockwise.hxx.

Args: sigma: The sigma value(s) of the filter. order: The derivative order of the filter (0 for smoothing).

Returns: The halo, as an int for scalar sigma or a per-axis list for sequence sigma.

def downscale_shape( shape: Sequence[int], scale_factor: Union[int, Sequence[int]], ceil_mode: bool = True) -> Tuple[int, ...]:
 96def downscale_shape(shape: Sequence[int], scale_factor: Union[int, Sequence[int]],
 97                    ceil_mode: bool = True) -> Tuple[int, ...]:
 98    """Compute the shape resulting from downscaling by an integer factor.
 99
100    Mirrors elf's ``downscale_shape``.
101
102    Args:
103        shape: The input array shape.
104        scale_factor: The downscaling factor: a single int (isotropic) or a per-axis sequence.
105        ceil_mode: Whether to round the downscaled size up (so no input voxel is dropped) or
106            down (strict integer division).
107
108    Returns:
109        The downscaled shape.
110
111    Raises:
112        ValueError: If a per-axis ``scale_factor`` does not match the dimensionality of ``shape``.
113    """
114    if isinstance(scale_factor, numbers.Integral):
115        factors = [int(scale_factor)] * len(shape)
116    else:
117        factors = [int(f) for f in scale_factor]
118        if len(factors) != len(shape):
119            raise ValueError(
120                f"scale_factor {scale_factor} does not match the dimensionality {len(shape)}."
121            )
122    if ceil_mode:
123        return tuple(int(s) // f + int((int(s) % f) != 0) for s, f in zip(shape, factors))
124    return tuple(int(s) // f for s, f in zip(shape, factors))

Compute the shape resulting from downscaling by an integer factor.

Mirrors elf's downscale_shape.

Args: shape: The input array shape. scale_factor: The downscaling factor: a single int (isotropic) or a per-axis sequence. ceil_mode: Whether to round the downscaled size up (so no input voxel is dropped) or down (strict integer division).

Returns: The downscaled shape.

Raises: ValueError: If a per-axis scale_factor does not match the dimensionality of shape.

def derive_block_shape( source: bioimage_py.sources.Source, block_shape: Optional[Sequence[int]]) -> Tuple[int, ...]:
127def derive_block_shape(source: Source, block_shape: Optional[Sequence[int]]) -> Tuple[int, ...]:
128    """Resolve the block shape, falling back to the source's chunks.
129
130    Args:
131        source: A source exposing ``shape`` and ``chunks``.
132        block_shape: The explicit block shape, or ``None`` to derive it from chunks.
133
134    Returns:
135        The resolved block shape.
136
137    Raises:
138        ValueError: If ``block_shape`` is ``None`` and the source is unchunked.
139    """
140    if block_shape is not None:
141        return tuple(int(b) for b in block_shape)
142    chunks = source.chunks
143    if chunks is not None:
144        return tuple(int(c) for c in chunks)
145    raise ValueError(
146        "block_shape is required for block-wise processing of an unchunked array "
147        "(the source has no chunks to derive it from)."
148    )

Resolve the block shape, falling back to the source's chunks.

Args: source: A source exposing shape and chunks. block_shape: The explicit block shape, or None to derive it from chunks.

Returns: The resolved block shape.

Raises: ValueError: If block_shape is None and the source is unchunked.

def get_blocking( shape: Sequence[int], block_shape: Sequence[int], roi: Optional[Tuple[slice, ...]] = None) -> bioimage_cpp._core.Blocking:
151def get_blocking(shape: Sequence[int], block_shape: Sequence[int],
152                 roi: Optional[Tuple[slice, ...]] = None) -> Blocking:
153    """Build a ``bioimage_cpp.utils.Blocking`` over ``shape`` (or a sub-roi).
154
155    Args:
156        shape: The full array shape.
157        block_shape: The block shape.
158        roi: Optional region of interest to restrict the blocking to.
159
160    Returns:
161        A ``bioimage_cpp.utils.Blocking`` instance.
162    """
163    ndim = len(shape)
164    if roi is None:
165        roi_begin = [0] * ndim
166        roi_end = [int(s) for s in shape]
167    else:
168        roi_begin = [int(sl.start) if sl.start is not None else 0 for sl in roi]
169        roi_end = [int(sl.stop) if sl.stop is not None else int(s) for sl, s in zip(roi, shape)]
170    return bic.utils.Blocking(roi_begin, roi_end, [int(b) for b in block_shape])

Build a bioimage_cpp.utils.Blocking over shape (or a sub-roi).

Args: shape: The full array shape. block_shape: The block shape. roi: Optional region of interest to restrict the blocking to.

Returns: A bioimage_cpp.utils.Blocking instance.

def check_rerun_args( job_type: str, resume_from: Optional[str], subset: Optional[Sequence[int]], *, subset_name: str = 'block_ids') -> None:
173def check_rerun_args(job_type: str, resume_from: Optional[str],
174                     subset: Optional[Sequence[int]], *, subset_name: str = "block_ids") -> None:
175    """Validate an operation's rerun arguments (``resume_from`` vs a subset).
176
177    Args:
178        job_type: The execution backend (``"local"``/``"subprocess"``/``"slurm"``).
179        resume_from: The preserved temp folder to resume from, or ``None``.
180        subset: The explicit subset (``block_ids``/``item_ids``) to process, or ``None``.
181        subset_name: The subset argument's name, for error messages.
182
183    Raises:
184        ValueError: If both ``resume_from`` and ``subset`` are given, or if ``resume_from`` is
185            used with the local backend (which keeps no temp folder to resume from).
186    """
187    if resume_from is not None:
188        if subset is not None:
189            raise ValueError(f"Pass either 'resume_from' or '{subset_name}', not both.")
190        if job_type == "local":
191            raise ValueError(
192                "resume_from is only valid for distributed backends (subprocess/slurm); the "
193                "local runner keeps no temp folder. Re-run the operation in-process instead "
194                f"(optionally with {subset_name}=err.failed_block_ids for a subset)."
195            )

Validate an operation's rerun arguments (resume_from vs a subset).

Args: job_type: The execution backend ("local"/"subprocess"/"slurm"). resume_from: The preserved temp folder to resume from, or None. subset: The explicit subset (block_ids/item_ids) to process, or None. subset_name: The subset argument's name, for error messages.

Raises: ValueError: If both resume_from and subset are given, or if resume_from is used with the local backend (which keeps no temp folder to resume from).

def group_blocks_by_shard( blocking: bioimage_cpp._core.Blocking, outputs: Sequence[bioimage_py.sources.Source], block_ids: Sequence[int]) -> Optional[List[List[int]]]:
198def group_blocks_by_shard(
199    blocking: Blocking,
200    outputs: Sequence[Source],
201    block_ids: Sequence[int],
202) -> Optional[List[List[int]]]:
203    """Group blocks so that every shard is written by a single worker.
204
205    For a sharded zarr v3 array the atomic write unit is the *shard*, not the inner chunk:
206    two blocks writing different inner chunks of the same shard concurrently corrupt it. To
207    keep the block shape flexible (rather than forcing it to a shard multiple) the runners
208    route each group to one worker, which processes its blocks sequentially — so same-shard
209    writes never race. This computes those groups: blocks that share any shard (for any
210    sharded output) are placed in the same group via a union-find over the block ids.
211
212    The shard grid is anchored at coordinate 0 and considered along the trailing (spatial)
213    shard axes only; a leading channel axis on an output is fully written by every block and
214    is not a routing axis (mirrors the chunk handling in
215    :meth:`Runner._validate_write_safety`).
216
217    Args:
218        blocking: The blocking used to map a block id to its (non-halo) write region.
219        outputs: The output sources; only those with a ``shards`` shape drive the grouping.
220        block_ids: The block ids to group.
221
222    Returns:
223        A list of groups (each a sorted list of block ids), ordered by each group's smallest
224        id; ``None`` if no output is sharded (the caller should then use the default
225        one-block-per-unit path); an empty list if ``block_ids`` is empty.
226    """
227    sharded = [(idx, out) for idx, out in enumerate(outputs) if out.shards is not None]
228    if not sharded:
229        return None
230    block_ids = [int(b) for b in block_ids]
231    if not block_ids:
232        return []
233
234    ndim = len(blocking.get_block(block_ids[0]).begin)
235    # Per sharded output, the spatial (trailing) shard extent that defines its cell grid.
236    shard_spatial = [(idx, tuple(int(s) for s in out.shards[-ndim:])) for idx, out in sharded]
237
238    # Union-find over the positions in block_ids (dense 0..n-1); bic's UnionFind, as used in
239    # segmentation/label.py, instead of a hand-rolled one.
240    uf = bic.utils.UnionFind(len(block_ids))
241    cell_owner: Dict[Tuple[int, ...], int] = {}
242    for pos, bid in enumerate(block_ids):
243        block = blocking.get_block(bid)
244        begin = [int(b) for b in block.begin]
245        end = [int(e) for e in block.end]
246        for out_idx, shard in shard_spatial:
247            ranges = [range(begin[d] // shard[d], (end[d] + shard[d] - 1) // shard[d])
248                      for d in range(ndim)]
249            for cell in itertools.product(*ranges):
250                owner = cell_owner.setdefault((out_idx,) + cell, pos)
251                if owner != pos:
252                    uf.merge(pos, owner)
253
254    groups: Dict[int, List[int]] = {}
255    for pos, bid in enumerate(block_ids):
256        groups.setdefault(int(uf.find(pos)), []).append(bid)
257    return sorted((sorted(g) for g in groups.values()), key=lambda g: g[0])

Group blocks so that every shard is written by a single worker.

For a sharded zarr v3 array the atomic write unit is the shard, not the inner chunk: two blocks writing different inner chunks of the same shard concurrently corrupt it. To keep the block shape flexible (rather than forcing it to a shard multiple) the runners route each group to one worker, which processes its blocks sequentially — so same-shard writes never race. This computes those groups: blocks that share any shard (for any sharded output) are placed in the same group via a union-find over the block ids.

The shard grid is anchored at coordinate 0 and considered along the trailing (spatial) shard axes only; a leading channel axis on an output is fully written by every block and is not a routing axis (mirrors the chunk handling in Runner._validate_write_safety()).

Args: blocking: The blocking used to map a block id to its (non-halo) write region. outputs: The output sources; only those with a shards shape drive the grouping. block_ids: The block ids to group.

Returns: A list of groups (each a sorted list of block ids), ordered by each group's smallest id; None if no output is sharded (the caller should then use the default one-block-per-unit path); an empty list if block_ids is empty.

def maybe_warn_imbalance(loads: Sequence[int], num_workers: int, n_groups: int, name: str) -> None:
260def maybe_warn_imbalance(loads: Sequence[int], num_workers: int, n_groups: int,
261                         name: str) -> None:
262    """Warn when shard-exclusive routing leaves workers idle or badly load-imbalanced.
263
264    Args:
265        loads: The per-worker (or per-task) block counts of the assignment.
266        num_workers: The requested number of workers.
267        n_groups: The number of shard groups (schedulable units) the blocks formed.
268        name: A short run name used in the warning message.
269    """
270    if not loads:
271        return
272    if n_groups < int(num_workers):
273        warnings.warn(
274            f"Shard routing for '{name or 'run'}' produced only {n_groups} shard-group(s) for "
275            f"{num_workers} workers, so {int(num_workers) - n_groups} worker(s) will be idle. "
276            "A few shards span the data; use a smaller shard shape or fewer workers to balance. "
277            "Results are still correct.",
278            stacklevel=2,
279        )
280        return
281    mx, mn = max(loads), min(loads)
282    mean = sum(loads) / len(loads)
283    if mx > mn and mx > 1.5 * mean:
284        warnings.warn(
285            f"Uneven worker load for '{name or 'run'}': block counts per worker range {mn}..{mx} "
286            f"(mean {mean:.1f}). Some shards span disproportionately many blocks; results are "
287            "still correct but parallelism is reduced.",
288            stacklevel=2,
289        )

Warn when shard-exclusive routing leaves workers idle or badly load-imbalanced.

Args: loads: The per-worker (or per-task) block counts of the assignment. num_workers: The requested number of workers. n_groups: The number of shard groups (schedulable units) the blocks formed. name: A short run name used in the warning message.