bioimage_py.stats.unique
Block-wise unique values (and optional counts) via the runner's return channel.
A reduction operation: each block computes the unique values it contains (and, optionally, their
counts) and the main process merges them. Without halo, the blocks partition the volume disjointly,
so the per-value counts are additive across blocks. The merge mirrors stats / contingency_table
-- the per-block results flow through runner.run(..., has_return_val=True) and the merge is pure
numpy -- so it behaves identically across the local / subprocess / slurm backends.
The count merge groups the stacked (value, count) rows with a single argsort + reduceat
(the 1-key variant of contingency_table's merge), rather than scattering into a dense
counts[max_id + 1] array, so it stays memory-safe for sparse, large label ids.
1"""Block-wise unique values (and optional counts) via the runner's return channel. 2 3A reduction operation: each block computes the unique values it contains (and, optionally, their 4counts) and the main process merges them. Without halo, the blocks partition the volume disjointly, 5so the per-value counts are additive across blocks. The merge mirrors ``stats`` / ``contingency_table`` 6-- the per-block results flow through ``runner.run(..., has_return_val=True)`` and the merge is pure 7numpy -- so it behaves identically across the ``local`` / ``subprocess`` / ``slurm`` backends. 8 9The count merge groups the stacked ``(value, count)`` rows with a single ``argsort`` + ``reduceat`` 10(the 1-key variant of ``contingency_table``'s merge), rather than scattering into a dense 11``counts[max_id + 1]`` array, so it stays memory-safe for sparse, large label ids. 12""" 13from __future__ import annotations 14 15from typing import List, Optional, Sequence, Tuple, Union 16 17import numpy as np 18 19from ..runner import get_runner 20from ..runner.config import RunnerConfig 21from ..sources import Source, SourceLike, as_source 22from ..util import BlockDescriptor, ComputeFn, check_direct, check_rerun_args, full_roi, to_roi 23 24__all__ = ["unique"] 25 26 27def _make_unique_block(return_counts: bool) -> ComputeFn: 28 """Build the per-block unique function (captures only the picklable ``return_counts`` flag).""" 29 30 def _compute(block: BlockDescriptor, inputs: Sequence[Source], outputs: Sequence[Source], 31 mask: Optional[Source]) -> Optional[Union[np.ndarray, Tuple[np.ndarray, np.ndarray]]]: 32 roi = to_roi(block) 33 d = inputs[0][roi] 34 if mask is not None: 35 block_mask = mask[roi].astype(bool) 36 if not block_mask.any(): 37 return None 38 d = d[block_mask] 39 if return_counts: 40 values, counts = np.unique(d, return_counts=True) 41 return values, counts.astype("int64") 42 return np.unique(d) 43 44 return _compute 45 46 47def _merge_unique(results: List, return_counts: bool, 48 dtype: np.dtype) -> Union[np.ndarray, Tuple[np.ndarray, np.ndarray]]: 49 """Merge per-block unique values (and counts) into the global result.""" 50 results = [r for r in results if r is not None] 51 if not return_counts: 52 if not results: 53 return np.zeros((0,), dtype=dtype) 54 return np.unique(np.concatenate(results)) 55 56 if not results: 57 return np.zeros((0,), dtype=dtype), np.zeros((0,), dtype="int64") 58 values = np.concatenate([r[0] for r in results]) 59 counts = np.concatenate([r[1] for r in results]) 60 order = np.argsort(values) 61 values, counts = values[order], counts[order] 62 starts = np.flatnonzero(np.concatenate(([True], values[1:] != values[:-1]))) 63 return values[starts], np.add.reduceat(counts, starts) 64 65 66def unique( 67 input: SourceLike, 68 return_counts: bool = False, 69 num_workers: int = 1, 70 block_shape: Optional[Tuple[int, ...]] = None, 71 job_type: str = "local", 72 job_config: Optional[RunnerConfig] = None, 73 mask: Optional[SourceLike] = None, 74 block_ids: Optional[Sequence[int]] = None, 75 resume_from: Optional[str] = None, 76) -> Union[np.ndarray, Tuple[np.ndarray, np.ndarray]]: 77 """Compute the unique values of the data, optionally with their counts. 78 79 Args: 80 input: The input data (a numpy/zarr/n5 array or a `Source`). 81 return_counts: Whether to also return the number of occurrences of each unique value. 82 num_workers: Number of parallel workers (threads for ``local``, tasks for distributed 83 backends). 84 block_shape: Shape of the processing blocks. Defaults to the input chunk shape; 85 required for unchunked data. 86 job_type: Execution backend: one of ``"local"``, ``"subprocess"`` or ``"slurm"``. 87 job_config: Backend configuration (a `RunnerConfig` / `SlurmConfig`). 88 mask: Optional binary mask; values outside the mask are excluded from the computation. 89 block_ids: Restrict processing to these block ids (e.g. to re-run previously failed blocks). 90 resume_from: Distributed only; the preserved temp folder of a failed run to resume and 91 merge (see ``runner.run``). Mutually exclusive with ``block_ids``. 92 93 Returns: 94 The sorted unique values. If ``return_counts`` is set, a ``(values, counts)`` tuple, where 95 ``counts`` is an ``int64`` array aligned with ``values``. 96 """ 97 check_rerun_args(job_type, resume_from, block_ids) 98 src = as_source(input) 99 if check_direct(job_type, num_workers, block_shape, mask, block_ids): 100 d = src[full_roi(src.ndim)] 101 if return_counts: 102 values, counts = np.unique(d, return_counts=True) 103 return values, counts.astype("int64") 104 return np.unique(d) 105 runner = get_runner(job_type, job_config) 106 results = runner.run(_make_unique_block(return_counts), [input], num_workers=num_workers, 107 block_shape=block_shape, mask=mask, block_ids=block_ids, 108 resume_from=resume_from, has_return_val=True, name="unique") 109 return _merge_unique(results, return_counts, np.dtype(src.dtype))
67def unique( 68 input: SourceLike, 69 return_counts: bool = False, 70 num_workers: int = 1, 71 block_shape: Optional[Tuple[int, ...]] = None, 72 job_type: str = "local", 73 job_config: Optional[RunnerConfig] = None, 74 mask: Optional[SourceLike] = None, 75 block_ids: Optional[Sequence[int]] = None, 76 resume_from: Optional[str] = None, 77) -> Union[np.ndarray, Tuple[np.ndarray, np.ndarray]]: 78 """Compute the unique values of the data, optionally with their counts. 79 80 Args: 81 input: The input data (a numpy/zarr/n5 array or a `Source`). 82 return_counts: Whether to also return the number of occurrences of each unique value. 83 num_workers: Number of parallel workers (threads for ``local``, tasks for distributed 84 backends). 85 block_shape: Shape of the processing blocks. Defaults to the input chunk shape; 86 required for unchunked data. 87 job_type: Execution backend: one of ``"local"``, ``"subprocess"`` or ``"slurm"``. 88 job_config: Backend configuration (a `RunnerConfig` / `SlurmConfig`). 89 mask: Optional binary mask; values outside the mask are excluded from the computation. 90 block_ids: Restrict processing to these block ids (e.g. to re-run previously failed blocks). 91 resume_from: Distributed only; the preserved temp folder of a failed run to resume and 92 merge (see ``runner.run``). Mutually exclusive with ``block_ids``. 93 94 Returns: 95 The sorted unique values. If ``return_counts`` is set, a ``(values, counts)`` tuple, where 96 ``counts`` is an ``int64`` array aligned with ``values``. 97 """ 98 check_rerun_args(job_type, resume_from, block_ids) 99 src = as_source(input) 100 if check_direct(job_type, num_workers, block_shape, mask, block_ids): 101 d = src[full_roi(src.ndim)] 102 if return_counts: 103 values, counts = np.unique(d, return_counts=True) 104 return values, counts.astype("int64") 105 return np.unique(d) 106 runner = get_runner(job_type, job_config) 107 results = runner.run(_make_unique_block(return_counts), [input], num_workers=num_workers, 108 block_shape=block_shape, mask=mask, block_ids=block_ids, 109 resume_from=resume_from, has_return_val=True, name="unique") 110 return _merge_unique(results, return_counts, np.dtype(src.dtype))
Compute the unique values of the data, optionally with their counts.
Args:
input: The input data (a numpy/zarr/n5 array or a Source).
return_counts: Whether to also return the number of occurrences of each unique value.
num_workers: Number of parallel workers (threads for local, tasks for distributed
backends).
block_shape: Shape of the processing blocks. Defaults to the input chunk shape;
required for unchunked data.
job_type: Execution backend: one of "local", "subprocess" or "slurm".
job_config: Backend configuration (a RunnerConfig / SlurmConfig).
mask: Optional binary mask; values outside the mask are excluded from the computation.
block_ids: Restrict processing to these block ids (e.g. to re-run previously failed blocks).
resume_from: Distributed only; the preserved temp folder of a failed run to resume and
merge (see runner.run). Mutually exclusive with block_ids.
Returns:
The sorted unique values. If return_counts is set, a (values, counts) tuple, where
counts is an int64 array aligned with values.