bioimage_py.util
Shared helpers: block-to-roi conversion, blocking construction and filter halos.
1"""Shared helpers: block-to-roi conversion, blocking construction and filter halos.""" 2from __future__ import annotations 3 4import itertools 5import numbers 6import warnings 7from math import ceil 8from typing import TYPE_CHECKING, Any, Callable, Dict, List, Optional, Sequence, Tuple, Union 9 10import bioimage_cpp as bic 11from bioimage_cpp.utils import Block, BlockWithHalo, Blocking 12 13from .sources.base import Source 14 15if TYPE_CHECKING: 16 from .sources import SourceLike 17 18# A per-block descriptor handed to compute functions: a plain ``Block`` (no halo) or a 19# ``BlockWithHalo`` (halo operations). 20BlockDescriptor = Union[Block, BlockWithHalo] 21 22# Signature of a per-block compute function: ``function(block, inputs, outputs, mask)``. 23ComputeFn = Callable[ 24 [BlockDescriptor, Sequence[Source], Sequence[Source], Optional[Source]], Any 25] 26 27 28def to_roi(block: BlockDescriptor) -> Tuple[slice, ...]: 29 """Convert a ``bioimage_cpp.utils`` ``Block`` into a tuple of slices. 30 31 Args: 32 block: A ``Block`` (carrying ``begin``/``end`` coordinate lists). For halo 33 operations pass one of ``block.outer_block`` / ``block.inner_block`` / 34 ``block.inner_block_local``. 35 36 Returns: 37 A tuple of slices that indexes a source or array. 38 """ 39 return tuple(slice(int(b), int(e)) for b, e in zip(block.begin, block.end)) 40 41 42def full_roi(ndim: int) -> Tuple[slice, ...]: 43 """Return a slicing tuple that selects an entire ``ndim``-dimensional array.""" 44 return tuple(slice(None) for _ in range(ndim)) 45 46 47def is_direct(job_type: str, num_workers: int, block_shape: Optional[Tuple[int, ...]]) -> bool: 48 """Return whether a call qualifies for the direct (whole-array, non-blocked) fast path.""" 49 return job_type == "local" and num_workers == 1 and block_shape is None 50 51 52def check_direct(job_type: str, num_workers: int, block_shape: Optional[Tuple[int, ...]], 53 mask: "Optional[SourceLike]", block_ids: Optional[Sequence[int]]) -> bool: 54 """Like :func:`is_direct`, but reject mask/block_ids the direct reduction path cannot honor.""" 55 if is_direct(job_type, num_workers, block_shape): 56 if mask is not None or block_ids is not None: 57 raise ValueError("Direct computation does not support 'mask' or 'block_ids'.") 58 return True 59 return False 60 61 62def same_array(a: Source, b: Source) -> bool: 63 """Return whether two sources wrap the same underlying array object.""" 64 return getattr(a, "array", None) is getattr(b, "array", object()) 65 66 67def normalize_halo(halo: Union[int, Sequence[int]], ndim: int) -> List[int]: 68 """Broadcast a halo to a per-axis list of length ``ndim``.""" 69 if isinstance(halo, numbers.Integral): 70 return [int(halo)] * ndim 71 halo = [int(h) for h in halo] 72 if len(halo) != ndim: 73 raise ValueError(f"Halo {halo} does not match ndim {ndim}.") 74 return halo 75 76 77def sigma_to_halo(sigma: Union[float, Sequence[float]], order: int) -> Union[int, List[int]]: 78 """Compute the halo for applying an image filter block-wise. 79 80 Mirrors elf's implementation, based on VIGRA's ``multi_blockwise.hxx``. 81 82 Args: 83 sigma: The sigma value(s) of the filter. 84 order: The derivative order of the filter (0 for smoothing). 85 86 Returns: 87 The halo, as an int for scalar sigma or a per-axis list for sequence sigma. 88 """ 89 multiplier = 2 90 if isinstance(sigma, numbers.Number): 91 return multiplier * int(ceil(3.0 * sigma + 0.5 * order + 0.5)) 92 return [multiplier * int(ceil(3.0 * sig + 0.5 * order + 0.5)) for sig in sigma] 93 94 95def downscale_shape(shape: Sequence[int], scale_factor: Union[int, Sequence[int]], 96 ceil_mode: bool = True) -> Tuple[int, ...]: 97 """Compute the shape resulting from downscaling by an integer factor. 98 99 Mirrors elf's ``downscale_shape``. 100 101 Args: 102 shape: The input array shape. 103 scale_factor: The downscaling factor: a single int (isotropic) or a per-axis sequence. 104 ceil_mode: Whether to round the downscaled size up (so no input voxel is dropped) or 105 down (strict integer division). 106 107 Returns: 108 The downscaled shape. 109 110 Raises: 111 ValueError: If a per-axis ``scale_factor`` does not match the dimensionality of ``shape``. 112 """ 113 if isinstance(scale_factor, numbers.Integral): 114 factors = [int(scale_factor)] * len(shape) 115 else: 116 factors = [int(f) for f in scale_factor] 117 if len(factors) != len(shape): 118 raise ValueError( 119 f"scale_factor {scale_factor} does not match the dimensionality {len(shape)}." 120 ) 121 if ceil_mode: 122 return tuple(int(s) // f + int((int(s) % f) != 0) for s, f in zip(shape, factors)) 123 return tuple(int(s) // f for s, f in zip(shape, factors)) 124 125 126def derive_block_shape(source: Source, block_shape: Optional[Sequence[int]]) -> Tuple[int, ...]: 127 """Resolve the block shape, falling back to the source's chunks. 128 129 Args: 130 source: A source exposing ``shape`` and ``chunks``. 131 block_shape: The explicit block shape, or ``None`` to derive it from chunks. 132 133 Returns: 134 The resolved block shape. 135 136 Raises: 137 ValueError: If ``block_shape`` is ``None`` and the source is unchunked. 138 """ 139 if block_shape is not None: 140 return tuple(int(b) for b in block_shape) 141 chunks = source.chunks 142 if chunks is not None: 143 return tuple(int(c) for c in chunks) 144 raise ValueError( 145 "block_shape is required for block-wise processing of an unchunked array " 146 "(the source has no chunks to derive it from)." 147 ) 148 149 150def get_blocking(shape: Sequence[int], block_shape: Sequence[int], 151 roi: Optional[Tuple[slice, ...]] = None) -> Blocking: 152 """Build a ``bioimage_cpp.utils.Blocking`` over ``shape`` (or a sub-roi). 153 154 Args: 155 shape: The full array shape. 156 block_shape: The block shape. 157 roi: Optional region of interest to restrict the blocking to. 158 159 Returns: 160 A ``bioimage_cpp.utils.Blocking`` instance. 161 """ 162 ndim = len(shape) 163 if roi is None: 164 roi_begin = [0] * ndim 165 roi_end = [int(s) for s in shape] 166 else: 167 roi_begin = [int(sl.start) if sl.start is not None else 0 for sl in roi] 168 roi_end = [int(sl.stop) if sl.stop is not None else int(s) for sl, s in zip(roi, shape)] 169 return bic.utils.Blocking(roi_begin, roi_end, [int(b) for b in block_shape]) 170 171 172def check_rerun_args(job_type: str, resume_from: Optional[str], 173 subset: Optional[Sequence[int]], *, subset_name: str = "block_ids") -> None: 174 """Validate an operation's rerun arguments (``resume_from`` vs a subset). 175 176 Args: 177 job_type: The execution backend (``"local"``/``"subprocess"``/``"slurm"``). 178 resume_from: The preserved temp folder to resume from, or ``None``. 179 subset: The explicit subset (``block_ids``/``item_ids``) to process, or ``None``. 180 subset_name: The subset argument's name, for error messages. 181 182 Raises: 183 ValueError: If both ``resume_from`` and ``subset`` are given, or if ``resume_from`` is 184 used with the local backend (which keeps no temp folder to resume from). 185 """ 186 if resume_from is not None: 187 if subset is not None: 188 raise ValueError(f"Pass either 'resume_from' or '{subset_name}', not both.") 189 if job_type == "local": 190 raise ValueError( 191 "resume_from is only valid for distributed backends (subprocess/slurm); the " 192 "local runner keeps no temp folder. Re-run the operation in-process instead " 193 f"(optionally with {subset_name}=err.failed_block_ids for a subset)." 194 ) 195 196 197def group_blocks_by_shard( 198 blocking: Blocking, 199 outputs: Sequence[Source], 200 block_ids: Sequence[int], 201) -> Optional[List[List[int]]]: 202 """Group blocks so that every shard is written by a single worker. 203 204 For a sharded zarr v3 array the atomic write unit is the *shard*, not the inner chunk: 205 two blocks writing different inner chunks of the same shard concurrently corrupt it. To 206 keep the block shape flexible (rather than forcing it to a shard multiple) the runners 207 route each group to one worker, which processes its blocks sequentially — so same-shard 208 writes never race. This computes those groups: blocks that share any shard (for any 209 sharded output) are placed in the same group via a union-find over the block ids. 210 211 The shard grid is anchored at coordinate 0 and considered along the trailing (spatial) 212 shard axes only; a leading channel axis on an output is fully written by every block and 213 is not a routing axis (mirrors the chunk handling in 214 :meth:`Runner._validate_write_safety`). 215 216 Args: 217 blocking: The blocking used to map a block id to its (non-halo) write region. 218 outputs: The output sources; only those with a ``shards`` shape drive the grouping. 219 block_ids: The block ids to group. 220 221 Returns: 222 A list of groups (each a sorted list of block ids), ordered by each group's smallest 223 id; ``None`` if no output is sharded (the caller should then use the default 224 one-block-per-unit path); an empty list if ``block_ids`` is empty. 225 """ 226 sharded = [(idx, out) for idx, out in enumerate(outputs) if out.shards is not None] 227 if not sharded: 228 return None 229 block_ids = [int(b) for b in block_ids] 230 if not block_ids: 231 return [] 232 233 ndim = len(blocking.get_block(block_ids[0]).begin) 234 # Per sharded output, the spatial (trailing) shard extent that defines its cell grid. 235 shard_spatial = [(idx, tuple(int(s) for s in out.shards[-ndim:])) for idx, out in sharded] 236 237 # Union-find over the positions in block_ids (dense 0..n-1); bic's UnionFind, as used in 238 # segmentation/label.py, instead of a hand-rolled one. 239 uf = bic.utils.UnionFind(len(block_ids)) 240 cell_owner: Dict[Tuple[int, ...], int] = {} 241 for pos, bid in enumerate(block_ids): 242 block = blocking.get_block(bid) 243 begin = [int(b) for b in block.begin] 244 end = [int(e) for e in block.end] 245 for out_idx, shard in shard_spatial: 246 ranges = [range(begin[d] // shard[d], (end[d] + shard[d] - 1) // shard[d]) 247 for d in range(ndim)] 248 for cell in itertools.product(*ranges): 249 owner = cell_owner.setdefault((out_idx,) + cell, pos) 250 if owner != pos: 251 uf.merge(pos, owner) 252 253 groups: Dict[int, List[int]] = {} 254 for pos, bid in enumerate(block_ids): 255 groups.setdefault(int(uf.find(pos)), []).append(bid) 256 return sorted((sorted(g) for g in groups.values()), key=lambda g: g[0]) 257 258 259def maybe_warn_imbalance(loads: Sequence[int], num_workers: int, n_groups: int, 260 name: str) -> None: 261 """Warn when shard-exclusive routing leaves workers idle or badly load-imbalanced. 262 263 Args: 264 loads: The per-worker (or per-task) block counts of the assignment. 265 num_workers: The requested number of workers. 266 n_groups: The number of shard groups (schedulable units) the blocks formed. 267 name: A short run name used in the warning message. 268 """ 269 if not loads: 270 return 271 if n_groups < int(num_workers): 272 warnings.warn( 273 f"Shard routing for '{name or 'run'}' produced only {n_groups} shard-group(s) for " 274 f"{num_workers} workers, so {int(num_workers) - n_groups} worker(s) will be idle. " 275 "A few shards span the data; use a smaller shard shape or fewer workers to balance. " 276 "Results are still correct.", 277 stacklevel=2, 278 ) 279 return 280 mx, mn = max(loads), min(loads) 281 mean = sum(loads) / len(loads) 282 if mx > mn and mx > 1.5 * mean: 283 warnings.warn( 284 f"Uneven worker load for '{name or 'run'}': block counts per worker range {mn}..{mx} " 285 f"(mean {mean:.1f}). Some shards span disproportionately many blocks; results are " 286 "still correct but parallelism is reduced.", 287 stacklevel=2, 288 )
29def to_roi(block: BlockDescriptor) -> Tuple[slice, ...]: 30 """Convert a ``bioimage_cpp.utils`` ``Block`` into a tuple of slices. 31 32 Args: 33 block: A ``Block`` (carrying ``begin``/``end`` coordinate lists). For halo 34 operations pass one of ``block.outer_block`` / ``block.inner_block`` / 35 ``block.inner_block_local``. 36 37 Returns: 38 A tuple of slices that indexes a source or array. 39 """ 40 return tuple(slice(int(b), int(e)) for b, e in zip(block.begin, block.end))
Convert a bioimage_cpp.utils Block into a tuple of slices.
Args:
block: A Block (carrying begin/end coordinate lists). For halo
operations pass one of block.outer_block / block.inner_block /
block.inner_block_local.
Returns: A tuple of slices that indexes a source or array.
43def full_roi(ndim: int) -> Tuple[slice, ...]: 44 """Return a slicing tuple that selects an entire ``ndim``-dimensional array.""" 45 return tuple(slice(None) for _ in range(ndim))
Return a slicing tuple that selects an entire ndim-dimensional array.
48def is_direct(job_type: str, num_workers: int, block_shape: Optional[Tuple[int, ...]]) -> bool: 49 """Return whether a call qualifies for the direct (whole-array, non-blocked) fast path.""" 50 return job_type == "local" and num_workers == 1 and block_shape is None
Return whether a call qualifies for the direct (whole-array, non-blocked) fast path.
53def check_direct(job_type: str, num_workers: int, block_shape: Optional[Tuple[int, ...]], 54 mask: "Optional[SourceLike]", block_ids: Optional[Sequence[int]]) -> bool: 55 """Like :func:`is_direct`, but reject mask/block_ids the direct reduction path cannot honor.""" 56 if is_direct(job_type, num_workers, block_shape): 57 if mask is not None or block_ids is not None: 58 raise ValueError("Direct computation does not support 'mask' or 'block_ids'.") 59 return True 60 return False
Like is_direct(), but reject mask/block_ids the direct reduction path cannot honor.
63def same_array(a: Source, b: Source) -> bool: 64 """Return whether two sources wrap the same underlying array object.""" 65 return getattr(a, "array", None) is getattr(b, "array", object())
Return whether two sources wrap the same underlying array object.
68def normalize_halo(halo: Union[int, Sequence[int]], ndim: int) -> List[int]: 69 """Broadcast a halo to a per-axis list of length ``ndim``.""" 70 if isinstance(halo, numbers.Integral): 71 return [int(halo)] * ndim 72 halo = [int(h) for h in halo] 73 if len(halo) != ndim: 74 raise ValueError(f"Halo {halo} does not match ndim {ndim}.") 75 return halo
Broadcast a halo to a per-axis list of length ndim.
78def sigma_to_halo(sigma: Union[float, Sequence[float]], order: int) -> Union[int, List[int]]: 79 """Compute the halo for applying an image filter block-wise. 80 81 Mirrors elf's implementation, based on VIGRA's ``multi_blockwise.hxx``. 82 83 Args: 84 sigma: The sigma value(s) of the filter. 85 order: The derivative order of the filter (0 for smoothing). 86 87 Returns: 88 The halo, as an int for scalar sigma or a per-axis list for sequence sigma. 89 """ 90 multiplier = 2 91 if isinstance(sigma, numbers.Number): 92 return multiplier * int(ceil(3.0 * sigma + 0.5 * order + 0.5)) 93 return [multiplier * int(ceil(3.0 * sig + 0.5 * order + 0.5)) for sig in sigma]
Compute the halo for applying an image filter block-wise.
Mirrors elf's implementation, based on VIGRA's multi_blockwise.hxx.
Args: sigma: The sigma value(s) of the filter. order: The derivative order of the filter (0 for smoothing).
Returns: The halo, as an int for scalar sigma or a per-axis list for sequence sigma.
96def downscale_shape(shape: Sequence[int], scale_factor: Union[int, Sequence[int]], 97 ceil_mode: bool = True) -> Tuple[int, ...]: 98 """Compute the shape resulting from downscaling by an integer factor. 99 100 Mirrors elf's ``downscale_shape``. 101 102 Args: 103 shape: The input array shape. 104 scale_factor: The downscaling factor: a single int (isotropic) or a per-axis sequence. 105 ceil_mode: Whether to round the downscaled size up (so no input voxel is dropped) or 106 down (strict integer division). 107 108 Returns: 109 The downscaled shape. 110 111 Raises: 112 ValueError: If a per-axis ``scale_factor`` does not match the dimensionality of ``shape``. 113 """ 114 if isinstance(scale_factor, numbers.Integral): 115 factors = [int(scale_factor)] * len(shape) 116 else: 117 factors = [int(f) for f in scale_factor] 118 if len(factors) != len(shape): 119 raise ValueError( 120 f"scale_factor {scale_factor} does not match the dimensionality {len(shape)}." 121 ) 122 if ceil_mode: 123 return tuple(int(s) // f + int((int(s) % f) != 0) for s, f in zip(shape, factors)) 124 return tuple(int(s) // f for s, f in zip(shape, factors))
Compute the shape resulting from downscaling by an integer factor.
Mirrors elf's downscale_shape.
Args: shape: The input array shape. scale_factor: The downscaling factor: a single int (isotropic) or a per-axis sequence. ceil_mode: Whether to round the downscaled size up (so no input voxel is dropped) or down (strict integer division).
Returns: The downscaled shape.
Raises:
ValueError: If a per-axis scale_factor does not match the dimensionality of shape.
127def derive_block_shape(source: Source, block_shape: Optional[Sequence[int]]) -> Tuple[int, ...]: 128 """Resolve the block shape, falling back to the source's chunks. 129 130 Args: 131 source: A source exposing ``shape`` and ``chunks``. 132 block_shape: The explicit block shape, or ``None`` to derive it from chunks. 133 134 Returns: 135 The resolved block shape. 136 137 Raises: 138 ValueError: If ``block_shape`` is ``None`` and the source is unchunked. 139 """ 140 if block_shape is not None: 141 return tuple(int(b) for b in block_shape) 142 chunks = source.chunks 143 if chunks is not None: 144 return tuple(int(c) for c in chunks) 145 raise ValueError( 146 "block_shape is required for block-wise processing of an unchunked array " 147 "(the source has no chunks to derive it from)." 148 )
Resolve the block shape, falling back to the source's chunks.
Args:
source: A source exposing shape and chunks.
block_shape: The explicit block shape, or None to derive it from chunks.
Returns: The resolved block shape.
Raises:
ValueError: If block_shape is None and the source is unchunked.
151def get_blocking(shape: Sequence[int], block_shape: Sequence[int], 152 roi: Optional[Tuple[slice, ...]] = None) -> Blocking: 153 """Build a ``bioimage_cpp.utils.Blocking`` over ``shape`` (or a sub-roi). 154 155 Args: 156 shape: The full array shape. 157 block_shape: The block shape. 158 roi: Optional region of interest to restrict the blocking to. 159 160 Returns: 161 A ``bioimage_cpp.utils.Blocking`` instance. 162 """ 163 ndim = len(shape) 164 if roi is None: 165 roi_begin = [0] * ndim 166 roi_end = [int(s) for s in shape] 167 else: 168 roi_begin = [int(sl.start) if sl.start is not None else 0 for sl in roi] 169 roi_end = [int(sl.stop) if sl.stop is not None else int(s) for sl, s in zip(roi, shape)] 170 return bic.utils.Blocking(roi_begin, roi_end, [int(b) for b in block_shape])
Build a bioimage_cpp.utils.Blocking over shape (or a sub-roi).
Args: shape: The full array shape. block_shape: The block shape. roi: Optional region of interest to restrict the blocking to.
Returns:
A bioimage_cpp.utils.Blocking instance.
173def check_rerun_args(job_type: str, resume_from: Optional[str], 174 subset: Optional[Sequence[int]], *, subset_name: str = "block_ids") -> None: 175 """Validate an operation's rerun arguments (``resume_from`` vs a subset). 176 177 Args: 178 job_type: The execution backend (``"local"``/``"subprocess"``/``"slurm"``). 179 resume_from: The preserved temp folder to resume from, or ``None``. 180 subset: The explicit subset (``block_ids``/``item_ids``) to process, or ``None``. 181 subset_name: The subset argument's name, for error messages. 182 183 Raises: 184 ValueError: If both ``resume_from`` and ``subset`` are given, or if ``resume_from`` is 185 used with the local backend (which keeps no temp folder to resume from). 186 """ 187 if resume_from is not None: 188 if subset is not None: 189 raise ValueError(f"Pass either 'resume_from' or '{subset_name}', not both.") 190 if job_type == "local": 191 raise ValueError( 192 "resume_from is only valid for distributed backends (subprocess/slurm); the " 193 "local runner keeps no temp folder. Re-run the operation in-process instead " 194 f"(optionally with {subset_name}=err.failed_block_ids for a subset)." 195 )
Validate an operation's rerun arguments (resume_from vs a subset).
Args:
job_type: The execution backend ("local"/"subprocess"/"slurm").
resume_from: The preserved temp folder to resume from, or None.
subset: The explicit subset (block_ids/item_ids) to process, or None.
subset_name: The subset argument's name, for error messages.
Raises:
ValueError: If both resume_from and subset are given, or if resume_from is
used with the local backend (which keeps no temp folder to resume from).
198def group_blocks_by_shard( 199 blocking: Blocking, 200 outputs: Sequence[Source], 201 block_ids: Sequence[int], 202) -> Optional[List[List[int]]]: 203 """Group blocks so that every shard is written by a single worker. 204 205 For a sharded zarr v3 array the atomic write unit is the *shard*, not the inner chunk: 206 two blocks writing different inner chunks of the same shard concurrently corrupt it. To 207 keep the block shape flexible (rather than forcing it to a shard multiple) the runners 208 route each group to one worker, which processes its blocks sequentially — so same-shard 209 writes never race. This computes those groups: blocks that share any shard (for any 210 sharded output) are placed in the same group via a union-find over the block ids. 211 212 The shard grid is anchored at coordinate 0 and considered along the trailing (spatial) 213 shard axes only; a leading channel axis on an output is fully written by every block and 214 is not a routing axis (mirrors the chunk handling in 215 :meth:`Runner._validate_write_safety`). 216 217 Args: 218 blocking: The blocking used to map a block id to its (non-halo) write region. 219 outputs: The output sources; only those with a ``shards`` shape drive the grouping. 220 block_ids: The block ids to group. 221 222 Returns: 223 A list of groups (each a sorted list of block ids), ordered by each group's smallest 224 id; ``None`` if no output is sharded (the caller should then use the default 225 one-block-per-unit path); an empty list if ``block_ids`` is empty. 226 """ 227 sharded = [(idx, out) for idx, out in enumerate(outputs) if out.shards is not None] 228 if not sharded: 229 return None 230 block_ids = [int(b) for b in block_ids] 231 if not block_ids: 232 return [] 233 234 ndim = len(blocking.get_block(block_ids[0]).begin) 235 # Per sharded output, the spatial (trailing) shard extent that defines its cell grid. 236 shard_spatial = [(idx, tuple(int(s) for s in out.shards[-ndim:])) for idx, out in sharded] 237 238 # Union-find over the positions in block_ids (dense 0..n-1); bic's UnionFind, as used in 239 # segmentation/label.py, instead of a hand-rolled one. 240 uf = bic.utils.UnionFind(len(block_ids)) 241 cell_owner: Dict[Tuple[int, ...], int] = {} 242 for pos, bid in enumerate(block_ids): 243 block = blocking.get_block(bid) 244 begin = [int(b) for b in block.begin] 245 end = [int(e) for e in block.end] 246 for out_idx, shard in shard_spatial: 247 ranges = [range(begin[d] // shard[d], (end[d] + shard[d] - 1) // shard[d]) 248 for d in range(ndim)] 249 for cell in itertools.product(*ranges): 250 owner = cell_owner.setdefault((out_idx,) + cell, pos) 251 if owner != pos: 252 uf.merge(pos, owner) 253 254 groups: Dict[int, List[int]] = {} 255 for pos, bid in enumerate(block_ids): 256 groups.setdefault(int(uf.find(pos)), []).append(bid) 257 return sorted((sorted(g) for g in groups.values()), key=lambda g: g[0])
Group blocks so that every shard is written by a single worker.
For a sharded zarr v3 array the atomic write unit is the shard, not the inner chunk: two blocks writing different inner chunks of the same shard concurrently corrupt it. To keep the block shape flexible (rather than forcing it to a shard multiple) the runners route each group to one worker, which processes its blocks sequentially — so same-shard writes never race. This computes those groups: blocks that share any shard (for any sharded output) are placed in the same group via a union-find over the block ids.
The shard grid is anchored at coordinate 0 and considered along the trailing (spatial)
shard axes only; a leading channel axis on an output is fully written by every block and
is not a routing axis (mirrors the chunk handling in
Runner._validate_write_safety()).
Args:
blocking: The blocking used to map a block id to its (non-halo) write region.
outputs: The output sources; only those with a shards shape drive the grouping.
block_ids: The block ids to group.
Returns:
A list of groups (each a sorted list of block ids), ordered by each group's smallest
id; None if no output is sharded (the caller should then use the default
one-block-per-unit path); an empty list if block_ids is empty.
260def maybe_warn_imbalance(loads: Sequence[int], num_workers: int, n_groups: int, 261 name: str) -> None: 262 """Warn when shard-exclusive routing leaves workers idle or badly load-imbalanced. 263 264 Args: 265 loads: The per-worker (or per-task) block counts of the assignment. 266 num_workers: The requested number of workers. 267 n_groups: The number of shard groups (schedulable units) the blocks formed. 268 name: A short run name used in the warning message. 269 """ 270 if not loads: 271 return 272 if n_groups < int(num_workers): 273 warnings.warn( 274 f"Shard routing for '{name or 'run'}' produced only {n_groups} shard-group(s) for " 275 f"{num_workers} workers, so {int(num_workers) - n_groups} worker(s) will be idle. " 276 "A few shards span the data; use a smaller shard shape or fewer workers to balance. " 277 "Results are still correct.", 278 stacklevel=2, 279 ) 280 return 281 mx, mn = max(loads), min(loads) 282 mean = sum(loads) / len(loads) 283 if mx > mn and mx > 1.5 * mean: 284 warnings.warn( 285 f"Uneven worker load for '{name or 'run'}': block counts per worker range {mn}..{mx} " 286 f"(mean {mean:.1f}). Some shards span disproportionately many blocks; results are " 287 "still correct but parallelism is reduced.", 288 stacklevel=2, 289 )
Warn when shard-exclusive routing leaves workers idle or badly load-imbalanced.
Args: loads: The per-worker (or per-task) block counts of the assignment. num_workers: The requested number of workers. n_groups: The number of shard groups (schedulable units) the blocks formed. name: A short run name used in the warning message.