API reference¶
The full public surface of the damacy package.
Pipeline¶
Pipeline
¶
Pipeline(config: Config)
Streaming GPU data pipeline. Drive :meth:push, :meth:pop.
Stages are plan → host I/O → H2D copy → on-device
decompress → assemble; output batches are double-buffered (B=2)
and waves are double-buffered internally.
A CUcontext must be current on the calling thread when this is
constructed; PyTorch sets one up implicitly. For bare-Python use,
call :func:damacy._native.cuda_init_primary once first.
Constructed from a :class:Config::
cfg = damacy.Config(samples_per_batch=8, ...)
with damacy.Pipeline(cfg) as p:
...
Resource caps are fixed at construction; nothing grows after that.
Source code in python/damacy/__init__.py
pending
property
¶
True if push() has accepted samples that haven't yet entered
the native lookahead. Becomes False as :meth:pop frees space.
close
¶
Release the underlying handle. Idempotent. Subsequent calls
on the pipeline raise :class:ShutdownError.
Source code in python/damacy/__init__.py
push
¶
push(samples: Iterable[Sample]) -> None
Queue samples for processing. Accepts any iterable (list,
generator, infinite generator, …); large or unbounded sources
are pulled lazily as :meth:pop frees space.
Local validation (shape/rank against Config.sample_shape)
raises the matching :class:DamacyError subclass here and
discards the offending iterator. Errors that depend on store
contents — :class:NotFound, :class:DtypeMismatch,
per-array :class:RankMismatch, decode failures — surface at
:meth:pop instead, since the pipeline fetches metadata
asynchronously after push returns. Once any such error fires,
the pipeline is terminal — rebuild a fresh :class:Pipeline
to recover. Calling :meth:push on a terminal pipeline raises
:class:ShutdownError.
Batching is drop_last=True: only complete batches of
Config.samples_per_batch are emitted, so trailing samples
beyond the last whole multiple are never returned. (Emitting
the ragged final batch is not yet supported — issue #139.)
Source code in python/damacy/__init__.py
pop
¶
pop() -> Batch
Block until the next batch is on-device-ready. Returns a
:class:Batch you can hand to torch.from_dlpack (or any
DLPack consumer) — preferably inside a with block.
Raises :class:PoolStarved if no batch arrives within
Config.pop_timeout_s seconds (default 30). Usually that
means tensors from previous batches are still being held —
drop them, or .clone() if you need to keep them. It also
fires if you pop past the batches produced: only
len(pushed) // Config.samples_per_batch exist (drop_last),
and a further pop waits on a batch that is never sealed.
Store-derived errors — :class:NotFound,
:class:DtypeMismatch, per-array :class:RankMismatch,
decode failures — surface here rather than at push. Once any
such error fires, the pipeline is terminal; subsequent calls
re-raise the same status.
Source code in python/damacy/__init__.py
batches
¶
batches(n: int) -> Iterator[Batch]
Pop n batches as an iterator. Each call to :meth:pop
blocks until that batch is on-device-ready.
Pair with a with block so the slot is released::
for batch in d.batches(8):
with batch as t:
x = torch.from_dlpack(t)
...
Source code in python/damacy/__init__.py
stats
¶
stats() -> Stats
Cumulative pipeline metrics as a :class:Stats snapshot.
The snapshot is taken at call time; pipeline counters keep
accumulating in the background. Per-stage :class:Metric
fields carry cumulative milliseconds, the best single
observation, input/output byte totals, and a sample count.
Cache hit/miss counters and lifetime totals (batches emitted,
waves emitted, chunks dispatched, …) round out the snapshot.
gpu_bytes_committed reflects the live GPU footprint
counted against Config.max_gpu_memory_bytes; it grows from
wave-init to first pop (lazy batch-output sizing) and stays
flat afterward.
Use :meth:stats_reset to zero the cumulative timing and
metadata-latency counters. gpu_bytes_committed is not reset
— it reflects the live commitment, not a delta.
Raises:
| Type | Description |
|---|---|
ShutdownError
|
If the pipeline has been closed. |
Source code in python/damacy/__init__.py
stats_reset
¶
Zero the cumulative timing counters and per-stage rolling
totals. Cache hit/miss counters and gpu_bytes_committed are
left alone — they reflect live state, not deltas. Metadata
latency counters are reset with the timing counters.
Raises:
| Type | Description |
|---|---|
ShutdownError
|
If the pipeline has been closed. |
Source code in python/damacy/__init__.py
Config
dataclass
¶
Config(
*,
samples_per_batch: int,
sample_shape: Sequence[int],
max_gpu_memory_bytes: int,
dtype: Dtype | str | int = F32,
lookahead_samples: int | None = None,
max_chunk_uncompressed_bytes: int = DEFAULT_CHUNK_UNCOMPRESSED_BYTES,
max_read_op_bytes: int = DEFAULT_READ_OP_MAX_BYTES,
host_buffer_waves: int = DEFAULT_HOST_BUFFER_WAVES,
max_chunks_per_wave: int = DEFAULT_MAX_CHUNKS_PER_WAVE,
max_substreams_per_chunk: int = DEFAULT_MAX_SUBSTREAMS_PER_CHUNK,
n_io_threads: int = DEFAULT_IO_THREADS,
metadata_io_concurrency: int = DEFAULT_METADATA_IO_CONCURRENCY,
n_array_meta_cache: int = DEFAULT_ARRAY_META_CACHE,
n_shard_index_cache: int = DEFAULT_SHARD_INDEX_CACHE,
n_chunk_layout_cache: int = DEFAULT_CHUNK_LAYOUT_CACHE,
max_shards_per_sample: int = DEFAULT_MAX_SHARDS_PER_SAMPLE,
device: int | None = None,
pop_timeout_s: float | None = 30.0,
enable_gds: bool | None = None,
numa_strategy: NumaStrategy | str | int = AUTO,
numa_node: int = -1,
metadata_latency: LatencyModel | None = None,
)
All resource caps and pipeline shape, fixed at create time.
Build variants with :func:dataclasses.replace:
>>> import dataclasses
>>> base = Config(samples_per_batch=8, sample_shape=(8, 16),
... max_gpu_memory_bytes=1 << 30)
>>> base.dtype is Dtype.F32
True
>>> dataclasses.replace(base, samples_per_batch=64, lookahead_samples=128).samples_per_batch
64
Validation runs in __init__ so invalid configs fail before we
touch CUDA. The constructor accepts :class:Dtype, an int, or one
of "f32" / "float32" / "bf16" / "bfloat16" for the
dtype argument; the stored field is always a :class:Dtype.
>>> Config(samples_per_batch=0, sample_shape=(8, 16), max_gpu_memory_bytes=1 << 30)
Traceback (most recent call last):
...
ValueError: samples_per_batch must be >= 1 (got 0)
Attributes:
| Name | Type | Description |
|---|---|---|
samples_per_batch |
int
|
Samples per batch (>= 1). |
max_gpu_memory_bytes |
int
|
Primary GPU budget knob. Hard cap on
GPU memory allocated for wave-resident buffers, decoder
scratch, per-wave fanout SOAs, and batch-output pools.
Required — no default. A value too small for the
requested batch geometry raises :class: |
dtype |
Dtype
|
Destination dtype for assembled batches. |
lookahead_samples |
int
|
User-side push-queue depth in samples. Defaults to two full output batches. |
n_io_threads |
int
|
Bulk data IO worker threads (>= 1). Defaults to 64. |
metadata_io_concurrency |
int
|
Async metadata request concurrency (>= 1). |
n_array_meta_cache |
int
|
LRU cap for zarr-metadata entries. Must be
|
n_shard_index_cache |
int
|
LRU cap for shard-index entries. Must be
|
n_chunk_layout_cache |
int
|
LRU cap for per-array blosc1 chunk-layout entries.
Must be |
max_shards_per_sample |
int
|
Declared upper bound on the number of shards
a single sample's AABB may intersect (>= 1). Sizes the
|
max_chunk_uncompressed_bytes |
int
|
Largest uncompressed chunk size the pipeline accepts; 0 selects the C default (512 KB). |
max_read_op_bytes |
int
|
Cap on the size of a single coalesced read issued to storage. 0 selects the C default. Tune against your storage tier: small values keep the queue deep and the read pattern fine-grained; large values amortize per-syscall overhead at the cost of latency spikes. |
device |
int | None
|
CUDA device index to bind. |
pop_timeout_s |
float | None
|
How long :meth: |
enable_gds |
bool | None
|
GPUDirect Storage opt-in. |
numa_strategy |
NumaStrategy
|
How to pin pinned-host slabs and worker
threads to a host-NUMA node. :attr: |
numa_node |
int
|
Explicit host-NUMA node when
|
metadata_latency |
LatencyModel
|
Synthetic metadata-store latency model used
for benchmarking prefetch behavior. |
Source code in python/damacy/__init__.py
568 569 570 571 572 573 574 575 576 577 578 579 580 581 582 583 584 585 586 587 588 589 590 591 592 593 594 595 596 597 598 599 600 601 602 603 604 605 606 607 608 609 610 611 612 613 614 615 616 617 618 619 620 621 622 623 624 625 626 627 628 629 630 631 632 633 634 635 636 637 638 639 640 641 642 643 644 645 646 647 648 649 650 651 652 653 654 655 656 657 658 659 660 661 662 663 664 665 666 667 668 669 670 671 672 673 674 675 676 677 678 679 680 681 682 683 684 685 686 687 688 689 690 691 692 693 694 695 696 697 698 | |
default
classmethod
¶
default(
*,
samples_per_batch: int,
sample_shape: Sequence[int],
max_gpu_memory_bytes: int,
dtype: Dtype | str | int = F32,
**overrides: Any,
) -> Self
Recommended config for networked-FS (NFS) training.
Convenience over the constructor: the tuning field defaults already carry the cluster-tuned values (see damacy_tuning_defaults). The caller supplies the workload geometry; any field may be overridden by keyword.
Source code in python/damacy/__init__.py
Sample
dataclass
¶
One sample request. aabb is a per-axis half-open interval
list in level-0 voxel indices, in the zarr's stored axis order.
Each axis may be a (start, stop) 2-tuple or a Python slice;
the tuple of slices that numpy.s_[...] produces is accepted
directly. The stored form is always
tuple[tuple[int, int], ...] regardless of how it was spelled,
so equivalent inputs hash and compare equal:
>>> a = Sample(uri="cell.zarr", aabb=[(0, 64), (0, 256), (0, 256)])
>>> b = Sample(
... uri="cell.zarr",
... aabb=[slice(0, 64), slice(0, 256), slice(0, 256)],
... )
>>> c = Sample(
... uri="cell.zarr",
... aabb=[slice(None, 64), slice(None, 256), slice(None, 256)],
... )
>>> a == b == c
True
>>> hash(a) == hash(b) == hash(c)
True
>>> a.aabb
((0, 64), (0, 256), (0, 256))
Bare ints in aabb are rejected so the behaviour stays
consistent with NumPy/zarr indexing semantics
(np.s_[64] means "point 64", not "extent (0, 64)"):
>>> Sample(uri="cell.zarr", aabb=[64, 256, 256])
Traceback (most recent call last):
...
TypeError: aabb axis 0: expected slice or (start, stop) tuple; got int
Slice validation rejects strided slices and unbounded stops:
>>> Sample(uri="cell.zarr", aabb=[slice(0, 64, 2), slice(0, 256), slice(0, 256)])
Traceback (most recent call last):
...
ValueError: aabb axis 0: slice step must be 1 or omitted (got step=2)
>>> Sample(uri="cell.zarr", aabb=[slice(0, None), slice(0, 256), slice(0, 256)])
Traceback (most recent call last):
...
ValueError: aabb axis 0: slice stop is required (got slice(0, None, None))
Source code in python/damacy/__init__.py
Batch
¶
A batch of samples on the device, ready for consumption.
Use as a context manager to release the slot back to the pool::
with d.pop() as batch:
x = torch.from_dlpack(batch)
The DLPack capsule (batch.__dlpack__()) keeps the underlying
storage alive as long as the consumer holds it; releasing the
Batch object while a tensor still views it is safe.
Deferred release. If the consumer kicks off an async D2D copy on
a side stream, the default with block forces a host-side
cuStreamSynchronize on the producer stream before the slot is
reused. To avoid that block, call :meth:release explicitly with
the consumer's stream or event — damacy will stream-wait on it
before re-assembling into the slot's buffer::
batch = d.pop()
tensor = torch.empty_like(...) # on side_stream
with torch.cuda.stream(side_stream):
tensor.copy_(torch.from_dlpack(batch))
batch.release(event=side_stream) # no host sync
Source code in python/damacy/__init__.py
release
¶
Return the slot to the pool. Idempotent.
Parameters:
| Name | Type | Description | Default |
|---|---|---|---|
event
|
object | None
|
If
|
None
|
Raises:
| Type | Description |
|---|---|
DamacyError
|
If the deferred-release CUDA call fails. On failure the slot is still released back to the pool; damacy logs and re-raises rather than silently leaking. |
Source code in python/damacy/__init__.py
Value types¶
BatchInfo
dataclass
¶
BatchInfo(
device_ptr: int,
shape: tuple[int, ...],
dtype: Dtype,
ready_stream: int,
batch_id: int,
)
Snapshot of the on-device batch geometry.
Metric
dataclass
¶
Metric(
name: str,
ms: float,
best_ms: float,
input_bytes: float,
output_bytes: float,
count: int,
)
One pipeline-stage metric. ms is cumulative; best_ms is the
best single observation (large sentinel when no samples yet).
Stats
dataclass
¶
Stats(
plan: Metric,
io: Metric,
input_transfer: Metric,
decode: Metric,
post_decode: Metric,
decode_gap: Metric,
assemble: Metric,
bind_wait: Metric,
pop_wait: Metric,
array_meta_hits: int,
array_meta_misses: int,
shard_index_hits: int,
shard_index_misses: int,
chunk_layout_hits: int,
chunk_layout_misses: int,
metadata_latency_ops: int,
metadata_latency_stat_ops: int,
metadata_latency_submit_ops: int,
metadata_latency_active: int,
metadata_latency_max_active: int,
metadata_latency_total_sleep_ns: int,
metadata_latency_max_sleep_ns: int,
metadata_backend_read_jobs: int,
metadata_backend_read_active: int,
metadata_backend_read_max_active: int,
batches_emitted: int,
waves_emitted: int,
chunks_planned: int,
chunks_to_load: int,
chunks_dispatched: int,
reads_issued: int,
worker_steps: int,
gpu_bytes_committed: int,
)
Cumulative pipeline metrics. Reset with :meth:Pipeline.stats_reset.
Enums¶
Dtype
¶
Bases: IntEnum
Destination dtype for assembled batches. Sources may differ; the assemble kernel casts each element to this type.
coerce
classmethod
¶
Accept enum / int / one of {"f32", "float32", "bf16", "bfloat16"}.
>>> Dtype.coerce("f32") is Dtype.F32
True
>>> Dtype.coerce("BFloat16") is Dtype.BF16
True
>>> Dtype.coerce(Dtype.F32) is Dtype.F32
True
>>> Dtype.coerce("nope")
Traceback (most recent call last):
...
ValueError: unknown dtype: 'nope'
Source code in python/damacy/__init__.py
Status
¶
Bases: IntEnum
Mirrors enum damacy_status.
Exceptions¶
DamacyError
¶
Bases: DamacyError
Base class for all damacy errors. .status is a :class:Status;
.what names the failing stage (e.g. "create", "pop").
TryAgain
¶
InvalidArgument
¶
NotFound
¶
DtypeMismatch
¶
RankMismatch
¶
StorageError
¶
Bases: DamacyError
Read or open failure on a shard file. Named StorageError rather
than IOError to avoid shadowing :class:builtins.IOError for
callers that from damacy import *.
DecodeError
¶
NativeCudaError
¶
OutOfMemory
¶
Bases: DamacyError
Host allocation failed (malloc/calloc returned NULL). Distinct
from :class:BudgetExceeded — the OS denied memory, the
configured cap was not the limiting factor.
BudgetExceeded
¶
Bases: DamacyError
A configured cap is too small to satisfy the request. Most
commonly: Config.max_gpu_memory_bytes cannot fit the
requested batch geometry, or a chunk's uncompressed size exceeds
Config.max_chunk_uncompressed_bytes. Raise the relevant cap
and retry.
ShutdownError
¶
PoolStarved
¶
Bases: DamacyError
Raised when :meth:Pipeline.pop waits longer than
Config.pop_timeout_s for the next batch.
The usual cause is your loop holding on to tensors from previous
batches — for example by stashing them in a list — which keeps
damacy from reusing that memory. Drop those references before
the next pop(), or call .clone() if you need to keep
them.
Logging¶
set_log_level
¶
Set the threshold for the C-side stderr sink (TRACE=0..FATAL=5).
The Python sink (logging.getLogger("damacy")) is independent.