bioimage_py.segmentation.label
Block-wise connected-component labeling (multi-stage: label -> merge -> relabel).
The block-wise path runs three ~bioimage_py.runner.base.Runner.run() calls plus one
in-process reduction, with the labeled volume persisted in the output source between
stages. This mirrors cluster_tools' connected-components workflow.
Inter-stage data visibility (distributed/slurm): stage N+1 is only launched after every stage-N
task's .success sentinel is visible, i.e. after all stage-N workers have exited and flushed
their writes. Stage N+1 then reads the output's chunks by direct path (zarr/n5 address a chunk by
key, with no directory listing), so NFS close-to-open consistency guarantees it sees the fresh
data -- the attribute-cache lag that affects sentinel discovery does not apply to reads of an
already-named chunk file.
1"""Block-wise connected-component labeling (multi-stage: label -> merge -> relabel). 2 3The block-wise path runs three :meth:`~bioimage_py.runner.base.Runner.run` calls plus one 4in-process reduction, with the labeled volume persisted in the ``output`` source between 5stages. This mirrors cluster_tools' connected-components workflow. 6 7Inter-stage data visibility (distributed/slurm): stage N+1 is only launched after every stage-N 8task's ``.success`` sentinel is visible, i.e. after all stage-N workers have exited and flushed 9their writes. Stage N+1 then reads the output's chunks by direct path (zarr/n5 address a chunk by 10key, with no directory listing), so NFS close-to-open consistency guarantees it sees the fresh 11data -- the attribute-cache lag that affects sentinel *discovery* does not apply to reads of an 12already-named chunk file. 13""" 14from __future__ import annotations 15 16from typing import Dict, Optional, Sequence, Tuple 17 18import bioimage_cpp as bic 19import numpy as np 20 21from ..runner import get_runner 22from ..runner.config import RunnerConfig 23from ..sources import Source, SourceLike, as_source 24from ..util import BlockDescriptor, ComputeFn, full_roi, get_blocking, is_direct, to_roi 25 26__all__ = ["label"] 27 28 29def _binarize(data: np.ndarray, threshold: Optional[float]) -> np.ndarray: 30 """Binarize ``data`` by threshold, or interpret it as a boolean foreground mask.""" 31 if threshold is not None: 32 return data > threshold 33 return data if data.dtype == np.dtype(bool) else data.astype(bool) 34 35 36def _resolve_block_shape(src: Source, out: Source, 37 block_shape: Optional[Sequence[int]]) -> Tuple[int, ...]: 38 """Resolve the block shape from the explicit value or the input/output chunks.""" 39 if block_shape is not None: 40 return tuple(int(b) for b in block_shape) 41 chunks = src.chunks or out.chunks 42 if chunks is None: 43 raise ValueError("block_shape is required for block-wise labeling of an unchunked array.") 44 return tuple(int(c) for c in chunks) 45 46 47# --- per-block stage functions (built as closures, capturing only picklable values) ---- 48 49def _make_stage1(shape: Tuple[int, ...], block_shape: Tuple[int, ...], connectivity: int, 50 threshold: Optional[float], offset_factor: int) -> ComputeFn: 51 """Build stage 1: label each block independently and apply a globally-unique offset.""" 52 53 def _compute(block: BlockDescriptor, inputs: Sequence[Source], outputs: Sequence[Source], 54 mask: Optional[Source]) -> Optional[np.ndarray]: 55 input_, output_ = inputs[0], outputs[0] 56 roi = to_roi(block) 57 blocking = get_blocking(shape, block_shape) 58 block_id = blocking.coordinates_to_block_id([int(c) for c in block.begin]) 59 60 binary = _binarize(input_[roi], threshold) 61 if mask is not None: 62 binary = binary & mask[roi].astype(bool) 63 64 if not binary.any(): 65 output_[roi] = np.zeros(binary.shape, dtype="uint64") 66 return None 67 68 comp = bic.segmentation.label(binary, connectivity=connectivity).astype("uint64", copy=False) 69 offset = np.uint64(int(block_id) * int(offset_factor)) 70 comp[comp != 0] += offset 71 output_[roi] = comp 72 # Return the block's actual (globally-unique) labels so stage 3 can relabel only 73 # over labels that exist, not over the sparse offset space. 74 return np.unique(comp[comp != 0]) 75 76 return _compute 77 78 79def _make_stage2(shape: Tuple[int, ...], block_shape: Tuple[int, ...]) -> ComputeFn: 80 """Build stage 2: collect label equivalences across lower block faces.""" 81 82 def _compute(block: BlockDescriptor, inputs: Sequence[Source], outputs: Sequence[Source], 83 mask: Optional[Source]) -> Optional[np.ndarray]: 84 output_ = inputs[0] # the labeled volume, passed as a read-only input 85 ndim = len(shape) 86 blocking = get_blocking(shape, block_shape) 87 block_id = blocking.coordinates_to_block_id([int(c) for c in block.begin]) 88 89 pairs = [] 90 block_roi = to_roi(block) 91 for axis in range(ndim): 92 if blocking.get_neighbor_id(block_id, axis, True) == -1: # no lower neighbor 93 continue 94 b0 = int(block.begin[axis]) 95 slab_roi = list(block_roi) 96 slab_roi[axis] = slice(b0 - 1, b0 + 1) # 2-thick slab straddling the boundary 97 slab = output_[tuple(slab_roi)] 98 lo = tuple(slice(0, 1) if d == axis else slice(None) for d in range(ndim)) 99 hi = tuple(slice(1, 2) if d == axis else slice(None) for d in range(ndim)) 100 labels_b = np.squeeze(slab[lo], axis=axis) # neighbor side 101 labels_a = np.squeeze(slab[hi], axis=axis) # this block side 102 keep = (labels_a != 0) & (labels_b != 0) 103 if keep.any(): 104 pairs.append(np.stack([labels_a[keep], labels_b[keep]], axis=1).astype("uint64")) 105 106 if not pairs: 107 return None 108 return np.unique(np.concatenate(pairs, axis=0), axis=0) 109 110 return _compute 111 112 113def _make_stage4(mapping: Dict[int, int]) -> ComputeFn: 114 """Build stage 4: apply the final label mapping in place.""" 115 116 def _compute(block: BlockDescriptor, inputs: Sequence[Source], outputs: Sequence[Source], 117 mask: Optional[Source]) -> None: 118 output_ = outputs[0] 119 roi = to_roi(block) 120 output_[roi] = bic.utils.take_dict(mapping, output_[roi]) 121 return None 122 123 return _compute 124 125 126def label( 127 input: SourceLike, 128 output: Optional[SourceLike] = None, 129 *, 130 threshold: Optional[float] = None, 131 connectivity: Optional[int] = None, 132 block_shape: Optional[Tuple[int, ...]] = None, 133 job_type: str = "local", 134 job_config: Optional[RunnerConfig] = None, 135 num_workers: int = 1, 136 mask: Optional[SourceLike] = None, 137) -> SourceLike: 138 """Label connected components of (optionally thresholded) data, block-wise. 139 140 Unlike the single-pass operations, ``label`` is multi-stage with a global cross-block merge 141 (per-block labeling, then a union-find over touching components across block faces), so it 142 does **not** accept ``block_ids`` or ``resume_from``: a failed run must be re-run whole (it is 143 idempotent given the same ``output``). 144 145 Args: 146 input: The input data (a numpy/zarr/n5 array or a `Source`). 147 output: The ``uint64`` output array to write the labels into. Optional for local 148 execution — a numpy array is allocated and returned if omitted; **required** for 149 distributed execution. 150 threshold: If given, the input is binarized as ``input > threshold``; otherwise the 151 input is treated as a binary foreground mask. 152 connectivity: Neighbour connectivity in ``[1, ndim]`` (``1`` = orthogonal). Defaults 153 to ``1``; values ``> 1`` are only supported for the direct (single-block) path. 154 block_shape: Shape of the processing blocks. Defaults to the input/output chunk shape; 155 required for unchunked data. 156 job_type: Execution backend: one of ``"local"``, ``"subprocess"`` or ``"slurm"``. 157 job_config: Backend configuration (a `RunnerConfig` / `SlurmConfig`). 158 num_workers: Number of parallel workers (threads for ``local``, tasks for distributed 159 backends). 160 mask: Optional binary mask; values outside the mask are excluded from the foreground. 161 162 Returns: 163 The output array (the provided ``output``, or a newly allocated numpy array), labeled 164 with consecutive ids (background stays ``0``). 165 """ 166 src = as_source(input) 167 ndim = src.ndim 168 conn = 1 if connectivity is None else int(connectivity) 169 if not 1 <= conn <= ndim: 170 raise ValueError(f"connectivity must be in [1, {ndim}], got {conn}.") 171 172 direct = is_direct(job_type, num_workers, block_shape) and mask is None 173 if conn > 1 and not direct: 174 raise NotImplementedError( 175 "Block-wise labeling only supports connectivity=1 (orthogonal). Use the direct " 176 "path (local, single worker, no block_shape, no mask) for higher connectivity." 177 ) 178 179 if output is None: 180 if job_type != "local": 181 raise ValueError( 182 f"'output' is required for distributed execution (job_type={job_type!r}); " 183 "pass a file-backed (zarr/n5) output array." 184 ) 185 out_array: SourceLike = np.zeros(tuple(src.shape), dtype="uint64") 186 else: 187 out_array = output 188 189 out = as_source(out_array) 190 if out.dtype != np.dtype("uint64"): 191 raise ValueError(f"output must have dtype uint64, got {out.dtype}.") 192 193 if direct: 194 binary = _binarize(src[full_roi(ndim)], threshold) 195 comp = bic.segmentation.label(binary, connectivity=conn).astype("uint64", copy=False) 196 out[full_roi(ndim)] = comp 197 return out_array 198 199 block_shape = _resolve_block_shape(src, out, block_shape) 200 offset_factor = int(np.prod(block_shape)) 201 blocking = get_blocking(src.shape, block_shape) 202 n_blocks = int(blocking.number_of_blocks) 203 if (n_blocks * offset_factor) >= int(np.iinfo(np.uint64).max): 204 raise ValueError( 205 "Label id overflow: number_of_blocks * prod(block_shape) exceeds uint64. " 206 "Reduce the block shape or the volume size." 207 ) 208 209 runner = get_runner(job_type, job_config) 210 211 # Stage 1: label each block independently with a globally-unique offset. 212 stage1 = _make_stage1(tuple(src.shape), block_shape, conn, threshold, offset_factor) 213 id_results = runner.run(stage1, [input], outputs=[out_array], block_shape=block_shape, 214 mask=mask, num_workers=num_workers, has_return_val=True, 215 name="label-blocks") 216 id_arrays = [a for a in id_results if a is not None and len(a)] 217 real_labels = np.unique(np.concatenate(id_arrays)) if id_arrays else np.zeros((0,), dtype="uint64") 218 max_label = int(real_labels.max()) if real_labels.size else 0 219 220 # Stage 2: collect label equivalences across lower block faces. 221 stage2 = _make_stage2(tuple(src.shape), block_shape) 222 pair_results = runner.run(stage2, [out_array], block_shape=block_shape, 223 num_workers=num_workers, has_return_val=True, name="merge-faces") 224 pairs = [p for p in pair_results if p is not None] 225 assignments = (np.unique(np.concatenate(pairs, axis=0), axis=0) 226 if pairs else np.zeros((0, 2), dtype="uint64")) 227 228 # Stage 3 (in process): union-find merge, then relabel only the labels that exist to 229 # consecutive ids (the offset space is sparse, so relabeling over it would not be compact). 230 uf = bic.utils.UnionFind(max_label + 1) 231 if len(assignments): 232 uf.merge(assignments.astype("uint64")) 233 mapping: Dict[int, int] = {0: 0} 234 if real_labels.size: 235 roots = np.asarray(uf.find(real_labels.astype("uint64"))) 236 root_to_new = {int(r): i + 1 for i, r in enumerate(np.unique(roots).tolist())} 237 for lab, root in zip(real_labels.tolist(), roots.tolist()): 238 mapping[int(lab)] = root_to_new[int(root)] 239 240 # Stage 4: apply the mapping in place. 241 stage4 = _make_stage4(mapping) 242 runner.run(stage4, [], outputs=[out_array], block_shape=block_shape, 243 num_workers=num_workers, has_return_val=False, name="relabel") 244 return out_array
127def label( 128 input: SourceLike, 129 output: Optional[SourceLike] = None, 130 *, 131 threshold: Optional[float] = None, 132 connectivity: Optional[int] = None, 133 block_shape: Optional[Tuple[int, ...]] = None, 134 job_type: str = "local", 135 job_config: Optional[RunnerConfig] = None, 136 num_workers: int = 1, 137 mask: Optional[SourceLike] = None, 138) -> SourceLike: 139 """Label connected components of (optionally thresholded) data, block-wise. 140 141 Unlike the single-pass operations, ``label`` is multi-stage with a global cross-block merge 142 (per-block labeling, then a union-find over touching components across block faces), so it 143 does **not** accept ``block_ids`` or ``resume_from``: a failed run must be re-run whole (it is 144 idempotent given the same ``output``). 145 146 Args: 147 input: The input data (a numpy/zarr/n5 array or a `Source`). 148 output: The ``uint64`` output array to write the labels into. Optional for local 149 execution — a numpy array is allocated and returned if omitted; **required** for 150 distributed execution. 151 threshold: If given, the input is binarized as ``input > threshold``; otherwise the 152 input is treated as a binary foreground mask. 153 connectivity: Neighbour connectivity in ``[1, ndim]`` (``1`` = orthogonal). Defaults 154 to ``1``; values ``> 1`` are only supported for the direct (single-block) path. 155 block_shape: Shape of the processing blocks. Defaults to the input/output chunk shape; 156 required for unchunked data. 157 job_type: Execution backend: one of ``"local"``, ``"subprocess"`` or ``"slurm"``. 158 job_config: Backend configuration (a `RunnerConfig` / `SlurmConfig`). 159 num_workers: Number of parallel workers (threads for ``local``, tasks for distributed 160 backends). 161 mask: Optional binary mask; values outside the mask are excluded from the foreground. 162 163 Returns: 164 The output array (the provided ``output``, or a newly allocated numpy array), labeled 165 with consecutive ids (background stays ``0``). 166 """ 167 src = as_source(input) 168 ndim = src.ndim 169 conn = 1 if connectivity is None else int(connectivity) 170 if not 1 <= conn <= ndim: 171 raise ValueError(f"connectivity must be in [1, {ndim}], got {conn}.") 172 173 direct = is_direct(job_type, num_workers, block_shape) and mask is None 174 if conn > 1 and not direct: 175 raise NotImplementedError( 176 "Block-wise labeling only supports connectivity=1 (orthogonal). Use the direct " 177 "path (local, single worker, no block_shape, no mask) for higher connectivity." 178 ) 179 180 if output is None: 181 if job_type != "local": 182 raise ValueError( 183 f"'output' is required for distributed execution (job_type={job_type!r}); " 184 "pass a file-backed (zarr/n5) output array." 185 ) 186 out_array: SourceLike = np.zeros(tuple(src.shape), dtype="uint64") 187 else: 188 out_array = output 189 190 out = as_source(out_array) 191 if out.dtype != np.dtype("uint64"): 192 raise ValueError(f"output must have dtype uint64, got {out.dtype}.") 193 194 if direct: 195 binary = _binarize(src[full_roi(ndim)], threshold) 196 comp = bic.segmentation.label(binary, connectivity=conn).astype("uint64", copy=False) 197 out[full_roi(ndim)] = comp 198 return out_array 199 200 block_shape = _resolve_block_shape(src, out, block_shape) 201 offset_factor = int(np.prod(block_shape)) 202 blocking = get_blocking(src.shape, block_shape) 203 n_blocks = int(blocking.number_of_blocks) 204 if (n_blocks * offset_factor) >= int(np.iinfo(np.uint64).max): 205 raise ValueError( 206 "Label id overflow: number_of_blocks * prod(block_shape) exceeds uint64. " 207 "Reduce the block shape or the volume size." 208 ) 209 210 runner = get_runner(job_type, job_config) 211 212 # Stage 1: label each block independently with a globally-unique offset. 213 stage1 = _make_stage1(tuple(src.shape), block_shape, conn, threshold, offset_factor) 214 id_results = runner.run(stage1, [input], outputs=[out_array], block_shape=block_shape, 215 mask=mask, num_workers=num_workers, has_return_val=True, 216 name="label-blocks") 217 id_arrays = [a for a in id_results if a is not None and len(a)] 218 real_labels = np.unique(np.concatenate(id_arrays)) if id_arrays else np.zeros((0,), dtype="uint64") 219 max_label = int(real_labels.max()) if real_labels.size else 0 220 221 # Stage 2: collect label equivalences across lower block faces. 222 stage2 = _make_stage2(tuple(src.shape), block_shape) 223 pair_results = runner.run(stage2, [out_array], block_shape=block_shape, 224 num_workers=num_workers, has_return_val=True, name="merge-faces") 225 pairs = [p for p in pair_results if p is not None] 226 assignments = (np.unique(np.concatenate(pairs, axis=0), axis=0) 227 if pairs else np.zeros((0, 2), dtype="uint64")) 228 229 # Stage 3 (in process): union-find merge, then relabel only the labels that exist to 230 # consecutive ids (the offset space is sparse, so relabeling over it would not be compact). 231 uf = bic.utils.UnionFind(max_label + 1) 232 if len(assignments): 233 uf.merge(assignments.astype("uint64")) 234 mapping: Dict[int, int] = {0: 0} 235 if real_labels.size: 236 roots = np.asarray(uf.find(real_labels.astype("uint64"))) 237 root_to_new = {int(r): i + 1 for i, r in enumerate(np.unique(roots).tolist())} 238 for lab, root in zip(real_labels.tolist(), roots.tolist()): 239 mapping[int(lab)] = root_to_new[int(root)] 240 241 # Stage 4: apply the mapping in place. 242 stage4 = _make_stage4(mapping) 243 runner.run(stage4, [], outputs=[out_array], block_shape=block_shape, 244 num_workers=num_workers, has_return_val=False, name="relabel") 245 return out_array
Label connected components of (optionally thresholded) data, block-wise.
Unlike the single-pass operations, label is multi-stage with a global cross-block merge
(per-block labeling, then a union-find over touching components across block faces), so it
does not accept block_ids or resume_from: a failed run must be re-run whole (it is
idempotent given the same output).
Args:
input: The input data (a numpy/zarr/n5 array or a Source).
output: The uint64 output array to write the labels into. Optional for local
execution — a numpy array is allocated and returned if omitted; required for
distributed execution.
threshold: If given, the input is binarized as input > threshold; otherwise the
input is treated as a binary foreground mask.
connectivity: Neighbour connectivity in [1, ndim] (1 = orthogonal). Defaults
to 1; values > 1 are only supported for the direct (single-block) path.
block_shape: Shape of the processing blocks. Defaults to the input/output chunk shape;
required for unchunked data.
job_type: Execution backend: one of "local", "subprocess" or "slurm".
job_config: Backend configuration (a RunnerConfig / SlurmConfig).
num_workers: Number of parallel workers (threads for local, tasks for distributed
backends).
mask: Optional binary mask; values outside the mask are excluded from the foreground.
Returns:
The output array (the provided output, or a newly allocated numpy array), labeled
with consecutive ids (background stays 0).