bioimage_py.segmentation.label

Block-wise connected-component labeling (multi-stage: label -> merge -> relabel).

The block-wise path runs three ~bioimage_py.runner.base.Runner.run() calls plus one in-process reduction, with the labeled volume persisted in the output source between stages. This mirrors cluster_tools' connected-components workflow.

Inter-stage data visibility (distributed/slurm): stage N+1 is only launched after every stage-N task's .success sentinel is visible, i.e. after all stage-N workers have exited and flushed their writes. Stage N+1 then reads the output's chunks by direct path (zarr/n5 address a chunk by key, with no directory listing), so NFS close-to-open consistency guarantees it sees the fresh data -- the attribute-cache lag that affects sentinel discovery does not apply to reads of an already-named chunk file.

  1"""Block-wise connected-component labeling (multi-stage: label -> merge -> relabel).
  2
  3The block-wise path runs three :meth:`~bioimage_py.runner.base.Runner.run` calls plus one
  4in-process reduction, with the labeled volume persisted in the ``output`` source between
  5stages. This mirrors cluster_tools' connected-components workflow.
  6
  7Inter-stage data visibility (distributed/slurm): stage N+1 is only launched after every stage-N
  8task's ``.success`` sentinel is visible, i.e. after all stage-N workers have exited and flushed
  9their writes. Stage N+1 then reads the output's chunks by direct path (zarr/n5 address a chunk by
 10key, with no directory listing), so NFS close-to-open consistency guarantees it sees the fresh
 11data -- the attribute-cache lag that affects sentinel *discovery* does not apply to reads of an
 12already-named chunk file.
 13"""
 14from __future__ import annotations
 15
 16from typing import Dict, Optional, Sequence, Tuple
 17
 18import bioimage_cpp as bic
 19import numpy as np
 20
 21from ..runner import get_runner
 22from ..runner.config import RunnerConfig
 23from ..sources import Source, SourceLike, as_source
 24from ..util import BlockDescriptor, ComputeFn, full_roi, get_blocking, is_direct, to_roi
 25
 26__all__ = ["label"]
 27
 28
 29def _binarize(data: np.ndarray, threshold: Optional[float]) -> np.ndarray:
 30    """Binarize ``data`` by threshold, or interpret it as a boolean foreground mask."""
 31    if threshold is not None:
 32        return data > threshold
 33    return data if data.dtype == np.dtype(bool) else data.astype(bool)
 34
 35
 36def _resolve_block_shape(src: Source, out: Source,
 37                         block_shape: Optional[Sequence[int]]) -> Tuple[int, ...]:
 38    """Resolve the block shape from the explicit value or the input/output chunks."""
 39    if block_shape is not None:
 40        return tuple(int(b) for b in block_shape)
 41    chunks = src.chunks or out.chunks
 42    if chunks is None:
 43        raise ValueError("block_shape is required for block-wise labeling of an unchunked array.")
 44    return tuple(int(c) for c in chunks)
 45
 46
 47# --- per-block stage functions (built as closures, capturing only picklable values) ----
 48
 49def _make_stage1(shape: Tuple[int, ...], block_shape: Tuple[int, ...], connectivity: int,
 50                 threshold: Optional[float], offset_factor: int) -> ComputeFn:
 51    """Build stage 1: label each block independently and apply a globally-unique offset."""
 52
 53    def _compute(block: BlockDescriptor, inputs: Sequence[Source], outputs: Sequence[Source],
 54                 mask: Optional[Source]) -> Optional[np.ndarray]:
 55        input_, output_ = inputs[0], outputs[0]
 56        roi = to_roi(block)
 57        blocking = get_blocking(shape, block_shape)
 58        block_id = blocking.coordinates_to_block_id([int(c) for c in block.begin])
 59
 60        binary = _binarize(input_[roi], threshold)
 61        if mask is not None:
 62            binary = binary & mask[roi].astype(bool)
 63
 64        if not binary.any():
 65            output_[roi] = np.zeros(binary.shape, dtype="uint64")
 66            return None
 67
 68        comp = bic.segmentation.label(binary, connectivity=connectivity).astype("uint64", copy=False)
 69        offset = np.uint64(int(block_id) * int(offset_factor))
 70        comp[comp != 0] += offset
 71        output_[roi] = comp
 72        # Return the block's actual (globally-unique) labels so stage 3 can relabel only
 73        # over labels that exist, not over the sparse offset space.
 74        return np.unique(comp[comp != 0])
 75
 76    return _compute
 77
 78
 79def _make_stage2(shape: Tuple[int, ...], block_shape: Tuple[int, ...]) -> ComputeFn:
 80    """Build stage 2: collect label equivalences across lower block faces."""
 81
 82    def _compute(block: BlockDescriptor, inputs: Sequence[Source], outputs: Sequence[Source],
 83                 mask: Optional[Source]) -> Optional[np.ndarray]:
 84        output_ = inputs[0]  # the labeled volume, passed as a read-only input
 85        ndim = len(shape)
 86        blocking = get_blocking(shape, block_shape)
 87        block_id = blocking.coordinates_to_block_id([int(c) for c in block.begin])
 88
 89        pairs = []
 90        block_roi = to_roi(block)
 91        for axis in range(ndim):
 92            if blocking.get_neighbor_id(block_id, axis, True) == -1:  # no lower neighbor
 93                continue
 94            b0 = int(block.begin[axis])
 95            slab_roi = list(block_roi)
 96            slab_roi[axis] = slice(b0 - 1, b0 + 1)  # 2-thick slab straddling the boundary
 97            slab = output_[tuple(slab_roi)]
 98            lo = tuple(slice(0, 1) if d == axis else slice(None) for d in range(ndim))
 99            hi = tuple(slice(1, 2) if d == axis else slice(None) for d in range(ndim))
100            labels_b = np.squeeze(slab[lo], axis=axis)  # neighbor side
101            labels_a = np.squeeze(slab[hi], axis=axis)  # this block side
102            keep = (labels_a != 0) & (labels_b != 0)
103            if keep.any():
104                pairs.append(np.stack([labels_a[keep], labels_b[keep]], axis=1).astype("uint64"))
105
106        if not pairs:
107            return None
108        return np.unique(np.concatenate(pairs, axis=0), axis=0)
109
110    return _compute
111
112
113def _make_stage4(mapping: Dict[int, int]) -> ComputeFn:
114    """Build stage 4: apply the final label mapping in place."""
115
116    def _compute(block: BlockDescriptor, inputs: Sequence[Source], outputs: Sequence[Source],
117                 mask: Optional[Source]) -> None:
118        output_ = outputs[0]
119        roi = to_roi(block)
120        output_[roi] = bic.utils.take_dict(mapping, output_[roi])
121        return None
122
123    return _compute
124
125
126def label(
127    input: SourceLike,
128    output: Optional[SourceLike] = None,
129    *,
130    threshold: Optional[float] = None,
131    connectivity: Optional[int] = None,
132    block_shape: Optional[Tuple[int, ...]] = None,
133    job_type: str = "local",
134    job_config: Optional[RunnerConfig] = None,
135    num_workers: int = 1,
136    mask: Optional[SourceLike] = None,
137) -> SourceLike:
138    """Label connected components of (optionally thresholded) data, block-wise.
139
140    Unlike the single-pass operations, ``label`` is multi-stage with a global cross-block merge
141    (per-block labeling, then a union-find over touching components across block faces), so it
142    does **not** accept ``block_ids`` or ``resume_from``: a failed run must be re-run whole (it is
143    idempotent given the same ``output``).
144
145    Args:
146        input: The input data (a numpy/zarr/n5 array or a `Source`).
147        output: The ``uint64`` output array to write the labels into. Optional for local
148            execution — a numpy array is allocated and returned if omitted; **required** for
149            distributed execution.
150        threshold: If given, the input is binarized as ``input > threshold``; otherwise the
151            input is treated as a binary foreground mask.
152        connectivity: Neighbour connectivity in ``[1, ndim]`` (``1`` = orthogonal). Defaults
153            to ``1``; values ``> 1`` are only supported for the direct (single-block) path.
154        block_shape: Shape of the processing blocks. Defaults to the input/output chunk shape;
155            required for unchunked data.
156        job_type: Execution backend: one of ``"local"``, ``"subprocess"`` or ``"slurm"``.
157        job_config: Backend configuration (a `RunnerConfig` / `SlurmConfig`).
158        num_workers: Number of parallel workers (threads for ``local``, tasks for distributed
159            backends).
160        mask: Optional binary mask; values outside the mask are excluded from the foreground.
161
162    Returns:
163        The output array (the provided ``output``, or a newly allocated numpy array), labeled
164        with consecutive ids (background stays ``0``).
165    """
166    src = as_source(input)
167    ndim = src.ndim
168    conn = 1 if connectivity is None else int(connectivity)
169    if not 1 <= conn <= ndim:
170        raise ValueError(f"connectivity must be in [1, {ndim}], got {conn}.")
171
172    direct = is_direct(job_type, num_workers, block_shape) and mask is None
173    if conn > 1 and not direct:
174        raise NotImplementedError(
175            "Block-wise labeling only supports connectivity=1 (orthogonal). Use the direct "
176            "path (local, single worker, no block_shape, no mask) for higher connectivity."
177        )
178
179    if output is None:
180        if job_type != "local":
181            raise ValueError(
182                f"'output' is required for distributed execution (job_type={job_type!r}); "
183                "pass a file-backed (zarr/n5) output array."
184            )
185        out_array: SourceLike = np.zeros(tuple(src.shape), dtype="uint64")
186    else:
187        out_array = output
188
189    out = as_source(out_array)
190    if out.dtype != np.dtype("uint64"):
191        raise ValueError(f"output must have dtype uint64, got {out.dtype}.")
192
193    if direct:
194        binary = _binarize(src[full_roi(ndim)], threshold)
195        comp = bic.segmentation.label(binary, connectivity=conn).astype("uint64", copy=False)
196        out[full_roi(ndim)] = comp
197        return out_array
198
199    block_shape = _resolve_block_shape(src, out, block_shape)
200    offset_factor = int(np.prod(block_shape))
201    blocking = get_blocking(src.shape, block_shape)
202    n_blocks = int(blocking.number_of_blocks)
203    if (n_blocks * offset_factor) >= int(np.iinfo(np.uint64).max):
204        raise ValueError(
205            "Label id overflow: number_of_blocks * prod(block_shape) exceeds uint64. "
206            "Reduce the block shape or the volume size."
207        )
208
209    runner = get_runner(job_type, job_config)
210
211    # Stage 1: label each block independently with a globally-unique offset.
212    stage1 = _make_stage1(tuple(src.shape), block_shape, conn, threshold, offset_factor)
213    id_results = runner.run(stage1, [input], outputs=[out_array], block_shape=block_shape,
214                            mask=mask, num_workers=num_workers, has_return_val=True,
215                            name="label-blocks")
216    id_arrays = [a for a in id_results if a is not None and len(a)]
217    real_labels = np.unique(np.concatenate(id_arrays)) if id_arrays else np.zeros((0,), dtype="uint64")
218    max_label = int(real_labels.max()) if real_labels.size else 0
219
220    # Stage 2: collect label equivalences across lower block faces.
221    stage2 = _make_stage2(tuple(src.shape), block_shape)
222    pair_results = runner.run(stage2, [out_array], block_shape=block_shape,
223                              num_workers=num_workers, has_return_val=True, name="merge-faces")
224    pairs = [p for p in pair_results if p is not None]
225    assignments = (np.unique(np.concatenate(pairs, axis=0), axis=0)
226                   if pairs else np.zeros((0, 2), dtype="uint64"))
227
228    # Stage 3 (in process): union-find merge, then relabel only the labels that exist to
229    # consecutive ids (the offset space is sparse, so relabeling over it would not be compact).
230    uf = bic.utils.UnionFind(max_label + 1)
231    if len(assignments):
232        uf.merge(assignments.astype("uint64"))
233    mapping: Dict[int, int] = {0: 0}
234    if real_labels.size:
235        roots = np.asarray(uf.find(real_labels.astype("uint64")))
236        root_to_new = {int(r): i + 1 for i, r in enumerate(np.unique(roots).tolist())}
237        for lab, root in zip(real_labels.tolist(), roots.tolist()):
238            mapping[int(lab)] = root_to_new[int(root)]
239
240    # Stage 4: apply the mapping in place.
241    stage4 = _make_stage4(mapping)
242    runner.run(stage4, [], outputs=[out_array], block_shape=block_shape,
243               num_workers=num_workers, has_return_val=False, name="relabel")
244    return out_array
def label( input: 'SourceLike', output: 'Optional[SourceLike]' = None, *, threshold: Optional[float] = None, connectivity: Optional[int] = None, block_shape: Optional[Tuple[int, ...]] = None, job_type: str = 'local', job_config: Optional[bioimage_py.runner.RunnerConfig] = None, num_workers: int = 1, mask: 'Optional[SourceLike]' = None) -> 'SourceLike':
127def label(
128    input: SourceLike,
129    output: Optional[SourceLike] = None,
130    *,
131    threshold: Optional[float] = None,
132    connectivity: Optional[int] = None,
133    block_shape: Optional[Tuple[int, ...]] = None,
134    job_type: str = "local",
135    job_config: Optional[RunnerConfig] = None,
136    num_workers: int = 1,
137    mask: Optional[SourceLike] = None,
138) -> SourceLike:
139    """Label connected components of (optionally thresholded) data, block-wise.
140
141    Unlike the single-pass operations, ``label`` is multi-stage with a global cross-block merge
142    (per-block labeling, then a union-find over touching components across block faces), so it
143    does **not** accept ``block_ids`` or ``resume_from``: a failed run must be re-run whole (it is
144    idempotent given the same ``output``).
145
146    Args:
147        input: The input data (a numpy/zarr/n5 array or a `Source`).
148        output: The ``uint64`` output array to write the labels into. Optional for local
149            execution — a numpy array is allocated and returned if omitted; **required** for
150            distributed execution.
151        threshold: If given, the input is binarized as ``input > threshold``; otherwise the
152            input is treated as a binary foreground mask.
153        connectivity: Neighbour connectivity in ``[1, ndim]`` (``1`` = orthogonal). Defaults
154            to ``1``; values ``> 1`` are only supported for the direct (single-block) path.
155        block_shape: Shape of the processing blocks. Defaults to the input/output chunk shape;
156            required for unchunked data.
157        job_type: Execution backend: one of ``"local"``, ``"subprocess"`` or ``"slurm"``.
158        job_config: Backend configuration (a `RunnerConfig` / `SlurmConfig`).
159        num_workers: Number of parallel workers (threads for ``local``, tasks for distributed
160            backends).
161        mask: Optional binary mask; values outside the mask are excluded from the foreground.
162
163    Returns:
164        The output array (the provided ``output``, or a newly allocated numpy array), labeled
165        with consecutive ids (background stays ``0``).
166    """
167    src = as_source(input)
168    ndim = src.ndim
169    conn = 1 if connectivity is None else int(connectivity)
170    if not 1 <= conn <= ndim:
171        raise ValueError(f"connectivity must be in [1, {ndim}], got {conn}.")
172
173    direct = is_direct(job_type, num_workers, block_shape) and mask is None
174    if conn > 1 and not direct:
175        raise NotImplementedError(
176            "Block-wise labeling only supports connectivity=1 (orthogonal). Use the direct "
177            "path (local, single worker, no block_shape, no mask) for higher connectivity."
178        )
179
180    if output is None:
181        if job_type != "local":
182            raise ValueError(
183                f"'output' is required for distributed execution (job_type={job_type!r}); "
184                "pass a file-backed (zarr/n5) output array."
185            )
186        out_array: SourceLike = np.zeros(tuple(src.shape), dtype="uint64")
187    else:
188        out_array = output
189
190    out = as_source(out_array)
191    if out.dtype != np.dtype("uint64"):
192        raise ValueError(f"output must have dtype uint64, got {out.dtype}.")
193
194    if direct:
195        binary = _binarize(src[full_roi(ndim)], threshold)
196        comp = bic.segmentation.label(binary, connectivity=conn).astype("uint64", copy=False)
197        out[full_roi(ndim)] = comp
198        return out_array
199
200    block_shape = _resolve_block_shape(src, out, block_shape)
201    offset_factor = int(np.prod(block_shape))
202    blocking = get_blocking(src.shape, block_shape)
203    n_blocks = int(blocking.number_of_blocks)
204    if (n_blocks * offset_factor) >= int(np.iinfo(np.uint64).max):
205        raise ValueError(
206            "Label id overflow: number_of_blocks * prod(block_shape) exceeds uint64. "
207            "Reduce the block shape or the volume size."
208        )
209
210    runner = get_runner(job_type, job_config)
211
212    # Stage 1: label each block independently with a globally-unique offset.
213    stage1 = _make_stage1(tuple(src.shape), block_shape, conn, threshold, offset_factor)
214    id_results = runner.run(stage1, [input], outputs=[out_array], block_shape=block_shape,
215                            mask=mask, num_workers=num_workers, has_return_val=True,
216                            name="label-blocks")
217    id_arrays = [a for a in id_results if a is not None and len(a)]
218    real_labels = np.unique(np.concatenate(id_arrays)) if id_arrays else np.zeros((0,), dtype="uint64")
219    max_label = int(real_labels.max()) if real_labels.size else 0
220
221    # Stage 2: collect label equivalences across lower block faces.
222    stage2 = _make_stage2(tuple(src.shape), block_shape)
223    pair_results = runner.run(stage2, [out_array], block_shape=block_shape,
224                              num_workers=num_workers, has_return_val=True, name="merge-faces")
225    pairs = [p for p in pair_results if p is not None]
226    assignments = (np.unique(np.concatenate(pairs, axis=0), axis=0)
227                   if pairs else np.zeros((0, 2), dtype="uint64"))
228
229    # Stage 3 (in process): union-find merge, then relabel only the labels that exist to
230    # consecutive ids (the offset space is sparse, so relabeling over it would not be compact).
231    uf = bic.utils.UnionFind(max_label + 1)
232    if len(assignments):
233        uf.merge(assignments.astype("uint64"))
234    mapping: Dict[int, int] = {0: 0}
235    if real_labels.size:
236        roots = np.asarray(uf.find(real_labels.astype("uint64")))
237        root_to_new = {int(r): i + 1 for i, r in enumerate(np.unique(roots).tolist())}
238        for lab, root in zip(real_labels.tolist(), roots.tolist()):
239            mapping[int(lab)] = root_to_new[int(root)]
240
241    # Stage 4: apply the mapping in place.
242    stage4 = _make_stage4(mapping)
243    runner.run(stage4, [], outputs=[out_array], block_shape=block_shape,
244               num_workers=num_workers, has_return_val=False, name="relabel")
245    return out_array

Label connected components of (optionally thresholded) data, block-wise.

Unlike the single-pass operations, label is multi-stage with a global cross-block merge (per-block labeling, then a union-find over touching components across block faces), so it does not accept block_ids or resume_from: a failed run must be re-run whole (it is idempotent given the same output).

Args: input: The input data (a numpy/zarr/n5 array or a Source). output: The uint64 output array to write the labels into. Optional for local execution — a numpy array is allocated and returned if omitted; required for distributed execution. threshold: If given, the input is binarized as input > threshold; otherwise the input is treated as a binary foreground mask. connectivity: Neighbour connectivity in [1, ndim] (1 = orthogonal). Defaults to 1; values > 1 are only supported for the direct (single-block) path. block_shape: Shape of the processing blocks. Defaults to the input/output chunk shape; required for unchunked data. job_type: Execution backend: one of "local", "subprocess" or "slurm". job_config: Backend configuration (a RunnerConfig / SlurmConfig). num_workers: Number of parallel workers (threads for local, tasks for distributed backends). mask: Optional binary mask; values outside the mask are excluded from the foreground.

Returns: The output array (the provided output, or a newly allocated numpy array), labeled with consecutive ids (background stays 0).