Skip to content

API reference

The full public surface of the damacy package.

Pipeline

Pipeline

Pipeline(config: Config)

Streaming GPU data pipeline. Drive :meth:push, :meth:pop, :meth:flush. Stages are plan → host I/O → H2D copy → on-device decompress → assemble; output batches are double-buffered (B=2) and waves are double-buffered internally.

A CUcontext must be current on the calling thread when this is constructed; PyTorch sets one up implicitly. For bare-Python use, call :func:damacy._native.cuda_init_primary once first.

Constructed from a :class:Config::

cfg = damacy.Config(batch_size=8, ...)
with damacy.Pipeline(cfg) as p:
    ...

Resource caps are fixed at construction; nothing grows after that.

Source code in python/damacy/__init__.py
def __init__(self, config: Config) -> None:
    try:
        self._native = _native.Pipeline(
            batch_size=config.batch_size,
            lookahead_batches=config.lookahead_batches,
            n_io_threads=config.n_io_threads,
            n_compute_threads=config.n_compute_threads,
            host_buffer_bytes=config.host_buffer_bytes,
            device_buffer_bytes=config.device_buffer_bytes,
            n_zarrs_meta_cache=config.n_zarrs_meta_cache,
            n_shards_meta_cache=config.n_shards_meta_cache,
            dtype=int(config.dtype),  # already coerced by Config.__init__
            max_chunk_uncompressed_bytes=config.max_chunk_uncompressed_bytes,
            max_gpu_memory_bytes=config.max_gpu_memory_bytes,
            max_bytes_per_element=config.max_bytes_per_element,
            device=-1 if config.device is None else int(config.device),
        )
    except _native.DamacyError as exc:
        _reraise_typed(exc)
    self._closed = False
    self._config = config
    # User-side queue of pending sample iterators. push() appends
    # here and best-effort drains; pop()/flush() top up before
    # touching native. This makes push() consume-everything from
    # the user's perspective and lets generators flow naturally.
    # _pending_buf is the head iterator's already-pulled-but-not-yet-
    # pushed samples; held flat to avoid wrapping `it` in successive
    # itertools.chain() layers under sustained backpressure.
    self._pending: deque[Iterator[Sample]] = deque()
    self._pending_buf: list[Sample] = []
    _warn_if_local_rank_disagrees(config.device, self._native.device)

device property

device: int

CUDA device index this pipeline is bound to.

config property

config: Config

The :class:Config this loader was built from.

pending property

pending: bool

True if push() has accepted samples that haven't yet entered the native lookahead. Becomes False as :meth:pop frees space.

close

close() -> None

Release the underlying handle. Idempotent. Subsequent calls on the pipeline raise :class:ShutdownError.

Source code in python/damacy/__init__.py
def close(self) -> None:
    """Release the underlying handle. Idempotent. Subsequent calls
    on the pipeline raise :class:`ShutdownError`."""
    if not self._closed:
        self._closed = True
        del self._native

push

push(samples: Iterable[Sample]) -> None

Queue samples for processing. Accepts any iterable (list, generator, infinite generator, …); large or unbounded sources are pulled lazily as :meth:pop frees space.

Fatal errors from the C-side validator (NotFound, DtypeMismatch, RankMismatch, …) raise the matching :class:DamacyError subclass; the offending iterator is discarded but samples accepted by earlier push calls are unaffected.

Source code in python/damacy/__init__.py
def push(self, samples: Iterable[Sample]) -> None:
    """Queue samples for processing. Accepts any iterable (list,
    generator, infinite generator, …); large or unbounded sources
    are pulled lazily as :meth:`pop` frees space.

    Fatal errors from the C-side validator (``NotFound``,
    ``DtypeMismatch``, ``RankMismatch``, …) raise the matching
    :class:`DamacyError` subclass; the offending iterator is
    discarded but samples accepted by earlier ``push`` calls are
    unaffected.
    """
    self._check_open()
    self._pending.append(iter(samples))
    self._drain_pending()

pop

pop() -> Batch

Block until the next batch is on-device-ready. Returns a :class:Batch you can hand to torch.from_dlpack (or any DLPack consumer) — preferably inside a with block.

Source code in python/damacy/__init__.py
def pop(self) -> Batch:
    """Block until the next batch is on-device-ready. Returns a
    :class:`Batch` you can hand to ``torch.from_dlpack`` (or any
    DLPack consumer) — preferably inside a ``with`` block."""
    self._check_open()
    # Top up native from the pending queue so the planner has work.
    self._drain_pending()
    try:
        return Batch(self._native.pop())
    except _native.DamacyError as exc:
        _reraise_typed(exc)

flush

flush() -> None

Drain pending samples into the pipeline (best-effort) and ready any partial last batch for pop. Idempotent. Pending samples that don't fit before flush are dropped — pop until :attr:pending reads False if you want every queued sample to emit as a batch.

Source code in python/damacy/__init__.py
def flush(self) -> None:
    """Drain pending samples into the pipeline (best-effort) and
    ready any partial last batch for pop. Idempotent. Pending
    samples that don't fit before flush are dropped — pop until
    :attr:`pending` reads False if you want every queued sample to
    emit as a batch."""
    self._check_open()
    self._drain_pending()
    try:
        self._native.flush()
    except _native.DamacyError as exc:
        _reraise_typed(exc)

batches

batches(n: int) -> Iterator[Batch]

Pop n batches as an iterator. Each call to :meth:pop blocks until that batch is on-device-ready.

Pair with a with block so the slot is released::

for batch in d.batches(8):
    with batch as t:
        x = torch.from_dlpack(t)
        ...
Source code in python/damacy/__init__.py
def batches(self, n: int) -> Iterator[Batch]:
    """Pop *n* batches as an iterator. Each call to :meth:`pop`
    blocks until that batch is on-device-ready.

    Pair with a ``with`` block so the slot is released::

        for batch in d.batches(8):
            with batch as t:
                x = torch.from_dlpack(t)
                ...
    """
    for _ in range(n):
        yield self.pop()

Config dataclass

Config(
    *,
    batch_size: int,
    host_buffer_bytes: int,
    device_buffer_bytes: int,
    dtype: Dtype | str | int = F32,
    lookahead_batches: int = 2,
    n_io_threads: int = 4,
    n_compute_threads: int = 0,
    n_zarrs_meta_cache: int = 64,
    n_shards_meta_cache: int = 256,
    max_chunk_uncompressed_bytes: int = 0,
    max_gpu_memory_bytes: int = 0,
    max_bytes_per_element: int = 0,
    device: int | None = None,
)

All resource caps and pipeline shape, fixed at create time.

Build variants with :func:dataclasses.replace:

import dataclasses base = Config(batch_size=8, ... host_buffer_bytes=1 << 30, device_buffer_bytes=1 << 30) base.dtype is Dtype.F32 True dataclasses.replace(base, batch_size=64).batch_size 64

Validation runs in __init__ so invalid configs fail before we touch CUDA. The constructor accepts :class:Dtype, an int, or one of "f32" / "float32" / "bf16" / "bfloat16" for the dtype argument; the stored field is always a :class:Dtype.

Config(batch_size=0, ... host_buffer_bytes=1, device_buffer_bytes=1) Traceback (most recent call last): ... ValueError: batch_size must be >= 1 (got 0)

Attributes:

Name Type Description
batch_size int

Samples per batch (>= 1).

host_buffer_bytes int

Pinned-host staging budget; sized for IO bw.

device_buffer_bytes int

Device decompress-scratch budget.

dtype Dtype

Destination dtype for assembled batches.

lookahead_batches int

User-side push-queue depth (>= 2).

n_io_threads int

IO worker threads (>= 1).

n_compute_threads int

Background workers for blosc1 chunk-header parsing (>= 0). 0 runs parsing serially on the calling thread; > 0 spawns a fork-join pool. Total parallelism is n_compute_threads + 1 (caller participates as tid 0).

n_zarrs_meta_cache int

LRU cap for zarr-metadata entries.

n_shards_meta_cache int

LRU cap for shard-index entries.

max_chunk_uncompressed_bytes int

Largest uncompressed chunk size the pipeline accepts; 0 selects the C default (512 KB). Values exceeding :data:MAX_CHUNK_UNCOMPRESSED_BYTES are rejected at create.

max_gpu_memory_bytes int

Hard cap on GPU memory allocated for wave-resident buffers and batch-output pools. 0 = no cap.

max_bytes_per_element int

Largest source dtype size (bytes) the pipeline will accept; 0 = the codec ceiling.

device int | None

CUDA device index to bind. None (default) captures the current CUcontext on the calling thread; pass an int (e.g. local_rank) to retain that device's primary context internally — recommended under torchrun / MPI.

Source code in python/damacy/__init__.py
def __init__(
    self,
    *,
    batch_size: int,
    host_buffer_bytes: int,
    device_buffer_bytes: int,
    dtype: Dtype | str | int = Dtype.F32,
    lookahead_batches: int = 2,
    n_io_threads: int = 4,
    n_compute_threads: int = 0,
    n_zarrs_meta_cache: int = 64,
    n_shards_meta_cache: int = 256,
    max_chunk_uncompressed_bytes: int = 0,
    max_gpu_memory_bytes: int = 0,
    max_bytes_per_element: int = 0,
    device: int | None = None,
) -> None:
    # Custom __init__ rather than __post_init__ so the constructor
    # signature accepts the polymorphic dtype input while reads of
    # `cfg.dtype` always type as Dtype. dataclass(init=False) keeps
    # the auto-generated __eq__ / __hash__ / __repr__.
    if batch_size < 1:
        raise ValueError(f"batch_size must be >= 1 (got {batch_size})")
    if lookahead_batches < 2:
        raise ValueError(
            f"lookahead_batches must be >= 2 (got {lookahead_batches})"
        )
    if n_io_threads < 1:
        raise ValueError(f"n_io_threads must be >= 1 (got {n_io_threads})")
    if n_compute_threads < 0:
        raise ValueError(
            f"n_compute_threads must be >= 0 (got {n_compute_threads})"
        )
    if host_buffer_bytes <= 0 or device_buffer_bytes <= 0:
        raise ValueError("host/device_buffer_bytes must be positive")
    if max_chunk_uncompressed_bytes < 0:
        raise ValueError("max_chunk_uncompressed_bytes must be >= 0")
    set_ = object.__setattr__  # frozen=True forbids `self.x = ...`
    set_(self, "batch_size", batch_size)
    set_(self, "host_buffer_bytes", host_buffer_bytes)
    set_(self, "device_buffer_bytes", device_buffer_bytes)
    set_(self, "dtype", Dtype.coerce(dtype))
    set_(self, "lookahead_batches", lookahead_batches)
    set_(self, "n_io_threads", n_io_threads)
    set_(self, "n_compute_threads", n_compute_threads)
    set_(self, "n_zarrs_meta_cache", n_zarrs_meta_cache)
    set_(self, "n_shards_meta_cache", n_shards_meta_cache)
    set_(self, "max_chunk_uncompressed_bytes", max_chunk_uncompressed_bytes)
    set_(self, "max_gpu_memory_bytes", max_gpu_memory_bytes)
    set_(self, "max_bytes_per_element", max_bytes_per_element)
    set_(self, "device", device)

Sample dataclass

Sample(uri: str, aabb: Iterable[slice | tuple[int, int]])

One sample request. aabb is a per-axis half-open interval list in level-0 voxel indices, in the zarr's stored axis order.

Each axis may be a (start, stop) 2-tuple or a Python slice; the tuple of slices that numpy.s_[...] produces is accepted directly. The stored form is always tuple[tuple[int, int], ...] regardless of how it was spelled, so equivalent inputs hash and compare equal:

a = Sample(uri="cell.zarr", aabb=[(0, 64), (0, 256), (0, 256)]) b = Sample( ... uri="cell.zarr", ... aabb=[slice(0, 64), slice(0, 256), slice(0, 256)], ... ) c = Sample( ... uri="cell.zarr", ... aabb=[slice(None, 64), slice(None, 256), slice(None, 256)], ... ) a == b == c True hash(a) == hash(b) == hash(c) True a.aabb ((0, 64), (0, 256), (0, 256))

Bare ints in aabb are rejected so the behaviour stays consistent with NumPy/zarr indexing semantics (np.s_[64] means "point 64", not "extent (0, 64)"):

Sample(uri="cell.zarr", aabb=[64, 256, 256]) Traceback (most recent call last): ... TypeError: aabb axis 0: expected slice or (start, stop) tuple; got int

Slice validation rejects strided slices and unbounded stops:

Sample(uri="cell.zarr", aabb=[slice(0, 64, 2), slice(0, 256), slice(0, 256)]) Traceback (most recent call last): ... ValueError: aabb axis 0: slice step must be 1 or omitted (got step=2) Sample(uri="cell.zarr", aabb=[slice(0, None), slice(0, 256), slice(0, 256)]) Traceback (most recent call last): ... ValueError: aabb axis 0: slice stop is required (got slice(0, None, None))

Source code in python/damacy/__init__.py
def __init__(
    self,
    uri: str,
    aabb: Iterable[slice | tuple[int, int]],
) -> None:
    # Custom __init__ rather than __post_init__ so pyright sees the
    # polymorphic input shape on construction *and* the precise
    # canonical shape on field reads. dataclass(init=False) keeps
    # the auto-generated __eq__ / __hash__ / __repr__.
    object.__setattr__(self, "uri", uri)
    object.__setattr__(
        self,
        "aabb",
        tuple(_normalise_axis(x, axis=i) for i, x in enumerate(aabb)),
    )

Batch

Batch(native_batch: Batch)

A batch of samples on the device, ready for consumption.

Use as a context manager to release the slot back to the pool::

with d.pop() as batch:
    x = torch.from_dlpack(batch)

The DLPack capsule (batch.__dlpack__()) keeps the underlying storage alive as long as the consumer holds it; releasing the Batch object while a tensor still views it is safe.

Source code in python/damacy/__init__.py
def __init__(self, native_batch: _native.Batch) -> None:
    self._native = native_batch

release

release() -> None

Return the slot to the pool. Idempotent.

Source code in python/damacy/__init__.py
def release(self) -> None:
    """Return the slot to the pool. Idempotent."""
    self._native.release()

Value types

BatchInfo dataclass

BatchInfo(
    device_ptr: int,
    shape: tuple[int, ...],
    dtype: Dtype,
    ready_stream: int,
    batch_id: int,
)

Snapshot of the on-device batch geometry.

Metric dataclass

Metric(
    name: str,
    ms: float,
    best_ms: float,
    input_bytes: float,
    output_bytes: float,
    count: int,
)

One pipeline-stage metric. ms is cumulative; best_ms is the best single observation (large sentinel when no samples yet).

Stats dataclass

Stats(
    plan: Metric,
    io: Metric,
    h2d: Metric,
    decompress: Metric,
    assemble: Metric,
    pop_wait_io: Metric,
    pop_wait_compute: Metric,
    flush_wait: Metric,
    zarr_meta_hits: int,
    zarr_meta_misses: int,
    shard_idx_hits: int,
    shard_idx_misses: int,
    batches_emitted: int,
    batches_truncated: int,
    waves_emitted: int,
    gpu_bytes_committed: int,
)

Cumulative pipeline metrics. Reset with :meth:Pipeline.stats_reset.

Enums

Dtype

Bases: IntEnum

Destination dtype for assembled batches. Sources may differ; the assemble kernel casts each element to this type.

coerce classmethod

coerce(value: str | int | Dtype) -> Dtype

Accept enum / int / one of {"f32", "float32", "bf16", "bfloat16"}.

Dtype.coerce("f32") is Dtype.F32 True Dtype.coerce("BFloat16") is Dtype.BF16 True Dtype.coerce(Dtype.F32) is Dtype.F32 True Dtype.coerce("nope") Traceback (most recent call last): ... ValueError: unknown dtype: 'nope'

Source code in python/damacy/__init__.py
@classmethod
def coerce(cls, value: str | int | Dtype) -> Dtype:
    """Accept enum / int / one of {"f32", "float32", "bf16", "bfloat16"}.

    >>> Dtype.coerce("f32") is Dtype.F32
    True
    >>> Dtype.coerce("BFloat16") is Dtype.BF16
    True
    >>> Dtype.coerce(Dtype.F32) is Dtype.F32
    True
    >>> Dtype.coerce("nope")
    Traceback (most recent call last):
        ...
    ValueError: unknown dtype: 'nope'
    """
    if isinstance(value, cls):
        return value
    if isinstance(value, int):
        return cls(value)
    s = str(value).lower()
    if s in ("f32", "float32"):
        return cls.F32
    if s in ("bf16", "bfloat16"):
        return cls.BF16
    raise ValueError(f"unknown dtype: {value!r}")

Status

Bases: IntEnum

Mirrors enum damacy_status.

Exceptions

DamacyError

Bases: DamacyError

Base class for all damacy errors. .status is a :class:Status; .what names the failing stage (e.g. "create", "pop").

TryAgain

Bases: DamacyError

Non-blocking call would block (lookahead queue full).

InvalidArgument

Bases: DamacyError

Bad arguments / configuration rejected at create or push.

NotFound

Bases: DamacyError

Sample uri did not resolve to a zarr.

DtypeMismatch

Bases: DamacyError

Source dtype has no cast path to the configured destination dtype.

RankMismatch

Bases: DamacyError

Sample rank is incompatible with the resolved zarr rank.

StorageError

Bases: DamacyError

Read or open failure on a shard file. Named StorageError rather than IOError to avoid shadowing :class:builtins.IOError for callers that from damacy import *.

DecodeError

Bases: DamacyError

Codec parse or decompression failure.

NativeCudaError

Bases: DamacyError

CUDA driver/runtime call failed.

OutOfMemory

Bases: DamacyError

Configured GPU memory cap would be exceeded.

ShutdownError

Bases: DamacyError

Pipeline destroyed or in failed state.

Logging

set_log_level

set_log_level(level: int) -> None

Set the threshold for the C-side stderr sink (TRACE=0..FATAL=5). The Python sink (logging.getLogger("damacy")) is independent.

Source code in python/damacy/__init__.py
def set_log_level(level: int) -> None:
    """Set the threshold for the C-side stderr sink (TRACE=0..FATAL=5).
    The Python sink (``logging.getLogger("damacy")``) is independent."""
    _native.set_log_level(level)

set_log_quiet

set_log_quiet(quiet: bool) -> None

Toggle the C-side stderr sink. The Python logging sink keeps firing regardless.

Source code in python/damacy/__init__.py
def set_log_quiet(quiet: bool) -> None:
    """Toggle the C-side stderr sink. The Python ``logging`` sink keeps
    firing regardless."""
    _native.set_log_quiet(quiet)