Skip to content

Distributed training

Damacy is single-rank-aware: each rank constructs its own Pipeline, bound to its own GPU, draining its own slice of the sample list. There is no cross-rank coordination inside the pipeline. Throughput scales linearly with rank count when the storage layer keeps up.

Binding each rank to its GPU

A multi-rank launch needs two lines that do two different things:

torch.cuda.set_device(local_rank)              # for PyTorch
cfg = damacy.Config(..., device=local_rank)    # for damacy

torch.cuda.set_device(local_rank) tells PyTorch where new tensors land. Without it, every rank's training tensors go to GPU 0.

Config(device=local_rank) tells damacy to retain that device's primary CUDA context internally and run all pipeline work there. Without device=..., damacy captures whatever CUcontext is current on the calling thread — which silently lands every rank on GPU 0 if set_device was forgotten.

Pass local_rank to both. The two settings are cross-checked at construction: Pipeline(cfg) raises damacy.InvalidArgument if Config.device and the already-current context disagree on a device, and emits a UserWarning when LOCAL_RANK is set in the environment but the bound device differs.

A complete torchrun example

# train.py — launch with:
#   torchrun --standalone --nproc_per_node=8 train.py
import os
from collections.abc import Iterator

import damacy
import torch
import torch.distributed as dist


def crops_for_rank(rank: int, world_size: int) -> Iterator[damacy.Sample]:
    """Yield an indefinite stream of `damacy.Sample` for this rank.
    Sharding policy lives here — see "Sharding samples across ranks"
    below. The main README covers sample-construction patterns
    (random crops, tile grids, curriculums)."""
    ...


def main() -> None:
    dist.init_process_group(backend="nccl")
    local_rank = int(os.environ["LOCAL_RANK"])
    world_size = dist.get_world_size()

    # See "Binding each rank to its GPU" above for why both lines.
    torch.cuda.set_device(local_rank)

    cfg = damacy.Config(
        batch_size=8,
        host_buffer_bytes=1 << 30,    # per-rank; see "Sizing per rank"
        device_buffer_bytes=1 << 30,  # per-rank
        dtype="bf16",
        n_io_threads=4,                # per-rank
        device=local_rank,
    )

    model = ...        # your DDP-wrapped module
    optimizer = ...    # your optimizer
    n_steps = 10_000

    with damacy.Pipeline(cfg) as p:
        # Hand the generator off once; the pipeline pulls lazily as
        # pops free space, so memory stays bounded even though
        # crops_for_rank is unbounded.
        p.push(crops_for_rank(local_rank, world_size))

        for step in range(n_steps):
            with p.pop() as batch:
                x = torch.from_dlpack(batch)  # zero-copy, stream-fenced
                # forward / backward / optimizer step:
                # loss = model(x).mean(); loss.backward(); optimizer.step()
                ...

    dist.destroy_process_group()


if __name__ == "__main__":
    main()

Sharding samples across ranks

crops_for_rank decides which samples each rank sees. Damacy doesn't enforce a sharding policy — whatever your generator yields is what that rank works on. Two common patterns:

  • Strided (all_samples[rank::world_size]): each rank's consecutive batches draw from nearby URIs, which preserves shard-index and zarr-metadata cache reuse between adjacent batches on the same rank.
  • Contiguous (all_samples[rank * n : (rank + 1) * n]): use this when the original order encodes a curriculum you don't want to interleave away.

Generators that draw random crops can shard implicitly by seeding their RNG with (base_seed, rank, epoch) so each rank's sequence is deterministic and disjoint.

Sizing per rank

Most Config knobs apply per rank. Aggregate cost on a node is (ranks per node) × (per-rank value):

knob per-rank cost
host_buffer_bytes pinned RAM staging pool
device_buffer_bytes device decompress scratch
n_io_threads I/O worker threads
n_compute_threads host blosc1 parse workers

Tune n_io_threads to your storage tier (NVMe pool, parallel filesystem, object store). n_compute_threads defaults to 0 (parse runs serially on the calling thread, which is fine when waves are small); raise it when chunks-per-wave is large enough that the parse shows up in stats.decompress_parse. When stacking multiple ranks on one GPU (uncommon, but valid), divide the device-side budgets so the per-GPU total fits below max_gpu_memory_bytes.

CUDA streams

DLPack handles synchronization. torch.from_dlpack(batch) records an event on damacy's output stream and makes the consuming PyTorch stream wait on it, so train kernels see a fenced tensor.

Damacy's internal streams are non-blocking, so a default-stream operation elsewhere in your code won't accidentally serialize the pipeline. The flip side: don't assume the legacy default stream sees damacy's work either. If you bypass DLPack and read BatchInfo.device_ptr directly, do a cuStreamWaitEvent against BatchInfo.ready_stream on your consumer stream before launching your kernel — the implicit default-stream sync is gone.

Common failures

symptom likely cause
~1/N throughput, training otherwise looks fine missing torch.cuda.set_device(local_rank). Damacy warns when LOCAL_RANK is set but its bound device disagrees.
damacy.InvalidArgument at Pipeline(cfg) with "Config.device=N but … current on device M" Config(device=N) and set_device(M) were called for different N and M. Make them match.
damacy.InvalidArgument ("no CUcontext is current") Config.device not set and no CUDA context primed yet. Either pass device=local_rank or do a torch.empty(1, device="cuda") first.
Pipeline construction succeeds but the first pop blocks forever The push didn't reach the lookahead, or the consumer never frees a slot. The Python wrapper queues overflow on push; check pipeline.pending and that you're popping at the rate you push.