bioimage_py.runner
Runner implementations: local, subprocess (distributed protocol), and slurm (stub).
1"""Runner implementations: local, subprocess (distributed protocol), and slurm (stub).""" 2from .base import LocalRunner, Runner, RunnerError, run_block 3from .config import RunnerConfig, SlurmConfig, config_file_path, write_slurm_config 4from .distributed import SlurmRunner, SubprocessRunner 5from .factory import get_runner 6 7__all__ = [ 8 "Runner", 9 "LocalRunner", 10 "SubprocessRunner", 11 "SlurmRunner", 12 "RunnerError", 13 "RunnerConfig", 14 "SlurmConfig", 15 "config_file_path", 16 "write_slurm_config", 17 "get_runner", 18 "run_block", 19]
62class Runner(ABC): 63 """Abstract runner. Subclasses implement :meth:`_execute` for a specific backend.""" 64 65 def __init__(self, config: Optional[RunnerConfig] = None): 66 self.config = config or RunnerConfig() 67 68 def run( 69 self, 70 function: ComputeFn, 71 inputs: Sequence[SourceLike], 72 outputs: Sequence[SourceLike] = (), 73 *, 74 block_shape: Optional[Tuple[int, ...]] = None, 75 halo: Optional[Sequence[int]] = None, 76 mask: Optional[SourceLike] = None, 77 num_workers: int = 1, 78 block_ids: Optional[Sequence[int]] = None, 79 has_return_val: bool = False, 80 name: str = "", 81 roi: Optional[Tuple[slice, ...]] = None, 82 pre_cleanup: Optional[Callable[[str], None]] = None, 83 resume_from: Optional[str] = None, 84 ) -> Optional[list]: 85 """Run ``function`` block-wise over the inputs/outputs. 86 87 Args: 88 function: Per-block function ``function(block, inputs, outputs, mask)``. 89 inputs: Input source-like objects (read). 90 outputs: Output source-like objects (written in place). 91 block_shape: Block shape; defaults to the domain source's chunks. 92 halo: Per-axis halo; if given, ``function`` receives a ``BlockWithHalo``. 93 mask: Optional binary mask source. 94 num_workers: Number of parallel workers / tasks. 95 block_ids: Restrict processing to these blocks (for re-running failures). 96 has_return_val: Whether ``function`` returns a value to collect. 97 name: A short name for progress display. 98 roi: Region of interest to restrict the blocking to. 99 pre_cleanup: Optional callback ``pre_cleanup(tmp_folder)`` invoked on the 100 orchestrating process with the job temp folder right before it is deleted 101 (distributed backends only, success path only). Use it to read out anything 102 worth keeping from the temp folder (e.g. the per-task timing files under 103 ``tmp_folder/timings/``) before cleanup. Ignored by the local runner, which 104 has no temp folder. 105 resume_from: Distributed backends only. Path to the preserved temp folder of a 106 failed run (``RunnerError.tmp_folder``). Re-runs only the blocks that did not 107 complete and merges them with the already-completed ones, so the result is 108 correct and complete. The run is resumed from the serialized payload, so 109 ``function``/``inputs``/``outputs``/``block_shape``/... from this call are 110 **ignored** -- pass ``resume_from`` to *finish the same call*, not to start a 111 new one. Mutually exclusive with ``block_ids``. 112 113 Returns: 114 The list of per-block return values (in ``block_ids`` order) if 115 ``has_return_val``, else ``None``. 116 """ 117 if resume_from is not None: 118 if block_ids is not None: 119 raise ValueError("resume_from and block_ids are mutually exclusive; resume_from " 120 "re-runs the original partition's un-done blocks.") 121 return self._resume_entry(resume_from, name=name, pre_cleanup=pre_cleanup) 122 123 inputs = [as_source(i) for i in inputs] 124 outputs = [as_source(o) for o in outputs] 125 mask_source = as_source(mask) if mask is not None else None 126 127 domain = inputs[0] if inputs else (outputs[0] if outputs else None) 128 if domain is None: 129 raise ValueError("run() requires at least one input or output source.") 130 131 # Shape consistency: all inputs and the mask must match the domain shape. 132 dom_shape = tuple(domain.shape) 133 for src in inputs + ([mask_source] if mask_source is not None else []): 134 if tuple(src.shape) != dom_shape: 135 raise ValueError( 136 f"Shape mismatch: source with shape {src.shape} does not match the " 137 f"domain shape {domain.shape}." 138 ) 139 # Outputs may carry a leading channel axis, but their trailing spatial dims must match 140 # the domain so the per-block roi indexes them consistently. 141 for out in outputs: 142 out_shape = tuple(out.shape) 143 if out_shape[-len(dom_shape):] != dom_shape: 144 raise ValueError( 145 f"Output shape {out_shape} is incompatible with the domain shape " 146 f"{dom_shape}: its trailing dimensions must match the domain." 147 ) 148 149 block_shape = derive_block_shape(domain, block_shape) 150 halo_n = normalize_halo(halo, domain.ndim) if halo is not None else None 151 self._validate_write_safety(outputs, block_shape) 152 153 blocking = get_blocking(domain.shape, block_shape, roi) 154 if block_ids is None: 155 block_ids = list(range(int(blocking.number_of_blocks))) 156 else: 157 block_ids = [int(b) for b in block_ids] 158 159 results = self._execute( 160 function=function, inputs=inputs, outputs=outputs, mask=mask_source, 161 blocking=blocking, block_ids=block_ids, halo=halo_n, 162 has_return_val=has_return_val, num_workers=num_workers, name=name, 163 shape=tuple(domain.shape), block_shape=block_shape, roi=roi, 164 pre_cleanup=pre_cleanup, 165 ) 166 return results if has_return_val else None 167 168 def map( 169 self, 170 function: Callable[[int], Any], 171 n_items: Optional[int] = None, 172 *, 173 item_ids: Optional[Sequence[int]] = None, 174 num_workers: int = 1, 175 has_return_val: bool = True, 176 name: str = "", 177 pre_cleanup: Optional[Callable[[str], None]] = None, 178 resume_from: Optional[str] = None, 179 ) -> Optional[list]: 180 """Map ``function(index)`` over item indices in parallel, across any backend. 181 182 Unlike :meth:`run`, this is not block-wise: there is no domain, blocking, sources or 183 mask. ``function`` takes a single integer index and returns its result; it must carry 184 whatever data it needs in its (cloudpickled) closure — e.g. a `SourceSpec` it reopens 185 and a file path it reads. This is the per-item counterpart used by per-object 186 workflows. 187 188 Args: 189 function: The per-item function ``function(index) -> result``. 190 n_items: The number of items; indices ``0 .. n_items - 1`` are processed. Ignored 191 if ``item_ids`` is given. 192 item_ids: Explicit item indices to process (e.g. to re-run failures). Defaults to 193 ``range(n_items)``. 194 num_workers: Number of parallel workers / tasks. 195 has_return_val: Whether ``function`` returns a value to collect. 196 name: A short name for progress display. 197 pre_cleanup: Optional ``pre_cleanup(tmp_folder)`` callback (distributed backends 198 only); see :meth:`run`. 199 resume_from: Distributed backends only; the preserved temp folder of a failed run 200 (see :meth:`run`). Re-runs only the incomplete items and merges with those 201 already done. Mutually exclusive with ``item_ids``. 202 203 Returns: 204 The list of per-item return values (in ``item_ids`` order) if ``has_return_val``, 205 else ``None``. 206 207 Raises: 208 ValueError: If neither ``n_items`` nor ``item_ids`` is given. 209 """ 210 if resume_from is not None: 211 if item_ids is not None: 212 raise ValueError("resume_from and item_ids are mutually exclusive; resume_from " 213 "re-runs the original partition's un-done items.") 214 return self._resume_entry(resume_from, name=name, pre_cleanup=pre_cleanup) 215 216 if item_ids is None: 217 if n_items is None: 218 raise ValueError("map() requires either n_items or item_ids.") 219 item_ids = list(range(int(n_items))) 220 else: 221 item_ids = [int(i) for i in item_ids] 222 223 results = self._execute_map( 224 function=function, item_ids=item_ids, has_return_val=has_return_val, 225 num_workers=num_workers, name=name, pre_cleanup=pre_cleanup, 226 ) 227 return results if has_return_val else None 228 229 def _resume_entry(self, tmp_folder: str, *, name: str, 230 pre_cleanup: Optional[Callable[[str], None]]) -> Optional[list]: 231 """Resume a failed run from its temp folder; overridden by distributed runners. 232 233 The local runner keeps no temp folder, so resuming is not possible here. 234 """ 235 raise ValueError( 236 "resume_from is only valid for distributed backends (subprocess/slurm); the local " 237 "runner keeps no temp folder. Re-run the operation to recompute in-process " 238 "(optionally with block_ids=err.failed_block_ids for a subset)." 239 ) 240 241 @staticmethod 242 def _validate_write_safety(outputs: Sequence[Source], block_shape: Sequence[int]) -> None: 243 """Conservative guard: chunked output write-blocks must be a multiple of chunks. 244 245 This prevents two blocks from concurrently writing the same chunk (which would 246 corrupt it). Auto-derivation of a safe block shape is a flagged TODO. 247 248 Sharded outputs (``out.shards is not None``) are exempt: for them the atomic write 249 unit is the shard, and they are made safe by shard-exclusive routing (each shard's 250 blocks go to one worker, run sequentially) rather than by constraining the block 251 shape — see :func:`bioimage_py.util.group_blocks_by_shard`. 252 253 ``block_shape`` is spatial-only, but an output may carry a leading channel axis (its 254 ``chunks`` then have one extra leading entry); the block shape is aligned against the 255 trailing (spatial) chunk axes, since the channel axis is fully written by every block. 256 """ 257 for out in outputs: 258 if out.shards is not None: 259 continue 260 chunks = out.chunks 261 if chunks is None or len(chunks) < len(block_shape): 262 continue 263 spatial_chunks = tuple(chunks[-len(block_shape):]) 264 for bs, ch in zip(block_shape, spatial_chunks): 265 if bs % ch != 0: 266 raise ValueError( 267 f"Unsafe block shape for writing: {tuple(block_shape)} is not a multiple " 268 f"of the output (spatial) chunk shape {spatial_chunks}. Concurrent writes " 269 "could corrupt shared chunks; use a block shape that is a chunk multiple." 270 ) 271 272 @abstractmethod 273 def _execute( 274 self, 275 *, 276 function: ComputeFn, 277 inputs: Sequence[Source], 278 outputs: Sequence[Source], 279 mask: Optional[Source], 280 blocking: Blocking, 281 block_ids: Sequence[int], 282 halo: Optional[Sequence[int]], 283 has_return_val: bool, 284 num_workers: int, 285 name: str, 286 shape: Tuple[int, ...], 287 block_shape: Tuple[int, ...], 288 roi: Optional[Tuple[slice, ...]], 289 pre_cleanup: Optional[Callable[[str], None]] = None, 290 ) -> List[Any]: 291 """Execute the per-block function over ``block_ids`` and return ordered results.""" 292 ... 293 294 @abstractmethod 295 def _execute_map( 296 self, 297 *, 298 function: Callable[[int], Any], 299 item_ids: Sequence[int], 300 has_return_val: bool, 301 num_workers: int, 302 name: str, 303 pre_cleanup: Optional[Callable[[str], None]] = None, 304 ) -> List[Any]: 305 """Execute ``function(index)`` over ``item_ids`` and return ordered results.""" 306 ...
Abstract runner. Subclasses implement _execute() for a specific backend.
68 def run( 69 self, 70 function: ComputeFn, 71 inputs: Sequence[SourceLike], 72 outputs: Sequence[SourceLike] = (), 73 *, 74 block_shape: Optional[Tuple[int, ...]] = None, 75 halo: Optional[Sequence[int]] = None, 76 mask: Optional[SourceLike] = None, 77 num_workers: int = 1, 78 block_ids: Optional[Sequence[int]] = None, 79 has_return_val: bool = False, 80 name: str = "", 81 roi: Optional[Tuple[slice, ...]] = None, 82 pre_cleanup: Optional[Callable[[str], None]] = None, 83 resume_from: Optional[str] = None, 84 ) -> Optional[list]: 85 """Run ``function`` block-wise over the inputs/outputs. 86 87 Args: 88 function: Per-block function ``function(block, inputs, outputs, mask)``. 89 inputs: Input source-like objects (read). 90 outputs: Output source-like objects (written in place). 91 block_shape: Block shape; defaults to the domain source's chunks. 92 halo: Per-axis halo; if given, ``function`` receives a ``BlockWithHalo``. 93 mask: Optional binary mask source. 94 num_workers: Number of parallel workers / tasks. 95 block_ids: Restrict processing to these blocks (for re-running failures). 96 has_return_val: Whether ``function`` returns a value to collect. 97 name: A short name for progress display. 98 roi: Region of interest to restrict the blocking to. 99 pre_cleanup: Optional callback ``pre_cleanup(tmp_folder)`` invoked on the 100 orchestrating process with the job temp folder right before it is deleted 101 (distributed backends only, success path only). Use it to read out anything 102 worth keeping from the temp folder (e.g. the per-task timing files under 103 ``tmp_folder/timings/``) before cleanup. Ignored by the local runner, which 104 has no temp folder. 105 resume_from: Distributed backends only. Path to the preserved temp folder of a 106 failed run (``RunnerError.tmp_folder``). Re-runs only the blocks that did not 107 complete and merges them with the already-completed ones, so the result is 108 correct and complete. The run is resumed from the serialized payload, so 109 ``function``/``inputs``/``outputs``/``block_shape``/... from this call are 110 **ignored** -- pass ``resume_from`` to *finish the same call*, not to start a 111 new one. Mutually exclusive with ``block_ids``. 112 113 Returns: 114 The list of per-block return values (in ``block_ids`` order) if 115 ``has_return_val``, else ``None``. 116 """ 117 if resume_from is not None: 118 if block_ids is not None: 119 raise ValueError("resume_from and block_ids are mutually exclusive; resume_from " 120 "re-runs the original partition's un-done blocks.") 121 return self._resume_entry(resume_from, name=name, pre_cleanup=pre_cleanup) 122 123 inputs = [as_source(i) for i in inputs] 124 outputs = [as_source(o) for o in outputs] 125 mask_source = as_source(mask) if mask is not None else None 126 127 domain = inputs[0] if inputs else (outputs[0] if outputs else None) 128 if domain is None: 129 raise ValueError("run() requires at least one input or output source.") 130 131 # Shape consistency: all inputs and the mask must match the domain shape. 132 dom_shape = tuple(domain.shape) 133 for src in inputs + ([mask_source] if mask_source is not None else []): 134 if tuple(src.shape) != dom_shape: 135 raise ValueError( 136 f"Shape mismatch: source with shape {src.shape} does not match the " 137 f"domain shape {domain.shape}." 138 ) 139 # Outputs may carry a leading channel axis, but their trailing spatial dims must match 140 # the domain so the per-block roi indexes them consistently. 141 for out in outputs: 142 out_shape = tuple(out.shape) 143 if out_shape[-len(dom_shape):] != dom_shape: 144 raise ValueError( 145 f"Output shape {out_shape} is incompatible with the domain shape " 146 f"{dom_shape}: its trailing dimensions must match the domain." 147 ) 148 149 block_shape = derive_block_shape(domain, block_shape) 150 halo_n = normalize_halo(halo, domain.ndim) if halo is not None else None 151 self._validate_write_safety(outputs, block_shape) 152 153 blocking = get_blocking(domain.shape, block_shape, roi) 154 if block_ids is None: 155 block_ids = list(range(int(blocking.number_of_blocks))) 156 else: 157 block_ids = [int(b) for b in block_ids] 158 159 results = self._execute( 160 function=function, inputs=inputs, outputs=outputs, mask=mask_source, 161 blocking=blocking, block_ids=block_ids, halo=halo_n, 162 has_return_val=has_return_val, num_workers=num_workers, name=name, 163 shape=tuple(domain.shape), block_shape=block_shape, roi=roi, 164 pre_cleanup=pre_cleanup, 165 ) 166 return results if has_return_val else None
Run function block-wise over the inputs/outputs.
Args:
function: Per-block function function(block, inputs, outputs, mask).
inputs: Input source-like objects (read).
outputs: Output source-like objects (written in place).
block_shape: Block shape; defaults to the domain source's chunks.
halo: Per-axis halo; if given, function receives a BlockWithHalo.
mask: Optional binary mask source.
num_workers: Number of parallel workers / tasks.
block_ids: Restrict processing to these blocks (for re-running failures).
has_return_val: Whether function returns a value to collect.
name: A short name for progress display.
roi: Region of interest to restrict the blocking to.
pre_cleanup: Optional callback pre_cleanup(tmp_folder) invoked on the
orchestrating process with the job temp folder right before it is deleted
(distributed backends only, success path only). Use it to read out anything
worth keeping from the temp folder (e.g. the per-task timing files under
tmp_folder/timings/) before cleanup. Ignored by the local runner, which
has no temp folder.
resume_from: Distributed backends only. Path to the preserved temp folder of a
failed run (RunnerError.tmp_folder). Re-runs only the blocks that did not
complete and merges them with the already-completed ones, so the result is
correct and complete. The run is resumed from the serialized payload, so
function/inputs/outputs/block_shape/... from this call are
ignored -- pass resume_from to finish the same call, not to start a
new one. Mutually exclusive with block_ids.
Returns:
The list of per-block return values (in block_ids order) if
has_return_val, else None.
168 def map( 169 self, 170 function: Callable[[int], Any], 171 n_items: Optional[int] = None, 172 *, 173 item_ids: Optional[Sequence[int]] = None, 174 num_workers: int = 1, 175 has_return_val: bool = True, 176 name: str = "", 177 pre_cleanup: Optional[Callable[[str], None]] = None, 178 resume_from: Optional[str] = None, 179 ) -> Optional[list]: 180 """Map ``function(index)`` over item indices in parallel, across any backend. 181 182 Unlike :meth:`run`, this is not block-wise: there is no domain, blocking, sources or 183 mask. ``function`` takes a single integer index and returns its result; it must carry 184 whatever data it needs in its (cloudpickled) closure — e.g. a `SourceSpec` it reopens 185 and a file path it reads. This is the per-item counterpart used by per-object 186 workflows. 187 188 Args: 189 function: The per-item function ``function(index) -> result``. 190 n_items: The number of items; indices ``0 .. n_items - 1`` are processed. Ignored 191 if ``item_ids`` is given. 192 item_ids: Explicit item indices to process (e.g. to re-run failures). Defaults to 193 ``range(n_items)``. 194 num_workers: Number of parallel workers / tasks. 195 has_return_val: Whether ``function`` returns a value to collect. 196 name: A short name for progress display. 197 pre_cleanup: Optional ``pre_cleanup(tmp_folder)`` callback (distributed backends 198 only); see :meth:`run`. 199 resume_from: Distributed backends only; the preserved temp folder of a failed run 200 (see :meth:`run`). Re-runs only the incomplete items and merges with those 201 already done. Mutually exclusive with ``item_ids``. 202 203 Returns: 204 The list of per-item return values (in ``item_ids`` order) if ``has_return_val``, 205 else ``None``. 206 207 Raises: 208 ValueError: If neither ``n_items`` nor ``item_ids`` is given. 209 """ 210 if resume_from is not None: 211 if item_ids is not None: 212 raise ValueError("resume_from and item_ids are mutually exclusive; resume_from " 213 "re-runs the original partition's un-done items.") 214 return self._resume_entry(resume_from, name=name, pre_cleanup=pre_cleanup) 215 216 if item_ids is None: 217 if n_items is None: 218 raise ValueError("map() requires either n_items or item_ids.") 219 item_ids = list(range(int(n_items))) 220 else: 221 item_ids = [int(i) for i in item_ids] 222 223 results = self._execute_map( 224 function=function, item_ids=item_ids, has_return_val=has_return_val, 225 num_workers=num_workers, name=name, pre_cleanup=pre_cleanup, 226 ) 227 return results if has_return_val else None
Map function(index) over item indices in parallel, across any backend.
Unlike run(), this is not block-wise: there is no domain, blocking, sources or
mask. function takes a single integer index and returns its result; it must carry
whatever data it needs in its (cloudpickled) closure — e.g. a SourceSpec it reopens
and a file path it reads. This is the per-item counterpart used by per-object
workflows.
Args:
function: The per-item function function(index) -> result.
n_items: The number of items; indices 0 .. n_items - 1 are processed. Ignored
if item_ids is given.
item_ids: Explicit item indices to process (e.g. to re-run failures). Defaults to
range(n_items).
num_workers: Number of parallel workers / tasks.
has_return_val: Whether function returns a value to collect.
name: A short name for progress display.
pre_cleanup: Optional pre_cleanup(tmp_folder) callback (distributed backends
only); see run().
resume_from: Distributed backends only; the preserved temp folder of a failed run
(see run()). Re-runs only the incomplete items and merges with those
already done. Mutually exclusive with item_ids.
Returns:
The list of per-item return values (in item_ids order) if has_return_val,
else None.
Raises:
ValueError: If neither n_items nor item_ids is given.
309class LocalRunner(Runner): 310 """Run blocks locally with a thread pool.""" 311 312 def _execute( 313 self, 314 *, 315 function: ComputeFn, 316 inputs: Sequence[Source], 317 outputs: Sequence[Source], 318 mask: Optional[Source], 319 blocking: Blocking, 320 block_ids: Sequence[int], 321 halo: Optional[Sequence[int]], 322 has_return_val: bool, 323 num_workers: int, 324 name: str, 325 shape: Tuple[int, ...], 326 block_shape: Tuple[int, ...], 327 roi: Optional[Tuple[slice, ...]], 328 pre_cleanup: Optional[Callable[[str], None]] = None, 329 ) -> List[Any]: 330 """Run the blocks in a thread pool, collecting results and re-raising failures. 331 332 ``pre_cleanup`` is accepted for interface parity but ignored: the local runner has 333 no temp folder (and no per-worker concept) to read out before returning. 334 """ 335 def call_one(bid: int) -> Any: 336 return run_block(function, blocking, bid, inputs, outputs, mask, halo) 337 338 # For sharded outputs, group blocks so each shard is written by a single thread 339 # (a group runs sequentially) and never corrupted by concurrent writes; otherwise 340 # each block is its own group, reproducing the plain one-future-per-block path. 341 groups = group_blocks_by_shard(blocking, outputs, block_ids) 342 if groups is None: 343 groups = [[int(b)] for b in block_ids] 344 else: 345 maybe_warn_imbalance([len(g) for g in groups], num_workers, len(groups), name) 346 return self._run_pool(groups, call_one, num_workers, name, unit="block") 347 348 def _execute_map( 349 self, 350 *, 351 function: Callable[[int], Any], 352 item_ids: Sequence[int], 353 has_return_val: bool, 354 num_workers: int, 355 name: str, 356 pre_cleanup: Optional[Callable[[str], None]] = None, 357 ) -> List[Any]: 358 """Run ``function(index)`` over ``item_ids`` in a thread pool (``pre_cleanup`` ignored).""" 359 groups = [[int(i)] for i in item_ids] 360 return self._run_pool(groups, lambda i: function(int(i)), num_workers, name, unit="item") 361 362 @staticmethod 363 def _run_pool(groups: Sequence[Sequence[int]], call_one: Callable[[int], Any], 364 num_workers: int, name: str, *, unit: str = "block") -> List[Any]: 365 """Run ``call_one(id)`` for each id in a thread pool, ordered, re-raising failures. 366 367 The schedulable unit is a *group*: the ids in a group are run sequentially within one 368 worker thread, while distinct groups run concurrently. Singleton groups reproduce the 369 one-future-per-id behavior; multi-id groups serialize same-shard writes (see 370 :func:`bioimage_py.util.group_blocks_by_shard`). 371 372 Args: 373 groups: The work groups; each is a list of ids (block ids or item indices) run 374 sequentially. Results are returned in flattened ``groups`` order. 375 call_one: The per-id callable returning that id's result. 376 num_workers: Number of worker threads. 377 name: A short name for the progress bar (disabled when empty). 378 unit: The noun used in the failure message ("block" or "item"). 379 380 Returns: 381 The per-id results in flattened ``groups`` order. 382 383 Raises: 384 RunnerError: If any id fails; the failed ids are attached for re-running. When an 385 id in a group fails, the remaining (un-run) ids of that group are reported as 386 failed too, since later same-shard writes cannot safely proceed. 387 """ 388 groups = [list(g) for g in groups] 389 flat_ids = [i for g in groups for i in g] 390 result_by_id: Dict[int, Any] = {} 391 failed: List[int] = [] 392 first_error: Optional[BaseException] = None 393 394 @threadpool_limits.wrap(limits=1) 395 def _run_group(group: List[int]): 396 local: Dict[int, Any] = {} 397 local_failed: List[int] = [] 398 err: Optional[BaseException] = None 399 for k, bid in enumerate(group): 400 try: 401 local[bid] = call_one(bid) 402 except Exception as error: # noqa: BLE001 - we re-raise as RunnerError 403 err = error 404 local_failed = list(group[k:]) 405 break 406 return local, local_failed, err 407 408 with futures.ThreadPoolExecutor(max(1, int(num_workers))) as tp: 409 fut_to_group = {tp.submit(_run_group, g): g for g in groups} 410 with tqdm(total=len(flat_ids), desc=name or None, disable=not name) as pbar: 411 for fut in futures.as_completed(fut_to_group): 412 group = fut_to_group[fut] 413 local, local_failed, err = fut.result() 414 result_by_id.update(local) 415 if local_failed: 416 failed.extend(local_failed) 417 if first_error is None: 418 first_error = err 419 pbar.update(len(group)) 420 421 if failed: 422 failed = sorted(set(failed)) 423 raise RunnerError( 424 f"{len(failed)} {unit}(s) failed in '{name or 'run'}': " 425 f"{failed[:10]}. First error: {first_error!r}", 426 failed_block_ids=failed, 427 ) 428 return [result_by_id[i] for i in flat_ids]
Run blocks locally with a thread pool.
435class SubprocessRunner(_DistributedRunner): 436 """Distributed runner that launches each task as a local subprocess. 437 438 Exercises the full distributed protocol (cloudpickle payload, generated harness, 439 result/sentinel files, ``block_ids`` re-run) without a scheduler. 440 """ 441 442 def _launch_and_wait(self, tmp: str, n_tasks: int, num_workers: int, name: str, 443 task_ids: Optional[Sequence[int]] = None) -> None: 444 """Run each task as a local subprocess, up to ``num_workers`` concurrently. 445 446 The progress bar counts processed *blocks* (summed from the per-task done-logs) rather 447 than tasks; a background thread polls the logs while the tasks run. ``task_ids`` 448 restricts the launch to a subset (resume); the bar still spans all tasks. 449 """ 450 ids = list(range(n_tasks)) if task_ids is None else list(task_ids) 451 python = self.config.python_executable or sys.executable 452 cmd_base = [python, "-m", "bioimage_py.runner._harness", tmp] 453 454 def _run_task(task_id: int): 455 proc = subprocess.run(cmd_base + [str(task_id)], capture_output=True, text=True) 456 # The harness writes its own error/<id>.txt on a caught exception. But a failure 457 # *before* that try (e.g. an import error launching the module) would otherwise be 458 # silent, so capture the subprocess output as a fallback error file. 459 if proc.returncode != 0: 460 err_path = os.path.join(tmp, "error", f"{task_id}.txt") 461 if not os.path.exists(err_path): 462 with open(err_path, "w") as f: 463 f.write(f"Worker for task {task_id} exited with code {proc.returncode}.\n") 464 if proc.stdout: 465 f.write(f"--- stdout ---\n{proc.stdout}\n") 466 if proc.stderr: 467 f.write(f"--- stderr ---\n{proc.stderr}\n") 468 return proc 469 470 # Drive a block-counting progress bar from the done-logs (single source of truth, so no 471 # double-counting); clamp to the total in case a resume re-reads prior lines. 472 n_blocks = _total_blocks(tmp, n_tasks) 473 stop = threading.Event() 474 bar_thread = None 475 if name: 476 def _poll_bar() -> None: 477 with tqdm(total=n_blocks, desc=name, unit="block") as pbar: 478 while not stop.wait(0.5): 479 pbar.n = min(_count_done_blocks(tmp, n_tasks), n_blocks) 480 pbar.refresh() 481 pbar.n = min(_count_done_blocks(tmp, n_tasks), n_blocks) 482 pbar.refresh() 483 bar_thread = threading.Thread(target=_poll_bar, daemon=True) 484 bar_thread.start() 485 try: 486 with futures.ThreadPoolExecutor(max(1, int(num_workers))) as tp: 487 list(tp.map(_run_task, ids)) 488 finally: 489 stop.set() 490 if bar_thread is not None: 491 bar_thread.join()
Distributed runner that launches each task as a local subprocess.
Exercises the full distributed protocol (cloudpickle payload, generated harness,
result/sentinel files, block_ids re-run) without a scheduler.
505class SlurmRunner(_DistributedRunner): 506 """Distributed runner that submits one sbatch array job and polls it with ``sacct``. 507 508 Reuses the full distributed protocol from :class:`_DistributedRunner` (cloudpickle 509 payload, generated work-lists, per-task result + ``.success`` sentinel files, failure 510 reporting and ``block_ids`` re-run) and overrides only how tasks are launched and 511 awaited. The per-task sentinel file remains the ground truth for success; ``sacct`` is 512 queried only to detect tasks that died without writing a sentinel. A manifest is written 513 at submission time so an interrupted run can be picked back up with :meth:`reattach`. 514 """ 515 516 def __init__(self, config: Optional[RunnerConfig] = None): 517 """Create the runner, requiring a :class:`SlurmConfig`. 518 519 Args: 520 config: The slurm configuration. ``None`` loads the user defaults from the config 521 file via :meth:`SlurmConfig.load` (honoring ``BIOIMAGE_PY_NO_CONFIG`` / 522 ``BIOIMAGE_PY_CONFIG``); ``tmp_root`` must still be set, here or in the file, 523 before running. 524 525 Raises: 526 TypeError: If ``config`` is a non-slurm ``RunnerConfig``. 527 """ 528 if config is None: 529 config = SlurmConfig.load() 530 if not isinstance(config, SlurmConfig): 531 raise TypeError( 532 f"SlurmRunner requires a SlurmConfig, got {type(config).__name__}. " 533 "Pass job_config=SlurmConfig(...) (it carries partition/account/time/etc.)." 534 ) 535 super().__init__(config) 536 537 def _launch_and_wait(self, tmp: str, n_tasks: int, num_workers: int, name: str, 538 task_ids: Optional[Sequence[int]] = None) -> None: 539 """Submit an sbatch array job for the tasks and poll until they all finish. 540 541 Args: 542 tmp: The job temp folder (must live on a shared filesystem). 543 n_tasks: The total number of tasks the run was partitioned into. 544 num_workers: The array throttle (max tasks running concurrently). 545 name: A short name used for the job name and progress display. 546 task_ids: Restrict the submitted array to this subset of task indices (used by 547 :meth:`resume` to resubmit only the incomplete tasks); ``None`` submits all 548 ``0 .. n_tasks - 1``. 549 """ 550 launch_ids = list(range(n_tasks)) if task_ids is None else sorted(set(int(t) for t in task_ids)) 551 is_resume = task_ids is not None 552 553 def _guard_fail(message: str) -> None: 554 # On a resume we must never remove the user's preserved temp folder. 555 if not is_resume: 556 shutil.rmtree(tmp, ignore_errors=True) 557 raise ValueError(message) 558 559 if self.config.tmp_root is None: 560 _guard_fail( 561 "SlurmRunner requires config.tmp_root to be set to a shared filesystem " 562 "visible to all compute nodes (node-local /tmp is not usable)." 563 ) 564 565 max_array = (self.config.max_array_size if self.config.max_array_size is not None 566 else self._max_array_size()) 567 if len(launch_ids) > max_array: 568 _guard_fail( 569 f"Run partitioned into {len(launch_ids)} tasks exceeds the maximum array size " 570 f"{max_array}. Lower num_workers or use a larger block_shape." 571 ) 572 573 os.makedirs(os.path.join(tmp, "logs"), exist_ok=True) 574 throttle = max(1, min(int(num_workers), len(launch_ids))) 575 script_path = os.path.join(tmp, "submit.sh") 576 with open(script_path, "w") as f: 577 f.write(self._build_script(tmp, launch_ids, throttle, name)) 578 579 # Unlike the tmp_root / max_array guards above, a submission failure deliberately does NOT 580 # remove the temp folder: the generated submit.sh, payload, and per-task block lists are 581 # exactly what's needed to diagnose why sbatch rejected the job. Re-raise naming the folder 582 # so the user knows where to look. 583 try: 584 job_id = self._submit(script_path) 585 except RuntimeError as err: 586 raise RuntimeError(f"{err} Temp folder preserved for debugging: {tmp}.") from err 587 manifest = { 588 "job_id": job_id, 589 "n_tasks": n_tasks, 590 "launch_ids": launch_ids, 591 "throttle": throttle, 592 "name": name, 593 "tmp": tmp, 594 "script": script_path, 595 "python_executable": self.config.python_executable or sys.executable, 596 "submit_time": time.strftime("%Y-%m-%d %H:%M:%S"), 597 } 598 manifest_path = os.path.join(tmp, "manifest.json") 599 if is_resume and os.path.exists(manifest_path): # keep the prior job id for forensics 600 try: 601 with open(manifest_path) as f: 602 manifest["resumed_from_job_id"] = json.load(f).get("job_id") 603 except (OSError, ValueError): 604 pass 605 with open(manifest_path, "w") as f: 606 json.dump(manifest, f, indent=2) 607 608 self._poll(job_id, n_tasks, tmp, name, task_ids=launch_ids) 609 610 @staticmethod 611 def _format_array_indices(task_ids: Sequence[int], throttle: int) -> str: 612 """Compress task ids into an sbatch ``--array`` spec, e.g. ``0,3,7-9%4``.""" 613 ids = sorted(set(int(t) for t in task_ids)) 614 parts: List[str] = [] 615 i = 0 616 while i < len(ids): 617 j = i 618 while j + 1 < len(ids) and ids[j + 1] == ids[j] + 1: 619 j += 1 620 parts.append(str(ids[i]) if i == j else f"{ids[i]}-{ids[j]}") 621 i = j + 1 622 return ",".join(parts) + f"%{throttle}" 623 624 def _build_script(self, tmp: str, task_ids: Sequence[int], throttle: int, name: str) -> str: 625 """Render the sbatch array script for the given task indices.""" 626 cfg = self.config 627 shebang, preamble = "#!/bin/bash", "" 628 if cfg.shebang: 629 lines = cfg.shebang.splitlines() 630 if lines and lines[0].startswith("#!"): 631 shebang, preamble = lines[0], "\n".join(lines[1:]) 632 else: 633 preamble = cfg.shebang 634 635 # Collapse whitespace/newlines so the name cannot break or inject directives. 636 job_name = "_".join((name or "").split()) or "bioimage_py" 637 directives = [ 638 f"--job-name={job_name}", 639 f"--array={self._format_array_indices(task_ids, throttle)}", 640 f"--cpus-per-task={int(cfg.cpus_per_task)}", 641 f"--output={os.path.join(tmp, 'logs', 'slurm-%A_%a.out')}", 642 f"--error={os.path.join(tmp, 'logs', 'slurm-%A_%a.err')}", 643 ] 644 if cfg.partition is not None: 645 directives.append(f"--partition={cfg.partition}") 646 if cfg.time is not None: 647 directives.append(f"--time={cfg.time}") 648 if cfg.mem is not None: 649 directives.append(f"--mem={cfg.mem}") 650 if int(cfg.gpus) > 0: 651 directives.append(f"--gpus={int(cfg.gpus)}") 652 if cfg.account is not None: 653 directives.append(f"--account={cfg.account}") 654 if cfg.qos is not None: 655 directives.append(f"--qos={cfg.qos}") 656 if cfg.constraint is not None: 657 directives.append(f"--constraint={cfg.constraint}") 658 659 python = shlex.quote(cfg.python_executable or sys.executable) 660 command = f'{python} -m bioimage_py.runner._harness {shlex.quote(tmp)} "${{SLURM_ARRAY_TASK_ID}}"' 661 lines = [shebang] 662 lines += [f"#SBATCH {d}" for d in directives] 663 if preamble: 664 lines.append(preamble) 665 lines.append(command) 666 return "\n".join(lines) + "\n" 667 668 @staticmethod 669 def _submit(script_path: str) -> str: 670 """Submit ``script_path`` with ``sbatch --parsable`` and return the job id.""" 671 sbatch = shutil.which("sbatch") 672 if sbatch is None: 673 raise RuntimeError("sbatch not found on PATH; the slurm CLI must be available.") 674 proc = subprocess.run([sbatch, "--parsable", script_path], 675 capture_output=True, text=True) 676 if proc.returncode != 0: 677 raise RuntimeError(f"sbatch submission failed (exit {proc.returncode}): " 678 f"{proc.stderr.strip() or proc.stdout.strip()}") 679 job_id = proc.stdout.strip().split(";")[0].strip() 680 if not job_id.isdigit(): 681 raise RuntimeError(f"Could not parse job id from sbatch output: {proc.stdout!r}") 682 return job_id 683 684 @staticmethod 685 def _max_array_size() -> int: 686 """Return the cluster's ``MaxArraySize`` (or a safe fallback).""" 687 scontrol = shutil.which("scontrol") 688 if scontrol is None: 689 return _DEFAULT_MAX_ARRAY 690 try: 691 proc = subprocess.run([scontrol, "show", "config"], capture_output=True, text=True) 692 except OSError: 693 return _DEFAULT_MAX_ARRAY 694 match = re.search(r"MaxArraySize\s*=\s*(\d+)", proc.stdout) 695 return int(match.group(1)) if match else _DEFAULT_MAX_ARRAY 696 697 @staticmethod 698 def _parse_array_range(spec: str) -> List[int]: 699 """Expand a pending-collapse range like ``[2-9,11%4]`` into its task indices.""" 700 body = spec.strip("[]").split("%", 1)[0] 701 indices: List[int] = [] 702 for part in body.split(","): 703 part = part.strip() 704 if not part: 705 continue 706 if "-" in part: 707 lo, hi = part.split("-", 1) 708 indices.extend(range(int(lo), int(hi) + 1)) 709 else: 710 indices.append(int(part)) 711 return indices 712 713 def _sacct_states(self, job_id: str) -> Optional[Dict[int, str]]: 714 """Return ``{array_index: STATE}`` for the array job, or ``None`` on a poll error. 715 716 ``None`` (a transient ``sacct`` failure) means *skip this poll*; an empty dict means 717 the job is simply not registered with the scheduler yet. A task absent from the 718 result is treated as pending, never as dead. 719 """ 720 sacct = shutil.which("sacct") 721 if sacct is None: 722 raise RuntimeError("sacct not found on PATH; the slurm CLI must be available.") 723 try: 724 proc = subprocess.run( 725 [sacct, "-X", "-n", "-P", "--format=JobID,State", "-j", str(job_id)], 726 capture_output=True, text=True, 727 ) 728 except OSError: 729 return None 730 if proc.returncode != 0: 731 return None 732 733 states: Dict[int, str] = {} 734 for line in proc.stdout.splitlines(): 735 line = line.strip() 736 if not line or "|" not in line: 737 continue 738 jid, _, raw_state = line.partition("|") 739 jid = jid.split(";", 1)[0] 740 if "." in jid or "_" not in jid: # step rows (defensive; -X already excludes them) 741 continue 742 # Take the first token: normalises e.g. "CANCELLED by 12345" -> "CANCELLED". 743 tokens = raw_state.split() 744 state = tokens[0].upper() if tokens else "" 745 suffix = jid.split("_", 1)[1] 746 if suffix.startswith("["): 747 for idx in self._parse_array_range(suffix): 748 states[idx] = state 749 else: 750 try: 751 states[int(suffix)] = state 752 except ValueError: 753 continue 754 return states 755 756 def _job_known(self, job_id: str, attempts: int = 3) -> bool: 757 """Whether the job is known to ``sacct``, retrying to tolerate post-submit lag. 758 759 A transient ``sacct`` error (``None``) or any returned row counts as known; only a 760 sustained empty result across ``attempts`` polls is treated as unknown. 761 """ 762 for attempt in range(attempts): 763 states = self._sacct_states(job_id) 764 if states is None or states: 765 return True 766 if attempt + 1 < attempts: 767 time.sleep(self.config.poll_interval) 768 return False 769 770 def _poll(self, job_id: str, n_tasks: int, tmp: str, name: str, 771 task_ids: Optional[Sequence[int]] = None) -> None: 772 """Poll ``sacct`` until every task has a visible sentinel or is confirmed dead. 773 774 The scheduler ``State`` is not subject to NFS lag, but the ``.success`` sentinels the 775 compute nodes write can take up to the mount's attribute-cache timeout to become 776 visible here. So a ``COMPLETED`` task (its harness exited 0, hence wrote a sentinel) 777 is given ``config.latency_wait`` for that sentinel to appear; any other terminal 778 state means the harness did not succeed and the task is declared dead after a short 779 confirmation grace. Tasks absent from ``sacct`` are pending, never dead. 780 781 Args: 782 job_id: The submitted array job id. 783 n_tasks: The total number of tasks (spans the block-counting progress bar). 784 tmp: The job temp folder (where sentinels are written). 785 name: A short name for the progress bar (disables it when empty). 786 task_ids: The subset of task indices this job actually runs (a resume submits only 787 the incomplete tasks); resolution is over this subset, ``None`` means all tasks. 788 """ 789 poll_ids = list(range(n_tasks)) if task_ids is None else sorted(set(int(t) for t in task_ids)) 790 791 def has_sentinel(t: int) -> bool: 792 return os.path.exists(os.path.join(tmp, "success", f"{t}.success")) 793 794 latency_wait = max(float(self.config.latency_wait), self.config.poll_interval) 795 fail_grace = max(self.config.poll_interval, 5.0) 796 terminal_since: Dict[int, float] = {} 797 terminal_count: Dict[int, int] = {} 798 resolved: set = set() 799 # The bar counts processed blocks across ALL tasks (a resume credits prior progress). 800 n_blocks = _total_blocks(tmp, n_tasks) 801 with tqdm(total=n_blocks, desc=name or None, disable=not name, unit="block") as pbar: 802 while len(resolved) < len(poll_ids): 803 states = self._sacct_states(job_id) 804 if states is None: # transient sacct error: skip this poll. 805 time.sleep(self.config.poll_interval) 806 continue 807 808 now = time.monotonic() 809 ok = {t for t in poll_ids if has_sentinel(t)} 810 running = sum(1 for s in states.values() if s == "RUNNING") 811 dead = set() 812 for t in poll_ids: 813 if t in ok: 814 terminal_since.pop(t, None) 815 terminal_count.pop(t, None) 816 continue 817 state = states.get(t) 818 if state in _TERMINAL_STATES: 819 terminal_since.setdefault(t, now) 820 terminal_count[t] = terminal_count.get(t, 0) + 1 821 # COMPLETED -> sentinel was written, just wait it out over NFS; any 822 # other terminal state -> the task will never produce a sentinel. 823 grace = latency_wait if state == "COMPLETED" else fail_grace 824 if (terminal_count[t] >= 2 and now - terminal_since[t] >= grace 825 and not has_sentinel(t)): 826 dead.add(t) 827 else: # pending/running/requeued: reset the dead countdown. 828 terminal_since.pop(t, None) 829 terminal_count.pop(t, None) 830 831 resolved = ok | dead 832 pbar.n = min(_count_done_blocks(tmp, n_tasks), n_blocks) 833 pbar.set_postfix(ok=len(ok), failed=len(dead), run=running, 834 pending=max(0, len(poll_ids) - len(resolved) - running), refresh=False) 835 pbar.refresh() 836 if len(resolved) >= len(poll_ids): 837 break 838 try: 839 time.sleep(self.config.poll_interval) 840 except KeyboardInterrupt: 841 print(f"\nInterrupted while waiting on slurm job {job_id}. The job was left " 842 f"running; reattach with SlurmRunner(...).reattach({tmp!r}).") 843 raise 844 845 def reattach(self, tmp_folder: str, name: str = "reattach", 846 pre_cleanup: Optional[Callable[[str], None]] = None) -> Optional[list]: 847 """Reattach to a previously submitted run and finalize it. 848 849 Picks a run back up from its manifest (e.g. after the orchestrating login-node 850 process was interrupted) instead of resubmitting. Only ``poll_interval`` is read 851 from this runner's config, so a freshly constructed ``SlurmRunner`` can reattach. 852 853 Args: 854 tmp_folder: The job temp folder containing ``manifest.json`` and ``payload.pkl``. 855 name: A short name for the progress display. 856 pre_cleanup: Optional ``pre_cleanup(tmp)`` callback invoked right before the temp 857 folder is removed (forwarded to :meth:`_finalize`). 858 859 Returns: 860 The per-block return values (if the run collected any), else ``None``. 861 862 Raises: 863 RunnerError: If any task failed (sentinel missing). 864 RuntimeError: If the manifest's job is unknown to slurm and the run did not 865 already complete. 866 """ 867 with open(os.path.join(tmp_folder, "manifest.json")) as f: 868 manifest = json.load(f) 869 job_id, n_tasks = str(manifest["job_id"]), int(manifest["n_tasks"]) 870 with open(os.path.join(tmp_folder, "payload.pkl"), "rb") as f: 871 has_return_val = bool(cloudpickle.load(f)["has_return_val"]) 872 873 # Reconstruct the partition in numeric task order (never glob: it sorts lexically). 874 tasks: List[List[int]] = [] 875 for task_id in range(n_tasks): 876 with open(os.path.join(tmp_folder, "blocks", f"{task_id}.json")) as f: 877 tasks.append(json.load(f)) 878 block_ids = [b for task in tasks for b in task] 879 880 all_done = all(os.path.exists(os.path.join(tmp_folder, "success", f"{t}.success")) 881 for t in range(n_tasks)) 882 if not all_done: 883 # Only a job that stays unknown to sacct across retries (not registration lag 884 # right after submit, nor a transient error) is treated as unrecoverable. 885 if not self._job_known(job_id): 886 raise RuntimeError( 887 f"Slurm job {job_id} is not known to the scheduler and the run did not " 888 f"complete. Inspect {tmp_folder} or resubmit." 889 ) 890 self._poll(job_id, n_tasks, tmp_folder, name) 891 892 results = self._finalize(tmp_folder, n_tasks, tasks, block_ids, has_return_val, name, 893 pre_cleanup=pre_cleanup) 894 return results if has_return_val else None
Distributed runner that submits one sbatch array job and polls it with sacct.
Reuses the full distributed protocol from _DistributedRunner (cloudpickle
payload, generated work-lists, per-task result + .success sentinel files, failure
reporting and block_ids re-run) and overrides only how tasks are launched and
awaited. The per-task sentinel file remains the ground truth for success; sacct is
queried only to detect tasks that died without writing a sentinel. A manifest is written
at submission time so an interrupted run can be picked back up with reattach().
516 def __init__(self, config: Optional[RunnerConfig] = None): 517 """Create the runner, requiring a :class:`SlurmConfig`. 518 519 Args: 520 config: The slurm configuration. ``None`` loads the user defaults from the config 521 file via :meth:`SlurmConfig.load` (honoring ``BIOIMAGE_PY_NO_CONFIG`` / 522 ``BIOIMAGE_PY_CONFIG``); ``tmp_root`` must still be set, here or in the file, 523 before running. 524 525 Raises: 526 TypeError: If ``config`` is a non-slurm ``RunnerConfig``. 527 """ 528 if config is None: 529 config = SlurmConfig.load() 530 if not isinstance(config, SlurmConfig): 531 raise TypeError( 532 f"SlurmRunner requires a SlurmConfig, got {type(config).__name__}. " 533 "Pass job_config=SlurmConfig(...) (it carries partition/account/time/etc.)." 534 ) 535 super().__init__(config)
Create the runner, requiring a SlurmConfig.
Args:
config: The slurm configuration. None loads the user defaults from the config
file via SlurmConfig.load() (honoring BIOIMAGE_PY_NO_CONFIG /
BIOIMAGE_PY_CONFIG); tmp_root must still be set, here or in the file,
before running.
Raises:
TypeError: If config is a non-slurm RunnerConfig.
845 def reattach(self, tmp_folder: str, name: str = "reattach", 846 pre_cleanup: Optional[Callable[[str], None]] = None) -> Optional[list]: 847 """Reattach to a previously submitted run and finalize it. 848 849 Picks a run back up from its manifest (e.g. after the orchestrating login-node 850 process was interrupted) instead of resubmitting. Only ``poll_interval`` is read 851 from this runner's config, so a freshly constructed ``SlurmRunner`` can reattach. 852 853 Args: 854 tmp_folder: The job temp folder containing ``manifest.json`` and ``payload.pkl``. 855 name: A short name for the progress display. 856 pre_cleanup: Optional ``pre_cleanup(tmp)`` callback invoked right before the temp 857 folder is removed (forwarded to :meth:`_finalize`). 858 859 Returns: 860 The per-block return values (if the run collected any), else ``None``. 861 862 Raises: 863 RunnerError: If any task failed (sentinel missing). 864 RuntimeError: If the manifest's job is unknown to slurm and the run did not 865 already complete. 866 """ 867 with open(os.path.join(tmp_folder, "manifest.json")) as f: 868 manifest = json.load(f) 869 job_id, n_tasks = str(manifest["job_id"]), int(manifest["n_tasks"]) 870 with open(os.path.join(tmp_folder, "payload.pkl"), "rb") as f: 871 has_return_val = bool(cloudpickle.load(f)["has_return_val"]) 872 873 # Reconstruct the partition in numeric task order (never glob: it sorts lexically). 874 tasks: List[List[int]] = [] 875 for task_id in range(n_tasks): 876 with open(os.path.join(tmp_folder, "blocks", f"{task_id}.json")) as f: 877 tasks.append(json.load(f)) 878 block_ids = [b for task in tasks for b in task] 879 880 all_done = all(os.path.exists(os.path.join(tmp_folder, "success", f"{t}.success")) 881 for t in range(n_tasks)) 882 if not all_done: 883 # Only a job that stays unknown to sacct across retries (not registration lag 884 # right after submit, nor a transient error) is treated as unrecoverable. 885 if not self._job_known(job_id): 886 raise RuntimeError( 887 f"Slurm job {job_id} is not known to the scheduler and the run did not " 888 f"complete. Inspect {tmp_folder} or resubmit." 889 ) 890 self._poll(job_id, n_tasks, tmp_folder, name) 891 892 results = self._finalize(tmp_folder, n_tasks, tasks, block_ids, has_return_val, name, 893 pre_cleanup=pre_cleanup) 894 return results if has_return_val else None
Reattach to a previously submitted run and finalize it.
Picks a run back up from its manifest (e.g. after the orchestrating login-node
process was interrupted) instead of resubmitting. Only poll_interval is read
from this runner's config, so a freshly constructed SlurmRunner can reattach.
Args:
tmp_folder: The job temp folder containing manifest.json and payload.pkl.
name: A short name for the progress display.
pre_cleanup: Optional pre_cleanup(tmp) callback invoked right before the temp
folder is removed (forwarded to _finalize()).
Returns:
The per-block return values (if the run collected any), else None.
Raises: RunnerError: If any task failed (sentinel missing). RuntimeError: If the manifest's job is unknown to slurm and the run did not already complete.
20class RunnerError(RuntimeError): 21 """Raised when one or more blocks fail. 22 23 Attributes: 24 failed_block_ids: The ids of the blocks that failed (re-run with these). 25 tmp_folder: The preserved temp folder for distributed jobs (``None`` for local). 26 """ 27 28 def __init__(self, message: str, failed_block_ids: Optional[Sequence[int]] = None, 29 tmp_folder: Optional[str] = None): 30 super().__init__(message) 31 self.failed_block_ids: List[int] = [int(b) for b in (failed_block_ids or [])] 32 self.tmp_folder = tmp_folder
Raised when one or more blocks fail.
Attributes:
failed_block_ids: The ids of the blocks that failed (re-run with these).
tmp_folder: The preserved temp folder for distributed jobs (None for local).
12@dataclass 13class RunnerConfig: 14 """Base configuration shared by all runners. 15 16 Attributes: 17 poll_interval: Seconds between status polls (distributed runners). 18 tmp_root: Root directory for job temp folders. ``None`` uses the system default. 19 For distributed jobs this must be on a shared filesystem. 20 python_executable: Interpreter used to launch worker tasks. ``None`` uses the 21 current interpreter (``sys.executable``). 22 """ 23 24 poll_interval: float = 10.0 25 tmp_root: Optional[str] = None 26 python_executable: Optional[str] = None
Base configuration shared by all runners.
Attributes:
poll_interval: Seconds between status polls (distributed runners).
tmp_root: Root directory for job temp folders. None uses the system default.
For distributed jobs this must be on a shared filesystem.
python_executable: Interpreter used to launch worker tasks. None uses the
current interpreter (sys.executable).
29@dataclass 30class SlurmConfig(RunnerConfig): 31 """Configuration for the slurm runner. 32 33 Inherits ``poll_interval``, ``tmp_root`` and ``python_executable`` from 34 :class:`RunnerConfig`. For slurm, ``tmp_root`` is **required** and must point at a 35 shared filesystem visible to all compute nodes (not node-local ``/tmp``), and 36 ``num_workers`` (passed to the op / ``run``) is interpreted as the array throttle — the 37 maximum number of tasks allowed to run concurrently — independently of how many tasks 38 the work is partitioned into. 39 40 Cluster-specific values (``partition``, ``account``, ``constraint``, ``tmp_root``, ...) 41 can be stored once in a user config file and reused as defaults; see 42 :meth:`load` and :func:`write_slurm_config`. 43 44 Attributes: 45 partition: The slurm partition to submit to. 46 time: The per-task time limit (slurm time format, e.g. ``"01:00:00"``). 47 mem: The per-task memory limit (e.g. ``"8G"``). 48 cpus_per_task: Number of CPUs requested per task. 49 gpus: Number of GPUs requested per task (emitted as ``--gpus`` only when > 0). 50 account: The accounting project to charge. 51 qos: The quality-of-service to request. 52 constraint: A node feature constraint. 53 shebang: Optional environment setup for the generated job script. If given, its 54 first line must be an interpreter line (starting with ``#!``) which is placed at 55 the top of the script; any remaining lines are emitted as an activation preamble 56 *after* the ``#SBATCH`` directives (so the directives are still honoured). The 57 preamble is for making the package importable on the node (e.g. ``module load`` 58 / ``LD_LIBRARY_PATH`` exports), not for choosing the interpreter: the worker is 59 always launched with the absolute ``python_executable`` (defaulting to the 60 submitting ``sys.executable``). ``None`` uses ``#!/bin/bash`` and that absolute 61 interpreter, which needs no activation when the env lives on a shared 62 filesystem. Example:: 63 64 shebang = "#!/bin/bash\\nmodule load gcc\\nexport LD_LIBRARY_PATH=...:$LD_LIBRARY_PATH" 65 66 max_array_size: Override for the maximum number of array tasks per job. ``None`` 67 queries the cluster's ``MaxArraySize`` (falling back to a safe default). A run 68 partitioned into more tasks than this is rejected up front with a clear error. 69 latency_wait: Seconds to wait for a finished task's ``.success`` sentinel to become 70 visible on a shared (NFS) filesystem before giving up on it. A task that the 71 scheduler reports ``COMPLETED`` wrote its sentinel, but the orchestrating node's 72 attribute cache can lag the compute node by up to the mount's ``acdirmax`` 73 (typically 60 s); this must comfortably exceed that. It only bounds the wait on a 74 ``COMPLETED``-but-not-yet-visible task — a task is resolved the moment its 75 sentinel appears, so a generous value does not slow down successful runs. 76 """ 77 78 partition: Optional[str] = None 79 time: Optional[str] = None 80 mem: Optional[str] = None 81 cpus_per_task: int = 1 82 gpus: int = 0 83 account: Optional[str] = None 84 qos: Optional[str] = None 85 constraint: Optional[str] = None 86 shebang: Optional[str] = None 87 max_array_size: Optional[int] = None 88 latency_wait: float = 120.0 89 90 @classmethod 91 def load(cls, path: Optional[str] = None, **overrides: Any) -> "SlurmConfig": 92 """Build a config from the user config file, with explicit overrides taking precedence. 93 94 Precedence is ``overrides`` > config file ``[slurm]`` section > dataclass defaults. 95 This is the way to combine the stored user defaults with per-run tweaks; constructing 96 ``SlurmConfig(...)`` directly does **not** consult the file (an explicitly built config 97 is used verbatim). 98 99 Args: 100 path: Path to the config file. ``None`` resolves the default location (see 101 :func:`config_file_path`). A missing file is treated as empty. 102 **overrides: Field values that override the file defaults. Each name must be a 103 valid ``SlurmConfig`` field. 104 105 Returns: 106 A :class:`SlurmConfig` with file defaults filled in and overrides applied. 107 108 Raises: 109 ValueError: If the file or ``overrides`` contain an unknown field name. 110 """ 111 _validate_keys(overrides, "load() overrides") 112 merged: Dict[str, Any] = dict(_read_slurm_defaults(path)) 113 merged.update(overrides) 114 return cls(**merged)
Configuration for the slurm runner.
Inherits poll_interval, tmp_root and python_executable from
RunnerConfig. For slurm, tmp_root is required and must point at a
shared filesystem visible to all compute nodes (not node-local /tmp), and
num_workers (passed to the op / run) is interpreted as the array throttle — the
maximum number of tasks allowed to run concurrently — independently of how many tasks
the work is partitioned into.
Cluster-specific values (partition, account, constraint, tmp_root, ...)
can be stored once in a user config file and reused as defaults; see
load() and write_slurm_config().
Attributes:
partition: The slurm partition to submit to.
time: The per-task time limit (slurm time format, e.g. "01:00:00").
mem: The per-task memory limit (e.g. "8G").
cpus_per_task: Number of CPUs requested per task.
gpus: Number of GPUs requested per task (emitted as --gpus only when > 0).
account: The accounting project to charge.
qos: The quality-of-service to request.
constraint: A node feature constraint.
shebang: Optional environment setup for the generated job script. If given, its
first line must be an interpreter line (starting with #!) which is placed at
the top of the script; any remaining lines are emitted as an activation preamble
after the #SBATCH directives (so the directives are still honoured). The
preamble is for making the package importable on the node (e.g. module load
/ LD_LIBRARY_PATH exports), not for choosing the interpreter: the worker is
always launched with the absolute python_executable (defaulting to the
submitting sys.executable). None uses #!/bin/bash and that absolute
interpreter, which needs no activation when the env lives on a shared
filesystem. Example::
shebang = "#!/bin/bash\nmodule load gcc\nexport LD_LIBRARY_PATH=...:$LD_LIBRARY_PATH"
max_array_size: Override for the maximum number of array tasks per job. ``None``
queries the cluster's ``MaxArraySize`` (falling back to a safe default). A run
partitioned into more tasks than this is rejected up front with a clear error.
latency_wait: Seconds to wait for a finished task's ``.success`` sentinel to become
visible on a shared (NFS) filesystem before giving up on it. A task that the
scheduler reports ``COMPLETED`` wrote its sentinel, but the orchestrating node's
attribute cache can lag the compute node by up to the mount's ``acdirmax``
(typically 60 s); this must comfortably exceed that. It only bounds the wait on a
``COMPLETED``-but-not-yet-visible task — a task is resolved the moment its
sentinel appears, so a generous value does not slow down successful runs.
90 @classmethod 91 def load(cls, path: Optional[str] = None, **overrides: Any) -> "SlurmConfig": 92 """Build a config from the user config file, with explicit overrides taking precedence. 93 94 Precedence is ``overrides`` > config file ``[slurm]`` section > dataclass defaults. 95 This is the way to combine the stored user defaults with per-run tweaks; constructing 96 ``SlurmConfig(...)`` directly does **not** consult the file (an explicitly built config 97 is used verbatim). 98 99 Args: 100 path: Path to the config file. ``None`` resolves the default location (see 101 :func:`config_file_path`). A missing file is treated as empty. 102 **overrides: Field values that override the file defaults. Each name must be a 103 valid ``SlurmConfig`` field. 104 105 Returns: 106 A :class:`SlurmConfig` with file defaults filled in and overrides applied. 107 108 Raises: 109 ValueError: If the file or ``overrides`` contain an unknown field name. 110 """ 111 _validate_keys(overrides, "load() overrides") 112 merged: Dict[str, Any] = dict(_read_slurm_defaults(path)) 113 merged.update(overrides) 114 return cls(**merged)
Build a config from the user config file, with explicit overrides taking precedence.
Precedence is overrides > config file [slurm] section > dataclass defaults.
This is the way to combine the stored user defaults with per-run tweaks; constructing
SlurmConfig(...) directly does not consult the file (an explicitly built config
is used verbatim).
Args:
path: Path to the config file. None resolves the default location (see
config_file_path()). A missing file is treated as empty.
**overrides: Field values that override the file defaults. Each name must be a
valid SlurmConfig field.
Returns:
A SlurmConfig with file defaults filled in and overrides applied.
Raises:
ValueError: If the file or overrides contain an unknown field name.
117def config_file_path(path: Optional[str] = None) -> Path: 118 """Resolve the path to the user config file. 119 120 Resolution order: an explicit ``path`` argument, then the ``BIOIMAGE_PY_CONFIG`` 121 environment variable, then ``$XDG_CONFIG_HOME/bioimage-py/config.toml`` (falling back to 122 ``~/.config/bioimage-py/config.toml``). 123 124 Args: 125 path: An explicit path that short-circuits the resolution. ``None`` resolves the 126 default location. 127 128 Returns: 129 The resolved path (not guaranteed to exist). 130 """ 131 if path is not None: 132 return Path(path).expanduser() 133 env = os.environ.get("BIOIMAGE_PY_CONFIG") 134 if env: 135 return Path(env).expanduser() 136 base = os.environ.get("XDG_CONFIG_HOME") or os.path.join(os.path.expanduser("~"), ".config") 137 return Path(base) / "bioimage-py" / "config.toml"
Resolve the path to the user config file.
Resolution order: an explicit path argument, then the BIOIMAGE_PY_CONFIG
environment variable, then $XDG_CONFIG_HOME/bioimage-py/config.toml (falling back to
~/.config/bioimage-py/config.toml).
Args:
path: An explicit path that short-circuits the resolution. None resolves the
default location.
Returns: The resolved path (not guaranteed to exist).
179def write_slurm_config(path: Optional[str] = None, *, replace: bool = False, **fields: Any) -> str: 180 """Create or update the user config file with default slurm settings. 181 182 This is the supported way to set up cluster-specific defaults (partition, account, 183 constraint, ``tmp_root``, ...) instead of editing the file by hand. Provided fields are 184 merged into the existing ``[slurm]`` table by default (so the file can be built up over 185 several calls); ``None`` values are skipped, and any other top-level tables in the file 186 (reserved for future named profiles) are preserved. 187 188 Args: 189 path: Path to write to. ``None`` resolves the default location (see 190 :func:`config_file_path`); the parent directory is created if needed. 191 replace: If ``True``, replace the whole ``[slurm]`` table instead of merging into it. 192 **fields: Default field values to store. Each name must be a valid ``SlurmConfig`` 193 field. 194 195 Returns: 196 The path that was written. 197 198 Raises: 199 ValueError: If ``fields`` contains an unknown field name. 200 """ 201 _validate_keys(fields, "write_slurm_config()") 202 provided = {k: v for k, v in fields.items() if v is not None} 203 fp = config_file_path(path) 204 data = _parse_toml(fp) 205 section = {} if replace else dict(data.get("slurm", {})) 206 section.update(provided) 207 data["slurm"] = section 208 209 import tomli_w # local import: only the writer needs the (optional-at-runtime) dependency. 210 211 fp.parent.mkdir(parents=True, exist_ok=True) 212 with open(fp, "wb") as f: 213 tomli_w.dump(data, f) 214 return str(fp)
Create or update the user config file with default slurm settings.
This is the supported way to set up cluster-specific defaults (partition, account,
constraint, tmp_root, ...) instead of editing the file by hand. Provided fields are
merged into the existing [slurm] table by default (so the file can be built up over
several calls); None values are skipped, and any other top-level tables in the file
(reserved for future named profiles) are preserved.
Args:
path: Path to write to. None resolves the default location (see
config_file_path()); the parent directory is created if needed.
replace: If True, replace the whole [slurm] table instead of merging into it.
**fields: Default field values to store. Each name must be a valid SlurmConfig
field.
Returns: The path that was written.
Raises:
ValueError: If fields contains an unknown field name.
18def get_runner(job_type: str, config: Optional[RunnerConfig] = None) -> Runner: 19 """Return a runner for the given job type. 20 21 Args: 22 job_type: One of ``"local"``, ``"subprocess"`` or ``"slurm"``. 23 config: Optional runner configuration. 24 25 Returns: 26 A :class:`~bioimage_py.runner.base.Runner` instance. 27 28 Raises: 29 ValueError: If ``job_type`` is unknown. 30 """ 31 try: 32 cls = _RUNNERS[job_type.lower()] 33 except KeyError: 34 raise ValueError(f"Unknown job_type {job_type!r}; expected one of {sorted(_RUNNERS)}.") 35 return cls(config)
Return a runner for the given job type.
Args:
job_type: One of "local", "subprocess" or "slurm".
config: Optional runner configuration.
Returns:
A ~bioimage_py.runner.base.Runner instance.
Raises:
ValueError: If job_type is unknown.
35def run_block(function: ComputeFn, blocking: Blocking, block_id: int, 36 inputs: Sequence[Source], outputs: Sequence[Source], 37 mask: Optional[Source], halo: Optional[Sequence[int]]) -> Any: 38 """Run the per-block ``function`` for a single block. 39 40 This is the single per-block code path shared by every backend (local and 41 distributed), which is what guarantees identical results across backends. 42 43 Args: 44 function: The per-block function ``function(block, inputs, outputs, mask)``. 45 blocking: A ``bioimage_cpp.utils.Blocking``. 46 block_id: The block id to process. 47 inputs: Tuple of opened input sources. 48 outputs: Tuple of opened output sources. 49 mask: An opened mask source or ``None``. 50 halo: A per-axis halo list, or ``None`` for no halo. 51 52 Returns: 53 The per-block return value of ``function`` (may be ``None``). 54 """ 55 if halo is None: 56 block = blocking.get_block(int(block_id)) 57 else: 58 block = blocking.get_block_with_halo(int(block_id), [int(h) for h in halo]) 59 return function(block, inputs, outputs, mask)
Run the per-block function for a single block.
This is the single per-block code path shared by every backend (local and distributed), which is what guarantees identical results across backends.
Args:
function: The per-block function function(block, inputs, outputs, mask).
blocking: A bioimage_cpp.utils.Blocking.
block_id: The block id to process.
inputs: Tuple of opened input sources.
outputs: Tuple of opened output sources.
mask: An opened mask source or None.
halo: A per-axis halo list, or None for no halo.
Returns:
The per-block return value of function (may be None).