Architecture & Design Analysis

EMLIO: Minimizing I/O Latency and Energy Consumption for Large-Scale AI Training

Source: Jamil, Nine, Kosar — University at Buffalo (SUNY) & Tennessee Tech, SC2025 Workshop, arXiv:2508.11035v1


1. System Overview Block Diagram

┌─────────────────────────────────────────────────────────────────────┐
│                      EMLIO System                                   │
│                                                                     │
│  ┌──────────────────────────────────────────────────────────────┐  │
│  │                   Storage Cluster                            │  │
│  │                                                              │  │
│  │  ┌─────────────────────────────────────────────────────┐    │  │
│  │  │  TFRecord Shards (mmap-accessible on storage nodes) │    │  │
│  │  └──────────────────┬──────────────────────────────────┘    │  │
│  │                     │                                        │  │
│  │                     ▼                                        │  │
│  │  ┌─────────────────────────────────────────────────────┐    │  │
│  │  │  Global Planner                                      │    │  │
│  │  │  Input: shard metadata, node list, batch size B,    │    │  │
│  │  │         epochs E                                     │    │  │
│  │  │  Output: batch plan (shard→node→epoch mapping)      │    │  │
│  │  │  Assigns shards round-robin; splits each node's     │    │  │
│  │  │  work into T thread subsets (SendWorkers)           │    │  │
│  │  └──────────────────┬──────────────────────────────────┘    │  │
│  │                     │ batch plan                             │  │
│  │         ┌───────────┴──────────────┐                        │  │
│  │         ▼                          ▼                         │  │
│  │  ┌─────────────┐          ┌─────────────┐                   │  │
│  │  │ EMLIO Daemon│          │ EMLIO Daemon│  ... (Node N)     │  │
│  │  │ (Node 1)    │          │ (Node 2)    │                   │  │
│  │  │             │          │             │                   │  │
│  │  │ T SendWorker│          │ T SendWorker│                   │  │
│  │  │ threads:    │          │ threads:    │                   │  │
│  │  │ mmap shard  │          │ mmap shard  │                   │  │
│  │  │ → slice B   │          │ → slice B   │                   │  │
│  │  │   records   │          │   records   │                   │  │
│  │  │ → msgpack   │          │ → msgpack   │                   │  │
│  │  │   serialize │          │   serialize │                   │  │
│  │  │ → ZMQ PUSH  │          │ → ZMQ PUSH  │                   │  │
│  │  │   (HWM=16)  │          │   (HWM=16)  │                   │  │
│  │  └──────┬──────┘          └──────┬──────┘                   │  │
│  └─────────┼─────────────────────── ┼────────────────────────── ┘  │
│            │                        │                               │
│            └─── TCP/ZMQ streams ────┘                               │
│                       │                                             │
│                       ▼ (Serialized msgpack batches over network)   │
│  ┌──────────────────────────────────────────────────────────────┐  │
│  │                   Compute Cluster                            │  │
│  │                                                              │  │
│  │  ┌──────────────────────────────────────────────────────┐   │  │
│  │  │  EMLIO Receiver (per GPU node)                        │   │  │
│  │  │  ┌────────────────────────────────────────────────┐  │   │  │
│  │  │  │  ZMQ PULL socket + ZMQ receiver thread         │  │   │  │
│  │  │  │  → deserialize msgpack → shared in-memory queue│  │   │  │
│  │  │  └──────────────────────┬─────────────────────────┘  │   │  │
│  │  │                         │ pre-batched samples          │   │  │
│  │  │  ┌──────────────────────▼─────────────────────────┐  │   │  │
│  │  │  │  BatchProvider (Queue consumer)                 │  │   │  │
│  │  │  │  → DALI external_source                        │  │   │  │
│  │  │  └──────────────────────┬─────────────────────────┘  │   │  │
│  │  │                         │ decoded/augmented tensors    │   │  │
│  │  │  ┌──────────────────────▼─────────────────────────┐  │   │  │
│  │  │  │  NVIDIA DALI Pipeline (GPU-accelerated)         │  │   │  │
│  │  │  │  decode JPEG → resize → crop → normalize        │  │   │  │
│  │  │  └──────────────────────┬─────────────────────────┘  │   │  │
│  │  │                         │ images + labels              │   │  │
│  │  │  ┌──────────────────────▼─────────────────────────┐  │   │  │
│  │  │  │  DDP Training Loop (PyTorch)                    │  │   │  │
│  │  │  │  forward → loss → backward → AllReduce grads    │  │   │  │
│  │  │  └────────────────────────────────────────────────┘  │   │  │
│  │  └──────────────────────────────────────────────────────┘   │  │
│  └──────────────────────────────────────────────────────────────┘  │
└─────────────────────────────────────────────────────────────────────┘
▲ Fig 1: EMLIO system overview — storage-side daemons batch+serialize
         shards and push over ZMQ; compute-side receivers feed DALI
         pipelines directly, hiding RTT behind pipeline parallelism

Interpretation. The key architectural choice is moving all batching and serialization logic out of the compute node into a storage-side daemon. This means the compute node GPU never waits for data assembly — it only consumes pre-packaged msgpack payloads. The ZMQ HWM=16 backpressure mechanism prevents the storage daemons from over-producing when the compute side falls behind, making the system self-regulating under GPU compute bursts.


2. Key Architecture Diagram — Energy Monitoring Framework

┌────────────────────────────────────────────────────────────────┐
│         EnergyMonitor — Distributed Fine-Grained               │
│         Energy Measurement Framework                           │
│                                                                │
│  Per Compute Node:                                             │
│  ┌──────────────────────────────────────────────────────────┐ │
│  │   Barrier synchronization at t_k (NTP-aligned)           │ │
│  │                                                          │ │
│  │  ┌─────────────────┐    ┌──────────────────────────────┐ │ │
│  │  │ CPU/DRAM Sampler│    │  GPU Sampler (optional)      │ │ │
│  │  │ Source: Linux   │    │  Source: NVIDIA NVML API     │ │ │
│  │  │ perf stat -e    │    │  v5.0                        │ │ │
│  │  │ power/energy-   │    │  compute: Σ P_i * δ / 1000  │ │ │
│  │  │ pkg,energy-ram  │    │  (integrated power readings) │ │ │
│  │  │ Interval: 100ms │    │  Interval: 100ms             │ │ │
│  │  └────────┬────────┘    └────────────┬─────────────────┘ │ │
│  │           │ (E_pkg, E_ram)           │ E_gpu              │ │
│  │           └──────────────────────────┘                   │ │
│  │                         │                                 │ │
│  │                         ▼                                 │ │
│  │  ┌──────────────────────────────────────────────────┐    │ │
│  │  │  Accumulator                                     │    │ │
│  │  │  merge CPU+DRAM+GPU at t_k                       │    │ │
│  │  │  interpolate holes (missed samples)              │    │ │
│  │  │  forward aligned tuples                          │    │ │
│  │  └────────────────────────┬─────────────────────────┘    │ │
│  │                           │ N aligned tuples              │ │
│  │                           ▼                               │ │
│  │  ┌──────────────────────────────────────────────────┐    │ │
│  │  │  Batch Writer                                    │    │ │
│  │  │  tag with node_id                                │    │ │
│  │  │  write to Local TSDB (InfluxDB)                  │    │ │
│  │  │  OR forward to Central TSDB                      │    │ │
│  │  └────────────────────────────────────────────────── ┘   │ │
│  └──────────────────────────────────────────────────────────┘ │
│                                                               │
│  Cross-node aggregation:                                      │
│  Local TSDBs ──► Central TSDB (query by start/end timestamp)  │
└────────────────────────────────────────────────────────────────┘
▲ Fig 2: EnergyMonitor architecture — barrier-synchronized per-node
         samplers at 100ms, NTP-aligned timestamps enable cross-node
         energy aggregation with <5% measurement error

3. Control Flow & Data Flow Diagrams

3a. Control Flow — EMLIO Sender (Planning & Dispatch, Algorithm 2)

  START: given TFRecord dir D, node list N, batch size B, epochs E
    │
    ▼
① [Load shard metadata: parse mapping_shard_*.json for
   all (offset, size, label) tuples across all shards]
    │
    ▼
② [Build global label map from all shards]
    │
    ▼
③ [For each epoch c = 1 to E:]
    │
    ├── Shuffle shard list (epoch-level randomization)
    ├── Assign shards to nodes round-robin
    │
    └── For each node (id, ip, port):
        │
        ├── Split node's shard list into T subsets
        └── Launch T SendWorker threads (ThreadPoolExecutor)
            │
            Each SendWorker:
            ├── mmap assigned TFRecord shard
            ├── Slice B records per batch
            ├── msgpack-serialize → binary payload
            └── ZMQ PUSH to (ip, port) with HWM=16 backpressure
    │
    ▼
  END: all epochs dispatched
▲ Fig 3: Control flow for EMLIO sender — global plan then per-node
         parallel SendWorkers push pre-batched msgpack payloads

3b. Control Flow — EMLIO Receiver + DALI Integration (Algorithm 3)

  START: given band_addr ip, port p, prefetch depth Q, epochs E
    │
    ▼
① [Start ZMQ PULL socket on (ip, port)]
    │
    ▼
② [Spawn zmq_receiver thread → push msgpack batches into shared Queue]
    │
    ▼
③ [Build DALI pipeline with BatchProvider(Queue), prefetch=Q]
    │
    ▼
④ [Warm up: manually run Q pipeline iterations to fill prefetch buffers]
    │
    ▼
⑤ [For epoch = 1 to E:]
    │
    └── While data is available:
        │
        ├── pipe.run()  ← deserializes + GPU-preprocesses one batch
        │   (decode JPEG, resize, crop, normalize)
        │
        └── feed output to DDP training step
            (forward, loss, backward, AllReduce)
    │
    ▼
  END: teardown pipeline and sockets
▲ Fig 4: Control flow for EMLIO receiver — ZMQ pull feeds DALI
         via BatchProvider; Q-batch prefetch hides network RTT

3c. Data Flow — End-to-End Pipeline Parallelism

  Storage Node                    Network               Compute Node
  ┌───────────────────┐           │           ┌──────────────────────┐
  │                   │           │           │                      │
  │  Thread 1:        │           │           │  ZMQ recv thread:    │
  │  mmap shard →     │           │           │  pull batch N+2 →    │
  │  slice B records  │           │           │  deser → Queue       │
  │                   │           │           │                      │
  │  Thread 2:        │══ batch N+1 (TCP) ═══►│  BatchProvider:      │
  │  serialize+push   │           │           │  pull batch N+1 →    │
  │  batch N+1        │           │           │  DALI pipeline       │
  │                   │           │           │                      │
  │  Thread 3:        │══ batch N (TCP) ═════►│  DALI running:       │
  │  serialize+push   │           │           │  batch N → GPU       │
  │  batch N          │           │           │  preprocess          │
  │                   │           │           │                      │
  │  (HWM=16 limits   │           │           │  Training loop:      │
  │   send queue)     │           │           │  consuming batch N-1 │
  └───────────────────┘           │           └──────────────────────┘

  At any time t: 3 concurrent stages run in overlap:
  ① Storage: reading+serializing batch N+2
  ② Network: transmitting batch N+1
  ③ GPU: preprocessing batch N
  ④ Training: consuming batch N-1

  This 4-stage pipeline eliminates all per-batch blocking waits.
  RTT is hidden as long as: t_serialize + t_network < t_GPU_step
▲ Fig 5: EMLIO 4-stage pipeline — read, serialize, transmit, and
         GPU preprocessing all overlap, hiding network RTT entirely

4. Design Trade-off Analysis

Design Decision Alternative A Alternative B (EMLIO) Winner Rationale
Batching location Compute-side (DataLoader reads small files, assembles batch) Storage-side daemon pre-batches entire B-record payloads B Storage-side batching converts N small random reads into 1 large sequential mmap scan; eliminates N-1 kernel filesystem calls per batch; reduces compute-side I/O CPU load
Serialization format JSON/pickle (verbose, slow) msgpack binary (compact, fast encode/decode) B msgpack achieves both space efficiency and fast encode; enables high-throughput network utilization; single payload covers entire batch without per-sample framing
Backpressure mechanism None (sender floods receiver) ZMQ HWM=16 (implicit flow control via blocked PUSH) B Without backpressure, sender OOMs receiver queue; HWM=16 provides cooperative rate matching with zero explicit control messages
Randomization scope Per-sample shuffle (random reads into TFRecord) Per-epoch shard shuffle (sequential read within shard) B Per-sample random access on remote storage has prohibitive seek cost; shard-level shuffle provides sufficient randomization for SGD convergence while enabling sequential mmap reads
GPU preprocessing location CPU-side decode+augment (PyTorch DataLoader) GPU-side decode+augment (NVIDIA DALI) B GPU decode/resize/crop runs concurrently with training backward pass via DALI exec_async; CPU-side preprocessing blocks GPU utilization during the data preprocessing stage
Energy measurement granularity End-to-end power meter (rack-level) Per-component synchronized sampling (CPU+DRAM+GPU at 100ms) B Component-level measurement attributes energy to I/O vs. compute accurately; 100ms sampling captures dynamic workload behavior; NTP alignment enables cross-node correlation
RTT resilience Double-buffering (1 batch lookahead) Out-of-order prefetch pipeline (Q batches prefetch) B At 30ms RTT with 1-batch lookahead, gaps still occur when RTT variance exceeds one batch time; Q-batch prefetch (Q≥4) fully absorbs RTT jitter within normal training step duration
Horizontal scaling Central storage server (bandwidth bottleneck) Sharded daemons — one EMLIO Daemon per storage node B Each daemon handles its local shards independently; aggregate throughput scales linearly with storage node count; no central coordination required during training

For DynamICCL, prefer B in all cases because the pipeline parallelism pattern (overlap multiple stages to hide latency) is directly applicable to NCCL config decisions: DynamICCL should pipeline its config probe phase (equivalent to EMLIO's serialize stage) with the ongoing collective execution (equivalent to the training stage), so that config re-evaluation overhead is hidden behind collective completion time.


5. What to Borrow for DynamICCL

5.1 Multi-Stage Pipeline Overlap as Latency Hiding Template

EMLIO's core insight is that I/O latency is hidden when serialization, transmission, and compute are pipelined with sufficient prefetch depth Q. DynamICCL can apply this same template to its config inference pipeline: while the GPU executes collective N using the current NCCL config, the Config Agent (on CPU) should be evaluating the next config for collective N+1 using the state observations from collective N. This requires the Trigger Agent and Config Agent to run on separate threads with a 1-step look-ahead buffer.

Concrete design: Implement a 2-stage pipeline: Stage 1 (background CPU thread) runs LSTM+CUSUM on telemetry from collective k to decide if config re-evaluation is needed; Stage 2 (another background thread) runs the DQN forward pass for collective k+1. The GPU executes collectives on the main thread without blocking for agent decisions. This is EMLIO's 4-stage pipeline reduced to 2 stages for DynamICCL's simpler pipeline.

5.2 Backpressure via Bounded Queue Between Agent Stages

EMLIO uses ZMQ HWM=16 to prevent the sender from overwhelming the receiver. DynamICCL needs an analogous mechanism between the Trigger Agent (fast: runs CUSUM on every collective's telemetry) and the Config Agent (slow: runs DQN forward pass which is more expensive). A bounded FIFO queue of depth K=4 between the two agents ensures that when the Config Agent falls behind, the Trigger Agent drops non-critical telemetry frames (from non-congested periods) rather than stalling the collective execution path.

Concrete design: Use a Python queue.Queue(maxsize=4) between the Trigger Agent output (congestion signal) and Config Agent input. If the queue is full when a new signal arrives and the signal is "no congestion detected," drop it silently. If the signal is "congestion detected," block briefly (≤1ms) to ensure the Config Agent processes it.

5.3 Energy-Aware Reward Signal from NVML

EMLIO demonstrates that GPU energy (measured via NVML) varies significantly across I/O loading strategies — up to 60x difference between DALI and EMLIO under WAN RTT. DynamICCL currently uses only collective completion time (latency) as its reward signal. Adding GPU energy consumption (via NVML nvmlDeviceGetPowerUsage()) as a secondary reward component would align DynamICCL with energy-efficient HPC operation. High-energy configs (e.g., excessive nChannels creating unnecessary SM contention) should receive a joint latency+energy penalty.

Reward formulation: R = -α * t_collective - β * E_gpu_during_collective, where α and β are tunable weights. E_gpu is measured by integrating NVML power readings over the collective's duration. This transforms DynamICCL from a pure-latency optimizer into a Pareto-optimal latency-energy optimizer — the same joint objective that EMLIO demonstrates is achievable with the right system design.

5.4 Storage-Side Pre-Computation Analogy: Config Pre-Computation

EMLIO's daemon pre-computes batches on storage nodes before the compute node requests them. DynamICCL can apply the same pattern to config precomputation: rather than evaluating the DQN policy at the moment a collective begins, precompute the config for the next collective as soon as the current collective starts. This transforms config selection from an on-critical-path operation into an off-critical-path precomputation, with the result cached in a 1-slot buffer.

Concrete design: When collective k begins, immediately submit a background task: future_config = executor.submit(config_agent.select, state_k). When collective k+1 is about to launch, retrieve future_config.result(). If the future is not ready (Config Agent too slow), fall back to the previous config (safe default). This mirrors EMLIO's daemon pre-pushing batches before the training loop requests them.

5.5 TFRecord Sharding → Collective Batching Analogy

EMLIO shards its dataset into fixed-size TFRecord files to enable sequential mmap reads and parallel daemon operation. DynamICCL can draw an analogy: NCCL collectives within a training iteration naturally group into "phases" (e.g., all AllReduce calls for layer k's gradients form one group). DynamICCL should evaluate configs at the phase granularity rather than per-collective, just as EMLIO batches at the shard granularity rather than per-sample. This reduces Config Agent inference frequency by the number of collectives per phase (typically 8-32 layers), dramatically reducing agent overhead.

Concrete design: Group consecutive AllReduce calls that share the same message size bin into a "phase." Run Config Agent inference once per phase, applying the selected config to all collectives in the phase. This is the temporal analog of EMLIO's B-record batch granularity.

5.6 NTP-Aligned Timestamps for Cross-Agent Correlation

EMLIO uses NTP-aligned timestamps to correlate energy measurements across nodes. DynamICCL's two agents (Trigger Agent, Config Agent) run on different timescales and must correlate their observations with actual collective latency measurements. Each collective's telemetry event should be timestamped using a monotonic clock (Python time.perf_counter_ns()), and both agents should share this timestamp as part of their state representation. This enables post-hoc analysis of which config decisions corresponded to which latency outcomes — essential for offline RL training data collection and debugging.