bioimage_py.copy
Block-wise copy of one source into another.
Copies an input source into a writable output source block-wise, reusing the runner machinery.
This is the minimal array-output operation: there is no halo and no per-block computation, just a
read of each block from the input and a write to the output. Typical uses are converting between
storage formats (e.g. a tiff stack to zarr) and persisting an on-the-fly transformation (a
wrapper source) to file so the result is stored rather than recomputed on every read.
1"""Block-wise copy of one source into another. 2 3Copies an input source into a writable output source block-wise, reusing the runner machinery. 4This is the minimal array-output operation: there is no halo and no per-block computation, just a 5read of each block from the input and a write to the output. Typical uses are converting between 6storage formats (e.g. a tiff stack to zarr) and persisting an on-the-fly transformation (a 7``wrapper`` source) to file so the result is stored rather than recomputed on every read. 8""" 9from __future__ import annotations 10 11from typing import Optional, Sequence, Tuple 12 13import numpy as np 14 15from .runner import get_runner 16from .runner.config import RunnerConfig 17from .sources import Source, SourceLike, as_source 18from .util import BlockDescriptor, ComputeFn, check_rerun_args, full_roi, is_direct, same_array, to_roi 19 20__all__ = ["copy"] 21 22 23def _make_compute() -> ComputeFn: 24 """Build the per-block copy function (no captured state, so trivially cloudpickle-safe).""" 25 26 def _compute(block: BlockDescriptor, inputs: Sequence[Source], outputs: Sequence[Source], 27 mask: Optional[Source]) -> None: 28 input_, output_ = inputs[0], outputs[0] 29 roi = to_roi(block) 30 if mask is not None: 31 m = mask[roi].astype(bool) 32 if not m.any(): 33 return None 34 # Keep out-of-mask voxels of the output unchanged. 35 output_[roi] = np.where(m, input_[roi], output_[roi]) 36 return None 37 output_[roi] = input_[roi] 38 return None 39 40 return _compute 41 42 43def _copy_source( 44 input: SourceLike, 45 output: Optional[SourceLike], 46 *, 47 block_shape: Optional[Tuple[int, ...]], 48 job_type: str, 49 job_config: Optional[RunnerConfig], 50 num_workers: int, 51 mask: Optional[SourceLike], 52 name: str, 53 block_ids: Optional[Sequence[int]] = None, 54 resume_from: Optional[str] = None, 55) -> SourceLike: 56 """Materialize ``input`` into ``output`` block-wise (shared by :func:`copy` and downsample). 57 58 Handles output allocation (a numpy array for local execution, a required file-backed array for 59 distributed execution), the direct (whole-array) fast path, and the runner dispatch. The output 60 shape and dtype are taken from ``input`` (so a shape-changing wrapper input is handled too). 61 """ 62 check_rerun_args(job_type, resume_from, block_ids) 63 src = as_source(input) 64 ndim = src.ndim 65 # A subset/resume rerun is block-wise, so it cannot use the direct (whole-array) path. 66 direct = (is_direct(job_type, num_workers, block_shape) and mask is None 67 and block_ids is None and resume_from is None) 68 69 if output is None: 70 if job_type != "local": 71 raise ValueError( 72 f"'output' is required for distributed execution (job_type={job_type!r}); " 73 "pass a file-backed (zarr/n5) output array." 74 ) 75 out_array: SourceLike = np.zeros(tuple(src.shape), dtype=src.dtype) 76 else: 77 out_array = output 78 79 out = as_source(out_array) 80 if not direct and same_array(out, src): 81 raise ValueError(f"Block-wise {name} needs 'output' to differ from 'input'.") 82 83 if direct: 84 out[full_roi(out.ndim)] = src[full_roi(ndim)] 85 return out_array 86 87 compute = _make_compute() 88 runner = get_runner(job_type, job_config) 89 runner.run(compute, [src], outputs=[out], block_shape=block_shape, 90 mask=mask, num_workers=num_workers, block_ids=block_ids, resume_from=resume_from, 91 name=name) 92 return out_array 93 94 95def copy( 96 input: SourceLike, 97 output: Optional[SourceLike] = None, 98 *, 99 block_shape: Optional[Tuple[int, ...]] = None, 100 job_type: str = "local", 101 job_config: Optional[RunnerConfig] = None, 102 num_workers: int = 1, 103 mask: Optional[SourceLike] = None, 104 block_ids: Optional[Sequence[int]] = None, 105 resume_from: Optional[str] = None, 106) -> SourceLike: 107 """Copy a source into an output, block-wise. 108 109 The data is read from ``input`` and written into ``output`` one block at a time. The input may 110 be any source, including a read-only ``FileSource`` (e.g. a tiff stack) or a ``wrapper`` source 111 whose transformation is computed on read; copying it materializes the transformed data to the 112 output. The data is written into the output as-is, so the output array's dtype governs and a 113 cast is applied on assignment when it differs from the input dtype. 114 115 Args: 116 input: The input data to copy (a numpy/zarr/n5 array or a `Source`). 117 output: The output array to write into. Optional for local execution — a numpy array 118 matching the input shape and dtype is allocated and returned if omitted; **required** 119 for distributed execution, where it must be a writable, file-backed (zarr/n5) array. 120 block_shape: Shape of the processing blocks. Defaults to the input chunk shape; required 121 for unchunked data. 122 job_type: Execution backend: one of ``"local"``, ``"subprocess"`` or ``"slurm"``. 123 job_config: Backend configuration (a `RunnerConfig` / `SlurmConfig`). 124 num_workers: Number of parallel workers (threads for ``local``, tasks for distributed 125 backends). 126 mask: Optional binary mask; only voxels within the mask are copied (out-of-mask output 127 voxels are left unchanged). 128 block_ids: Restrict processing to these block ids (e.g. to re-run previously failed blocks 129 into the existing ``output``). Mutually exclusive with ``resume_from``. 130 resume_from: Distributed only; the preserved temp folder of a failed run to resume (see 131 ``runner.run``); the missing blocks are written into ``output``. Mutually exclusive 132 with ``block_ids``. 133 134 Returns: 135 The output array (the provided ``output``, or a newly allocated numpy array). 136 """ 137 return _copy_source(input, output, block_shape=block_shape, job_type=job_type, 138 job_config=job_config, num_workers=num_workers, mask=mask, name="copy", 139 block_ids=block_ids, resume_from=resume_from)
96def copy( 97 input: SourceLike, 98 output: Optional[SourceLike] = None, 99 *, 100 block_shape: Optional[Tuple[int, ...]] = None, 101 job_type: str = "local", 102 job_config: Optional[RunnerConfig] = None, 103 num_workers: int = 1, 104 mask: Optional[SourceLike] = None, 105 block_ids: Optional[Sequence[int]] = None, 106 resume_from: Optional[str] = None, 107) -> SourceLike: 108 """Copy a source into an output, block-wise. 109 110 The data is read from ``input`` and written into ``output`` one block at a time. The input may 111 be any source, including a read-only ``FileSource`` (e.g. a tiff stack) or a ``wrapper`` source 112 whose transformation is computed on read; copying it materializes the transformed data to the 113 output. The data is written into the output as-is, so the output array's dtype governs and a 114 cast is applied on assignment when it differs from the input dtype. 115 116 Args: 117 input: The input data to copy (a numpy/zarr/n5 array or a `Source`). 118 output: The output array to write into. Optional for local execution — a numpy array 119 matching the input shape and dtype is allocated and returned if omitted; **required** 120 for distributed execution, where it must be a writable, file-backed (zarr/n5) array. 121 block_shape: Shape of the processing blocks. Defaults to the input chunk shape; required 122 for unchunked data. 123 job_type: Execution backend: one of ``"local"``, ``"subprocess"`` or ``"slurm"``. 124 job_config: Backend configuration (a `RunnerConfig` / `SlurmConfig`). 125 num_workers: Number of parallel workers (threads for ``local``, tasks for distributed 126 backends). 127 mask: Optional binary mask; only voxels within the mask are copied (out-of-mask output 128 voxels are left unchanged). 129 block_ids: Restrict processing to these block ids (e.g. to re-run previously failed blocks 130 into the existing ``output``). Mutually exclusive with ``resume_from``. 131 resume_from: Distributed only; the preserved temp folder of a failed run to resume (see 132 ``runner.run``); the missing blocks are written into ``output``. Mutually exclusive 133 with ``block_ids``. 134 135 Returns: 136 The output array (the provided ``output``, or a newly allocated numpy array). 137 """ 138 return _copy_source(input, output, block_shape=block_shape, job_type=job_type, 139 job_config=job_config, num_workers=num_workers, mask=mask, name="copy", 140 block_ids=block_ids, resume_from=resume_from)
Copy a source into an output, block-wise.
The data is read from input and written into output one block at a time. The input may
be any source, including a read-only FileSource (e.g. a tiff stack) or a wrapper source
whose transformation is computed on read; copying it materializes the transformed data to the
output. The data is written into the output as-is, so the output array's dtype governs and a
cast is applied on assignment when it differs from the input dtype.
Args:
input: The input data to copy (a numpy/zarr/n5 array or a Source).
output: The output array to write into. Optional for local execution — a numpy array
matching the input shape and dtype is allocated and returned if omitted; required
for distributed execution, where it must be a writable, file-backed (zarr/n5) array.
block_shape: Shape of the processing blocks. Defaults to the input chunk shape; required
for unchunked data.
job_type: Execution backend: one of "local", "subprocess" or "slurm".
job_config: Backend configuration (a RunnerConfig / SlurmConfig).
num_workers: Number of parallel workers (threads for local, tasks for distributed
backends).
mask: Optional binary mask; only voxels within the mask are copied (out-of-mask output
voxels are left unchanged).
block_ids: Restrict processing to these block ids (e.g. to re-run previously failed blocks
into the existing output). Mutually exclusive with resume_from.
resume_from: Distributed only; the preserved temp folder of a failed run to resume (see
runner.run); the missing blocks are written into output. Mutually exclusive
with block_ids.
Returns:
The output array (the provided output, or a newly allocated numpy array).