Skip to content

API reference

The full public surface of the damacy package.

Pipeline

Pipeline

Pipeline(config: Config)

Streaming GPU data pipeline. Drive :meth:push, :meth:pop. Stages are plan → host I/O → H2D copy → on-device decompress → assemble; output batches are double-buffered (B=2) and waves are double-buffered internally.

A CUcontext must be current on the calling thread when this is constructed; PyTorch sets one up implicitly. For bare-Python use, call :func:damacy._native.cuda_init_primary once first.

Constructed from a :class:Config::

cfg = damacy.Config(samples_per_batch=8, ...)
with damacy.Pipeline(cfg) as p:
    ...

Resource caps are fixed at construction; nothing grows after that.

Source code in python/damacy/__init__.py
def __init__(self, config: Config) -> None:
    try:
        self._native = _native.Pipeline(
            samples_per_batch=config.samples_per_batch,
            lookahead_samples=config.lookahead_samples,
            dtype=int(config.dtype),  # already coerced by Config.__init__
            max_chunk_uncompressed_bytes=config.max_chunk_uncompressed_bytes,
            max_read_op_bytes=config.max_read_op_bytes,
            max_gpu_memory_bytes=config.max_gpu_memory_bytes,
            host_buffer_waves=config.host_buffer_waves,
            max_chunks_per_wave=config.max_chunks_per_wave,
            max_substreams_per_chunk=config.max_substreams_per_chunk,
            n_io_threads=config.n_io_threads,
            metadata_io_concurrency=config.metadata_io_concurrency,
            n_array_meta_cache=config.n_array_meta_cache,
            n_shard_index_cache=config.n_shard_index_cache,
            n_chunk_layout_cache=config.n_chunk_layout_cache,
            max_shards_per_sample=config.max_shards_per_sample,
            sample_shape=tuple(config.sample_shape),
            device=-1 if config.device is None else int(config.device),
            enable_gds=_gds_to_native(config.enable_gds),
            numa_strategy=int(config.numa_strategy),
            numa_node=config.numa_node,
            metadata_latency_baseline_ns=config.metadata_latency.baseline_ns,
            metadata_latency_lognormal_mu_ln_ns=(
                config.metadata_latency.lognormal_mu_ln_ns
            ),
            metadata_latency_lognormal_sigma_ln_ns=(
                config.metadata_latency.lognormal_sigma_ln_ns
            ),
            metadata_latency_cap_ns=config.metadata_latency.cap_ns,
            metadata_latency_seed=config.metadata_latency.seed,
        )
    except _native.DamacyError as exc:
        _reraise_typed(exc)
    self._closed = False
    self._config = config
    # User-side queue of pending sample iterators. push() appends
    # here and best-effort drains; pop() tops up before
    # touching native. This makes push() consume-everything from
    # the user's perspective and lets generators flow naturally.
    # _pending_buf is the head iterator's already-pulled-but-not-yet-
    # pushed samples; held flat to avoid wrapping `it` in successive
    # itertools.chain() layers under sustained backpressure.
    self._pending: deque[Iterator[Sample]] = deque()
    self._pending_buf: list[Sample] = []
    # damacy_pop has no timed variant; on timeout the worker stays
    # parked inside it and the next pop() adopts the same thread.
    self._pop_lock = threading.Lock()
    self._pop_done = threading.Event()
    self._pop_thread: threading.Thread | None = None
    self._pop_result: _native.Batch | None = None
    self._pop_err: BaseException | None = None
    bound = self._native.device
    if not _warn_if_local_rank_disagrees(config.device, bound):
        _warn_if_multi_gpu_implicit(config.device, bound)

device property

device: int

CUDA device index this pipeline is bound to.

config property

config: Config

The :class:Config this loader was built from.

pending property

pending: bool

True if push() has accepted samples that haven't yet entered the native lookahead. Becomes False as :meth:pop frees space.

close

close() -> None

Release the underlying handle. Idempotent. Subsequent calls on the pipeline raise :class:ShutdownError.

Source code in python/damacy/__init__.py
def close(self) -> None:
    """Release the underlying handle. Idempotent. Subsequent calls
    on the pipeline raise :class:`ShutdownError`."""
    if not self._closed:
        self._closed = True
        del self._native
        t = self._pop_thread
        if t is not None:
            t.join()  # damacy_destroy already woke it with SHUTDOWN
            self._pop_thread = None

push

push(samples: Iterable[Sample]) -> None

Queue samples for processing. Accepts any iterable (list, generator, infinite generator, …); large or unbounded sources are pulled lazily as :meth:pop frees space.

Local validation (shape/rank against Config.sample_shape) raises the matching :class:DamacyError subclass here and discards the offending iterator. Errors that depend on store contents — :class:NotFound, :class:DtypeMismatch, per-array :class:RankMismatch, decode failures — surface at :meth:pop instead, since the pipeline fetches metadata asynchronously after push returns. Once any such error fires, the pipeline is terminal — rebuild a fresh :class:Pipeline to recover. Calling :meth:push on a terminal pipeline raises :class:ShutdownError.

Batching is drop_last=True: only complete batches of Config.samples_per_batch are emitted, so trailing samples beyond the last whole multiple are never returned. (Emitting the ragged final batch is not yet supported — issue #139.)

Source code in python/damacy/__init__.py
def push(self, samples: Iterable[Sample]) -> None:
    """Queue samples for processing. Accepts any iterable (list,
    generator, infinite generator, …); large or unbounded sources
    are pulled lazily as :meth:`pop` frees space.

    Local validation (shape/rank against ``Config.sample_shape``)
    raises the matching :class:`DamacyError` subclass here and
    discards the offending iterator. Errors that depend on store
    contents — :class:`NotFound`, :class:`DtypeMismatch`,
    per-array :class:`RankMismatch`, decode failures — surface at
    :meth:`pop` instead, since the pipeline fetches metadata
    asynchronously after push returns. Once any such error fires,
    the pipeline is terminal — rebuild a fresh :class:`Pipeline`
    to recover. Calling :meth:`push` on a terminal pipeline raises
    :class:`ShutdownError`.

    Batching is ``drop_last=True``: only complete batches of
    ``Config.samples_per_batch`` are emitted, so trailing samples
    beyond the last whole multiple are never returned. (Emitting
    the ragged final batch is not yet supported — issue #139.)
    """
    self._check_open()
    self._pending.append(iter(samples))
    self._drain_pending()

pop

pop() -> Batch

Block until the next batch is on-device-ready. Returns a :class:Batch you can hand to torch.from_dlpack (or any DLPack consumer) — preferably inside a with block.

Raises :class:PoolStarved if no batch arrives within Config.pop_timeout_s seconds (default 30). Usually that means tensors from previous batches are still being held — drop them, or .clone() if you need to keep them. It also fires if you pop past the batches produced: only len(pushed) // Config.samples_per_batch exist (drop_last), and a further pop waits on a batch that is never sealed.

Store-derived errors — :class:NotFound, :class:DtypeMismatch, per-array :class:RankMismatch, decode failures — surface here rather than at push. Once any such error fires, the pipeline is terminal; subsequent calls re-raise the same status.

Source code in python/damacy/__init__.py
def pop(self) -> Batch:
    """Block until the next batch is on-device-ready. Returns a
    :class:`Batch` you can hand to ``torch.from_dlpack`` (or any
    DLPack consumer) — preferably inside a ``with`` block.

    Raises :class:`PoolStarved` if no batch arrives within
    ``Config.pop_timeout_s`` seconds (default 30). Usually that
    means tensors from previous batches are still being held —
    drop them, or ``.clone()`` if you need to keep them. It also
    fires if you pop past the batches produced: only
    ``len(pushed) // Config.samples_per_batch`` exist (drop_last),
    and a further pop waits on a batch that is never sealed.

    Store-derived errors — :class:`NotFound`,
    :class:`DtypeMismatch`, per-array :class:`RankMismatch`,
    decode failures — surface here rather than at push. Once any
    such error fires, the pipeline is terminal; subsequent calls
    re-raise the same status."""
    self._check_open()
    # Swallow ShutdownError from drain so the sticky error from the
    # native pipeline (NotFound/DtypeMismatch/…) surfaces from pop,
    # not the secondary SHUTDOWN raised by re-pushing into a terminal.
    with contextlib.suppress(ShutdownError):
        self._drain_pending()
    timeout = self._config.pop_timeout_s
    if timeout is None:
        try:
            return Batch(self._native.pop())
        except _native.DamacyError as exc:
            _reraise_typed(exc)
    return self._pop_with_timeout(timeout)

batches

batches(n: int) -> Iterator[Batch]

Pop n batches as an iterator. Each call to :meth:pop blocks until that batch is on-device-ready.

Pair with a with block so the slot is released::

for batch in d.batches(8):
    with batch as t:
        x = torch.from_dlpack(t)
        ...
Source code in python/damacy/__init__.py
def batches(self, n: int) -> Iterator[Batch]:
    """Pop *n* batches as an iterator. Each call to :meth:`pop`
    blocks until that batch is on-device-ready.

    Pair with a ``with`` block so the slot is released::

        for batch in d.batches(8):
            with batch as t:
                x = torch.from_dlpack(t)
                ...
    """
    for _ in range(n):
        yield self.pop()

stats

stats() -> Stats

Cumulative pipeline metrics as a :class:Stats snapshot.

The snapshot is taken at call time; pipeline counters keep accumulating in the background. Per-stage :class:Metric fields carry cumulative milliseconds, the best single observation, input/output byte totals, and a sample count. Cache hit/miss counters and lifetime totals (batches emitted, waves emitted, chunks dispatched, …) round out the snapshot.

gpu_bytes_committed reflects the live GPU footprint counted against Config.max_gpu_memory_bytes; it grows from wave-init to first pop (lazy batch-output sizing) and stays flat afterward.

Use :meth:stats_reset to zero the cumulative timing and metadata-latency counters. gpu_bytes_committed is not reset — it reflects the live commitment, not a delta.

Raises:

Type Description
ShutdownError

If the pipeline has been closed.

Source code in python/damacy/__init__.py
def stats(self) -> Stats:
    """Cumulative pipeline metrics as a :class:`Stats` snapshot.

    The snapshot is taken at call time; pipeline counters keep
    accumulating in the background. Per-stage :class:`Metric`
    fields carry cumulative milliseconds, the best single
    observation, input/output byte totals, and a sample count.
    Cache hit/miss counters and lifetime totals (batches emitted,
    waves emitted, chunks dispatched, …) round out the snapshot.

    ``gpu_bytes_committed`` reflects the live GPU footprint
    counted against ``Config.max_gpu_memory_bytes``; it grows from
    wave-init to first pop (lazy batch-output sizing) and stays
    flat afterward.

    Use :meth:`stats_reset` to zero the cumulative timing and
    metadata-latency counters. ``gpu_bytes_committed`` is not reset
    — it reflects the live commitment, not a delta.

    Raises:
        ShutdownError: If the pipeline has been closed.
    """
    self._check_open()
    return Stats._from_native(self._native.stats())

stats_reset

stats_reset() -> None

Zero the cumulative timing counters and per-stage rolling totals. Cache hit/miss counters and gpu_bytes_committed are left alone — they reflect live state, not deltas. Metadata latency counters are reset with the timing counters.

Raises:

Type Description
ShutdownError

If the pipeline has been closed.

Source code in python/damacy/__init__.py
def stats_reset(self) -> None:
    """Zero the cumulative timing counters and per-stage rolling
    totals. Cache hit/miss counters and ``gpu_bytes_committed`` are
    left alone — they reflect live state, not deltas. Metadata
    latency counters are reset with the timing counters.

    Raises:
        ShutdownError: If the pipeline has been closed.
    """
    self._check_open()
    self._native.stats_reset()

Config dataclass

Config(
    *,
    samples_per_batch: int,
    sample_shape: Sequence[int],
    max_gpu_memory_bytes: int,
    dtype: Dtype | str | int = F32,
    lookahead_samples: int | None = None,
    max_chunk_uncompressed_bytes: int = DEFAULT_CHUNK_UNCOMPRESSED_BYTES,
    max_read_op_bytes: int = DEFAULT_READ_OP_MAX_BYTES,
    host_buffer_waves: int = DEFAULT_HOST_BUFFER_WAVES,
    max_chunks_per_wave: int = DEFAULT_MAX_CHUNKS_PER_WAVE,
    max_substreams_per_chunk: int = DEFAULT_MAX_SUBSTREAMS_PER_CHUNK,
    n_io_threads: int = DEFAULT_IO_THREADS,
    metadata_io_concurrency: int = DEFAULT_METADATA_IO_CONCURRENCY,
    n_array_meta_cache: int = DEFAULT_ARRAY_META_CACHE,
    n_shard_index_cache: int = DEFAULT_SHARD_INDEX_CACHE,
    n_chunk_layout_cache: int = DEFAULT_CHUNK_LAYOUT_CACHE,
    max_shards_per_sample: int = DEFAULT_MAX_SHARDS_PER_SAMPLE,
    device: int | None = None,
    pop_timeout_s: float | None = 30.0,
    enable_gds: bool | None = None,
    numa_strategy: NumaStrategy | str | int = AUTO,
    numa_node: int = -1,
    metadata_latency: LatencyModel | None = None,
)

All resource caps and pipeline shape, fixed at create time.

Build variants with :func:dataclasses.replace:

>>> import dataclasses
>>> base = Config(samples_per_batch=8, sample_shape=(8, 16),
...               max_gpu_memory_bytes=1 << 30)
>>> base.dtype is Dtype.F32
True
>>> dataclasses.replace(base, samples_per_batch=64, lookahead_samples=128).samples_per_batch
64

Validation runs in __init__ so invalid configs fail before we touch CUDA. The constructor accepts :class:Dtype, an int, or one of "f32" / "float32" / "bf16" / "bfloat16" for the dtype argument; the stored field is always a :class:Dtype.

>>> Config(samples_per_batch=0, sample_shape=(8, 16), max_gpu_memory_bytes=1 << 30)
Traceback (most recent call last):
    ...
ValueError: samples_per_batch must be >= 1 (got 0)

Attributes:

Name Type Description
samples_per_batch int

Samples per batch (>= 1).

max_gpu_memory_bytes int

Primary GPU budget knob. Hard cap on GPU memory allocated for wave-resident buffers, decoder scratch, per-wave fanout SOAs, and batch-output pools. Required — no default. A value too small for the requested batch geometry raises :class:BudgetExceeded from Pipeline(cfg). Internal sizing (host slab, dev decompress arena, nvcomp temp) is derived from this value; the create-time resolver also reserves the worst-case observe-and-grow footprint so grows inside a successfully-created instance never trip the cap.

dtype Dtype

Destination dtype for assembled batches.

lookahead_samples int

User-side push-queue depth in samples. Defaults to two full output batches.

n_io_threads int

Bulk data IO worker threads (>= 1). Defaults to 64.

metadata_io_concurrency int

Async metadata request concurrency (>= 1).

n_array_meta_cache int

LRU cap for zarr-metadata entries. Must be >= lookahead_samples + 2 * samples_per_batch so the in-flight working set plus the staging lag fits.

n_shard_index_cache int

LRU cap for shard-index entries. Must be >= (lookahead_samples + 2 * samples_per_batch) * max_shards_per_sample.

n_chunk_layout_cache int

LRU cap for per-array blosc1 chunk-layout entries. Must be >= lookahead_samples + 2 * samples_per_batch.

max_shards_per_sample int

Declared upper bound on the number of shards a single sample's AABB may intersect (>= 1). Sizes the n_shard_index_cache floor so metadata-cache saturation cannot occur, and caps per-sample shard enumeration at runtime: a sample intersecting more shards is rejected with :class:InvalidArgument.

max_chunk_uncompressed_bytes int

Largest uncompressed chunk size the pipeline accepts; 0 selects the C default (512 KB).

max_read_op_bytes int

Cap on the size of a single coalesced read issued to storage. 0 selects the C default. Tune against your storage tier: small values keep the queue deep and the read pattern fine-grained; large values amortize per-syscall overhead at the cost of latency spikes.

device int | None

CUDA device index to bind. None (default) captures the current CUcontext on the calling thread; pass an int (e.g. local_rank) to retain that device's primary context internally — recommended under torchrun / MPI.

pop_timeout_s float | None

How long :meth:Pipeline.pop waits for the next batch before raising :class:PoolStarved. Defaults to 30 seconds; pass None to wait forever.

enable_gds bool | None

GPUDirect Storage opt-in. True forces cuFile reads of compressed bytes straight to device memory, bypassing host-staging slabs; False forces the host-staging path. None (default) defers to env DAMACY_GDS_ENABLE=1. Explicit True/False wins over the env var. Requires libcufile.so.0 and a successful cuFileDriverOpen at create time.

numa_strategy NumaStrategy

How to pin pinned-host slabs and worker threads to a host-NUMA node. :attr:NumaStrategy.AUTO (default) resolves the GPU's host-NUMA node from the CUDA driver. :attr:NumaStrategy.PIN_TO uses :attr:numa_node. :attr:NumaStrategy.DISABLED is a full no-op. All three are silent no-ops when libnuma.so.1 is unavailable at runtime; AUTO is also a no-op on single-node hosts.

numa_node int

Explicit host-NUMA node when numa_strategy=NumaStrategy.PIN_TO. Must be >= 0 in that mode; must be -1 (default) otherwise — the constructor rejects a node hint paired with a non-PIN_TO strategy rather than silently dropping it.

metadata_latency LatencyModel

Synthetic metadata-store latency model used for benchmarking prefetch behavior. None disables it.

Source code in python/damacy/__init__.py
def __init__(
    self,
    *,
    samples_per_batch: int,
    sample_shape: Sequence[int],
    max_gpu_memory_bytes: int,
    dtype: Dtype | str | int = Dtype.F32,
    lookahead_samples: int | None = None,
    max_chunk_uncompressed_bytes: int = _native.DEFAULT_CHUNK_UNCOMPRESSED_BYTES,
    max_read_op_bytes: int = _native.DEFAULT_READ_OP_MAX_BYTES,
    host_buffer_waves: int = _native.DEFAULT_HOST_BUFFER_WAVES,
    max_chunks_per_wave: int = _native.DEFAULT_MAX_CHUNKS_PER_WAVE,
    max_substreams_per_chunk: int = _native.DEFAULT_MAX_SUBSTREAMS_PER_CHUNK,
    n_io_threads: int = _native.DEFAULT_IO_THREADS,
    metadata_io_concurrency: int = _native.DEFAULT_METADATA_IO_CONCURRENCY,
    n_array_meta_cache: int = _native.DEFAULT_ARRAY_META_CACHE,
    n_shard_index_cache: int = _native.DEFAULT_SHARD_INDEX_CACHE,
    n_chunk_layout_cache: int = _native.DEFAULT_CHUNK_LAYOUT_CACHE,
    max_shards_per_sample: int = _native.DEFAULT_MAX_SHARDS_PER_SAMPLE,
    device: int | None = None,
    pop_timeout_s: float | None = 30.0,
    enable_gds: bool | None = None,
    numa_strategy: NumaStrategy | str | int = NumaStrategy.AUTO,
    numa_node: int = -1,
    metadata_latency: LatencyModel | None = None,
) -> None:
    # Custom __init__ rather than __post_init__ so the constructor
    # signature accepts the polymorphic dtype input while reads of
    # `cfg.dtype` always type as Dtype. dataclass(init=False) keeps
    # the auto-generated __eq__ / __hash__ / __repr__.
    if samples_per_batch < 1:
        raise ValueError(
            f"samples_per_batch must be >= 1 (got {samples_per_batch})"
        )
    if lookahead_samples is None:
        lookahead_samples = 2 * samples_per_batch
    if lookahead_samples < samples_per_batch:
        raise ValueError(
            "lookahead_samples must cover at least one full batch "
            f"(got {lookahead_samples}, samples_per_batch={samples_per_batch})"
        )
    if not 1 <= n_io_threads <= _native.MAX_IO_THREADS:
        raise ValueError(
            "n_io_threads must be in "
            f"[1, {_native.MAX_IO_THREADS}] (got {n_io_threads})"
        )
    if metadata_io_concurrency < 1:
        raise ValueError(
            f"metadata_io_concurrency must be >= 1 (got {metadata_io_concurrency})"
        )
    if not 1 <= max_chunk_uncompressed_bytes <= _native.MAX_CHUNK_BYTES:
        raise ValueError(
            "max_chunk_uncompressed_bytes must be in "
            f"[1, {_native.MAX_CHUNK_BYTES}] (got {max_chunk_uncompressed_bytes})"
        )
    if not 1 <= max_read_op_bytes <= _native.MAX_READ_OP_BYTES:
        raise ValueError(
            "max_read_op_bytes must be in "
            f"[1, {_native.MAX_READ_OP_BYTES}] (got {max_read_op_bytes})"
        )
    if max_gpu_memory_bytes < 1:
        raise ValueError(
            f"max_gpu_memory_bytes must be >= 1 (got {max_gpu_memory_bytes})"
        )
    if not _native.N_WAVES <= host_buffer_waves <= _native.MAX_HOST_BUFFER_WAVES:
        raise ValueError(
            "host_buffer_waves must be in "
            f"[{_native.N_WAVES}, {_native.MAX_HOST_BUFFER_WAVES}] "
            f"(got {host_buffer_waves})"
        )
    if not 1 <= max_chunks_per_wave <= _native.HARD_MAX_CHUNKS_PER_WAVE:
        raise ValueError(
            "max_chunks_per_wave must be in "
            f"[1, {_native.HARD_MAX_CHUNKS_PER_WAVE}] (got {max_chunks_per_wave})"
        )
    if not 1 <= max_substreams_per_chunk <= _native.HARD_MAX_SUBSTREAMS_PER_CHUNK:
        raise ValueError(
            "max_substreams_per_chunk must be in "
            f"[1, {_native.HARD_MAX_SUBSTREAMS_PER_CHUNK}] "
            f"(got {max_substreams_per_chunk})"
        )
    # Cache floor validation (n_*_cache >= lookahead_samples +
    # 2*samples_per_batch; n_shard_index_cache scales by
    # max_shards_per_sample) plus the > 0 checks are enforced by the C
    # config validator, which
    # surfaces an actionable message via InvalidArgument from Pipeline().
    # Not duplicated here so a single source of truth owns the floors.
    if pop_timeout_s is not None and pop_timeout_s <= 0:
        raise ValueError(f"pop_timeout_s must be > 0 or None (got {pop_timeout_s})")
    ns = NumaStrategy.coerce(numa_strategy)
    if ns is NumaStrategy.PIN_TO:
        if numa_node < 0:
            raise ValueError(
                f"numa_node must be >= 0 when numa_strategy=PIN_TO (got {numa_node})"
            )
    elif numa_node != -1:
        raise ValueError(
            f"numa_node must be -1 when numa_strategy={ns.name} (got {numa_node})"
        )
    if metadata_latency is not None and not isinstance(
        metadata_latency, LatencyModel
    ):
        raise TypeError("metadata_latency must be a LatencyModel or None")
    shape_t = tuple(int(x) for x in sample_shape)
    if not shape_t:
        raise ValueError("sample_shape must be non-empty")
    if any(d <= 0 for d in shape_t):
        raise ValueError(f"sample_shape entries must be > 0 (got {shape_t})")
    set_ = object.__setattr__  # frozen=True forbids `self.x = ...`
    set_(self, "samples_per_batch", samples_per_batch)
    set_(self, "dtype", Dtype.coerce(dtype))
    set_(self, "lookahead_samples", lookahead_samples)
    set_(self, "max_chunk_uncompressed_bytes", max_chunk_uncompressed_bytes)
    set_(self, "max_read_op_bytes", max_read_op_bytes)
    set_(self, "max_gpu_memory_bytes", max_gpu_memory_bytes)
    set_(self, "host_buffer_waves", host_buffer_waves)
    set_(self, "max_chunks_per_wave", max_chunks_per_wave)
    set_(self, "max_substreams_per_chunk", max_substreams_per_chunk)
    set_(self, "n_io_threads", n_io_threads)
    set_(self, "metadata_io_concurrency", metadata_io_concurrency)
    set_(self, "n_array_meta_cache", n_array_meta_cache)
    set_(self, "n_shard_index_cache", n_shard_index_cache)
    set_(self, "n_chunk_layout_cache", n_chunk_layout_cache)
    set_(self, "max_shards_per_sample", max_shards_per_sample)
    set_(self, "sample_shape", shape_t)
    set_(self, "device", device)
    set_(self, "pop_timeout_s", pop_timeout_s)
    set_(self, "enable_gds", None if enable_gds is None else bool(enable_gds))
    set_(self, "numa_strategy", ns)
    set_(self, "numa_node", int(numa_node))
    set_(self, "metadata_latency", metadata_latency or LatencyModel())

default classmethod

default(
    *,
    samples_per_batch: int,
    sample_shape: Sequence[int],
    max_gpu_memory_bytes: int,
    dtype: Dtype | str | int = F32,
    **overrides: Any,
) -> Self

Recommended config for networked-FS (NFS) training.

Convenience over the constructor: the tuning field defaults already carry the cluster-tuned values (see damacy_tuning_defaults). The caller supplies the workload geometry; any field may be overridden by keyword.

Source code in python/damacy/__init__.py
@classmethod
def default(
    cls,
    *,
    samples_per_batch: int,
    sample_shape: Sequence[int],
    max_gpu_memory_bytes: int,
    dtype: Dtype | str | int = Dtype.F32,
    **overrides: Any,
) -> Self:
    """Recommended config for networked-FS (NFS) training.

    Convenience over the constructor: the tuning field defaults already
    carry the cluster-tuned values (see damacy_tuning_defaults). The caller
    supplies the workload geometry; any field may be overridden by keyword.
    """
    return cls(
        samples_per_batch=samples_per_batch,
        sample_shape=sample_shape,
        max_gpu_memory_bytes=max_gpu_memory_bytes,
        dtype=dtype,
        **overrides,
    )

Sample dataclass

Sample(uri: str, aabb: Iterable[slice | tuple[int, int]])

One sample request. aabb is a per-axis half-open interval list in level-0 voxel indices, in the zarr's stored axis order.

Each axis may be a (start, stop) 2-tuple or a Python slice; the tuple of slices that numpy.s_[...] produces is accepted directly. The stored form is always tuple[tuple[int, int], ...] regardless of how it was spelled, so equivalent inputs hash and compare equal:

>>> a = Sample(uri="cell.zarr", aabb=[(0, 64), (0, 256), (0, 256)])
>>> b = Sample(
...     uri="cell.zarr",
...     aabb=[slice(0, 64), slice(0, 256), slice(0, 256)],
... )
>>> c = Sample(
...     uri="cell.zarr",
...     aabb=[slice(None, 64), slice(None, 256), slice(None, 256)],
... )
>>> a == b == c
True
>>> hash(a) == hash(b) == hash(c)
True
>>> a.aabb
((0, 64), (0, 256), (0, 256))

Bare ints in aabb are rejected so the behaviour stays consistent with NumPy/zarr indexing semantics (np.s_[64] means "point 64", not "extent (0, 64)"):

>>> Sample(uri="cell.zarr", aabb=[64, 256, 256])
Traceback (most recent call last):
    ...
TypeError: aabb axis 0: expected slice or (start, stop) tuple; got int

Slice validation rejects strided slices and unbounded stops:

>>> Sample(uri="cell.zarr", aabb=[slice(0, 64, 2), slice(0, 256), slice(0, 256)])
Traceback (most recent call last):
    ...
ValueError: aabb axis 0: slice step must be 1 or omitted (got step=2)
>>> Sample(uri="cell.zarr", aabb=[slice(0, None), slice(0, 256), slice(0, 256)])
Traceback (most recent call last):
    ...
ValueError: aabb axis 0: slice stop is required (got slice(0, None, None))
Source code in python/damacy/__init__.py
def __init__(
    self,
    uri: str,
    aabb: Iterable[slice | tuple[int, int]],
) -> None:
    # Custom __init__ rather than __post_init__ so pyright sees the
    # polymorphic input shape on construction *and* the precise
    # canonical shape on field reads. dataclass(init=False) keeps
    # the auto-generated __eq__ / __hash__ / __repr__.
    object.__setattr__(self, "uri", uri)
    object.__setattr__(
        self,
        "aabb",
        tuple(_normalise_axis(x, axis=i) for i, x in enumerate(aabb)),
    )

Batch

Batch(native_batch: Batch)

A batch of samples on the device, ready for consumption.

Use as a context manager to release the slot back to the pool::

with d.pop() as batch:
    x = torch.from_dlpack(batch)

The DLPack capsule (batch.__dlpack__()) keeps the underlying storage alive as long as the consumer holds it; releasing the Batch object while a tensor still views it is safe.

Deferred release. If the consumer kicks off an async D2D copy on a side stream, the default with block forces a host-side cuStreamSynchronize on the producer stream before the slot is reused. To avoid that block, call :meth:release explicitly with the consumer's stream or event — damacy will stream-wait on it before re-assembling into the slot's buffer::

batch = d.pop()
tensor = torch.empty_like(...)  # on side_stream
with torch.cuda.stream(side_stream):
    tensor.copy_(torch.from_dlpack(batch))
batch.release(event=side_stream)  # no host sync
Source code in python/damacy/__init__.py
def __init__(self, native_batch: _native.Batch) -> None:
    self._native = native_batch

info property

info: BatchInfo

Snapshot of the on-device batch geometry. Raises after release.

release

release(*, event: object | None = None) -> None

Return the slot to the pool. Idempotent.

Parameters:

Name Type Description Default
event object | None

If None (default), the slot is freed immediately; damacy may reuse the buffer right away, so callers must have host-synced any work that reads it. Otherwise the slot reuse waits on the supplied CUDA event before damacy's assemble kernel writes the buffer again — the host returns at once. Accepted forms:

  • int — raw CUevent handle.
  • torch.cuda.Event — its .cuda_event is read.
  • torch.cuda.Streamrecord_event() is called on it; the recorded event captures the stream's current position.
None

Raises:

Type Description
DamacyError

If the deferred-release CUDA call fails. On failure the slot is still released back to the pool; damacy logs and re-raises rather than silently leaking.

Source code in python/damacy/__init__.py
def release(
    self,
    *,
    event: object | None = None,
) -> None:
    """Return the slot to the pool. Idempotent.

    Args:
        event: If ``None`` (default), the slot is freed immediately;
            damacy may reuse the buffer right away, so callers must
            have host-synced any work that reads it. Otherwise the
            slot reuse waits on the supplied CUDA event before
            damacy's assemble kernel writes the buffer again — the
            host returns at once. Accepted forms:

            * ``int`` — raw CUevent handle.
            * ``torch.cuda.Event`` — its ``.cuda_event`` is read.
            * ``torch.cuda.Stream`` — ``record_event()`` is called
              on it; the recorded event captures the stream's
              current position.

    Raises:
        DamacyError: If the deferred-release CUDA call fails. On
            failure the slot is still released back to the pool;
            damacy logs and re-raises rather than silently leaking.
    """
    handle = _coerce_cuda_event_handle(event)
    if handle is None:
        self._native.release()
    else:
        self._native.release(handle)

Value types

BatchInfo dataclass

BatchInfo(
    device_ptr: int,
    shape: tuple[int, ...],
    dtype: Dtype,
    ready_stream: int,
    batch_id: int,
)

Snapshot of the on-device batch geometry.

Metric dataclass

Metric(
    name: str,
    ms: float,
    best_ms: float,
    input_bytes: float,
    output_bytes: float,
    count: int,
)

One pipeline-stage metric. ms is cumulative; best_ms is the best single observation (large sentinel when no samples yet).

Stats dataclass

Stats(
    plan: Metric,
    io: Metric,
    input_transfer: Metric,
    decode: Metric,
    post_decode: Metric,
    decode_gap: Metric,
    assemble: Metric,
    bind_wait: Metric,
    pop_wait: Metric,
    array_meta_hits: int,
    array_meta_misses: int,
    shard_index_hits: int,
    shard_index_misses: int,
    chunk_layout_hits: int,
    chunk_layout_misses: int,
    metadata_latency_ops: int,
    metadata_latency_stat_ops: int,
    metadata_latency_submit_ops: int,
    metadata_latency_active: int,
    metadata_latency_max_active: int,
    metadata_latency_total_sleep_ns: int,
    metadata_latency_max_sleep_ns: int,
    metadata_backend_read_jobs: int,
    metadata_backend_read_active: int,
    metadata_backend_read_max_active: int,
    batches_emitted: int,
    waves_emitted: int,
    chunks_planned: int,
    chunks_to_load: int,
    chunks_dispatched: int,
    reads_issued: int,
    worker_steps: int,
    gpu_bytes_committed: int,
)

Cumulative pipeline metrics. Reset with :meth:Pipeline.stats_reset.

Enums

Dtype

Bases: IntEnum

Destination dtype for assembled batches. Sources may differ; the assemble kernel casts each element to this type.

coerce classmethod

coerce(value: str | int | Dtype) -> Dtype

Accept enum / int / one of {"f32", "float32", "bf16", "bfloat16"}.

>>> Dtype.coerce("f32") is Dtype.F32
True
>>> Dtype.coerce("BFloat16") is Dtype.BF16
True
>>> Dtype.coerce(Dtype.F32) is Dtype.F32
True
>>> Dtype.coerce("nope")
Traceback (most recent call last):
    ...
ValueError: unknown dtype: 'nope'
Source code in python/damacy/__init__.py
@classmethod
def coerce(cls, value: str | int | Dtype) -> Dtype:
    """Accept enum / int / one of {"f32", "float32", "bf16", "bfloat16"}.

    ```pycon
    >>> Dtype.coerce("f32") is Dtype.F32
    True
    >>> Dtype.coerce("BFloat16") is Dtype.BF16
    True
    >>> Dtype.coerce(Dtype.F32) is Dtype.F32
    True
    >>> Dtype.coerce("nope")
    Traceback (most recent call last):
        ...
    ValueError: unknown dtype: 'nope'

    ```
    """
    if isinstance(value, cls):
        return value
    if isinstance(value, int):
        return cls(value)
    s = str(value).lower()
    if s in ("f32", "float32"):
        return cls.F32
    if s in ("bf16", "bfloat16"):
        return cls.BF16
    raise ValueError(f"unknown dtype: {value!r}")

Status

Bases: IntEnum

Mirrors enum damacy_status.

Exceptions

DamacyError

Bases: DamacyError

Base class for all damacy errors. .status is a :class:Status; .what names the failing stage (e.g. "create", "pop").

TryAgain

Bases: DamacyError

Non-blocking call would block (lookahead queue full).

InvalidArgument

Bases: DamacyError

Bad arguments / configuration rejected at create or push.

NotFound

Bases: DamacyError

Sample uri did not resolve to a zarr.

DtypeMismatch

Bases: DamacyError

Source dtype has no cast path to the configured destination dtype.

RankMismatch

Bases: DamacyError

Sample rank is incompatible with the resolved zarr rank.

StorageError

Bases: DamacyError

Read or open failure on a shard file. Named StorageError rather than IOError to avoid shadowing :class:builtins.IOError for callers that from damacy import *.

DecodeError

Bases: DamacyError

Codec parse or decompression failure.

NativeCudaError

Bases: DamacyError

CUDA driver/runtime call failed.

OutOfMemory

Bases: DamacyError

Host allocation failed (malloc/calloc returned NULL). Distinct from :class:BudgetExceeded — the OS denied memory, the configured cap was not the limiting factor.

BudgetExceeded

Bases: DamacyError

A configured cap is too small to satisfy the request. Most commonly: Config.max_gpu_memory_bytes cannot fit the requested batch geometry, or a chunk's uncompressed size exceeds Config.max_chunk_uncompressed_bytes. Raise the relevant cap and retry.

ShutdownError

Bases: DamacyError

Pipeline destroyed or in failed state.

PoolStarved

Bases: DamacyError

Raised when :meth:Pipeline.pop waits longer than Config.pop_timeout_s for the next batch.

The usual cause is your loop holding on to tensors from previous batches — for example by stashing them in a list — which keeps damacy from reusing that memory. Drop those references before the next pop(), or call .clone() if you need to keep them.

Logging

set_log_level

set_log_level(level: int) -> None

Set the threshold for the C-side stderr sink (TRACE=0..FATAL=5). The Python sink (logging.getLogger("damacy")) is independent.

Source code in python/damacy/__init__.py
def set_log_level(level: int) -> None:
    """Set the threshold for the C-side stderr sink (TRACE=0..FATAL=5).
    The Python sink (``logging.getLogger("damacy")``) is independent."""
    _native.set_log_level(level)

set_log_quiet

set_log_quiet(quiet: bool) -> None

Toggle the C-side stderr sink. The Python logging sink keeps firing regardless.

Source code in python/damacy/__init__.py
def set_log_quiet(quiet: bool) -> None:
    """Toggle the C-side stderr sink. The Python ``logging`` sink keeps
    firing regardless."""
    _native.set_log_quiet(quiet)