API reference¶
The full public surface of the damacy package.
Pipeline¶
Pipeline
¶
Pipeline(config: Config)
Streaming GPU data pipeline. Drive :meth:push, :meth:pop,
:meth:flush. Stages are plan → host I/O → H2D copy → on-device
decompress → assemble; output batches are double-buffered (B=2)
and waves are double-buffered internally.
A CUcontext must be current on the calling thread when this is
constructed; PyTorch sets one up implicitly. For bare-Python use,
call :func:damacy._native.cuda_init_primary once first.
Constructed from a :class:Config::
cfg = damacy.Config(batch_size=8, ...)
with damacy.Pipeline(cfg) as p:
...
Resource caps are fixed at construction; nothing grows after that.
Source code in python/damacy/__init__.py
pending
property
¶
True if push() has accepted samples that haven't yet entered
the native lookahead. Becomes False as :meth:pop frees space.
close
¶
Release the underlying handle. Idempotent. Subsequent calls
on the pipeline raise :class:ShutdownError.
push
¶
push(samples: Iterable[Sample]) -> None
Queue samples for processing. Accepts any iterable (list,
generator, infinite generator, …); large or unbounded sources
are pulled lazily as :meth:pop frees space.
Fatal errors from the C-side validator (NotFound,
DtypeMismatch, RankMismatch, …) raise the matching
:class:DamacyError subclass; the offending iterator is
discarded but samples accepted by earlier push calls are
unaffected.
Source code in python/damacy/__init__.py
pop
¶
pop() -> Batch
Block until the next batch is on-device-ready. Returns a
:class:Batch you can hand to torch.from_dlpack (or any
DLPack consumer) — preferably inside a with block.
Source code in python/damacy/__init__.py
flush
¶
Drain pending samples into the pipeline (best-effort) and
ready any partial last batch for pop. Idempotent. Pending
samples that don't fit before flush are dropped — pop until
:attr:pending reads False if you want every queued sample to
emit as a batch.
Source code in python/damacy/__init__.py
batches
¶
batches(n: int) -> Iterator[Batch]
Pop n batches as an iterator. Each call to :meth:pop
blocks until that batch is on-device-ready.
Pair with a with block so the slot is released::
for batch in d.batches(8):
with batch as t:
x = torch.from_dlpack(t)
...
Source code in python/damacy/__init__.py
Config
dataclass
¶
Config(
*,
batch_size: int,
host_buffer_bytes: int,
device_buffer_bytes: int,
dtype: Dtype | str | int = F32,
lookahead_batches: int = 2,
n_io_threads: int = 4,
n_compute_threads: int = 0,
n_zarrs_meta_cache: int = 64,
n_shards_meta_cache: int = 256,
max_chunk_uncompressed_bytes: int = 0,
max_gpu_memory_bytes: int = 0,
max_bytes_per_element: int = 0,
device: int | None = None,
)
All resource caps and pipeline shape, fixed at create time.
Build variants with :func:dataclasses.replace:
import dataclasses base = Config(batch_size=8, ... host_buffer_bytes=1 << 30, device_buffer_bytes=1 << 30) base.dtype is Dtype.F32 True dataclasses.replace(base, batch_size=64).batch_size 64
Validation runs in __init__ so invalid configs fail before we
touch CUDA. The constructor accepts :class:Dtype, an int, or one
of "f32" / "float32" / "bf16" / "bfloat16" for the
dtype argument; the stored field is always a :class:Dtype.
Config(batch_size=0, ... host_buffer_bytes=1, device_buffer_bytes=1) Traceback (most recent call last): ... ValueError: batch_size must be >= 1 (got 0)
Attributes:
| Name | Type | Description |
|---|---|---|
batch_size |
int
|
Samples per batch (>= 1). |
host_buffer_bytes |
int
|
Pinned-host staging budget; sized for IO bw. |
device_buffer_bytes |
int
|
Device decompress-scratch budget. |
dtype |
Dtype
|
Destination dtype for assembled batches. |
lookahead_batches |
int
|
User-side push-queue depth (>= 2). |
n_io_threads |
int
|
IO worker threads (>= 1). |
n_compute_threads |
int
|
Background workers for blosc1 chunk-header
parsing (>= 0). 0 runs parsing serially on the calling
thread; > 0 spawns a fork-join pool. Total parallelism is
|
n_zarrs_meta_cache |
int
|
LRU cap for zarr-metadata entries. |
n_shards_meta_cache |
int
|
LRU cap for shard-index entries. |
max_chunk_uncompressed_bytes |
int
|
Largest uncompressed chunk size
the pipeline accepts; 0 selects the C default (512 KB).
Values exceeding :data: |
max_gpu_memory_bytes |
int
|
Hard cap on GPU memory allocated for wave-resident buffers and batch-output pools. 0 = no cap. |
max_bytes_per_element |
int
|
Largest source dtype size (bytes) the pipeline will accept; 0 = the codec ceiling. |
device |
int | None
|
CUDA device index to bind. |
Source code in python/damacy/__init__.py
Sample
dataclass
¶
One sample request. aabb is a per-axis half-open interval
list in level-0 voxel indices, in the zarr's stored axis order.
Each axis may be a (start, stop) 2-tuple or a Python slice;
the tuple of slices that numpy.s_[...] produces is accepted
directly. The stored form is always
tuple[tuple[int, int], ...] regardless of how it was spelled,
so equivalent inputs hash and compare equal:
a = Sample(uri="cell.zarr", aabb=[(0, 64), (0, 256), (0, 256)]) b = Sample( ... uri="cell.zarr", ... aabb=[slice(0, 64), slice(0, 256), slice(0, 256)], ... ) c = Sample( ... uri="cell.zarr", ... aabb=[slice(None, 64), slice(None, 256), slice(None, 256)], ... ) a == b == c True hash(a) == hash(b) == hash(c) True a.aabb ((0, 64), (0, 256), (0, 256))
Bare ints in aabb are rejected so the behaviour stays
consistent with NumPy/zarr indexing semantics
(np.s_[64] means "point 64", not "extent (0, 64)"):
Sample(uri="cell.zarr", aabb=[64, 256, 256]) Traceback (most recent call last): ... TypeError: aabb axis 0: expected slice or (start, stop) tuple; got int
Slice validation rejects strided slices and unbounded stops:
Sample(uri="cell.zarr", aabb=[slice(0, 64, 2), slice(0, 256), slice(0, 256)]) Traceback (most recent call last): ... ValueError: aabb axis 0: slice step must be 1 or omitted (got step=2) Sample(uri="cell.zarr", aabb=[slice(0, None), slice(0, 256), slice(0, 256)]) Traceback (most recent call last): ... ValueError: aabb axis 0: slice stop is required (got slice(0, None, None))
Source code in python/damacy/__init__.py
Batch
¶
A batch of samples on the device, ready for consumption.
Use as a context manager to release the slot back to the pool::
with d.pop() as batch:
x = torch.from_dlpack(batch)
The DLPack capsule (batch.__dlpack__()) keeps the underlying
storage alive as long as the consumer holds it; releasing the
Batch object while a tensor still views it is safe.
Source code in python/damacy/__init__.py
Value types¶
BatchInfo
dataclass
¶
BatchInfo(
device_ptr: int,
shape: tuple[int, ...],
dtype: Dtype,
ready_stream: int,
batch_id: int,
)
Snapshot of the on-device batch geometry.
Metric
dataclass
¶
Metric(
name: str,
ms: float,
best_ms: float,
input_bytes: float,
output_bytes: float,
count: int,
)
One pipeline-stage metric. ms is cumulative; best_ms is the
best single observation (large sentinel when no samples yet).
Stats
dataclass
¶
Stats(
plan: Metric,
io: Metric,
h2d: Metric,
decompress: Metric,
assemble: Metric,
pop_wait_io: Metric,
pop_wait_compute: Metric,
flush_wait: Metric,
zarr_meta_hits: int,
zarr_meta_misses: int,
shard_idx_hits: int,
shard_idx_misses: int,
batches_emitted: int,
batches_truncated: int,
waves_emitted: int,
gpu_bytes_committed: int,
)
Cumulative pipeline metrics. Reset with :meth:Pipeline.stats_reset.
Enums¶
Dtype
¶
Bases: IntEnum
Destination dtype for assembled batches. Sources may differ; the assemble kernel casts each element to this type.
coerce
classmethod
¶
Accept enum / int / one of {"f32", "float32", "bf16", "bfloat16"}.
Dtype.coerce("f32") is Dtype.F32 True Dtype.coerce("BFloat16") is Dtype.BF16 True Dtype.coerce(Dtype.F32) is Dtype.F32 True Dtype.coerce("nope") Traceback (most recent call last): ... ValueError: unknown dtype: 'nope'
Source code in python/damacy/__init__.py
Status
¶
Bases: IntEnum
Mirrors enum damacy_status.
Exceptions¶
DamacyError
¶
Bases: DamacyError
Base class for all damacy errors. .status is a :class:Status;
.what names the failing stage (e.g. "create", "pop").
TryAgain
¶
InvalidArgument
¶
NotFound
¶
DtypeMismatch
¶
RankMismatch
¶
StorageError
¶
Bases: DamacyError
Read or open failure on a shard file. Named StorageError rather
than IOError to avoid shadowing :class:builtins.IOError for
callers that from damacy import *.
DecodeError
¶
NativeCudaError
¶
OutOfMemory
¶
ShutdownError
¶
Logging¶
set_log_level
¶
Set the threshold for the C-side stderr sink (TRACE=0..FATAL=5).
The Python sink (logging.getLogger("damacy")) is independent.