bioimage_py.operations
Block-wise element-wise operations (arithmetic, comparison, membership).
1"""Block-wise element-wise operations (arithmetic, comparison, membership).""" 2from .operations import ( 3 add, 4 apply_operation, 5 divide, 6 greater, 7 greater_equal, 8 isin, 9 less, 10 less_equal, 11 maximum, 12 minimum, 13 multiply, 14 subtract, 15) 16 17__all__ = [ 18 "apply_operation", 19 "isin", 20 "add", 21 "subtract", 22 "multiply", 23 "divide", 24 "greater", 25 "greater_equal", 26 "less", 27 "less_equal", 28 "minimum", 29 "maximum", 30]
95def apply_operation( 96 x: SourceLike, 97 y: Union[SourceLike, Number], 98 operation: Union[str, Callable], 99 output: Optional[SourceLike] = None, 100 *, 101 block_shape: Optional[Tuple[int, ...]] = None, 102 job_type: str = "local", 103 job_config: Optional[RunnerConfig] = None, 104 num_workers: int = 1, 105 mask: Optional[SourceLike] = None, 106 block_ids: Optional[Sequence[int]] = None, 107 resume_from: Optional[str] = None, 108) -> SourceLike: 109 """Apply a binary operation to two operands block-wise. 110 111 Args: 112 x: The first operand (a numpy/zarr/n5 array or a `Source`). 113 y: The second operand: a scalar, or an array/source. An array operand must have the same 114 number of dimensions as ``x``; it may either match ``x``'s shape or be broadcastable to it 115 (size-1 dimensions). A same-shape array operand is read block-wise (so it must be 116 file-backed for distributed execution); a scalar or broadcast operand is captured into the 117 worker closure. 118 operation: The binary operation: a numpy function name (e.g. ``"add"``) or a picklable callable 119 ``operation(x_block, y_block)``. 120 output: The output array to write into. Optional for local execution -- a numpy array (dtype 121 inferred from the operation) is allocated and returned if omitted; **required** for 122 distributed execution. May be ``x`` itself for an in-place computation. 123 block_shape: Shape of the processing blocks. Defaults to the input chunk shape; required 124 for unchunked data. 125 job_type: Execution backend: one of ``"local"``, ``"subprocess"`` or ``"slurm"``. 126 job_config: Backend configuration (a `RunnerConfig` / `SlurmConfig`). 127 num_workers: Number of parallel workers (threads for ``local``, tasks for distributed 128 backends). 129 mask: Optional binary mask; out-of-mask output voxels are left unchanged. Not supported 130 together with a broadcast operand. 131 block_ids: Restrict processing to these block ids (e.g. to re-run previously failed blocks). 132 Mutually exclusive with ``resume_from``. 133 resume_from: Distributed only; the preserved temp folder of a failed run to resume (see 134 ``runner.run``). Mutually exclusive with ``block_ids``. 135 136 Returns: 137 The output array (the provided ``output``, or a newly allocated numpy array). 138 """ 139 check_rerun_args(job_type, resume_from, block_ids) 140 x_src = as_source(x) 141 ndim = x_src.ndim 142 op = _resolve_op(operation) 143 144 scalar_y: Optional[Number] = None 145 broadcast_y: Optional[np.ndarray] = None 146 broadcast_axes: Optional[List[bool]] = None 147 y_src: Optional[Source] = None 148 if isinstance(y, Number): 149 mode = "scalar" 150 scalar_y = y 151 inputs: List[SourceLike] = [x] 152 else: 153 y_src = as_source(y) 154 if y_src.ndim != ndim: 155 raise ValueError(f"operand dimensionalities differ: {ndim} vs {y_src.ndim}.") 156 if tuple(y_src.shape) == tuple(x_src.shape): 157 mode = "array" 158 inputs = [x, y] 159 else: 160 mode = "broadcast" 161 broadcast_axes = _broadcast_axes(x_src.shape, y_src.shape) 162 broadcast_y = np.asarray(y_src[full_roi(ndim)]) 163 inputs = [x] 164 if mode == "broadcast" and mask is not None: 165 raise NotImplementedError("Combining a broadcast operand with a mask is not supported.") 166 167 direct = (is_direct(job_type, num_workers, block_shape) and mask is None 168 and block_ids is None and resume_from is None) 169 170 if output is None: 171 if job_type != "local": 172 raise ValueError( 173 f"'output' is required for distributed execution (job_type={job_type!r}); " 174 "pass a file-backed (zarr/n5) output array." 175 ) 176 out_dtype = _result_dtype(op, x_src, mode, scalar_y, broadcast_y, y_src) 177 out_array: SourceLike = np.zeros(tuple(x_src.shape), dtype=out_dtype) 178 else: 179 out_array = output 180 out = as_source(out_array) 181 182 if direct: 183 x_full = x_src[full_roi(ndim)] 184 if mode == "scalar": 185 res = op(x_full, scalar_y) 186 elif mode == "array": 187 res = op(x_full, y_src[full_roi(ndim)]) 188 else: 189 res = op(x_full, broadcast_y) 190 out[full_roi(out.ndim)] = res 191 return out_array 192 193 runner = get_runner(job_type, job_config) 194 name = operation if isinstance(operation, str) else "apply_operation" 195 runner.run(_make_apply(operation, mode, scalar_y, broadcast_y, broadcast_axes), 196 inputs, outputs=[out_array], block_shape=block_shape, mask=mask, 197 num_workers=num_workers, block_ids=block_ids, resume_from=resume_from, name=name) 198 return out_array
Apply a binary operation to two operands block-wise.
Args:
x: The first operand (a numpy/zarr/n5 array or a Source).
y: The second operand: a scalar, or an array/source. An array operand must have the same
number of dimensions as x; it may either match x's shape or be broadcastable to it
(size-1 dimensions). A same-shape array operand is read block-wise (so it must be
file-backed for distributed execution); a scalar or broadcast operand is captured into the
worker closure.
operation: The binary operation: a numpy function name (e.g. "add") or a picklable callable
operation(x_block, y_block).
output: The output array to write into. Optional for local execution -- a numpy array (dtype
inferred from the operation) is allocated and returned if omitted; required for
distributed execution. May be x itself for an in-place computation.
block_shape: Shape of the processing blocks. Defaults to the input chunk shape; required
for unchunked data.
job_type: Execution backend: one of "local", "subprocess" or "slurm".
job_config: Backend configuration (a RunnerConfig / SlurmConfig).
num_workers: Number of parallel workers (threads for local, tasks for distributed
backends).
mask: Optional binary mask; out-of-mask output voxels are left unchanged. Not supported
together with a broadcast operand.
block_ids: Restrict processing to these block ids (e.g. to re-run previously failed blocks).
Mutually exclusive with resume_from.
resume_from: Distributed only; the preserved temp folder of a failed run to resume (see
runner.run). Mutually exclusive with block_ids.
Returns:
The output array (the provided output, or a newly allocated numpy array).
219def isin( 220 x: SourceLike, 221 test_values: Union[ArrayLike, Number], 222 output: Optional[SourceLike] = None, 223 *, 224 block_shape: Optional[Tuple[int, ...]] = None, 225 job_type: str = "local", 226 job_config: Optional[RunnerConfig] = None, 227 num_workers: int = 1, 228 mask: Optional[SourceLike] = None, 229 block_ids: Optional[Sequence[int]] = None, 230 resume_from: Optional[str] = None, 231) -> SourceLike: 232 """Compute ``np.isin(x, test_values)`` block-wise (a boolean membership mask). 233 234 Args: 235 x: The input data (a numpy/zarr/n5 array or a `Source`). 236 test_values: The values to test membership against (a scalar, list or array); captured into 237 the worker closure. 238 output: The boolean output array to write into. Optional for local execution -- a boolean 239 numpy array is allocated and returned if omitted; **required** for distributed execution. 240 block_shape: Shape of the processing blocks. Defaults to the input chunk shape; required 241 for unchunked data. 242 job_type: Execution backend: one of ``"local"``, ``"subprocess"`` or ``"slurm"``. 243 job_config: Backend configuration (a `RunnerConfig` / `SlurmConfig`). 244 num_workers: Number of parallel workers (threads for ``local``, tasks for distributed 245 backends). 246 mask: Optional binary mask; out-of-mask output voxels are left unchanged. 247 block_ids: Restrict processing to these block ids (e.g. to re-run previously failed blocks). 248 Mutually exclusive with ``resume_from``. 249 resume_from: Distributed only; the preserved temp folder of a failed run to resume (see 250 ``runner.run``). Mutually exclusive with ``block_ids``. 251 252 Returns: 253 The boolean output array (the provided ``output``, or a newly allocated numpy array). 254 """ 255 check_rerun_args(job_type, resume_from, block_ids) 256 x_src = as_source(x) 257 ndim = x_src.ndim 258 values = test_values if isinstance(test_values, Number) else np.asarray(test_values) 259 260 direct = (is_direct(job_type, num_workers, block_shape) and mask is None 261 and block_ids is None and resume_from is None) 262 263 if output is None: 264 if job_type != "local": 265 raise ValueError( 266 f"'output' is required for distributed execution (job_type={job_type!r}); " 267 "pass a file-backed (zarr/n5) output array." 268 ) 269 out_array: SourceLike = np.zeros(tuple(x_src.shape), dtype=bool) 270 else: 271 out_array = output 272 out = as_source(out_array) 273 274 if direct: 275 out[full_roi(out.ndim)] = np.isin(x_src[full_roi(ndim)], values) 276 return out_array 277 278 runner = get_runner(job_type, job_config) 279 runner.run(_make_isin(values), [x], outputs=[out_array], block_shape=block_shape, mask=mask, 280 num_workers=num_workers, block_ids=block_ids, resume_from=resume_from, name="isin") 281 return out_array
Compute np.isin(x, test_values) block-wise (a boolean membership mask).
Args:
x: The input data (a numpy/zarr/n5 array or a Source).
test_values: The values to test membership against (a scalar, list or array); captured into
the worker closure.
output: The boolean output array to write into. Optional for local execution -- a boolean
numpy array is allocated and returned if omitted; required for distributed execution.
block_shape: Shape of the processing blocks. Defaults to the input chunk shape; required
for unchunked data.
job_type: Execution backend: one of "local", "subprocess" or "slurm".
job_config: Backend configuration (a RunnerConfig / SlurmConfig).
num_workers: Number of parallel workers (threads for local, tasks for distributed
backends).
mask: Optional binary mask; out-of-mask output voxels are left unchanged.
block_ids: Restrict processing to these block ids (e.g. to re-run previously failed blocks).
Mutually exclusive with resume_from.
resume_from: Distributed only; the preserved temp folder of a failed run to resume (see
runner.run). Mutually exclusive with block_ids.
Returns:
The boolean output array (the provided output, or a newly allocated numpy array).
287 def op( 288 x: SourceLike, 289 y: Union[SourceLike, Number], 290 output: Optional[SourceLike] = None, 291 *, 292 block_shape: Optional[Tuple[int, ...]] = None, 293 job_type: str = "local", 294 job_config: Optional[RunnerConfig] = None, 295 num_workers: int = 1, 296 mask: Optional[SourceLike] = None, 297 block_ids: Optional[Sequence[int]] = None, 298 resume_from: Optional[str] = None, 299 ) -> SourceLike: 300 return apply_operation(x, y, op_name, output, block_shape=block_shape, job_type=job_type, 301 job_config=job_config, num_workers=num_workers, mask=mask, 302 block_ids=block_ids, resume_from=resume_from)
Apply np.add to two operands block-wise.
A thin wrapper over apply_operation() with operation='add'; see it for the full
parameter documentation (operands, output handling, backends, mask and re-run options).
287 def op( 288 x: SourceLike, 289 y: Union[SourceLike, Number], 290 output: Optional[SourceLike] = None, 291 *, 292 block_shape: Optional[Tuple[int, ...]] = None, 293 job_type: str = "local", 294 job_config: Optional[RunnerConfig] = None, 295 num_workers: int = 1, 296 mask: Optional[SourceLike] = None, 297 block_ids: Optional[Sequence[int]] = None, 298 resume_from: Optional[str] = None, 299 ) -> SourceLike: 300 return apply_operation(x, y, op_name, output, block_shape=block_shape, job_type=job_type, 301 job_config=job_config, num_workers=num_workers, mask=mask, 302 block_ids=block_ids, resume_from=resume_from)
Apply np.subtract to two operands block-wise.
A thin wrapper over apply_operation() with operation='subtract'; see it for the full
parameter documentation (operands, output handling, backends, mask and re-run options).
287 def op( 288 x: SourceLike, 289 y: Union[SourceLike, Number], 290 output: Optional[SourceLike] = None, 291 *, 292 block_shape: Optional[Tuple[int, ...]] = None, 293 job_type: str = "local", 294 job_config: Optional[RunnerConfig] = None, 295 num_workers: int = 1, 296 mask: Optional[SourceLike] = None, 297 block_ids: Optional[Sequence[int]] = None, 298 resume_from: Optional[str] = None, 299 ) -> SourceLike: 300 return apply_operation(x, y, op_name, output, block_shape=block_shape, job_type=job_type, 301 job_config=job_config, num_workers=num_workers, mask=mask, 302 block_ids=block_ids, resume_from=resume_from)
Apply np.multiply to two operands block-wise.
A thin wrapper over apply_operation() with operation='multiply'; see it for the full
parameter documentation (operands, output handling, backends, mask and re-run options).
287 def op( 288 x: SourceLike, 289 y: Union[SourceLike, Number], 290 output: Optional[SourceLike] = None, 291 *, 292 block_shape: Optional[Tuple[int, ...]] = None, 293 job_type: str = "local", 294 job_config: Optional[RunnerConfig] = None, 295 num_workers: int = 1, 296 mask: Optional[SourceLike] = None, 297 block_ids: Optional[Sequence[int]] = None, 298 resume_from: Optional[str] = None, 299 ) -> SourceLike: 300 return apply_operation(x, y, op_name, output, block_shape=block_shape, job_type=job_type, 301 job_config=job_config, num_workers=num_workers, mask=mask, 302 block_ids=block_ids, resume_from=resume_from)
Apply np.divide to two operands block-wise.
A thin wrapper over apply_operation() with operation='divide'; see it for the full
parameter documentation (operands, output handling, backends, mask and re-run options).
287 def op( 288 x: SourceLike, 289 y: Union[SourceLike, Number], 290 output: Optional[SourceLike] = None, 291 *, 292 block_shape: Optional[Tuple[int, ...]] = None, 293 job_type: str = "local", 294 job_config: Optional[RunnerConfig] = None, 295 num_workers: int = 1, 296 mask: Optional[SourceLike] = None, 297 block_ids: Optional[Sequence[int]] = None, 298 resume_from: Optional[str] = None, 299 ) -> SourceLike: 300 return apply_operation(x, y, op_name, output, block_shape=block_shape, job_type=job_type, 301 job_config=job_config, num_workers=num_workers, mask=mask, 302 block_ids=block_ids, resume_from=resume_from)
Apply np.greater to two operands block-wise.
A thin wrapper over apply_operation() with operation='greater'; see it for the full
parameter documentation (operands, output handling, backends, mask and re-run options).
287 def op( 288 x: SourceLike, 289 y: Union[SourceLike, Number], 290 output: Optional[SourceLike] = None, 291 *, 292 block_shape: Optional[Tuple[int, ...]] = None, 293 job_type: str = "local", 294 job_config: Optional[RunnerConfig] = None, 295 num_workers: int = 1, 296 mask: Optional[SourceLike] = None, 297 block_ids: Optional[Sequence[int]] = None, 298 resume_from: Optional[str] = None, 299 ) -> SourceLike: 300 return apply_operation(x, y, op_name, output, block_shape=block_shape, job_type=job_type, 301 job_config=job_config, num_workers=num_workers, mask=mask, 302 block_ids=block_ids, resume_from=resume_from)
Apply np.greater_equal to two operands block-wise.
A thin wrapper over apply_operation() with operation='greater_equal'; see it for the full
parameter documentation (operands, output handling, backends, mask and re-run options).
287 def op( 288 x: SourceLike, 289 y: Union[SourceLike, Number], 290 output: Optional[SourceLike] = None, 291 *, 292 block_shape: Optional[Tuple[int, ...]] = None, 293 job_type: str = "local", 294 job_config: Optional[RunnerConfig] = None, 295 num_workers: int = 1, 296 mask: Optional[SourceLike] = None, 297 block_ids: Optional[Sequence[int]] = None, 298 resume_from: Optional[str] = None, 299 ) -> SourceLike: 300 return apply_operation(x, y, op_name, output, block_shape=block_shape, job_type=job_type, 301 job_config=job_config, num_workers=num_workers, mask=mask, 302 block_ids=block_ids, resume_from=resume_from)
Apply np.less to two operands block-wise.
A thin wrapper over apply_operation() with operation='less'; see it for the full
parameter documentation (operands, output handling, backends, mask and re-run options).
287 def op( 288 x: SourceLike, 289 y: Union[SourceLike, Number], 290 output: Optional[SourceLike] = None, 291 *, 292 block_shape: Optional[Tuple[int, ...]] = None, 293 job_type: str = "local", 294 job_config: Optional[RunnerConfig] = None, 295 num_workers: int = 1, 296 mask: Optional[SourceLike] = None, 297 block_ids: Optional[Sequence[int]] = None, 298 resume_from: Optional[str] = None, 299 ) -> SourceLike: 300 return apply_operation(x, y, op_name, output, block_shape=block_shape, job_type=job_type, 301 job_config=job_config, num_workers=num_workers, mask=mask, 302 block_ids=block_ids, resume_from=resume_from)
Apply np.less_equal to two operands block-wise.
A thin wrapper over apply_operation() with operation='less_equal'; see it for the full
parameter documentation (operands, output handling, backends, mask and re-run options).
287 def op( 288 x: SourceLike, 289 y: Union[SourceLike, Number], 290 output: Optional[SourceLike] = None, 291 *, 292 block_shape: Optional[Tuple[int, ...]] = None, 293 job_type: str = "local", 294 job_config: Optional[RunnerConfig] = None, 295 num_workers: int = 1, 296 mask: Optional[SourceLike] = None, 297 block_ids: Optional[Sequence[int]] = None, 298 resume_from: Optional[str] = None, 299 ) -> SourceLike: 300 return apply_operation(x, y, op_name, output, block_shape=block_shape, job_type=job_type, 301 job_config=job_config, num_workers=num_workers, mask=mask, 302 block_ids=block_ids, resume_from=resume_from)
Apply np.minimum to two operands block-wise.
A thin wrapper over apply_operation() with operation='minimum'; see it for the full
parameter documentation (operands, output handling, backends, mask and re-run options).
287 def op( 288 x: SourceLike, 289 y: Union[SourceLike, Number], 290 output: Optional[SourceLike] = None, 291 *, 292 block_shape: Optional[Tuple[int, ...]] = None, 293 job_type: str = "local", 294 job_config: Optional[RunnerConfig] = None, 295 num_workers: int = 1, 296 mask: Optional[SourceLike] = None, 297 block_ids: Optional[Sequence[int]] = None, 298 resume_from: Optional[str] = None, 299 ) -> SourceLike: 300 return apply_operation(x, y, op_name, output, block_shape=block_shape, job_type=job_type, 301 job_config=job_config, num_workers=num_workers, mask=mask, 302 block_ids=block_ids, resume_from=resume_from)
Apply np.maximum to two operands block-wise.
A thin wrapper over apply_operation() with operation='maximum'; see it for the full
parameter documentation (operands, output handling, backends, mask and re-run options).