colossalai.communication

colossalai.communication.all_gather(tensor, dim, parallel_mode, async_op=False)[source]

Gathers all tensors from the parallel group and concatenates them in a specific dimension.

Note

The parallel_mode should be concluded in ParallelMode. More details about ParallelMode could be found in parallel_mode.

Parameters
  • tensor (torch.Tensor) – Tensor to be gathered.

  • dim (int) – The dimension concatenating in.

  • parallel_mode (colossalai.context.ParallelMode) – Parallel group mode used in this communication.

  • async_op (bool, optional) – Whether operations are asynchronous.

Returns

The result of all-together only, if async_op is set to False. A tuple of output of all-gather and Async work handle, if async_op is set to True.

Return type

Union[tuple(torch.Tensor, work handle), torch.Tensor]

colossalai.communication.reduce_scatter(tensor, dim, parallel_mode, op=<torch.distributed.distributed_c10d.ReduceOp object>, async_op=False)[source]

Reduces all tensors then scatters it in a specific dimension to all members in the parallel group.

Note

The parallel_mode should be concluded in ParallelMode. More details about ParallelMode could be found in parallel_mode.

Parameters
  • tensor (torch.Tensor) – Tensor to be reduce_scattered.

  • dim (int) – The dimension concatenating in.

  • parallel_mode (colossalai.context.ParallelMode) – Parallel group mode used in this communication.

  • op (torch.distributed.ReduceOp, optional) – The type of reduce operation, should be included in [SUM, AVG, PRODUCT, MIN, MAX, BAND, BOR, BXOR]. More details about ReduceOp please refer to ReduceOp.

  • async_op (bool, optional) – Whether operations are asynchronous.

Returns

The result of reduce_scatter only, if async_op is set to False. A tuple of output of all-gather and Async work handle, if async_op is set to True.

Return type

Union[tuple(torch.Tensor, work handle), torch.Tensor]

colossalai.communication.all_reduce(tensor, parallel_mode, op=<torch.distributed.distributed_c10d.ReduceOp object>, async_op=False)[source]

Reduces the tensor data across whole parallel group in such a way that all get the final result.

Note

The parallel_mode should be concluded in ParallelMode. More details about ParallelMode could be found in parallel_mode.

Parameters
  • tensor (torch.Tensor) – Tensor to be all-reduced.

  • parallel_mode (colossalai.context.ParallelMode) – Parallel group mode used in this communication.

  • op (torch.distributed.ReduceOp, optional) –

    The type of reduce operation, should be included in [SUM, AVG, PRODUCT, MIN, MAX, BAND, BOR, BXOR]. More details about ReduceOp please refer to ReduceOp.

  • async_op (bool, optional) – Whether operations are asynchronous.

Returns

The result of all-gather only, if async_op is set to False. A tuple of output of all-gather and Async work handle, if async_op is set to True.

Return type

Union[tuple(torch.Tensor, work handle), torch.Tensor]

colossalai.communication.broadcast(tensor, src, parallel_mode, async_op=False)[source]

Broadcast tensors to whole parallel group. Tensor must have the same number of elements in all processes participating in the collective.

Note

The parallel_mode should be concluded in ParallelMode. More details about ParallelMode could be found in parallel_mode.

Parameters
  • tensor (torch.Tensor) – Tensor to be broadcast.

  • src (int) – Source rank.

  • parallel_mode (colossalai.context.ParallelMode) – Parallel group mode used in this communication.

  • async_op (bool, optional) – Whether operations are asynchronous.

Returns

The tensor need to be broadcast only, if async_op is set to False. A tuple of output of all-gather and Async work handle, if async_op is set to True.

Return type

Union[tuple(torch.Tensor, work handle), torch.Tensor]

colossalai.communication.reduce(tensor, dst, parallel_mode, op=<torch.distributed.distributed_c10d.ReduceOp object>, async_op=False)[source]

Reduce tensors across whole parallel group. Only the process with rank dst is going to receive the final result.

Note

The parallel_mode should be concluded in ParallelMode. More details about ParallelMode could be found in parallel_mode.

Parameters
  • tensor (torch.Tensor) – Tensor to be reduced.

  • dst (int) – Destination rank.

  • parallel_mode (colossalai.context.ParallelMode) – Parallel group mode used in this communication.

  • async_op (bool, optional) – Whether operations are asynchronous.

Returns

The result of reduce only, if async_op is set to False. A tuple of output of all-gather and Async work handle, if async_op is set to True.

Return type

Union[tuple(torch.Tensor, work handle), torch.Tensor]

colossalai.communication.send_forward(output_tensor, next_rank=None, scatter_gather_tensors=False)[source]

Sends the input tensor to the next stage in pipeline.

Parameters
  • output_tensor (Union[torch.Tensor, List[torch.Tensor]]) – Tensor to be sent.

  • next_rank (int, optional) – The rank of the recipient of the tensor.

colossalai.communication.send_forward_recv_forward(output_tensor, input_tensor_shape, recv_prev=True, prev_rank=None, next_rank=None, dtype=torch.float32, scatter_gather_tensors=False)[source]

Batched communication operation. Sends the input tensor to the next stage in pipeline, while receives the output tensor from the previous stage in pipeline as the input of this stage.

Parameters
  • output_tensor (Union[torch.Tensor, List[torch.Tensor]]) – Tensor to be sent.

  • input_tensor_shape (Union[torch.Size, List[torch.Size]]) – The shape of the tensor to be received.

Returns

The input tensor.

Return type

Union[torch.Tensor, List[torch.Tensor]]

colossalai.communication.send_forward_backward_recv_forward_backward(output_tensor, input_tensor_grad, input_tensor_shape, output_grad_shape, recv_prev=True, recv_next=True, prev_rank=None, next_rank=None, dtype=torch.float32, scatter_gather_tensors=False)[source]

Batched communication operation. Sends the input tensor to the next stage in pipeline and the gradient tensor to the previous stage, while receives the input gradient tensor from the next stage and the input tensor from the previous stage.

Parameters
  • output_tensor (Union[torch.Tensor, List[torch.Tensor]]) – Tensor sent to the next.

  • input_tensor_grad (Union[torch.Tensor, List[torch.Tensor]]) – Tensor sent to the previous.

  • input_tensor_shape (Union[torch.Size, List[torch.Size]]) – The shape of the tensor received from the previous.

  • output_grad_shape (Union[torch.Size, List[torch.Size]]) – The shape of the tensor received from the next.

Returns

(the input tensor, the input gradient tensor)

Return type

Tuple(Union[torch.Tensor, List[torch.Tensor]], Union[torch.Tensor, List[torch.Tensor]])

colossalai.communication.send_backward(input_tensor_grad, prev_rank=None, scatter_gather_tensors=False)[source]

Sends the gradient tensor to the previous stage in pipeline.

Parameters
  • input_tensor_grad (Union[torch.Tensor, List[torch.Tensor]]) – Tensor to be sent

  • prev_rank (int, optional) – The rank of the recipient of the tensor

colossalai.communication.send_backward_recv_backward(input_tensor_grad, output_grad_shape, recv_next=True, prev_rank=None, next_rank=None, dtype=torch.float32, scatter_gather_tensors=False)[source]

Batched communication operation. Sends the gradient tensor to the previous stage in pipeline, while receives the gradient tensor from the next member in pipeline as the input of this stage.

Parameters
  • input_tensor_grad (Union[torch.Tensor, List[torch.Tensor]]) – Tensor to be sent.

  • output_grad_shape (Union[torch.Size, List[torch.Size]]) – The shape of the tensor to be received.

Returns

The input gradient tensor.

Return type

Union[torch.Tensor, List[torch.Tensor]]

colossalai.communication.send_backward_recv_forward(input_tensor_grad, input_tensor_shape, recv_prev=True, prev_rank=None, dtype=torch.float32, scatter_gather_tensors=False)[source]

Batched communication operation. Sends the gradient tensor to the previous stage in pipeline, while receives the output tensor from the previous stage in pipeline as the input of this stage.

Parameters
  • input_tensor_grad (Union[torch.Tensor, List[torch.Tensor]]) – Tensor to be sent.

  • input_tensor_shape (Union[torch.Size, List[torch.Size]]) – The shape of the tensor to be received.

Returns

The input tensor.

Return type

Union[torch.Tensor, List[torch.Tensor]]

colossalai.communication.send_forward_recv_backward(output_tensor, output_grad_shape, recv_next=True, next_rank=None, dtype=torch.float32, scatter_gather_tensors=False)[source]

Batched communication operation. Sends the input tensor to the next stage in pipeline, while receives the gradient tensor from the next stage in pipeline as the input gradient tensor of this stage.

Parameters
  • output_tensor (Union[torch.Tensor, List[torch.Tensor]]) – Tensor to be sent.

  • output_grad_shape (Union[torch.Size, List[torch.Size]]) – The shape of the tensor to be received.

Returns

The input gradient tensor.

Return type

Union[torch.Tensor, List[torch.Tensor]]

colossalai.communication.recv_backward(output_grad_shape, next_rank=None, dtype=torch.float32, scatter_gather_tensors=False)[source]

Copy the gradient tensor from the next stage in pipeline as the input gradient of this stage.

Parameters
  • output_grad_shape (Union[torch.Size, List[torch.Size]]) – The shape of the tensor to be received.

  • next_rank (int, optional) – The rank of the source of the tensor.

Returns

The input gradient tensor or gradident tensor list.

Return type

Union[torch.Tensor, List[torch.Tensor]]

colossalai.communication.recv_forward(input_tensor_shape, prev_rank=None, dtype=torch.float32, scatter_gather_tensors=False)[source]

Copy the forward output from the previous stage in pipeline as the input tensor of this stage.

Parameters
  • input_tensor_shape (Union[torch.Size, List[torch.Size]]) – The shape of the tensor to be received.

  • prev_rank (int, optional) – The rank of the source of the tensor.

Returns

The input tensor or input tensor list.

Return type

Union[torch.Tensor, List[torch.Tensor]]

colossalai.communication.ring_forward(tensor_send_next, parallel_mode)[source]

Sends a tensor to the next member and receives a tensor from the previous member. This function returns the received tensor from the previous member.

Parameters
  • tensor_send_next (torch.Tensor) – Tensor sent to next member

  • parallel_mode (ParallelMode) – Parallel group mode used in this communication

Returns

The tensor received from the previous.

Return type

torch.Tensor

Note

The parallel_mode should be concluded in ParallelMode. More details about ParallelMode could be found in parallel_mode.

colossalai.communication.send_obj_meta(obj, need_meta=True, next_rank=None)[source]

Sends obj meta information before sending a specific obj. Since the recipient must know the shape of the obj in p2p communications, meta information of the obj should be sent before communications. This function synchronizes with recv_obj_meta().

Parameters
  • obj (Union[torch.Tensor, List[torch.Tensor]]) – obj to be sent.

  • need_meta (bool, optional) – If False, meta information won’t be sent.

  • next_rank (int) – The rank of the next member in pipeline parallel group.

Returns

False

Return type

bool

colossalai.communication.recv_obj_meta(obj_shape, prev_rank=None)[source]

Receives obj meta information before receiving a specific obj. Since the recipient must know the shape of the obj in p2p communications, meta information of the obj should be received before communications. This function synchronizes with send_obj_meta().

Parameters
  • obj_shape (Union[torch.Size, List[torch.Size]]) – The shape of the obj to be received.

  • prev_rank (int) – The rank of the source of the obj.

Returns

The shape of the obj to be received.

Return type

Union[torch.Size, List[torch.Size]]