Machine Learning

AI on Multiple GPUs: Point-to-Point and Clustered Operations

is part of a series about distributed AI on multiple GPUs:

  • Part 1: Understanding the Host and Device Paradigm
  • Part 2: Point-to-Point and Combined Operations (This article)
  • Part 3: How GPUs Interact (coming soon)
  • Part 4: Gradient Collection & Distributed Data Parallelism (DDP) (coming soon)
  • Part 5: ZeRO (coming soon)
  • Part 6: Tensor Parallelism (coming soon)

Introduction

In a previous post, we established the host device paradigm and introduced the concept of multi-GPU workload levels. Now, we will explore some connection patterns provided by PyTorch's torch.distributed module to coordinate work and exchange data between these levels. These activities, known as collectionsare the building blocks of distributed workloads.

Although PyTorch exposes these functions, it ends up calling a backend framework that implements communication. For NVIDIA GPUs, of course NCCL (NVIDIA Collective Communications Library), while for AMD it is RCCL (ROCm Communication Collectives Library).

NCCL uses multi-GPU and multi-node communication primitives for NVIDIA GPUs and communication. It automatically detects the current topology (communication channels such as PCIe, NVLink, InfiniBand) and selects the most efficient one.

Disclaimer 1: Since NVIDIA GPUs are more common, we will focus on NCCL the backend of this post.

Disclaimer 2: In short, the code presented below only provides the maximum arguments for each method instead of all available arguments.

Disclaimer 3: For simplicity, we do not show the memory transfer of tensors, but the same operation scatter it won't automatically free source-level memory (if you don't understand what I mean, that's okay, it will be clear soon).

Communication: Prevention vs. Not blocking

To work together, GPUs must exchange data. The CPU initiates the communication by inserting NCCL characters into the CUDA stream (if you don't know what CUDA Streams are, check out the first blog post of this series), but the actual data transfer happens directly between GPUs over the communication, bypassing the CPU's main memory entirely. Ideally, the GPUs are connected via a high-speed connection such as NVLink or InfiniBand (this connection is covered in the third post of this series).

This connection can be synchronous (blocking) or asynchronous (non-blocking), which we explore below.

Synchronous Communication (Blocking).

  • Conduct: When you call a synchronous communication method, the host process he stops and waits until the NCCL kernel is successfully installed in the current working CUDA distribution. Once the row has been inserted, the function returns. This is usually simple and reliable. Note that the host does not wait for the transfer to complete, so that the job can be queued. However, it is blocking that particular stream from moving on to the next task until the NCCL kernel is used to terminate it.

Asynchronous (Non-blocking) Communication

  • Conduct: When you call an asynchronous communication method, the call returns immediatelyand line operations occur in the background. It does not go into the current active stream, but into the NCCL's dedicated internal stream for each device. This allows your CPU to continue with other tasks, a process known as overclocking extreme calculation and communication. The asynchronous API is very complex because it can lead to undefined behavior if you don't use it properly .wait() (described below) and modify the data while it is being transmitted. However, understanding it is the key to unlocking high performance in mass distributed training.

Point-to-point (One-to-one)

These activities are not considered collectionsbut they are the basics of communication. They facilitate direct data transfer between two specific levels and are essential for tasks where one GPU needs to send specific information to another.

  • Synchronized (Blocking): The host process waits for a task to be queued in CUDA before continuing. The kernel is queued for the current active stream.
    • torch.distributed.send(tensor, dst): Sends a tensor to a specific destination.
    • torch.distributed.recv(tensor, src): Gets the tensor from the source level. The receiving tensor must be pre-allocated in the appropriate state as well dtype.
  • Asynchronous (Non-blocking): A host process starts a queue operation and immediately resumes other operations. The kernel is embedded in a dedicated internal NCCL stream for each device, allowing for overlapping communication and computation. These functions return a request(technically a Work object) that can be used to track the state of the queue.
    • request = torch.distributed.isend(tensor, dst): Starts the asynchronous sending operation.
    • request = torch.distributed.irecv(tensor, src): Starts parallel reception operation.
    • request.wait(): Blocks the host until the operation is successful line in the CUDA stream. However, it prevents the currently running CUDA stream from releasing the latest kernels until this specific asynchronous operation completes.
    • request.wait(timeout): If you provide a timeout argument, the host's behavior changes. It will do block the CPU thread until the NCCL task ends or times out (which raises an exception). In normal situations, users do not need to set a timeout.
    • request.is_completed(): It returns True if the operation was successful line in the CUDA stream. It may be used for polling. It does not guarantee that the actual data has been transferred.

When PyTorch launches the NCCL kernel, it automatically installs the ia dependency (ie forces synchronization) between your current active stream and the NCCL stream. This means that the NCCL stream will not start until all the work previously queued in the active stream has finished – a guarantee that the tensor being sent already has its final values.

Likewise, calling req.wait() it includes a dependency on the other side. Any function you put in the current stream after that req.wait() It will not work until NCCL operations are endingso that you can safely use the obtained tensors.

Major “Gotchas” in NCCL

While send again recv labeled as “synchronous,” their behavior in NCCL can be confusing. A synchronized call to the CUDA tensor blocks only the host CPU thread until the data transfer kernel is put into circulation, not until the data transfer is complete. The CPU is then free to queue other tasks.

There are exceptions: i very first call to torch.distributed.recv() it is a process really blocking and waiting for the transfer to complete, probably due to internal NCCL warming processes. Subsequent calls will only block until the function is enqueued.

Consider this example there rank 1 it hangs because the CPU is trying to access a tensor that the GPU hasn't found yet:

rank = torch.distributed.get_rank()
if rank == 0:
   t = torch.tensor([1,2,3], dtype=torch.float32, device=device)
   # torch.distributed.send(t, dst=1) # No send operation is performed
else: # rank == 1 (assuming only 2 ranks)
   t = torch.empty(3, dtype=torch.float32, device=device)
   torch.distributed.recv(t, src=0) # Blocks only until enqueued (after first run)
   print("This WILL print if NCCL is warmed-up")
   print
   print("This will NOT print")

CPU process in rank 1 he holds on print

If you use this code multiple times, be aware of that This WILL print if NCCL is warmed-up it will not be printed in the later operation, as the CPU is still stuck print

Collections

All integrated operations support both synchronous and async operations by using async_op to argue. It defaults to False, which means synchronous operation.

One to All Collections

These functions involve one level sending data to all other levels in the group.

Broadcast

  • torch.distributed.broadcast(tensor, src): Copies a tensor from a single source point (src) at all other levels. Every process ends with the same copy of the tensor. I tensor the parameter has two purposes: (1) when the process level is the same as srci tensor data is sent; (2) otherwise, tensor is used to save the received data.
rank = torch.distributed.get_rank()
if rank == 0: # source rank
  tensor = torch.tensor([1,2,3], dtype=torch.int64, device=device)
else: # destination ranks
  tensor = torch.empty(3, dtype=torch.int64, device=device)
torch.distributed.broadcast(tensor, src=0)
Photo by author: Stream visuals

Green

  • torch.distributed.scatter(tensor, scatter_list, src): Moves pieces of data from source level to all levels. I scatter_list at the source level it contains multiple tensors, and each level (including the source) receives one tensor from this array. tensor flexible. Local standards are just passing by None Of course scatter_list.
# The scatter_list must be None for all non-source ranks.
scatter_list = None if rank != 0 else [torch.tensor([i, i+1]).to(device) for i in range(0,4,2)]
tensor = torch.empty(2, dtype=torch.int64).to(device)
torch.distributed.scatter(tensor, scatter_list, src=0)
print(f'Rank {rank} received: {tensor}')
Image by the author: Scatter visual animation

All-in-one Collections

These functions collect data from all levels and consolidate it into a single destination.

Reduce

  • torch.distributed.reduce(tensor, dst, op): Takes a tensor at each level, using a reduction function (like SUM, MAX, MIN), and stores the final result in the destination (dst) only.
rank = torch.distributed.get_rank()
tensor = torch.tensor([rank+1, rank+2, rank+3], device=device)
torch.distributed.reduce(tensor, dst=0, op=torch.distributed.ReduceOp.SUM)
print(tensor)
Photo by author: Reduce visual animation

Collect

  • torch.distributed.gather(tensor, gather_list, dst): Combines the tensor from all distances into an array of values ​​at the destination. I gather_list it should be a list of prices (properly measured and typed) at the destination as well None all over.
# The gather_list must be None for all non-destination ranks.
rank = torch.distributed.get_rank()
world_size = torch.distributed.get_world_size()
gather_list = None if rank != 0 else [torch.zeros(3, dtype=torch.int64).to(device) for _ in range(world_size)]
t = torch.tensor([0+rank, 1+rank, 2+rank], dtype=torch.int64).to(device)
torch.distributed.gather(t, gather_list, dst=0)
print(f'After op, Rank {rank} has: {gather_list}')

Variable world_size is the sum of the number of levels. It can be found with torch.distributed.get_world_size(). But don't worry about the implementation details yet, the most important thing is to grasp the concepts.

Photo by author: Collect visuals

All Inclusive Collections

In these operations, each level sends and receives data from all other levels.

Everything Reduce

  • torch.distributed.all_reduce(tensor, op): It's the same reducebut the result is stored in all of themposition instead of only one position.
# Example for torch.distributed.all_reduce
rank = torch.distributed.get_rank()
tensor = torch.tensor([rank+1, rank+2, rank+3], dtype=torch.float32, device=device)
torch.distributed.all_reduce(tensor, op=torch.distributed.ReduceOp.SUM)
print(f"Rank {rank} after all_reduce: {tensor}")
Image by author: All Reduce visual animation

All Collect

  • torch.distributed.all_gather(tensor_list, tensor): It's the same gatherbut a compiled list of tensors is available all of them position.
# Example for torch.distributed.all_gather
rank = torch.distributed.get_rank()
world_size = torch.distributed.get_world_size()
input_tensor = torch.tensor([rank], dtype=torch.float32, device=device)
tensor_list = [torch.empty(1, dtype=torch.float32, device=device) for _ in range(world_size)]
torch.distributed.all_gather(tensor_list, input_tensor)
print(f"Rank {rank} gathered: {[t.item() for t in tensor_list]}")
Photo by author: All Combine visuals

Reduce Scatter

  • torch.distributed.reduce_scatter(output, input_list): Equivalent to performing a reduction operation on an array of tensors and decomposing the results. Each level gets a different percentage of the reduced effect.
# Example for torch.distributed.reduce_scatter
rank = torch.distributed.get_rank()
world_size = torch.distributed.get_world_size()
input_list = [torch.tensor([rank + i], dtype=torch.float32, device=device) for i in range(world_size)]
output = torch.empty(1, dtype=torch.float32, device=device)
torch.distributed.reduce_scatter(output, input_list, op=torch.distributed.ReduceOp.SUM)
print(f"Rank {rank} received reduced value: {output.item()}")
Image by author: Reduce Scatter visuals

Synchronization

The two most commonly used functions are these request.wait() again torch.cuda.synchronize(). It is important to understand the difference between the two:

  • request.wait(): This is used for asynchronous operations. Synchronizes the currently active CUDA stream for that task, ensuring that the stream waits for the connection to complete before continuing. In other words, it blocks the currently active CUDA stream until the data transfer is complete. On the host side, it only causes the host to wait for a character to be inserted; the host does not wait for the data transfer to complete.
  • torch.cuda.synchronize(): This is a very powerful command that stops the host CPU thread until everything tasks previously assigned to the GPU have been completed. It ensures that the GPU is completely idle before the CPU takes over, but can cause performance bottlenecks if used improperly. Whenever you need to run benchmarks, you should use this to ensure you are capturing the exact time the GPUs are made.

The conclusion

Congratulations on reaching the end! In this post, learn about:

  • Point-to-Point Operations
  • Sync and Async in NCCL
  • Combined activities
  • Synchronization methods

In the next blog post we'll get into PCIe, NVLink, and other methods that enable communication in a distributed environment!

References

Source link

Related Articles

Leave a Reply

Your email address will not be published. Required fields are marked *

Back to top button