bioimage_py.stats.unique

Block-wise unique values (and optional counts) via the runner's return channel.

A reduction operation: each block computes the unique values it contains (and, optionally, their counts) and the main process merges them. Without halo, the blocks partition the volume disjointly, so the per-value counts are additive across blocks. The merge mirrors stats / contingency_table -- the per-block results flow through runner.run(..., has_return_val=True) and the merge is pure numpy -- so it behaves identically across the local / subprocess / slurm backends.

The count merge groups the stacked (value, count) rows with a single argsort + reduceat (the 1-key variant of contingency_table's merge), rather than scattering into a dense counts[max_id + 1] array, so it stays memory-safe for sparse, large label ids.

  1"""Block-wise unique values (and optional counts) via the runner's return channel.
  2
  3A reduction operation: each block computes the unique values it contains (and, optionally, their
  4counts) and the main process merges them. Without halo, the blocks partition the volume disjointly,
  5so the per-value counts are additive across blocks. The merge mirrors ``stats`` / ``contingency_table``
  6-- the per-block results flow through ``runner.run(..., has_return_val=True)`` and the merge is pure
  7numpy -- so it behaves identically across the ``local`` / ``subprocess`` / ``slurm`` backends.
  8
  9The count merge groups the stacked ``(value, count)`` rows with a single ``argsort`` + ``reduceat``
 10(the 1-key variant of ``contingency_table``'s merge), rather than scattering into a dense
 11``counts[max_id + 1]`` array, so it stays memory-safe for sparse, large label ids.
 12"""
 13from __future__ import annotations
 14
 15from typing import List, Optional, Sequence, Tuple, Union
 16
 17import numpy as np
 18
 19from ..runner import get_runner
 20from ..runner.config import RunnerConfig
 21from ..sources import Source, SourceLike, as_source
 22from ..util import BlockDescriptor, ComputeFn, check_direct, check_rerun_args, full_roi, to_roi
 23
 24__all__ = ["unique"]
 25
 26
 27def _make_unique_block(return_counts: bool) -> ComputeFn:
 28    """Build the per-block unique function (captures only the picklable ``return_counts`` flag)."""
 29
 30    def _compute(block: BlockDescriptor, inputs: Sequence[Source], outputs: Sequence[Source],
 31                 mask: Optional[Source]) -> Optional[Union[np.ndarray, Tuple[np.ndarray, np.ndarray]]]:
 32        roi = to_roi(block)
 33        d = inputs[0][roi]
 34        if mask is not None:
 35            block_mask = mask[roi].astype(bool)
 36            if not block_mask.any():
 37                return None
 38            d = d[block_mask]
 39        if return_counts:
 40            values, counts = np.unique(d, return_counts=True)
 41            return values, counts.astype("int64")
 42        return np.unique(d)
 43
 44    return _compute
 45
 46
 47def _merge_unique(results: List, return_counts: bool,
 48                  dtype: np.dtype) -> Union[np.ndarray, Tuple[np.ndarray, np.ndarray]]:
 49    """Merge per-block unique values (and counts) into the global result."""
 50    results = [r for r in results if r is not None]
 51    if not return_counts:
 52        if not results:
 53            return np.zeros((0,), dtype=dtype)
 54        return np.unique(np.concatenate(results))
 55
 56    if not results:
 57        return np.zeros((0,), dtype=dtype), np.zeros((0,), dtype="int64")
 58    values = np.concatenate([r[0] for r in results])
 59    counts = np.concatenate([r[1] for r in results])
 60    order = np.argsort(values)
 61    values, counts = values[order], counts[order]
 62    starts = np.flatnonzero(np.concatenate(([True], values[1:] != values[:-1])))
 63    return values[starts], np.add.reduceat(counts, starts)
 64
 65
 66def unique(
 67    input: SourceLike,
 68    return_counts: bool = False,
 69    num_workers: int = 1,
 70    block_shape: Optional[Tuple[int, ...]] = None,
 71    job_type: str = "local",
 72    job_config: Optional[RunnerConfig] = None,
 73    mask: Optional[SourceLike] = None,
 74    block_ids: Optional[Sequence[int]] = None,
 75    resume_from: Optional[str] = None,
 76) -> Union[np.ndarray, Tuple[np.ndarray, np.ndarray]]:
 77    """Compute the unique values of the data, optionally with their counts.
 78
 79    Args:
 80        input: The input data (a numpy/zarr/n5 array or a `Source`).
 81        return_counts: Whether to also return the number of occurrences of each unique value.
 82        num_workers: Number of parallel workers (threads for ``local``, tasks for distributed
 83            backends).
 84        block_shape: Shape of the processing blocks. Defaults to the input chunk shape;
 85            required for unchunked data.
 86        job_type: Execution backend: one of ``"local"``, ``"subprocess"`` or ``"slurm"``.
 87        job_config: Backend configuration (a `RunnerConfig` / `SlurmConfig`).
 88        mask: Optional binary mask; values outside the mask are excluded from the computation.
 89        block_ids: Restrict processing to these block ids (e.g. to re-run previously failed blocks).
 90        resume_from: Distributed only; the preserved temp folder of a failed run to resume and
 91            merge (see ``runner.run``). Mutually exclusive with ``block_ids``.
 92
 93    Returns:
 94        The sorted unique values. If ``return_counts`` is set, a ``(values, counts)`` tuple, where
 95        ``counts`` is an ``int64`` array aligned with ``values``.
 96    """
 97    check_rerun_args(job_type, resume_from, block_ids)
 98    src = as_source(input)
 99    if check_direct(job_type, num_workers, block_shape, mask, block_ids):
100        d = src[full_roi(src.ndim)]
101        if return_counts:
102            values, counts = np.unique(d, return_counts=True)
103            return values, counts.astype("int64")
104        return np.unique(d)
105    runner = get_runner(job_type, job_config)
106    results = runner.run(_make_unique_block(return_counts), [input], num_workers=num_workers,
107                         block_shape=block_shape, mask=mask, block_ids=block_ids,
108                         resume_from=resume_from, has_return_val=True, name="unique")
109    return _merge_unique(results, return_counts, np.dtype(src.dtype))
def unique( input: 'SourceLike', return_counts: bool = False, num_workers: int = 1, block_shape: Optional[Tuple[int, ...]] = None, job_type: str = 'local', job_config: Optional[bioimage_py.runner.RunnerConfig] = None, mask: 'Optional[SourceLike]' = None, block_ids: Optional[Sequence[int]] = None, resume_from: Optional[str] = None) -> Union[numpy.ndarray, Tuple[numpy.ndarray, numpy.ndarray]]:
 67def unique(
 68    input: SourceLike,
 69    return_counts: bool = False,
 70    num_workers: int = 1,
 71    block_shape: Optional[Tuple[int, ...]] = None,
 72    job_type: str = "local",
 73    job_config: Optional[RunnerConfig] = None,
 74    mask: Optional[SourceLike] = None,
 75    block_ids: Optional[Sequence[int]] = None,
 76    resume_from: Optional[str] = None,
 77) -> Union[np.ndarray, Tuple[np.ndarray, np.ndarray]]:
 78    """Compute the unique values of the data, optionally with their counts.
 79
 80    Args:
 81        input: The input data (a numpy/zarr/n5 array or a `Source`).
 82        return_counts: Whether to also return the number of occurrences of each unique value.
 83        num_workers: Number of parallel workers (threads for ``local``, tasks for distributed
 84            backends).
 85        block_shape: Shape of the processing blocks. Defaults to the input chunk shape;
 86            required for unchunked data.
 87        job_type: Execution backend: one of ``"local"``, ``"subprocess"`` or ``"slurm"``.
 88        job_config: Backend configuration (a `RunnerConfig` / `SlurmConfig`).
 89        mask: Optional binary mask; values outside the mask are excluded from the computation.
 90        block_ids: Restrict processing to these block ids (e.g. to re-run previously failed blocks).
 91        resume_from: Distributed only; the preserved temp folder of a failed run to resume and
 92            merge (see ``runner.run``). Mutually exclusive with ``block_ids``.
 93
 94    Returns:
 95        The sorted unique values. If ``return_counts`` is set, a ``(values, counts)`` tuple, where
 96        ``counts`` is an ``int64`` array aligned with ``values``.
 97    """
 98    check_rerun_args(job_type, resume_from, block_ids)
 99    src = as_source(input)
100    if check_direct(job_type, num_workers, block_shape, mask, block_ids):
101        d = src[full_roi(src.ndim)]
102        if return_counts:
103            values, counts = np.unique(d, return_counts=True)
104            return values, counts.astype("int64")
105        return np.unique(d)
106    runner = get_runner(job_type, job_config)
107    results = runner.run(_make_unique_block(return_counts), [input], num_workers=num_workers,
108                         block_shape=block_shape, mask=mask, block_ids=block_ids,
109                         resume_from=resume_from, has_return_val=True, name="unique")
110    return _merge_unique(results, return_counts, np.dtype(src.dtype))

Compute the unique values of the data, optionally with their counts.

Args: input: The input data (a numpy/zarr/n5 array or a Source). return_counts: Whether to also return the number of occurrences of each unique value. num_workers: Number of parallel workers (threads for local, tasks for distributed backends). block_shape: Shape of the processing blocks. Defaults to the input chunk shape; required for unchunked data. job_type: Execution backend: one of "local", "subprocess" or "slurm". job_config: Backend configuration (a RunnerConfig / SlurmConfig). mask: Optional binary mask; values outside the mask are excluded from the computation. block_ids: Restrict processing to these block ids (e.g. to re-run previously failed blocks). resume_from: Distributed only; the preserved temp folder of a failed run to resume and merge (see runner.run). Mutually exclusive with block_ids.

Returns: The sorted unique values. If return_counts is set, a (values, counts) tuple, where counts is an int64 array aligned with values.