AI on Multiple GPUs: Point-to-Point and Clustered Operations

is part of a series about distributed AI on multiple GPUs:
- Part 1: Understanding the Host and Device Paradigm
- Part 2: Point-to-Point and Combined Operations (This article)
- Part 3: How GPUs Interact (coming soon)
- Part 4: Gradient Collection & Distributed Data Parallelism (DDP) (coming soon)
- Part 5: ZeRO (coming soon)
- Part 6: Tensor Parallelism (coming soon)
Introduction
In a previous post, we established the host device paradigm and introduced the concept of multi-GPU workload levels. Now, we will explore some connection patterns provided by PyTorch's torch.distributed module to coordinate work and exchange data between these levels. These activities, known as collectionsare the building blocks of distributed workloads.
Although PyTorch exposes these functions, it ends up calling a backend framework that implements communication. For NVIDIA GPUs, of course NCCL (NVIDIA Collective Communications Library), while for AMD it is RCCL (ROCm Communication Collectives Library).
NCCL uses multi-GPU and multi-node communication primitives for NVIDIA GPUs and communication. It automatically detects the current topology (communication channels such as PCIe, NVLink, InfiniBand) and selects the most efficient one.
Disclaimer 1: Since NVIDIA GPUs are more common, we will focus on
NCCLthe backend of this post.
Disclaimer 2: In short, the code presented below only provides the maximum arguments for each method instead of all available arguments.
Disclaimer 3: For simplicity, we do not show the memory transfer of tensors, but the same operation
scatterit won't automatically free source-level memory (if you don't understand what I mean, that's okay, it will be clear soon).
Communication: Prevention vs. Not blocking
To work together, GPUs must exchange data. The CPU initiates the communication by inserting NCCL characters into the CUDA stream (if you don't know what CUDA Streams are, check out the first blog post of this series), but the actual data transfer happens directly between GPUs over the communication, bypassing the CPU's main memory entirely. Ideally, the GPUs are connected via a high-speed connection such as NVLink or InfiniBand (this connection is covered in the third post of this series).
This connection can be synchronous (blocking) or asynchronous (non-blocking), which we explore below.
Synchronous Communication (Blocking).
- Conduct: When you call a synchronous communication method, the host process he stops and waits until the NCCL kernel is successfully installed in the current working CUDA distribution. Once the row has been inserted, the function returns. This is usually simple and reliable. Note that the host does not wait for the transfer to complete, so that the job can be queued. However, it is blocking that particular stream from moving on to the next task until the NCCL kernel is used to terminate it.
Asynchronous (Non-blocking) Communication
- Conduct: When you call an asynchronous communication method, the call returns immediatelyand line operations occur in the background. It does not go into the current active stream, but into the NCCL's dedicated internal stream for each device. This allows your CPU to continue with other tasks, a process known as overclocking extreme calculation and communication. The asynchronous API is very complex because it can lead to undefined behavior if you don't use it properly
.wait()(described below) and modify the data while it is being transmitted. However, understanding it is the key to unlocking high performance in mass distributed training.
Point-to-point (One-to-one)
These activities are not considered collectionsbut they are the basics of communication. They facilitate direct data transfer between two specific levels and are essential for tasks where one GPU needs to send specific information to another.
- Synchronized (Blocking): The host process waits for a task to be queued in CUDA before continuing. The kernel is queued for the current active stream.
torch.distributed.send(tensor, dst): Sends a tensor to a specific destination.torch.distributed.recv(tensor, src): Gets the tensor from the source level. The receiving tensor must be pre-allocated in the appropriate state as welldtype.
- Asynchronous (Non-blocking): A host process starts a queue operation and immediately resumes other operations. The kernel is embedded in a dedicated internal NCCL stream for each device, allowing for overlapping communication and computation. These functions return a
request(technically aWorkobject) that can be used to track the state of the queue.request = torch.distributed.isend(tensor, dst): Starts the asynchronous sending operation.request = torch.distributed.irecv(tensor, src): Starts parallel reception operation.request.wait(): Blocks the host until the operation is successful line in the CUDA stream. However, it prevents the currently running CUDA stream from releasing the latest kernels until this specific asynchronous operation completes.request.wait(timeout): If you provide a timeout argument, the host's behavior changes. It will do block the CPU thread until the NCCL task ends or times out (which raises an exception). In normal situations, users do not need to set a timeout.request.is_completed(): It returnsTrueif the operation was successful line in the CUDA stream. It may be used for polling. It does not guarantee that the actual data has been transferred.
When PyTorch launches the NCCL kernel, it automatically installs the ia dependency (ie forces synchronization) between your current active stream and the NCCL stream. This means that the NCCL stream will not start until all the work previously queued in the active stream has finished – a guarantee that the tensor being sent already has its final values.
Likewise, calling req.wait() it includes a dependency on the other side. Any function you put in the current stream after that req.wait() It will not work until NCCL operations are endingso that you can safely use the obtained tensors.
Major “Gotchas” in NCCL
While send again recv labeled as “synchronous,” their behavior in NCCL can be confusing. A synchronized call to the CUDA tensor blocks only the host CPU thread until the data transfer kernel is put into circulation, not until the data transfer is complete. The CPU is then free to queue other tasks.
There are exceptions: i very first call to torch.distributed.recv() it is a process really blocking and waiting for the transfer to complete, probably due to internal NCCL warming processes. Subsequent calls will only block until the function is enqueued.
Consider this example there rank 1 it hangs because the CPU is trying to access a tensor that the GPU hasn't found yet:
rank = torch.distributed.get_rank()
if rank == 0:
t = torch.tensor([1,2,3], dtype=torch.float32, device=device)
# torch.distributed.send(t, dst=1) # No send operation is performed
else: # rank == 1 (assuming only 2 ranks)
t = torch.empty(3, dtype=torch.float32, device=device)
torch.distributed.recv(t, src=0) # Blocks only until enqueued (after first run)
print("This WILL print if NCCL is warmed-up")
print
print("This will NOT print")
CPU process in rank 1 he holds on print
If you use this code multiple times, be aware of that
This WILL print if NCCL is warmed-upit will not be printed in the later operation, as the CPU is still stuck
Collections
All integrated operations support both synchronous and async operations by using async_op to argue. It defaults to False, which means synchronous operation.
One to All Collections
These functions involve one level sending data to all other levels in the group.
Broadcast
torch.distributed.broadcast(tensor, src): Copies a tensor from a single source point (src) at all other levels. Every process ends with the same copy of the tensor. Itensorthe parameter has two purposes: (1) when the process level is the same assrcitensordata is sent; (2) otherwise,tensoris used to save the received data.
rank = torch.distributed.get_rank()
if rank == 0: # source rank
tensor = torch.tensor([1,2,3], dtype=torch.int64, device=device)
else: # destination ranks
tensor = torch.empty(3, dtype=torch.int64, device=device)
torch.distributed.broadcast(tensor, src=0)
Green
torch.distributed.scatter(tensor, scatter_list, src): Moves pieces of data from source level to all levels. Iscatter_listat the source level it contains multiple tensors, and each level (including the source) receives one tensor from this array.tensorflexible. Local standards are just passing byNoneOf coursescatter_list.
# The scatter_list must be None for all non-source ranks.
scatter_list = None if rank != 0 else [torch.tensor([i, i+1]).to(device) for i in range(0,4,2)]
tensor = torch.empty(2, dtype=torch.int64).to(device)
torch.distributed.scatter(tensor, scatter_list, src=0)
print(f'Rank {rank} received: {tensor}')

All-in-one Collections
These functions collect data from all levels and consolidate it into a single destination.
Reduce
torch.distributed.reduce(tensor, dst, op): Takes a tensor at each level, using a reduction function (likeSUM,MAX,MIN), and stores the final result in the destination (dst) only.
rank = torch.distributed.get_rank()
tensor = torch.tensor([rank+1, rank+2, rank+3], device=device)
torch.distributed.reduce(tensor, dst=0, op=torch.distributed.ReduceOp.SUM)
print(tensor)

Collect
torch.distributed.gather(tensor, gather_list, dst): Combines the tensor from all distances into an array of values at the destination. Igather_listit should be a list of prices (properly measured and typed) at the destination as wellNoneall over.
# The gather_list must be None for all non-destination ranks.
rank = torch.distributed.get_rank()
world_size = torch.distributed.get_world_size()
gather_list = None if rank != 0 else [torch.zeros(3, dtype=torch.int64).to(device) for _ in range(world_size)]
t = torch.tensor([0+rank, 1+rank, 2+rank], dtype=torch.int64).to(device)
torch.distributed.gather(t, gather_list, dst=0)
print(f'After op, Rank {rank} has: {gather_list}')
Variable world_size is the sum of the number of levels. It can be found with torch.distributed.get_world_size(). But don't worry about the implementation details yet, the most important thing is to grasp the concepts.

All Inclusive Collections
In these operations, each level sends and receives data from all other levels.
Everything Reduce
torch.distributed.all_reduce(tensor, op): It's the samereducebut the result is stored in all of themposition instead of only one position.
# Example for torch.distributed.all_reduce
rank = torch.distributed.get_rank()
tensor = torch.tensor([rank+1, rank+2, rank+3], dtype=torch.float32, device=device)
torch.distributed.all_reduce(tensor, op=torch.distributed.ReduceOp.SUM)
print(f"Rank {rank} after all_reduce: {tensor}")

All Collect
torch.distributed.all_gather(tensor_list, tensor): It's the samegatherbut a compiled list of tensors is available all of them position.
# Example for torch.distributed.all_gather
rank = torch.distributed.get_rank()
world_size = torch.distributed.get_world_size()
input_tensor = torch.tensor([rank], dtype=torch.float32, device=device)
tensor_list = [torch.empty(1, dtype=torch.float32, device=device) for _ in range(world_size)]
torch.distributed.all_gather(tensor_list, input_tensor)
print(f"Rank {rank} gathered: {[t.item() for t in tensor_list]}")

Reduce Scatter
torch.distributed.reduce_scatter(output, input_list): Equivalent to performing a reduction operation on an array of tensors and decomposing the results. Each level gets a different percentage of the reduced effect.
# Example for torch.distributed.reduce_scatter
rank = torch.distributed.get_rank()
world_size = torch.distributed.get_world_size()
input_list = [torch.tensor([rank + i], dtype=torch.float32, device=device) for i in range(world_size)]
output = torch.empty(1, dtype=torch.float32, device=device)
torch.distributed.reduce_scatter(output, input_list, op=torch.distributed.ReduceOp.SUM)
print(f"Rank {rank} received reduced value: {output.item()}")

Synchronization
The two most commonly used functions are these request.wait() again torch.cuda.synchronize(). It is important to understand the difference between the two:
request.wait(): This is used for asynchronous operations. Synchronizes the currently active CUDA stream for that task, ensuring that the stream waits for the connection to complete before continuing. In other words, it blocks the currently active CUDA stream until the data transfer is complete. On the host side, it only causes the host to wait for a character to be inserted; the host does not wait for the data transfer to complete.torch.cuda.synchronize(): This is a very powerful command that stops the host CPU thread until everything tasks previously assigned to the GPU have been completed. It ensures that the GPU is completely idle before the CPU takes over, but can cause performance bottlenecks if used improperly. Whenever you need to run benchmarks, you should use this to ensure you are capturing the exact time the GPUs are made.
The conclusion
Congratulations on reaching the end! In this post, learn about:
- Point-to-Point Operations
- Sync and Async in NCCL
- Combined activities
- Synchronization methods
In the next blog post we'll get into PCIe, NVLink, and other methods that enable communication in a distributed environment!


