bioimage_py.runner

Runner implementations: local, subprocess (distributed protocol), and slurm (stub).

 1"""Runner implementations: local, subprocess (distributed protocol), and slurm (stub)."""
 2from .base import LocalRunner, Runner, RunnerError, run_block
 3from .config import RunnerConfig, SlurmConfig, config_file_path, write_slurm_config
 4from .distributed import SlurmRunner, SubprocessRunner
 5from .factory import get_runner
 6
 7__all__ = [
 8    "Runner",
 9    "LocalRunner",
10    "SubprocessRunner",
11    "SlurmRunner",
12    "RunnerError",
13    "RunnerConfig",
14    "SlurmConfig",
15    "config_file_path",
16    "write_slurm_config",
17    "get_runner",
18    "run_block",
19]
class Runner(abc.ABC):
 62class Runner(ABC):
 63    """Abstract runner. Subclasses implement :meth:`_execute` for a specific backend."""
 64
 65    def __init__(self, config: Optional[RunnerConfig] = None):
 66        self.config = config or RunnerConfig()
 67
 68    def run(
 69        self,
 70        function: ComputeFn,
 71        inputs: Sequence[SourceLike],
 72        outputs: Sequence[SourceLike] = (),
 73        *,
 74        block_shape: Optional[Tuple[int, ...]] = None,
 75        halo: Optional[Sequence[int]] = None,
 76        mask: Optional[SourceLike] = None,
 77        num_workers: int = 1,
 78        block_ids: Optional[Sequence[int]] = None,
 79        has_return_val: bool = False,
 80        name: str = "",
 81        roi: Optional[Tuple[slice, ...]] = None,
 82        pre_cleanup: Optional[Callable[[str], None]] = None,
 83        resume_from: Optional[str] = None,
 84    ) -> Optional[list]:
 85        """Run ``function`` block-wise over the inputs/outputs.
 86
 87        Args:
 88            function: Per-block function ``function(block, inputs, outputs, mask)``.
 89            inputs: Input source-like objects (read).
 90            outputs: Output source-like objects (written in place).
 91            block_shape: Block shape; defaults to the domain source's chunks.
 92            halo: Per-axis halo; if given, ``function`` receives a ``BlockWithHalo``.
 93            mask: Optional binary mask source.
 94            num_workers: Number of parallel workers / tasks.
 95            block_ids: Restrict processing to these blocks (for re-running failures).
 96            has_return_val: Whether ``function`` returns a value to collect.
 97            name: A short name for progress display.
 98            roi: Region of interest to restrict the blocking to.
 99            pre_cleanup: Optional callback ``pre_cleanup(tmp_folder)`` invoked on the
100                orchestrating process with the job temp folder right before it is deleted
101                (distributed backends only, success path only). Use it to read out anything
102                worth keeping from the temp folder (e.g. the per-task timing files under
103                ``tmp_folder/timings/``) before cleanup. Ignored by the local runner, which
104                has no temp folder.
105            resume_from: Distributed backends only. Path to the preserved temp folder of a
106                failed run (``RunnerError.tmp_folder``). Re-runs only the blocks that did not
107                complete and merges them with the already-completed ones, so the result is
108                correct and complete. The run is resumed from the serialized payload, so
109                ``function``/``inputs``/``outputs``/``block_shape``/... from this call are
110                **ignored** -- pass ``resume_from`` to *finish the same call*, not to start a
111                new one. Mutually exclusive with ``block_ids``.
112
113        Returns:
114            The list of per-block return values (in ``block_ids`` order) if
115            ``has_return_val``, else ``None``.
116        """
117        if resume_from is not None:
118            if block_ids is not None:
119                raise ValueError("resume_from and block_ids are mutually exclusive; resume_from "
120                                 "re-runs the original partition's un-done blocks.")
121            return self._resume_entry(resume_from, name=name, pre_cleanup=pre_cleanup)
122
123        inputs = [as_source(i) for i in inputs]
124        outputs = [as_source(o) for o in outputs]
125        mask_source = as_source(mask) if mask is not None else None
126
127        domain = inputs[0] if inputs else (outputs[0] if outputs else None)
128        if domain is None:
129            raise ValueError("run() requires at least one input or output source.")
130
131        # Shape consistency: all inputs and the mask must match the domain shape.
132        dom_shape = tuple(domain.shape)
133        for src in inputs + ([mask_source] if mask_source is not None else []):
134            if tuple(src.shape) != dom_shape:
135                raise ValueError(
136                    f"Shape mismatch: source with shape {src.shape} does not match the "
137                    f"domain shape {domain.shape}."
138                )
139        # Outputs may carry a leading channel axis, but their trailing spatial dims must match
140        # the domain so the per-block roi indexes them consistently.
141        for out in outputs:
142            out_shape = tuple(out.shape)
143            if out_shape[-len(dom_shape):] != dom_shape:
144                raise ValueError(
145                    f"Output shape {out_shape} is incompatible with the domain shape "
146                    f"{dom_shape}: its trailing dimensions must match the domain."
147                )
148
149        block_shape = derive_block_shape(domain, block_shape)
150        halo_n = normalize_halo(halo, domain.ndim) if halo is not None else None
151        self._validate_write_safety(outputs, block_shape)
152
153        blocking = get_blocking(domain.shape, block_shape, roi)
154        if block_ids is None:
155            block_ids = list(range(int(blocking.number_of_blocks)))
156        else:
157            block_ids = [int(b) for b in block_ids]
158
159        results = self._execute(
160            function=function, inputs=inputs, outputs=outputs, mask=mask_source,
161            blocking=blocking, block_ids=block_ids, halo=halo_n,
162            has_return_val=has_return_val, num_workers=num_workers, name=name,
163            shape=tuple(domain.shape), block_shape=block_shape, roi=roi,
164            pre_cleanup=pre_cleanup,
165        )
166        return results if has_return_val else None
167
168    def map(
169        self,
170        function: Callable[[int], Any],
171        n_items: Optional[int] = None,
172        *,
173        item_ids: Optional[Sequence[int]] = None,
174        num_workers: int = 1,
175        has_return_val: bool = True,
176        name: str = "",
177        pre_cleanup: Optional[Callable[[str], None]] = None,
178        resume_from: Optional[str] = None,
179    ) -> Optional[list]:
180        """Map ``function(index)`` over item indices in parallel, across any backend.
181
182        Unlike :meth:`run`, this is not block-wise: there is no domain, blocking, sources or
183        mask. ``function`` takes a single integer index and returns its result; it must carry
184        whatever data it needs in its (cloudpickled) closure — e.g. a `SourceSpec` it reopens
185        and a file path it reads. This is the per-item counterpart used by per-object
186        workflows.
187
188        Args:
189            function: The per-item function ``function(index) -> result``.
190            n_items: The number of items; indices ``0 .. n_items - 1`` are processed. Ignored
191                if ``item_ids`` is given.
192            item_ids: Explicit item indices to process (e.g. to re-run failures). Defaults to
193                ``range(n_items)``.
194            num_workers: Number of parallel workers / tasks.
195            has_return_val: Whether ``function`` returns a value to collect.
196            name: A short name for progress display.
197            pre_cleanup: Optional ``pre_cleanup(tmp_folder)`` callback (distributed backends
198                only); see :meth:`run`.
199            resume_from: Distributed backends only; the preserved temp folder of a failed run
200                (see :meth:`run`). Re-runs only the incomplete items and merges with those
201                already done. Mutually exclusive with ``item_ids``.
202
203        Returns:
204            The list of per-item return values (in ``item_ids`` order) if ``has_return_val``,
205            else ``None``.
206
207        Raises:
208            ValueError: If neither ``n_items`` nor ``item_ids`` is given.
209        """
210        if resume_from is not None:
211            if item_ids is not None:
212                raise ValueError("resume_from and item_ids are mutually exclusive; resume_from "
213                                 "re-runs the original partition's un-done items.")
214            return self._resume_entry(resume_from, name=name, pre_cleanup=pre_cleanup)
215
216        if item_ids is None:
217            if n_items is None:
218                raise ValueError("map() requires either n_items or item_ids.")
219            item_ids = list(range(int(n_items)))
220        else:
221            item_ids = [int(i) for i in item_ids]
222
223        results = self._execute_map(
224            function=function, item_ids=item_ids, has_return_val=has_return_val,
225            num_workers=num_workers, name=name, pre_cleanup=pre_cleanup,
226        )
227        return results if has_return_val else None
228
229    def _resume_entry(self, tmp_folder: str, *, name: str,
230                      pre_cleanup: Optional[Callable[[str], None]]) -> Optional[list]:
231        """Resume a failed run from its temp folder; overridden by distributed runners.
232
233        The local runner keeps no temp folder, so resuming is not possible here.
234        """
235        raise ValueError(
236            "resume_from is only valid for distributed backends (subprocess/slurm); the local "
237            "runner keeps no temp folder. Re-run the operation to recompute in-process "
238            "(optionally with block_ids=err.failed_block_ids for a subset)."
239        )
240
241    @staticmethod
242    def _validate_write_safety(outputs: Sequence[Source], block_shape: Sequence[int]) -> None:
243        """Conservative guard: chunked output write-blocks must be a multiple of chunks.
244
245        This prevents two blocks from concurrently writing the same chunk (which would
246        corrupt it). Auto-derivation of a safe block shape is a flagged TODO.
247
248        Sharded outputs (``out.shards is not None``) are exempt: for them the atomic write
249        unit is the shard, and they are made safe by shard-exclusive routing (each shard's
250        blocks go to one worker, run sequentially) rather than by constraining the block
251        shape — see :func:`bioimage_py.util.group_blocks_by_shard`.
252
253        ``block_shape`` is spatial-only, but an output may carry a leading channel axis (its
254        ``chunks`` then have one extra leading entry); the block shape is aligned against the
255        trailing (spatial) chunk axes, since the channel axis is fully written by every block.
256        """
257        for out in outputs:
258            if out.shards is not None:
259                continue
260            chunks = out.chunks
261            if chunks is None or len(chunks) < len(block_shape):
262                continue
263            spatial_chunks = tuple(chunks[-len(block_shape):])
264            for bs, ch in zip(block_shape, spatial_chunks):
265                if bs % ch != 0:
266                    raise ValueError(
267                        f"Unsafe block shape for writing: {tuple(block_shape)} is not a multiple "
268                        f"of the output (spatial) chunk shape {spatial_chunks}. Concurrent writes "
269                        "could corrupt shared chunks; use a block shape that is a chunk multiple."
270                    )
271
272    @abstractmethod
273    def _execute(
274        self,
275        *,
276        function: ComputeFn,
277        inputs: Sequence[Source],
278        outputs: Sequence[Source],
279        mask: Optional[Source],
280        blocking: Blocking,
281        block_ids: Sequence[int],
282        halo: Optional[Sequence[int]],
283        has_return_val: bool,
284        num_workers: int,
285        name: str,
286        shape: Tuple[int, ...],
287        block_shape: Tuple[int, ...],
288        roi: Optional[Tuple[slice, ...]],
289        pre_cleanup: Optional[Callable[[str], None]] = None,
290    ) -> List[Any]:
291        """Execute the per-block function over ``block_ids`` and return ordered results."""
292        ...
293
294    @abstractmethod
295    def _execute_map(
296        self,
297        *,
298        function: Callable[[int], Any],
299        item_ids: Sequence[int],
300        has_return_val: bool,
301        num_workers: int,
302        name: str,
303        pre_cleanup: Optional[Callable[[str], None]] = None,
304    ) -> List[Any]:
305        """Execute ``function(index)`` over ``item_ids`` and return ordered results."""
306        ...

Abstract runner. Subclasses implement _execute() for a specific backend.

config
def run( self, function: Callable[[Union[bioimage_cpp._core.Block, bioimage_cpp._core.BlockWithHalo], Sequence[bioimage_py.sources.Source], Sequence[bioimage_py.sources.Source], Optional[bioimage_py.sources.Source]], Any], inputs: 'Sequence[SourceLike]', outputs: 'Sequence[SourceLike]' = (), *, block_shape: Optional[Tuple[int, ...]] = None, halo: Optional[Sequence[int]] = None, mask: 'Optional[SourceLike]' = None, num_workers: int = 1, block_ids: Optional[Sequence[int]] = None, has_return_val: bool = False, name: str = '', roi: Optional[Tuple[slice, ...]] = None, pre_cleanup: Optional[Callable[[str], NoneType]] = None, resume_from: Optional[str] = None) -> Optional[list]:
 68    def run(
 69        self,
 70        function: ComputeFn,
 71        inputs: Sequence[SourceLike],
 72        outputs: Sequence[SourceLike] = (),
 73        *,
 74        block_shape: Optional[Tuple[int, ...]] = None,
 75        halo: Optional[Sequence[int]] = None,
 76        mask: Optional[SourceLike] = None,
 77        num_workers: int = 1,
 78        block_ids: Optional[Sequence[int]] = None,
 79        has_return_val: bool = False,
 80        name: str = "",
 81        roi: Optional[Tuple[slice, ...]] = None,
 82        pre_cleanup: Optional[Callable[[str], None]] = None,
 83        resume_from: Optional[str] = None,
 84    ) -> Optional[list]:
 85        """Run ``function`` block-wise over the inputs/outputs.
 86
 87        Args:
 88            function: Per-block function ``function(block, inputs, outputs, mask)``.
 89            inputs: Input source-like objects (read).
 90            outputs: Output source-like objects (written in place).
 91            block_shape: Block shape; defaults to the domain source's chunks.
 92            halo: Per-axis halo; if given, ``function`` receives a ``BlockWithHalo``.
 93            mask: Optional binary mask source.
 94            num_workers: Number of parallel workers / tasks.
 95            block_ids: Restrict processing to these blocks (for re-running failures).
 96            has_return_val: Whether ``function`` returns a value to collect.
 97            name: A short name for progress display.
 98            roi: Region of interest to restrict the blocking to.
 99            pre_cleanup: Optional callback ``pre_cleanup(tmp_folder)`` invoked on the
100                orchestrating process with the job temp folder right before it is deleted
101                (distributed backends only, success path only). Use it to read out anything
102                worth keeping from the temp folder (e.g. the per-task timing files under
103                ``tmp_folder/timings/``) before cleanup. Ignored by the local runner, which
104                has no temp folder.
105            resume_from: Distributed backends only. Path to the preserved temp folder of a
106                failed run (``RunnerError.tmp_folder``). Re-runs only the blocks that did not
107                complete and merges them with the already-completed ones, so the result is
108                correct and complete. The run is resumed from the serialized payload, so
109                ``function``/``inputs``/``outputs``/``block_shape``/... from this call are
110                **ignored** -- pass ``resume_from`` to *finish the same call*, not to start a
111                new one. Mutually exclusive with ``block_ids``.
112
113        Returns:
114            The list of per-block return values (in ``block_ids`` order) if
115            ``has_return_val``, else ``None``.
116        """
117        if resume_from is not None:
118            if block_ids is not None:
119                raise ValueError("resume_from and block_ids are mutually exclusive; resume_from "
120                                 "re-runs the original partition's un-done blocks.")
121            return self._resume_entry(resume_from, name=name, pre_cleanup=pre_cleanup)
122
123        inputs = [as_source(i) for i in inputs]
124        outputs = [as_source(o) for o in outputs]
125        mask_source = as_source(mask) if mask is not None else None
126
127        domain = inputs[0] if inputs else (outputs[0] if outputs else None)
128        if domain is None:
129            raise ValueError("run() requires at least one input or output source.")
130
131        # Shape consistency: all inputs and the mask must match the domain shape.
132        dom_shape = tuple(domain.shape)
133        for src in inputs + ([mask_source] if mask_source is not None else []):
134            if tuple(src.shape) != dom_shape:
135                raise ValueError(
136                    f"Shape mismatch: source with shape {src.shape} does not match the "
137                    f"domain shape {domain.shape}."
138                )
139        # Outputs may carry a leading channel axis, but their trailing spatial dims must match
140        # the domain so the per-block roi indexes them consistently.
141        for out in outputs:
142            out_shape = tuple(out.shape)
143            if out_shape[-len(dom_shape):] != dom_shape:
144                raise ValueError(
145                    f"Output shape {out_shape} is incompatible with the domain shape "
146                    f"{dom_shape}: its trailing dimensions must match the domain."
147                )
148
149        block_shape = derive_block_shape(domain, block_shape)
150        halo_n = normalize_halo(halo, domain.ndim) if halo is not None else None
151        self._validate_write_safety(outputs, block_shape)
152
153        blocking = get_blocking(domain.shape, block_shape, roi)
154        if block_ids is None:
155            block_ids = list(range(int(blocking.number_of_blocks)))
156        else:
157            block_ids = [int(b) for b in block_ids]
158
159        results = self._execute(
160            function=function, inputs=inputs, outputs=outputs, mask=mask_source,
161            blocking=blocking, block_ids=block_ids, halo=halo_n,
162            has_return_val=has_return_val, num_workers=num_workers, name=name,
163            shape=tuple(domain.shape), block_shape=block_shape, roi=roi,
164            pre_cleanup=pre_cleanup,
165        )
166        return results if has_return_val else None

Run function block-wise over the inputs/outputs.

Args: function: Per-block function function(block, inputs, outputs, mask). inputs: Input source-like objects (read). outputs: Output source-like objects (written in place). block_shape: Block shape; defaults to the domain source's chunks. halo: Per-axis halo; if given, function receives a BlockWithHalo. mask: Optional binary mask source. num_workers: Number of parallel workers / tasks. block_ids: Restrict processing to these blocks (for re-running failures). has_return_val: Whether function returns a value to collect. name: A short name for progress display. roi: Region of interest to restrict the blocking to. pre_cleanup: Optional callback pre_cleanup(tmp_folder) invoked on the orchestrating process with the job temp folder right before it is deleted (distributed backends only, success path only). Use it to read out anything worth keeping from the temp folder (e.g. the per-task timing files under tmp_folder/timings/) before cleanup. Ignored by the local runner, which has no temp folder. resume_from: Distributed backends only. Path to the preserved temp folder of a failed run (RunnerError.tmp_folder). Re-runs only the blocks that did not complete and merges them with the already-completed ones, so the result is correct and complete. The run is resumed from the serialized payload, so function/inputs/outputs/block_shape/... from this call are ignored -- pass resume_from to finish the same call, not to start a new one. Mutually exclusive with block_ids.

Returns: The list of per-block return values (in block_ids order) if has_return_val, else None.

def map( self, function: Callable[[int], Any], n_items: Optional[int] = None, *, item_ids: Optional[Sequence[int]] = None, num_workers: int = 1, has_return_val: bool = True, name: str = '', pre_cleanup: Optional[Callable[[str], NoneType]] = None, resume_from: Optional[str] = None) -> Optional[list]:
168    def map(
169        self,
170        function: Callable[[int], Any],
171        n_items: Optional[int] = None,
172        *,
173        item_ids: Optional[Sequence[int]] = None,
174        num_workers: int = 1,
175        has_return_val: bool = True,
176        name: str = "",
177        pre_cleanup: Optional[Callable[[str], None]] = None,
178        resume_from: Optional[str] = None,
179    ) -> Optional[list]:
180        """Map ``function(index)`` over item indices in parallel, across any backend.
181
182        Unlike :meth:`run`, this is not block-wise: there is no domain, blocking, sources or
183        mask. ``function`` takes a single integer index and returns its result; it must carry
184        whatever data it needs in its (cloudpickled) closure — e.g. a `SourceSpec` it reopens
185        and a file path it reads. This is the per-item counterpart used by per-object
186        workflows.
187
188        Args:
189            function: The per-item function ``function(index) -> result``.
190            n_items: The number of items; indices ``0 .. n_items - 1`` are processed. Ignored
191                if ``item_ids`` is given.
192            item_ids: Explicit item indices to process (e.g. to re-run failures). Defaults to
193                ``range(n_items)``.
194            num_workers: Number of parallel workers / tasks.
195            has_return_val: Whether ``function`` returns a value to collect.
196            name: A short name for progress display.
197            pre_cleanup: Optional ``pre_cleanup(tmp_folder)`` callback (distributed backends
198                only); see :meth:`run`.
199            resume_from: Distributed backends only; the preserved temp folder of a failed run
200                (see :meth:`run`). Re-runs only the incomplete items and merges with those
201                already done. Mutually exclusive with ``item_ids``.
202
203        Returns:
204            The list of per-item return values (in ``item_ids`` order) if ``has_return_val``,
205            else ``None``.
206
207        Raises:
208            ValueError: If neither ``n_items`` nor ``item_ids`` is given.
209        """
210        if resume_from is not None:
211            if item_ids is not None:
212                raise ValueError("resume_from and item_ids are mutually exclusive; resume_from "
213                                 "re-runs the original partition's un-done items.")
214            return self._resume_entry(resume_from, name=name, pre_cleanup=pre_cleanup)
215
216        if item_ids is None:
217            if n_items is None:
218                raise ValueError("map() requires either n_items or item_ids.")
219            item_ids = list(range(int(n_items)))
220        else:
221            item_ids = [int(i) for i in item_ids]
222
223        results = self._execute_map(
224            function=function, item_ids=item_ids, has_return_val=has_return_val,
225            num_workers=num_workers, name=name, pre_cleanup=pre_cleanup,
226        )
227        return results if has_return_val else None

Map function(index) over item indices in parallel, across any backend.

Unlike run(), this is not block-wise: there is no domain, blocking, sources or mask. function takes a single integer index and returns its result; it must carry whatever data it needs in its (cloudpickled) closure — e.g. a SourceSpec it reopens and a file path it reads. This is the per-item counterpart used by per-object workflows.

Args: function: The per-item function function(index) -> result. n_items: The number of items; indices 0 .. n_items - 1 are processed. Ignored if item_ids is given. item_ids: Explicit item indices to process (e.g. to re-run failures). Defaults to range(n_items). num_workers: Number of parallel workers / tasks. has_return_val: Whether function returns a value to collect. name: A short name for progress display. pre_cleanup: Optional pre_cleanup(tmp_folder) callback (distributed backends only); see run(). resume_from: Distributed backends only; the preserved temp folder of a failed run (see run()). Re-runs only the incomplete items and merges with those already done. Mutually exclusive with item_ids.

Returns: The list of per-item return values (in item_ids order) if has_return_val, else None.

Raises: ValueError: If neither n_items nor item_ids is given.

class LocalRunner(bioimage_py.runner.Runner):
309class LocalRunner(Runner):
310    """Run blocks locally with a thread pool."""
311
312    def _execute(
313        self,
314        *,
315        function: ComputeFn,
316        inputs: Sequence[Source],
317        outputs: Sequence[Source],
318        mask: Optional[Source],
319        blocking: Blocking,
320        block_ids: Sequence[int],
321        halo: Optional[Sequence[int]],
322        has_return_val: bool,
323        num_workers: int,
324        name: str,
325        shape: Tuple[int, ...],
326        block_shape: Tuple[int, ...],
327        roi: Optional[Tuple[slice, ...]],
328        pre_cleanup: Optional[Callable[[str], None]] = None,
329    ) -> List[Any]:
330        """Run the blocks in a thread pool, collecting results and re-raising failures.
331
332        ``pre_cleanup`` is accepted for interface parity but ignored: the local runner has
333        no temp folder (and no per-worker concept) to read out before returning.
334        """
335        def call_one(bid: int) -> Any:
336            return run_block(function, blocking, bid, inputs, outputs, mask, halo)
337
338        # For sharded outputs, group blocks so each shard is written by a single thread
339        # (a group runs sequentially) and never corrupted by concurrent writes; otherwise
340        # each block is its own group, reproducing the plain one-future-per-block path.
341        groups = group_blocks_by_shard(blocking, outputs, block_ids)
342        if groups is None:
343            groups = [[int(b)] for b in block_ids]
344        else:
345            maybe_warn_imbalance([len(g) for g in groups], num_workers, len(groups), name)
346        return self._run_pool(groups, call_one, num_workers, name, unit="block")
347
348    def _execute_map(
349        self,
350        *,
351        function: Callable[[int], Any],
352        item_ids: Sequence[int],
353        has_return_val: bool,
354        num_workers: int,
355        name: str,
356        pre_cleanup: Optional[Callable[[str], None]] = None,
357    ) -> List[Any]:
358        """Run ``function(index)`` over ``item_ids`` in a thread pool (``pre_cleanup`` ignored)."""
359        groups = [[int(i)] for i in item_ids]
360        return self._run_pool(groups, lambda i: function(int(i)), num_workers, name, unit="item")
361
362    @staticmethod
363    def _run_pool(groups: Sequence[Sequence[int]], call_one: Callable[[int], Any],
364                  num_workers: int, name: str, *, unit: str = "block") -> List[Any]:
365        """Run ``call_one(id)`` for each id in a thread pool, ordered, re-raising failures.
366
367        The schedulable unit is a *group*: the ids in a group are run sequentially within one
368        worker thread, while distinct groups run concurrently. Singleton groups reproduce the
369        one-future-per-id behavior; multi-id groups serialize same-shard writes (see
370        :func:`bioimage_py.util.group_blocks_by_shard`).
371
372        Args:
373            groups: The work groups; each is a list of ids (block ids or item indices) run
374                sequentially. Results are returned in flattened ``groups`` order.
375            call_one: The per-id callable returning that id's result.
376            num_workers: Number of worker threads.
377            name: A short name for the progress bar (disabled when empty).
378            unit: The noun used in the failure message ("block" or "item").
379
380        Returns:
381            The per-id results in flattened ``groups`` order.
382
383        Raises:
384            RunnerError: If any id fails; the failed ids are attached for re-running. When an
385                id in a group fails, the remaining (un-run) ids of that group are reported as
386                failed too, since later same-shard writes cannot safely proceed.
387        """
388        groups = [list(g) for g in groups]
389        flat_ids = [i for g in groups for i in g]
390        result_by_id: Dict[int, Any] = {}
391        failed: List[int] = []
392        first_error: Optional[BaseException] = None
393
394        @threadpool_limits.wrap(limits=1)
395        def _run_group(group: List[int]):
396            local: Dict[int, Any] = {}
397            local_failed: List[int] = []
398            err: Optional[BaseException] = None
399            for k, bid in enumerate(group):
400                try:
401                    local[bid] = call_one(bid)
402                except Exception as error:  # noqa: BLE001 - we re-raise as RunnerError
403                    err = error
404                    local_failed = list(group[k:])
405                    break
406            return local, local_failed, err
407
408        with futures.ThreadPoolExecutor(max(1, int(num_workers))) as tp:
409            fut_to_group = {tp.submit(_run_group, g): g for g in groups}
410            with tqdm(total=len(flat_ids), desc=name or None, disable=not name) as pbar:
411                for fut in futures.as_completed(fut_to_group):
412                    group = fut_to_group[fut]
413                    local, local_failed, err = fut.result()
414                    result_by_id.update(local)
415                    if local_failed:
416                        failed.extend(local_failed)
417                        if first_error is None:
418                            first_error = err
419                    pbar.update(len(group))
420
421        if failed:
422            failed = sorted(set(failed))
423            raise RunnerError(
424                f"{len(failed)} {unit}(s) failed in '{name or 'run'}': "
425                f"{failed[:10]}. First error: {first_error!r}",
426                failed_block_ids=failed,
427            )
428        return [result_by_id[i] for i in flat_ids]

Run blocks locally with a thread pool.

class SubprocessRunner(bioimage_py.runner.distributed._DistributedRunner):
435class SubprocessRunner(_DistributedRunner):
436    """Distributed runner that launches each task as a local subprocess.
437
438    Exercises the full distributed protocol (cloudpickle payload, generated harness,
439    result/sentinel files, ``block_ids`` re-run) without a scheduler.
440    """
441
442    def _launch_and_wait(self, tmp: str, n_tasks: int, num_workers: int, name: str,
443                         task_ids: Optional[Sequence[int]] = None) -> None:
444        """Run each task as a local subprocess, up to ``num_workers`` concurrently.
445
446        The progress bar counts processed *blocks* (summed from the per-task done-logs) rather
447        than tasks; a background thread polls the logs while the tasks run. ``task_ids``
448        restricts the launch to a subset (resume); the bar still spans all tasks.
449        """
450        ids = list(range(n_tasks)) if task_ids is None else list(task_ids)
451        python = self.config.python_executable or sys.executable
452        cmd_base = [python, "-m", "bioimage_py.runner._harness", tmp]
453
454        def _run_task(task_id: int):
455            proc = subprocess.run(cmd_base + [str(task_id)], capture_output=True, text=True)
456            # The harness writes its own error/<id>.txt on a caught exception. But a failure
457            # *before* that try (e.g. an import error launching the module) would otherwise be
458            # silent, so capture the subprocess output as a fallback error file.
459            if proc.returncode != 0:
460                err_path = os.path.join(tmp, "error", f"{task_id}.txt")
461                if not os.path.exists(err_path):
462                    with open(err_path, "w") as f:
463                        f.write(f"Worker for task {task_id} exited with code {proc.returncode}.\n")
464                        if proc.stdout:
465                            f.write(f"--- stdout ---\n{proc.stdout}\n")
466                        if proc.stderr:
467                            f.write(f"--- stderr ---\n{proc.stderr}\n")
468            return proc
469
470        # Drive a block-counting progress bar from the done-logs (single source of truth, so no
471        # double-counting); clamp to the total in case a resume re-reads prior lines.
472        n_blocks = _total_blocks(tmp, n_tasks)
473        stop = threading.Event()
474        bar_thread = None
475        if name:
476            def _poll_bar() -> None:
477                with tqdm(total=n_blocks, desc=name, unit="block") as pbar:
478                    while not stop.wait(0.5):
479                        pbar.n = min(_count_done_blocks(tmp, n_tasks), n_blocks)
480                        pbar.refresh()
481                    pbar.n = min(_count_done_blocks(tmp, n_tasks), n_blocks)
482                    pbar.refresh()
483            bar_thread = threading.Thread(target=_poll_bar, daemon=True)
484            bar_thread.start()
485        try:
486            with futures.ThreadPoolExecutor(max(1, int(num_workers))) as tp:
487                list(tp.map(_run_task, ids))
488        finally:
489            stop.set()
490            if bar_thread is not None:
491                bar_thread.join()

Distributed runner that launches each task as a local subprocess.

Exercises the full distributed protocol (cloudpickle payload, generated harness, result/sentinel files, block_ids re-run) without a scheduler.

class SlurmRunner(bioimage_py.runner.distributed._DistributedRunner):
505class SlurmRunner(_DistributedRunner):
506    """Distributed runner that submits one sbatch array job and polls it with ``sacct``.
507
508    Reuses the full distributed protocol from :class:`_DistributedRunner` (cloudpickle
509    payload, generated work-lists, per-task result + ``.success`` sentinel files, failure
510    reporting and ``block_ids`` re-run) and overrides only how tasks are launched and
511    awaited. The per-task sentinel file remains the ground truth for success; ``sacct`` is
512    queried only to detect tasks that died without writing a sentinel. A manifest is written
513    at submission time so an interrupted run can be picked back up with :meth:`reattach`.
514    """
515
516    def __init__(self, config: Optional[RunnerConfig] = None):
517        """Create the runner, requiring a :class:`SlurmConfig`.
518
519        Args:
520            config: The slurm configuration. ``None`` loads the user defaults from the config
521                file via :meth:`SlurmConfig.load` (honoring ``BIOIMAGE_PY_NO_CONFIG`` /
522                ``BIOIMAGE_PY_CONFIG``); ``tmp_root`` must still be set, here or in the file,
523                before running.
524
525        Raises:
526            TypeError: If ``config`` is a non-slurm ``RunnerConfig``.
527        """
528        if config is None:
529            config = SlurmConfig.load()
530        if not isinstance(config, SlurmConfig):
531            raise TypeError(
532                f"SlurmRunner requires a SlurmConfig, got {type(config).__name__}. "
533                "Pass job_config=SlurmConfig(...) (it carries partition/account/time/etc.)."
534            )
535        super().__init__(config)
536
537    def _launch_and_wait(self, tmp: str, n_tasks: int, num_workers: int, name: str,
538                         task_ids: Optional[Sequence[int]] = None) -> None:
539        """Submit an sbatch array job for the tasks and poll until they all finish.
540
541        Args:
542            tmp: The job temp folder (must live on a shared filesystem).
543            n_tasks: The total number of tasks the run was partitioned into.
544            num_workers: The array throttle (max tasks running concurrently).
545            name: A short name used for the job name and progress display.
546            task_ids: Restrict the submitted array to this subset of task indices (used by
547                :meth:`resume` to resubmit only the incomplete tasks); ``None`` submits all
548                ``0 .. n_tasks - 1``.
549        """
550        launch_ids = list(range(n_tasks)) if task_ids is None else sorted(set(int(t) for t in task_ids))
551        is_resume = task_ids is not None
552
553        def _guard_fail(message: str) -> None:
554            # On a resume we must never remove the user's preserved temp folder.
555            if not is_resume:
556                shutil.rmtree(tmp, ignore_errors=True)
557            raise ValueError(message)
558
559        if self.config.tmp_root is None:
560            _guard_fail(
561                "SlurmRunner requires config.tmp_root to be set to a shared filesystem "
562                "visible to all compute nodes (node-local /tmp is not usable)."
563            )
564
565        max_array = (self.config.max_array_size if self.config.max_array_size is not None
566                     else self._max_array_size())
567        if len(launch_ids) > max_array:
568            _guard_fail(
569                f"Run partitioned into {len(launch_ids)} tasks exceeds the maximum array size "
570                f"{max_array}. Lower num_workers or use a larger block_shape."
571            )
572
573        os.makedirs(os.path.join(tmp, "logs"), exist_ok=True)
574        throttle = max(1, min(int(num_workers), len(launch_ids)))
575        script_path = os.path.join(tmp, "submit.sh")
576        with open(script_path, "w") as f:
577            f.write(self._build_script(tmp, launch_ids, throttle, name))
578
579        # Unlike the tmp_root / max_array guards above, a submission failure deliberately does NOT
580        # remove the temp folder: the generated submit.sh, payload, and per-task block lists are
581        # exactly what's needed to diagnose why sbatch rejected the job. Re-raise naming the folder
582        # so the user knows where to look.
583        try:
584            job_id = self._submit(script_path)
585        except RuntimeError as err:
586            raise RuntimeError(f"{err} Temp folder preserved for debugging: {tmp}.") from err
587        manifest = {
588            "job_id": job_id,
589            "n_tasks": n_tasks,
590            "launch_ids": launch_ids,
591            "throttle": throttle,
592            "name": name,
593            "tmp": tmp,
594            "script": script_path,
595            "python_executable": self.config.python_executable or sys.executable,
596            "submit_time": time.strftime("%Y-%m-%d %H:%M:%S"),
597        }
598        manifest_path = os.path.join(tmp, "manifest.json")
599        if is_resume and os.path.exists(manifest_path):  # keep the prior job id for forensics
600            try:
601                with open(manifest_path) as f:
602                    manifest["resumed_from_job_id"] = json.load(f).get("job_id")
603            except (OSError, ValueError):
604                pass
605        with open(manifest_path, "w") as f:
606            json.dump(manifest, f, indent=2)
607
608        self._poll(job_id, n_tasks, tmp, name, task_ids=launch_ids)
609
610    @staticmethod
611    def _format_array_indices(task_ids: Sequence[int], throttle: int) -> str:
612        """Compress task ids into an sbatch ``--array`` spec, e.g. ``0,3,7-9%4``."""
613        ids = sorted(set(int(t) for t in task_ids))
614        parts: List[str] = []
615        i = 0
616        while i < len(ids):
617            j = i
618            while j + 1 < len(ids) and ids[j + 1] == ids[j] + 1:
619                j += 1
620            parts.append(str(ids[i]) if i == j else f"{ids[i]}-{ids[j]}")
621            i = j + 1
622        return ",".join(parts) + f"%{throttle}"
623
624    def _build_script(self, tmp: str, task_ids: Sequence[int], throttle: int, name: str) -> str:
625        """Render the sbatch array script for the given task indices."""
626        cfg = self.config
627        shebang, preamble = "#!/bin/bash", ""
628        if cfg.shebang:
629            lines = cfg.shebang.splitlines()
630            if lines and lines[0].startswith("#!"):
631                shebang, preamble = lines[0], "\n".join(lines[1:])
632            else:
633                preamble = cfg.shebang
634
635        # Collapse whitespace/newlines so the name cannot break or inject directives.
636        job_name = "_".join((name or "").split()) or "bioimage_py"
637        directives = [
638            f"--job-name={job_name}",
639            f"--array={self._format_array_indices(task_ids, throttle)}",
640            f"--cpus-per-task={int(cfg.cpus_per_task)}",
641            f"--output={os.path.join(tmp, 'logs', 'slurm-%A_%a.out')}",
642            f"--error={os.path.join(tmp, 'logs', 'slurm-%A_%a.err')}",
643        ]
644        if cfg.partition is not None:
645            directives.append(f"--partition={cfg.partition}")
646        if cfg.time is not None:
647            directives.append(f"--time={cfg.time}")
648        if cfg.mem is not None:
649            directives.append(f"--mem={cfg.mem}")
650        if int(cfg.gpus) > 0:
651            directives.append(f"--gpus={int(cfg.gpus)}")
652        if cfg.account is not None:
653            directives.append(f"--account={cfg.account}")
654        if cfg.qos is not None:
655            directives.append(f"--qos={cfg.qos}")
656        if cfg.constraint is not None:
657            directives.append(f"--constraint={cfg.constraint}")
658
659        python = shlex.quote(cfg.python_executable or sys.executable)
660        command = f'{python} -m bioimage_py.runner._harness {shlex.quote(tmp)} "${{SLURM_ARRAY_TASK_ID}}"'
661        lines = [shebang]
662        lines += [f"#SBATCH {d}" for d in directives]
663        if preamble:
664            lines.append(preamble)
665        lines.append(command)
666        return "\n".join(lines) + "\n"
667
668    @staticmethod
669    def _submit(script_path: str) -> str:
670        """Submit ``script_path`` with ``sbatch --parsable`` and return the job id."""
671        sbatch = shutil.which("sbatch")
672        if sbatch is None:
673            raise RuntimeError("sbatch not found on PATH; the slurm CLI must be available.")
674        proc = subprocess.run([sbatch, "--parsable", script_path],
675                              capture_output=True, text=True)
676        if proc.returncode != 0:
677            raise RuntimeError(f"sbatch submission failed (exit {proc.returncode}): "
678                               f"{proc.stderr.strip() or proc.stdout.strip()}")
679        job_id = proc.stdout.strip().split(";")[0].strip()
680        if not job_id.isdigit():
681            raise RuntimeError(f"Could not parse job id from sbatch output: {proc.stdout!r}")
682        return job_id
683
684    @staticmethod
685    def _max_array_size() -> int:
686        """Return the cluster's ``MaxArraySize`` (or a safe fallback)."""
687        scontrol = shutil.which("scontrol")
688        if scontrol is None:
689            return _DEFAULT_MAX_ARRAY
690        try:
691            proc = subprocess.run([scontrol, "show", "config"], capture_output=True, text=True)
692        except OSError:
693            return _DEFAULT_MAX_ARRAY
694        match = re.search(r"MaxArraySize\s*=\s*(\d+)", proc.stdout)
695        return int(match.group(1)) if match else _DEFAULT_MAX_ARRAY
696
697    @staticmethod
698    def _parse_array_range(spec: str) -> List[int]:
699        """Expand a pending-collapse range like ``[2-9,11%4]`` into its task indices."""
700        body = spec.strip("[]").split("%", 1)[0]
701        indices: List[int] = []
702        for part in body.split(","):
703            part = part.strip()
704            if not part:
705                continue
706            if "-" in part:
707                lo, hi = part.split("-", 1)
708                indices.extend(range(int(lo), int(hi) + 1))
709            else:
710                indices.append(int(part))
711        return indices
712
713    def _sacct_states(self, job_id: str) -> Optional[Dict[int, str]]:
714        """Return ``{array_index: STATE}`` for the array job, or ``None`` on a poll error.
715
716        ``None`` (a transient ``sacct`` failure) means *skip this poll*; an empty dict means
717        the job is simply not registered with the scheduler yet. A task absent from the
718        result is treated as pending, never as dead.
719        """
720        sacct = shutil.which("sacct")
721        if sacct is None:
722            raise RuntimeError("sacct not found on PATH; the slurm CLI must be available.")
723        try:
724            proc = subprocess.run(
725                [sacct, "-X", "-n", "-P", "--format=JobID,State", "-j", str(job_id)],
726                capture_output=True, text=True,
727            )
728        except OSError:
729            return None
730        if proc.returncode != 0:
731            return None
732
733        states: Dict[int, str] = {}
734        for line in proc.stdout.splitlines():
735            line = line.strip()
736            if not line or "|" not in line:
737                continue
738            jid, _, raw_state = line.partition("|")
739            jid = jid.split(";", 1)[0]
740            if "." in jid or "_" not in jid:  # step rows (defensive; -X already excludes them)
741                continue
742            # Take the first token: normalises e.g. "CANCELLED by 12345" -> "CANCELLED".
743            tokens = raw_state.split()
744            state = tokens[0].upper() if tokens else ""
745            suffix = jid.split("_", 1)[1]
746            if suffix.startswith("["):
747                for idx in self._parse_array_range(suffix):
748                    states[idx] = state
749            else:
750                try:
751                    states[int(suffix)] = state
752                except ValueError:
753                    continue
754        return states
755
756    def _job_known(self, job_id: str, attempts: int = 3) -> bool:
757        """Whether the job is known to ``sacct``, retrying to tolerate post-submit lag.
758
759        A transient ``sacct`` error (``None``) or any returned row counts as known; only a
760        sustained empty result across ``attempts`` polls is treated as unknown.
761        """
762        for attempt in range(attempts):
763            states = self._sacct_states(job_id)
764            if states is None or states:
765                return True
766            if attempt + 1 < attempts:
767                time.sleep(self.config.poll_interval)
768        return False
769
770    def _poll(self, job_id: str, n_tasks: int, tmp: str, name: str,
771              task_ids: Optional[Sequence[int]] = None) -> None:
772        """Poll ``sacct`` until every task has a visible sentinel or is confirmed dead.
773
774        The scheduler ``State`` is not subject to NFS lag, but the ``.success`` sentinels the
775        compute nodes write can take up to the mount's attribute-cache timeout to become
776        visible here. So a ``COMPLETED`` task (its harness exited 0, hence wrote a sentinel)
777        is given ``config.latency_wait`` for that sentinel to appear; any other terminal
778        state means the harness did not succeed and the task is declared dead after a short
779        confirmation grace. Tasks absent from ``sacct`` are pending, never dead.
780
781        Args:
782            job_id: The submitted array job id.
783            n_tasks: The total number of tasks (spans the block-counting progress bar).
784            tmp: The job temp folder (where sentinels are written).
785            name: A short name for the progress bar (disables it when empty).
786            task_ids: The subset of task indices this job actually runs (a resume submits only
787                the incomplete tasks); resolution is over this subset, ``None`` means all tasks.
788        """
789        poll_ids = list(range(n_tasks)) if task_ids is None else sorted(set(int(t) for t in task_ids))
790
791        def has_sentinel(t: int) -> bool:
792            return os.path.exists(os.path.join(tmp, "success", f"{t}.success"))
793
794        latency_wait = max(float(self.config.latency_wait), self.config.poll_interval)
795        fail_grace = max(self.config.poll_interval, 5.0)
796        terminal_since: Dict[int, float] = {}
797        terminal_count: Dict[int, int] = {}
798        resolved: set = set()
799        # The bar counts processed blocks across ALL tasks (a resume credits prior progress).
800        n_blocks = _total_blocks(tmp, n_tasks)
801        with tqdm(total=n_blocks, desc=name or None, disable=not name, unit="block") as pbar:
802            while len(resolved) < len(poll_ids):
803                states = self._sacct_states(job_id)
804                if states is None:  # transient sacct error: skip this poll.
805                    time.sleep(self.config.poll_interval)
806                    continue
807
808                now = time.monotonic()
809                ok = {t for t in poll_ids if has_sentinel(t)}
810                running = sum(1 for s in states.values() if s == "RUNNING")
811                dead = set()
812                for t in poll_ids:
813                    if t in ok:
814                        terminal_since.pop(t, None)
815                        terminal_count.pop(t, None)
816                        continue
817                    state = states.get(t)
818                    if state in _TERMINAL_STATES:
819                        terminal_since.setdefault(t, now)
820                        terminal_count[t] = terminal_count.get(t, 0) + 1
821                        # COMPLETED -> sentinel was written, just wait it out over NFS; any
822                        # other terminal state -> the task will never produce a sentinel.
823                        grace = latency_wait if state == "COMPLETED" else fail_grace
824                        if (terminal_count[t] >= 2 and now - terminal_since[t] >= grace
825                                and not has_sentinel(t)):
826                            dead.add(t)
827                    else:  # pending/running/requeued: reset the dead countdown.
828                        terminal_since.pop(t, None)
829                        terminal_count.pop(t, None)
830
831                resolved = ok | dead
832                pbar.n = min(_count_done_blocks(tmp, n_tasks), n_blocks)
833                pbar.set_postfix(ok=len(ok), failed=len(dead), run=running,
834                                 pending=max(0, len(poll_ids) - len(resolved) - running), refresh=False)
835                pbar.refresh()
836                if len(resolved) >= len(poll_ids):
837                    break
838                try:
839                    time.sleep(self.config.poll_interval)
840                except KeyboardInterrupt:
841                    print(f"\nInterrupted while waiting on slurm job {job_id}. The job was left "
842                          f"running; reattach with SlurmRunner(...).reattach({tmp!r}).")
843                    raise
844
845    def reattach(self, tmp_folder: str, name: str = "reattach",
846                 pre_cleanup: Optional[Callable[[str], None]] = None) -> Optional[list]:
847        """Reattach to a previously submitted run and finalize it.
848
849        Picks a run back up from its manifest (e.g. after the orchestrating login-node
850        process was interrupted) instead of resubmitting. Only ``poll_interval`` is read
851        from this runner's config, so a freshly constructed ``SlurmRunner`` can reattach.
852
853        Args:
854            tmp_folder: The job temp folder containing ``manifest.json`` and ``payload.pkl``.
855            name: A short name for the progress display.
856            pre_cleanup: Optional ``pre_cleanup(tmp)`` callback invoked right before the temp
857                folder is removed (forwarded to :meth:`_finalize`).
858
859        Returns:
860            The per-block return values (if the run collected any), else ``None``.
861
862        Raises:
863            RunnerError: If any task failed (sentinel missing).
864            RuntimeError: If the manifest's job is unknown to slurm and the run did not
865                already complete.
866        """
867        with open(os.path.join(tmp_folder, "manifest.json")) as f:
868            manifest = json.load(f)
869        job_id, n_tasks = str(manifest["job_id"]), int(manifest["n_tasks"])
870        with open(os.path.join(tmp_folder, "payload.pkl"), "rb") as f:
871            has_return_val = bool(cloudpickle.load(f)["has_return_val"])
872
873        # Reconstruct the partition in numeric task order (never glob: it sorts lexically).
874        tasks: List[List[int]] = []
875        for task_id in range(n_tasks):
876            with open(os.path.join(tmp_folder, "blocks", f"{task_id}.json")) as f:
877                tasks.append(json.load(f))
878        block_ids = [b for task in tasks for b in task]
879
880        all_done = all(os.path.exists(os.path.join(tmp_folder, "success", f"{t}.success"))
881                       for t in range(n_tasks))
882        if not all_done:
883            # Only a job that stays unknown to sacct across retries (not registration lag
884            # right after submit, nor a transient error) is treated as unrecoverable.
885            if not self._job_known(job_id):
886                raise RuntimeError(
887                    f"Slurm job {job_id} is not known to the scheduler and the run did not "
888                    f"complete. Inspect {tmp_folder} or resubmit."
889                )
890            self._poll(job_id, n_tasks, tmp_folder, name)
891
892        results = self._finalize(tmp_folder, n_tasks, tasks, block_ids, has_return_val, name,
893                                 pre_cleanup=pre_cleanup)
894        return results if has_return_val else None

Distributed runner that submits one sbatch array job and polls it with sacct.

Reuses the full distributed protocol from _DistributedRunner (cloudpickle payload, generated work-lists, per-task result + .success sentinel files, failure reporting and block_ids re-run) and overrides only how tasks are launched and awaited. The per-task sentinel file remains the ground truth for success; sacct is queried only to detect tasks that died without writing a sentinel. A manifest is written at submission time so an interrupted run can be picked back up with reattach().

SlurmRunner(config: Optional[RunnerConfig] = None)
516    def __init__(self, config: Optional[RunnerConfig] = None):
517        """Create the runner, requiring a :class:`SlurmConfig`.
518
519        Args:
520            config: The slurm configuration. ``None`` loads the user defaults from the config
521                file via :meth:`SlurmConfig.load` (honoring ``BIOIMAGE_PY_NO_CONFIG`` /
522                ``BIOIMAGE_PY_CONFIG``); ``tmp_root`` must still be set, here or in the file,
523                before running.
524
525        Raises:
526            TypeError: If ``config`` is a non-slurm ``RunnerConfig``.
527        """
528        if config is None:
529            config = SlurmConfig.load()
530        if not isinstance(config, SlurmConfig):
531            raise TypeError(
532                f"SlurmRunner requires a SlurmConfig, got {type(config).__name__}. "
533                "Pass job_config=SlurmConfig(...) (it carries partition/account/time/etc.)."
534            )
535        super().__init__(config)

Create the runner, requiring a SlurmConfig.

Args: config: The slurm configuration. None loads the user defaults from the config file via SlurmConfig.load() (honoring BIOIMAGE_PY_NO_CONFIG / BIOIMAGE_PY_CONFIG); tmp_root must still be set, here or in the file, before running.

Raises: TypeError: If config is a non-slurm RunnerConfig.

def reattach( self, tmp_folder: str, name: str = 'reattach', pre_cleanup: Optional[Callable[[str], NoneType]] = None) -> Optional[list]:
845    def reattach(self, tmp_folder: str, name: str = "reattach",
846                 pre_cleanup: Optional[Callable[[str], None]] = None) -> Optional[list]:
847        """Reattach to a previously submitted run and finalize it.
848
849        Picks a run back up from its manifest (e.g. after the orchestrating login-node
850        process was interrupted) instead of resubmitting. Only ``poll_interval`` is read
851        from this runner's config, so a freshly constructed ``SlurmRunner`` can reattach.
852
853        Args:
854            tmp_folder: The job temp folder containing ``manifest.json`` and ``payload.pkl``.
855            name: A short name for the progress display.
856            pre_cleanup: Optional ``pre_cleanup(tmp)`` callback invoked right before the temp
857                folder is removed (forwarded to :meth:`_finalize`).
858
859        Returns:
860            The per-block return values (if the run collected any), else ``None``.
861
862        Raises:
863            RunnerError: If any task failed (sentinel missing).
864            RuntimeError: If the manifest's job is unknown to slurm and the run did not
865                already complete.
866        """
867        with open(os.path.join(tmp_folder, "manifest.json")) as f:
868            manifest = json.load(f)
869        job_id, n_tasks = str(manifest["job_id"]), int(manifest["n_tasks"])
870        with open(os.path.join(tmp_folder, "payload.pkl"), "rb") as f:
871            has_return_val = bool(cloudpickle.load(f)["has_return_val"])
872
873        # Reconstruct the partition in numeric task order (never glob: it sorts lexically).
874        tasks: List[List[int]] = []
875        for task_id in range(n_tasks):
876            with open(os.path.join(tmp_folder, "blocks", f"{task_id}.json")) as f:
877                tasks.append(json.load(f))
878        block_ids = [b for task in tasks for b in task]
879
880        all_done = all(os.path.exists(os.path.join(tmp_folder, "success", f"{t}.success"))
881                       for t in range(n_tasks))
882        if not all_done:
883            # Only a job that stays unknown to sacct across retries (not registration lag
884            # right after submit, nor a transient error) is treated as unrecoverable.
885            if not self._job_known(job_id):
886                raise RuntimeError(
887                    f"Slurm job {job_id} is not known to the scheduler and the run did not "
888                    f"complete. Inspect {tmp_folder} or resubmit."
889                )
890            self._poll(job_id, n_tasks, tmp_folder, name)
891
892        results = self._finalize(tmp_folder, n_tasks, tasks, block_ids, has_return_val, name,
893                                 pre_cleanup=pre_cleanup)
894        return results if has_return_val else None

Reattach to a previously submitted run and finalize it.

Picks a run back up from its manifest (e.g. after the orchestrating login-node process was interrupted) instead of resubmitting. Only poll_interval is read from this runner's config, so a freshly constructed SlurmRunner can reattach.

Args: tmp_folder: The job temp folder containing manifest.json and payload.pkl. name: A short name for the progress display. pre_cleanup: Optional pre_cleanup(tmp) callback invoked right before the temp folder is removed (forwarded to _finalize()).

Returns: The per-block return values (if the run collected any), else None.

Raises: RunnerError: If any task failed (sentinel missing). RuntimeError: If the manifest's job is unknown to slurm and the run did not already complete.

class RunnerError(builtins.RuntimeError):
20class RunnerError(RuntimeError):
21    """Raised when one or more blocks fail.
22
23    Attributes:
24        failed_block_ids: The ids of the blocks that failed (re-run with these).
25        tmp_folder: The preserved temp folder for distributed jobs (``None`` for local).
26    """
27
28    def __init__(self, message: str, failed_block_ids: Optional[Sequence[int]] = None,
29                 tmp_folder: Optional[str] = None):
30        super().__init__(message)
31        self.failed_block_ids: List[int] = [int(b) for b in (failed_block_ids or [])]
32        self.tmp_folder = tmp_folder

Raised when one or more blocks fail.

Attributes: failed_block_ids: The ids of the blocks that failed (re-run with these). tmp_folder: The preserved temp folder for distributed jobs (None for local).

RunnerError( message: str, failed_block_ids: Optional[Sequence[int]] = None, tmp_folder: Optional[str] = None)
28    def __init__(self, message: str, failed_block_ids: Optional[Sequence[int]] = None,
29                 tmp_folder: Optional[str] = None):
30        super().__init__(message)
31        self.failed_block_ids: List[int] = [int(b) for b in (failed_block_ids or [])]
32        self.tmp_folder = tmp_folder
failed_block_ids: List[int]
tmp_folder
@dataclass
class RunnerConfig:
12@dataclass
13class RunnerConfig:
14    """Base configuration shared by all runners.
15
16    Attributes:
17        poll_interval: Seconds between status polls (distributed runners).
18        tmp_root: Root directory for job temp folders. ``None`` uses the system default.
19            For distributed jobs this must be on a shared filesystem.
20        python_executable: Interpreter used to launch worker tasks. ``None`` uses the
21            current interpreter (``sys.executable``).
22    """
23
24    poll_interval: float = 10.0
25    tmp_root: Optional[str] = None
26    python_executable: Optional[str] = None

Base configuration shared by all runners.

Attributes: poll_interval: Seconds between status polls (distributed runners). tmp_root: Root directory for job temp folders. None uses the system default. For distributed jobs this must be on a shared filesystem. python_executable: Interpreter used to launch worker tasks. None uses the current interpreter (sys.executable).

RunnerConfig( poll_interval: float = 10.0, tmp_root: Optional[str] = None, python_executable: Optional[str] = None)
poll_interval: float = 10.0
tmp_root: Optional[str] = None
python_executable: Optional[str] = None
@dataclass
class SlurmConfig(bioimage_py.runner.RunnerConfig):
 29@dataclass
 30class SlurmConfig(RunnerConfig):
 31    """Configuration for the slurm runner.
 32
 33    Inherits ``poll_interval``, ``tmp_root`` and ``python_executable`` from
 34    :class:`RunnerConfig`. For slurm, ``tmp_root`` is **required** and must point at a
 35    shared filesystem visible to all compute nodes (not node-local ``/tmp``), and
 36    ``num_workers`` (passed to the op / ``run``) is interpreted as the array throttle — the
 37    maximum number of tasks allowed to run concurrently — independently of how many tasks
 38    the work is partitioned into.
 39
 40    Cluster-specific values (``partition``, ``account``, ``constraint``, ``tmp_root``, ...)
 41    can be stored once in a user config file and reused as defaults; see
 42    :meth:`load` and :func:`write_slurm_config`.
 43
 44    Attributes:
 45        partition: The slurm partition to submit to.
 46        time: The per-task time limit (slurm time format, e.g. ``"01:00:00"``).
 47        mem: The per-task memory limit (e.g. ``"8G"``).
 48        cpus_per_task: Number of CPUs requested per task.
 49        gpus: Number of GPUs requested per task (emitted as ``--gpus`` only when > 0).
 50        account: The accounting project to charge.
 51        qos: The quality-of-service to request.
 52        constraint: A node feature constraint.
 53        shebang: Optional environment setup for the generated job script. If given, its
 54            first line must be an interpreter line (starting with ``#!``) which is placed at
 55            the top of the script; any remaining lines are emitted as an activation preamble
 56            *after* the ``#SBATCH`` directives (so the directives are still honoured). The
 57            preamble is for making the package importable on the node (e.g. ``module load``
 58            / ``LD_LIBRARY_PATH`` exports), not for choosing the interpreter: the worker is
 59            always launched with the absolute ``python_executable`` (defaulting to the
 60            submitting ``sys.executable``). ``None`` uses ``#!/bin/bash`` and that absolute
 61            interpreter, which needs no activation when the env lives on a shared
 62            filesystem. Example::
 63
 64                shebang = "#!/bin/bash\\nmodule load gcc\\nexport LD_LIBRARY_PATH=...:$LD_LIBRARY_PATH"
 65
 66        max_array_size: Override for the maximum number of array tasks per job. ``None``
 67            queries the cluster's ``MaxArraySize`` (falling back to a safe default). A run
 68            partitioned into more tasks than this is rejected up front with a clear error.
 69        latency_wait: Seconds to wait for a finished task's ``.success`` sentinel to become
 70            visible on a shared (NFS) filesystem before giving up on it. A task that the
 71            scheduler reports ``COMPLETED`` wrote its sentinel, but the orchestrating node's
 72            attribute cache can lag the compute node by up to the mount's ``acdirmax``
 73            (typically 60 s); this must comfortably exceed that. It only bounds the wait on a
 74            ``COMPLETED``-but-not-yet-visible task — a task is resolved the moment its
 75            sentinel appears, so a generous value does not slow down successful runs.
 76    """
 77
 78    partition: Optional[str] = None
 79    time: Optional[str] = None
 80    mem: Optional[str] = None
 81    cpus_per_task: int = 1
 82    gpus: int = 0
 83    account: Optional[str] = None
 84    qos: Optional[str] = None
 85    constraint: Optional[str] = None
 86    shebang: Optional[str] = None
 87    max_array_size: Optional[int] = None
 88    latency_wait: float = 120.0
 89
 90    @classmethod
 91    def load(cls, path: Optional[str] = None, **overrides: Any) -> "SlurmConfig":
 92        """Build a config from the user config file, with explicit overrides taking precedence.
 93
 94        Precedence is ``overrides`` > config file ``[slurm]`` section > dataclass defaults.
 95        This is the way to combine the stored user defaults with per-run tweaks; constructing
 96        ``SlurmConfig(...)`` directly does **not** consult the file (an explicitly built config
 97        is used verbatim).
 98
 99        Args:
100            path: Path to the config file. ``None`` resolves the default location (see
101                :func:`config_file_path`). A missing file is treated as empty.
102            **overrides: Field values that override the file defaults. Each name must be a
103                valid ``SlurmConfig`` field.
104
105        Returns:
106            A :class:`SlurmConfig` with file defaults filled in and overrides applied.
107
108        Raises:
109            ValueError: If the file or ``overrides`` contain an unknown field name.
110        """
111        _validate_keys(overrides, "load() overrides")
112        merged: Dict[str, Any] = dict(_read_slurm_defaults(path))
113        merged.update(overrides)
114        return cls(**merged)

Configuration for the slurm runner.

Inherits poll_interval, tmp_root and python_executable from RunnerConfig. For slurm, tmp_root is required and must point at a shared filesystem visible to all compute nodes (not node-local /tmp), and num_workers (passed to the op / run) is interpreted as the array throttle — the maximum number of tasks allowed to run concurrently — independently of how many tasks the work is partitioned into.

Cluster-specific values (partition, account, constraint, tmp_root, ...) can be stored once in a user config file and reused as defaults; see load() and write_slurm_config().

Attributes: partition: The slurm partition to submit to. time: The per-task time limit (slurm time format, e.g. "01:00:00"). mem: The per-task memory limit (e.g. "8G"). cpus_per_task: Number of CPUs requested per task. gpus: Number of GPUs requested per task (emitted as --gpus only when > 0). account: The accounting project to charge. qos: The quality-of-service to request. constraint: A node feature constraint. shebang: Optional environment setup for the generated job script. If given, its first line must be an interpreter line (starting with #!) which is placed at the top of the script; any remaining lines are emitted as an activation preamble after the #SBATCH directives (so the directives are still honoured). The preamble is for making the package importable on the node (e.g. module load / LD_LIBRARY_PATH exports), not for choosing the interpreter: the worker is always launched with the absolute python_executable (defaulting to the submitting sys.executable). None uses #!/bin/bash and that absolute interpreter, which needs no activation when the env lives on a shared filesystem. Example::

        shebang = "#!/bin/bash\nmodule load gcc\nexport LD_LIBRARY_PATH=...:$LD_LIBRARY_PATH"

max_array_size: Override for the maximum number of array tasks per job. ``None``
    queries the cluster's ``MaxArraySize`` (falling back to a safe default). A run
    partitioned into more tasks than this is rejected up front with a clear error.
latency_wait: Seconds to wait for a finished task's ``.success`` sentinel to become
    visible on a shared (NFS) filesystem before giving up on it. A task that the
    scheduler reports ``COMPLETED`` wrote its sentinel, but the orchestrating node's
    attribute cache can lag the compute node by up to the mount's ``acdirmax``
    (typically 60 s); this must comfortably exceed that. It only bounds the wait on a
    ``COMPLETED``-but-not-yet-visible task — a task is resolved the moment its
    sentinel appears, so a generous value does not slow down successful runs.
SlurmConfig( poll_interval: float = 10.0, tmp_root: Optional[str] = None, python_executable: Optional[str] = None, partition: Optional[str] = None, time: Optional[str] = None, mem: Optional[str] = None, cpus_per_task: int = 1, gpus: int = 0, account: Optional[str] = None, qos: Optional[str] = None, constraint: Optional[str] = None, shebang: Optional[str] = None, max_array_size: Optional[int] = None, latency_wait: float = 120.0)
partition: Optional[str] = None
time: Optional[str] = None
mem: Optional[str] = None
cpus_per_task: int = 1
gpus: int = 0
account: Optional[str] = None
qos: Optional[str] = None
constraint: Optional[str] = None
shebang: Optional[str] = None
max_array_size: Optional[int] = None
latency_wait: float = 120.0
@classmethod
def load( cls, path: Optional[str] = None, **overrides: Any) -> SlurmConfig:
 90    @classmethod
 91    def load(cls, path: Optional[str] = None, **overrides: Any) -> "SlurmConfig":
 92        """Build a config from the user config file, with explicit overrides taking precedence.
 93
 94        Precedence is ``overrides`` > config file ``[slurm]`` section > dataclass defaults.
 95        This is the way to combine the stored user defaults with per-run tweaks; constructing
 96        ``SlurmConfig(...)`` directly does **not** consult the file (an explicitly built config
 97        is used verbatim).
 98
 99        Args:
100            path: Path to the config file. ``None`` resolves the default location (see
101                :func:`config_file_path`). A missing file is treated as empty.
102            **overrides: Field values that override the file defaults. Each name must be a
103                valid ``SlurmConfig`` field.
104
105        Returns:
106            A :class:`SlurmConfig` with file defaults filled in and overrides applied.
107
108        Raises:
109            ValueError: If the file or ``overrides`` contain an unknown field name.
110        """
111        _validate_keys(overrides, "load() overrides")
112        merged: Dict[str, Any] = dict(_read_slurm_defaults(path))
113        merged.update(overrides)
114        return cls(**merged)

Build a config from the user config file, with explicit overrides taking precedence.

Precedence is overrides > config file [slurm] section > dataclass defaults. This is the way to combine the stored user defaults with per-run tweaks; constructing SlurmConfig(...) directly does not consult the file (an explicitly built config is used verbatim).

Args: path: Path to the config file. None resolves the default location (see config_file_path()). A missing file is treated as empty. **overrides: Field values that override the file defaults. Each name must be a valid SlurmConfig field.

Returns: A SlurmConfig with file defaults filled in and overrides applied.

Raises: ValueError: If the file or overrides contain an unknown field name.

def config_file_path(path: Optional[str] = None) -> pathlib._local.Path:
117def config_file_path(path: Optional[str] = None) -> Path:
118    """Resolve the path to the user config file.
119
120    Resolution order: an explicit ``path`` argument, then the ``BIOIMAGE_PY_CONFIG``
121    environment variable, then ``$XDG_CONFIG_HOME/bioimage-py/config.toml`` (falling back to
122    ``~/.config/bioimage-py/config.toml``).
123
124    Args:
125        path: An explicit path that short-circuits the resolution. ``None`` resolves the
126            default location.
127
128    Returns:
129        The resolved path (not guaranteed to exist).
130    """
131    if path is not None:
132        return Path(path).expanduser()
133    env = os.environ.get("BIOIMAGE_PY_CONFIG")
134    if env:
135        return Path(env).expanduser()
136    base = os.environ.get("XDG_CONFIG_HOME") or os.path.join(os.path.expanduser("~"), ".config")
137    return Path(base) / "bioimage-py" / "config.toml"

Resolve the path to the user config file.

Resolution order: an explicit path argument, then the BIOIMAGE_PY_CONFIG environment variable, then $XDG_CONFIG_HOME/bioimage-py/config.toml (falling back to ~/.config/bioimage-py/config.toml).

Args: path: An explicit path that short-circuits the resolution. None resolves the default location.

Returns: The resolved path (not guaranteed to exist).

def write_slurm_config( path: Optional[str] = None, *, replace: bool = False, **fields: Any) -> str:
179def write_slurm_config(path: Optional[str] = None, *, replace: bool = False, **fields: Any) -> str:
180    """Create or update the user config file with default slurm settings.
181
182    This is the supported way to set up cluster-specific defaults (partition, account,
183    constraint, ``tmp_root``, ...) instead of editing the file by hand. Provided fields are
184    merged into the existing ``[slurm]`` table by default (so the file can be built up over
185    several calls); ``None`` values are skipped, and any other top-level tables in the file
186    (reserved for future named profiles) are preserved.
187
188    Args:
189        path: Path to write to. ``None`` resolves the default location (see
190            :func:`config_file_path`); the parent directory is created if needed.
191        replace: If ``True``, replace the whole ``[slurm]`` table instead of merging into it.
192        **fields: Default field values to store. Each name must be a valid ``SlurmConfig``
193            field.
194
195    Returns:
196        The path that was written.
197
198    Raises:
199        ValueError: If ``fields`` contains an unknown field name.
200    """
201    _validate_keys(fields, "write_slurm_config()")
202    provided = {k: v for k, v in fields.items() if v is not None}
203    fp = config_file_path(path)
204    data = _parse_toml(fp)
205    section = {} if replace else dict(data.get("slurm", {}))
206    section.update(provided)
207    data["slurm"] = section
208
209    import tomli_w  # local import: only the writer needs the (optional-at-runtime) dependency.
210
211    fp.parent.mkdir(parents=True, exist_ok=True)
212    with open(fp, "wb") as f:
213        tomli_w.dump(data, f)
214    return str(fp)

Create or update the user config file with default slurm settings.

This is the supported way to set up cluster-specific defaults (partition, account, constraint, tmp_root, ...) instead of editing the file by hand. Provided fields are merged into the existing [slurm] table by default (so the file can be built up over several calls); None values are skipped, and any other top-level tables in the file (reserved for future named profiles) are preserved.

Args: path: Path to write to. None resolves the default location (see config_file_path()); the parent directory is created if needed. replace: If True, replace the whole [slurm] table instead of merging into it. **fields: Default field values to store. Each name must be a valid SlurmConfig field.

Returns: The path that was written.

Raises: ValueError: If fields contains an unknown field name.

def get_runner( job_type: str, config: Optional[RunnerConfig] = None) -> Runner:
18def get_runner(job_type: str, config: Optional[RunnerConfig] = None) -> Runner:
19    """Return a runner for the given job type.
20
21    Args:
22        job_type: One of ``"local"``, ``"subprocess"`` or ``"slurm"``.
23        config: Optional runner configuration.
24
25    Returns:
26        A :class:`~bioimage_py.runner.base.Runner` instance.
27
28    Raises:
29        ValueError: If ``job_type`` is unknown.
30    """
31    try:
32        cls = _RUNNERS[job_type.lower()]
33    except KeyError:
34        raise ValueError(f"Unknown job_type {job_type!r}; expected one of {sorted(_RUNNERS)}.")
35    return cls(config)

Return a runner for the given job type.

Args: job_type: One of "local", "subprocess" or "slurm". config: Optional runner configuration.

Returns: A ~bioimage_py.runner.base.Runner instance.

Raises: ValueError: If job_type is unknown.

def run_block( function: Callable[[Union[bioimage_cpp._core.Block, bioimage_cpp._core.BlockWithHalo], Sequence[bioimage_py.sources.Source], Sequence[bioimage_py.sources.Source], Optional[bioimage_py.sources.Source]], Any], blocking: bioimage_cpp._core.Blocking, block_id: int, inputs: Sequence[bioimage_py.sources.Source], outputs: Sequence[bioimage_py.sources.Source], mask: Optional[bioimage_py.sources.Source], halo: Optional[Sequence[int]]) -> Any:
35def run_block(function: ComputeFn, blocking: Blocking, block_id: int,
36              inputs: Sequence[Source], outputs: Sequence[Source],
37              mask: Optional[Source], halo: Optional[Sequence[int]]) -> Any:
38    """Run the per-block ``function`` for a single block.
39
40    This is the single per-block code path shared by every backend (local and
41    distributed), which is what guarantees identical results across backends.
42
43    Args:
44        function: The per-block function ``function(block, inputs, outputs, mask)``.
45        blocking: A ``bioimage_cpp.utils.Blocking``.
46        block_id: The block id to process.
47        inputs: Tuple of opened input sources.
48        outputs: Tuple of opened output sources.
49        mask: An opened mask source or ``None``.
50        halo: A per-axis halo list, or ``None`` for no halo.
51
52    Returns:
53        The per-block return value of ``function`` (may be ``None``).
54    """
55    if halo is None:
56        block = blocking.get_block(int(block_id))
57    else:
58        block = blocking.get_block_with_halo(int(block_id), [int(h) for h in halo])
59    return function(block, inputs, outputs, mask)

Run the per-block function for a single block.

This is the single per-block code path shared by every backend (local and distributed), which is what guarantees identical results across backends.

Args: function: The per-block function function(block, inputs, outputs, mask). blocking: A bioimage_cpp.utils.Blocking. block_id: The block id to process. inputs: Tuple of opened input sources. outputs: Tuple of opened output sources. mask: An opened mask source or None. halo: A per-axis halo list, or None for no halo.

Returns: The per-block return value of function (may be None).