P3: Distributed Deep Graph Learning at Scale

Paper: Gandhi and Iyer, Microsoft Research, USENIX OSDI 2021 Core contribution: Eliminates the feature-movement bottleneck in distributed GNN training by (1) independently hash-partitioning graph structure and features, (2) using push-pull parallelism — pushing the computation graph to all machines for intra-layer model parallelism at layer 1, then pulling only small partial activations (not features) across the network, and (3) pipelining three concurrent minibatches to overlap communication with compute. Achieves up to 7x speedup over DGL and 2.2x over ROC on billion-edge graphs.


Fig 1: System Overview Block Diagram

┌──────────────────────────────────────────────────────────────┐
│                    P3 Training System                        │
│                                                              │
│  ┌───────────────────────────────────────────────────────┐   │
│  │  Distributed Storage (across N machines)              │   │
│  │                                                       │   │
│  │  ┌─────────────────────┐  ┌──────────────────────┐    │   │
│  │  │  Graph Topology     │  │  Node Features        │    │   │
│  │  │  (hash partitioned  │  │  (hash partitioned    │    │   │
│  │  │   by node id)       │  │   by feature dim F/N) │    │   │
│  │  └─────────────────────┘  └──────────────────────┘    │   │
│  └───────────────────────────────────────────────────────┘   │
│                         │                                    │
│                         ▼                                    │
│  ┌───────────────────────────────────────────────────────┐   │
│  │  Push-Pull Parallel Execution Engine                  │   │
│  │                                                       │   │
│  │  ┌──────────────────────────────────────────────────┐ │   │
│  │  │  Layer 1: Intra-Layer Model Parallel (Layer 1_M) │ │   │
│  │  │  PUSH: computation graph → all machines          │ │   │
│  │  │  Each machine computes partial activations       │ │   │
│  │  │  PULL: partial activations via reduce (sync)     │ │   │
│  │  └──────────────────────────────────────────────────┘ │   │
│  │                         │                             │   │
│  │  ┌──────────────────────────────────────────────────┐ │   │
│  │  │  Layer 1_D → Layer K: Data Parallel              │ │   │
│  │  │  Standard DNN layers, gradient AllReduce         │ │   │
│  │  └──────────────────────────────────────────────────┘ │   │
│  └───────────────────────────────────────────────────────┘   │
│                         │                                    │
│  ┌───────────────────────────────────────────────────────┐   │
│  │  Pipelining Engine (3 concurrent minibatches)         │   │
│  │  Overlaps model-parallel comm with data-parallel      │   │
│  │  compute from adjacent minibatches                    │   │
│  └───────────────────────────────────────────────────────┘   │
│                         │                                    │
│  ┌───────────────────────────────────────────────────────┐   │
│  │  KVStore (RPC-based partial activation sync)          │   │
│  │  Stores partial activations; orchestrates movement    │   │
│  └───────────────────────────────────────────────────────┘   │
└──────────────────────────────────────────────────────────────┘
▲ Fig 1: P3 full system — independent graph+feature partitioning,
  push-pull parallel execution, and pipelined 3-minibatch schedule.

Fig 2: Key Architecture Diagram — Push-Pull Parallelism

  LAYER 1: INTRA-LAYER MODEL PARALLEL (push phase)

  ┌────────────────────────────────────────────────────────┐
  │  Computation graph (k-hop subgraph) for minibatch m    │
  │  PUSHED to all N machines                              │
  │                                                        │
  │  Machine 0          Machine 1          Machine N-1     │
  │  ┌─────────────┐   ┌─────────────┐   ┌─────────────┐  │
  │  │ holds F_0   │   │ holds F_1   │   │ holds F_N-1 │  │
  │  │ (cols 0..   │   │ (cols F/N.. │   │ (cols (N-1) │  │
  │  │  F/N-1)     │   │  2F/N-1)    │   │  F/N..F-1)  │  │
  │  │             │   │             │   │             │  │
  │  │ computes    │   │ computes    │   │ computes    │  │
  │  │ partial_act │   │ partial_act │   │ partial_act │  │
  │  │ for layer 1 │   │ for layer 1 │   │ for layer 1 │  │
  │  └──────┬──────┘   └──────┬──────┘   └──────┬──────┘  │
  │         └─────────────────┴──────────────────┘         │
  │                           │                            │
  │              reduce (AllReduce / sum)                   │
  │              partial activations size: hidden_dim       │
  │              NOT feature_dim — much smaller!            │
  └────────────────────────────────────────────────────────┘
                           │
                           ▼ (pull phase: node owns its activation)

  LAYER 1_D → LAYER K: DATA PARALLEL

  ┌─────────────────────────────────────────────────────────┐
  │  Each machine now has full activation for its nodes     │
  │  Executes remaining GNN layers (layers 2..K) locally   │
  │  Uses standard DNN data parallelism                     │
  │  Gradient sync via AllReduce at each layer boundary     │
  └─────────────────────────────────────────────────────────┘
▲ Fig 2: Push-pull parallelism. Layer 1 uses model parallelism over
  feature partitions; layers 2..K use data parallelism. Communication
  is over partial activations (hidden_dim), not input features (F).

Fig 3: Data Flow Diagram — Feature vs. Activation Traffic

  EXISTING FRAMEWORKS (e.g., DGL): pull features + graph structure

  ┌──────────────┐                        ┌──────────────┐
  │  Machine 0   │══ k-hop features ════►│  Machine 1   │
  │  (GNN owner) │   (F × neighborhood   │  (data store)│
  │              │◄══ + graph structure ══│              │
  └──────────────┘    SIZE: ~71 MB        └──────────────┘
                      per minibatch (2-hop, F=100)

  P3: push graph structure, pull partial activations only

  ┌──────────────┐  ══ graph structure ══►┌──────────────┐
  │  Machine 0   │   (topology only,      │  Machine 1   │
  │  (initiator) │    lightweight)        │  (compute)   │
  └──────────────┘                        └──────────────┘
        ▲                                        │
        │                              computes partial
        │                              activations
        │                              size: hidden_dim
        │                              (e.g., 16 dims)
        │                                        │
        └──════ partial activations (5 MB) ══════┘
                (3 machines × 24703 nodes × 16 dims)
                vs 71 MB for feature pull — 14x smaller

  KEY INSIGHT: activation size = hidden_dim (model config, small)
               feature size   = F (graph property, can be 1000s)
▲ Fig 3: P3 eliminates feature movement. Communication is bounded by
  hidden dimension, not input feature dimension. 14x less network traffic
  on representative GraphSAGE OGB-Product benchmark.

Fig 4: Control Flow Diagram — One Training Step (Forward + Backward)

  START: minibatch m arrives, computation graph generated locally
    │
    ▼
① [Pull k-hop graph structure from neighbors]
    │  topology only — no features transferred
    │  (if graph fits in host memory, fully local)
    ▼
② [PUSH: broadcast computation graph to all N machines]
    │  each machine now knows which nodes and edges to process
    ▼
③ [Layer 1_M: intra-layer model parallel forward pass]
    │
    ├── each machine computes partial activations
    │   using its local feature shard (F/N columns)
    │
    ├── partial activations accumulated via reduce
    │   (AllReduce with op=sum across machines)
    │   Communication: N_nodes × hidden_dim
    ▼
④ [SWITCH: mode switches from model-parallel to data-parallel]
    │  nodes sent to machine that owns them (by hash partition)
    ▼
⑤ [Layers 2..K: data parallel forward pass (local)]
    │  each machine executes its portion of the minibatch
    │  no cross-machine communication until layer boundary
    ▼
⑥ [Loss computation and backward pass (layers K..2)]
    │  data parallel; gradient AllReduce at each layer boundary
    ▼
⑦ [Layer 1_D backward: gradient AllReduce]
    │  (DP-mode gradient sync at layer 1 boundary)
    ▼
⑧ [SWITCH BACK: model-parallel for layer 1_M backward]
    │  push error gradients to all machines
    │  each machine computes gradient for its feature shard
    ▼
⑨ [Weight update: each machine updates its local weight shard]
    │
  END — pipeline advances to next minibatch
▲ Fig 4: P3 forward+backward control flow. Push at step ② and step ⑧
  bookend the model-parallel phases; data parallelism runs between them.

Fig 5: State Machine — Pipelined 3-Minibatch Schedule

  P3 overlaps 3 concurrent minibatches using static scheduling.
  Phase labels: nM = model-parallel phase of minibatch n
                nD = data-parallel phase of minibatch n

  TIME →
  ┌──────────────────────────────────────────────────────────┐
  │  mb 1:  [1M_fwd]─────[1D_fwd]─[1D_bwd]─[1M_bwd]        │
  │                │            │                            │
  │  mb 2:  ───────[2M_fwd]─────────────[2D_fwd]─[2D_bwd]   │
  │                         │                  │             │
  │  mb 3:  ────────────────[3M_fwd]           │[3D_fwd]...  │
  │                                            │             │
  │  OVERLAP: when 3D_fwd starts communication,              │
  │           it has a data dependency on 3M_fwd.            │
  │           P3 schedules 2 fwd + 2 bwd from other          │
  │           minibatches during this window.                │
  │                                                          │
  │  STALENESS: delay of 3 optimizer steps                   │
  │  w_{t+1} = w_t - α·∇f(w_{t-3})                          │
  │  Fixed, bounded staleness — same as PipeDream.           │
  └──────────────────────────────────────────────────────────┘

  Per-minibatch state machine:

           new minibatch
  [IDLE] ─────────────► [GRAPH_GEN]
                              │
                         k-hop pulled
                              │
                              ▼
                        [LAYER1_MODEL_PARALLEL] ◄── push graph
                              │
                         activations reduced
                              │
                              ▼
                        [DATA_PARALLEL_FWD]
                              │
                         loss computed
                              │
                              ▼
                        [DATA_PARALLEL_BWD]
                              │
                         grads synced
                              │
                              ▼
                        [LAYER1_MODEL_PARALLEL_BWD]
                              │
                         weight update
                              │
                              ▼
                           [DONE]
▲ Fig 5: Pipelined 3-minibatch schedule (top) and per-minibatch state
  machine (bottom). Staleness is bounded at 3, same as PipeDream.

Fig 6: Layered Software Stack

┌──────────────────────────────────────────────────────────┐
│  User GNN model (P-TAGS API: partition, transform,       │
│  apply, gather, scatter, sync)                           │
├──────────────────────────────────────────────────────────┤
│  P3 Runtime                                              │
│  (push-pull scheduler, pipelining engine, mode switch)   │
├──────────────────────────────────────────────────────────┤
│  Deep Graph Library (DGL)                                │
│  (k-hop sampling, message passing primitives)            │
│  [extended: independent topology+feature partitioning]   │
├──────────────────────────────────────────────────────────┤
│  KVStore (RPC-based partial activation movement)         │
│  PyTorch DistributedDataParallel (gradient AllReduce)    │
├──────────────────────────────────────────────────────────┤
│  NCCL (AllReduce for partial activations + gradients)    │
├──────────────────────────────────────────────────────────┤
│  10 Gbps Ethernet (inter-node) / PCIe (intra-node)       │
└──────────────────────────────────────────────────────────┘
▲ Fig 6: P3 software stack. P3 extends DGL with independent
  partitioning and wraps it with a push-pull runtime. NCCL handles
  the AllReduce for partial activations and DP gradient sync.

Fig 7: Concept Map — Independent Partitioning vs. Joint Partitioning

                 ┌──────────────────────────────┐
                 │  GNN Training Communication  │
                 │  Bottleneck Analysis         │
                 └──────────────┬───────────────┘
                                │
              ┌─────────────────┴─────────────────┐
              │                                   │
   ┌──────────▼──────────┐             ┌──────────▼──────────┐
   │  Joint Partitioning │             │  Independent        │
   │  (DGL/METIS/GRID)   │             │  Partitioning (P3)  │
   │                     │             │                     │
   │  Optimizes locality │             │  Graph: hash by     │
   │  for first hop only │             │  node id (1D)       │
   │                     │             │                     │
   │  Fails at k>1 hops: │             │  Features: hash by  │
   │  exponential neigh- │             │  feature dimension  │
   │  borhood explosion  │             │  (F/N per machine)  │
   │                     │             │                     │
   │  Moves features +   │             │  Moves only partial │
   │  graph structure    │             │  activations        │
   │  (large, F dims)    │             │  (hidden_dim, small)│
   └─────────────────────┘             └─────────────────────┘
         HIGH network traffic                LOW network traffic
         GPU idle 80% of time               GPU busy 85% of time
▲ Fig 7: Why intelligent partitioning hurts GNNs. Independent hash
  partitioning unlocks push-pull parallelism, eliminating feature movement.

Design Trade-off Analysis

Design Decision Alternative A Alternative B (P3 choice) Winner Why
Graph partitioning strategy Intelligent (METIS, GRID, vertex-cut) Independent random hash B Intelligent partitioners incur 35-4264s overhead; benefits only last hop; hurt multi-layer GNNs
Feature movement during training Pull features to compute node (DGL) Distribute features; never move them B Features (100-1000s dims) are much larger than activations (hidden_dim); 14x traffic reduction
Layer 1 execution model Data parallelism (requires feature pull) Intra-layer model parallelism (push graph) B Model parallelism over feature dimension avoids all feature movement; only activations cross network
Communication primitive at layer 1 Point-to-point feature transfer AllReduce of partial activations B AllReduce size = hidden_dim × nodes, not feature_dim × nodes; 14x smaller message
Pipelining granularity No pipelining (blocking) 3 concurrent minibatches B Hides 30-50% of communication latency; staleness bounded at 3 (acceptable convergence)
Caching strategy No caching (memory minimal) Greedy cache graph+feature partitions B Independent partitioning enables independent caching; 1.7x additional speedup
Implementation base Build from scratch Extend DGL with custom partitioning B Reuses DGL's k-hop sampling; avoids reinventing message passing primitives
P3 applicability Universal (all GNNs) GNNs where hidden_dim << feature_dim Context-dependent P3 loses benefit when hidden_dim approaches feature_dim; degrades gracefully

For DynamICCL context: P3 generates two distinct AllReduce workloads: small, high-frequency AllReduces for partial activations at layer 1 (size proportional to hidden_dim) and standard DP gradient AllReduces at layers 2..K (size proportional to model weights). These require different NCCL configs.


What to Borrow for DynamICCL

1. Partial-activation AllReduce as a new small-message high-frequency workload. P3's layer-1 model-parallel phase generates one AllReduce per GNN layer per minibatch of size (nodes_in_minibatch × hidden_dim). For a 16-hidden-dim GNN with 1000-node minibatch this is 64 KB — well within NCCL's LL protocol territory (sub-256 KB). DynamICCL's Config Agent should classify these collectives by their size fingerprint and immediately apply LL protocol with low channel count. The key signal is the combination of small message size and high call frequency (one AllReduce per forward pass layer), which distinguishes this workload from gradient AllReduces in DNN training.

2. AllReduce size as a proxy for the model-parallel vs. data-parallel regime. P3 alternates between model-parallel AllReduce (small, hidden_dim-sized) and data-parallel gradient AllReduce (large, weight-matrix-sized). DynamICCL's LSTM encoder should detect this alternating pattern in the sequence of collective sizes — a short AllReduce immediately followed by several larger AllReduces identifies the P3 push-pull pattern. Once identified, the Config Agent should apply LL protocol to the short AllReduce and Simple protocol to the large gradient AllReduces without needing to probe both configurations independently. Pattern recognition in the LSTM reduces probe overhead.

3. KVStore RPC traffic as a congestion precursor signal. P3 uses RPC calls through the KVStore to coordinate partial activation movement between the model-parallel and data-parallel modes. These RPC calls are out-of-band relative to NCCL's communication path but share the same 10 GbE network fabric. DynamICCL's Trigger Agent should treat elevated RPC traffic (observable as background network utilization before the AllReduce) as a pre-congestion signal — the partial activation RPCs consume bandwidth that reduces effective AllReduce throughput. The CUSUM detector should account for this background load when setting the congestion threshold.

4. Independent feature partitioning as a communication-volume reduction analogy for NCCL buffer sizing. P3's core insight is that moving partial activations (bounded by hidden_dim) is always cheaper than moving features (bounded by input_dim). The analogous insight for DynamICCL is that NCCL's chunkSize parameter controls how much data is in flight per step of the ring algorithm — and smaller chunks reduce the minimum latency for the first receiver to get its data, at the cost of more steps. For P3's small partial-activation AllReduces, DynamICCL should set chunkSize small (LL protocol handles this automatically) to minimize first-byte latency, since the total message is already small and throughput is not the constraint.

5. Pipelining staleness as a regime-transition signal for the LSTM. P3's 3-minibatch pipeline means weights are stale by 3 optimizer steps. This staleness changes the gradient AllReduce timing: gradients from minibatch k-3 are applied when minibatch k is already in the model-parallel phase. For DynamICCL, this means the gradient AllReduce for minibatch k-3 and the partial-activation AllReduce for minibatch k may overlap on the network. DynamICCL's Trigger Agent should detect concurrent AllReduce calls (same process group, overlapping time windows) as a load indicator, and the Config Agent should reduce numChannels for both to prevent NIC saturation from two simultaneous large ring operations competing for the same bandwidth.