Skip to content

MPI point-to-point communication

Sending and receiving of messages by processes is the basic MPI communication mechanism.

Sending messages

The MPI_Send function is used to send data to another process. This function has the following signature:

int MPI_Send(const void *buf, int count, MPI_Datatype datatype, 
             int dest, int tag, MPI_Comm comm)
Parameter Description
buf a pointer to the data to send
count the number of elements to send from the buffer
datatype data type of the element in the buffer
dest the rank of recipient process
tag a tag (user choice) to assign to the communication
comm the communicator in which the communication takes place

In plain English: a buffer which consists of count successive entries of the type indicated by datatype, starting with the entry at address buf should be sent to the process with rank dest in the communicator comm. The message is identified by tag.

The basic datatypes that can be specified correspond to the basic datatypes of the C programming language. The MPI basic datatypes and their corresponding C types are listed in the table below.

MPI Datatype C equivalent MPI Datatype C equivalent
MPI_CHAR char MPI_UNSIGNED_CHAR unsigned char
MPI_INT int MPI_UNSIGNED unsigned int
MPI_LONG long int MPI_UNSIGNED_LONG unsigned long int
MPI_LONG_LONG long long int MPI_UNSIGNED_LONG_LONG unsigned long long int
MPI_FLOAT float MPI_DOUBLE double

As an example, consider sending an array of 16 doubles to rank 1 using tag 99 and the MPI_COMM_WORLD communicator, then the call to MPI_Send will be

MPI_Send(buf, 16, MPI_DOUBLE, 1, 99, MPI_COMM_WORLD);

Receiving messages

The MPI_Recv function is used to receive data from another process. This function has the following signature:

int MPI_Recv(void *buf, int count, MPI_Datatype datatype, int source, 
             int tag, MPI_Comm comm, MPI_Status *status)
Parameter Description
buf a pointer to where the data should be received
count the maximum number of elements the receive buffer can hold
datatype data type of the element of the receive buffer
source the rank of sender process
tag a tag (user choice) to assign to the communication
comm the communicator in which the communication takes place
status a pointer object representing the status of a receive operation

In plain English: count successive entries of the type indicated by datatype can be received in the buffer starting at address buf. The message should come from the process with rank source in the communicator comm and should have a the tag tag.

As an example, consider receiving an array of 16 doubles from rank 0 using tag 99 and the MPI_COMM_WORLD communicator, then the call to MPI_Recv will be

MPI_Recv(buf, 16, MPI_DOUBLE, 0, 99, MPI_COMM_WORLD, &status);

The MPI_Send function will block until a message is received. It means that as long as a message is not received, the program cannot progress and that you have to make sure that for every call to MPI_Recv, there is a matching MPI_Send or your program will be in a deadlock.

Send and receive example

To illustrate the process of communication with MPI_Send and MPI_Recv we will consider a simple example were to processes are launched. For both these processes a buffer that can hold up to 20 elements of type char is allocated. Then,

  • the first process of rank 0, will fill the buffer and send it to the process with rank 1 using MPI_Send.
  • the second process will call MPI_Recv to receive the buffer from the first process (rank 0)
  • the second process prints the buffer

The code decribed above is implemented in the following program:

Source code for this example

mpi_hello_msg.c
#include <string.h>
#include <stdio.h>

#include <mpi.h>

int main(int argc, char** argv) {
  char message[20];
  MPI_Status status;

  MPI_Init(&argc, &argv);

  int world_rank;
  MPI_Comm_rank(MPI_COMM_WORLD, &world_rank);

  if (world_rank == 0) {
    strcpy(message,"Hello, there");
    MPI_Send(message, strlen(message)+1, MPI_CHAR, 1, 99, MPI_COMM_WORLD);

  } else if (world_rank == 1) {
    MPI_Recv(message, 20, MPI_CHAR, 0, 99, MPI_COMM_WORLD, &status);
    printf("Rank %d received: %s\n", world_rank, message);
  }

  MPI_Finalize();

  return 0;
}

This code can be compiled and run by using the following commands.

 $ module load OpenMPI
 $ mpicc -o mpi_hello_msg mpi_hello_msg.c
 $ mpirun -np 2 ./mpi_hello_msg
Rank 1 received: Hello, there

Message matching

An MPI message consists of two parts:

  • the data part and the information that can
  • information about the message

The last part is called the envelope and contains the source, the destination, the tag and the communicator. The source is automatically determined by identifying the sender while the destination, tag and communicator are provided as parameters of the MPI_Send call.

In order for a message to be received, by a process all the elements of the envelope need to match the MPI_Recv call:

  • the source parameter of MPI_Recv should match the source in the envelope
  • the process that called MPI_Recv should match the destination in the envelope
  • the tag parameter of MPI_Recv should match the tag in the envelope

For example, consider this change in the previous example code, where the value of the tag on the sender side is changed from 99 to 89.

if (world_rank == 0) {
  strcpy(message,"Hello, there");
-  MPI_Send(message, strlen(message)+1, MPI_CHAR, 1, 99, MPI_COMM_WORLD);
+  MPI_Send(message, strlen(message)+1, MPI_CHAR, 1, 89, MPI_COMM_WORLD);

} else if (world_rank == 1) {
  MPI_Recv(message, 20, MPI_CHAR, 0, 99, MPI_COMM_WORLD, &status);
  printf("Rank %d received: %s\n", world_rank, message);
}

If we try to run the code with the change of tag on the sender side, not output will be produced and the processes keep running until we cancel the execution: we are in a deadlock. The reason for this deadlock is that the process of rank 1 is waiting for a message with tag = 99 but on the sender side (rank 0) the message that was sent with tag = 89. As a result, rank 1 is blocking, waiting for a message that was never sent.

Another example to illustrate how messages are matched is the following:

if (world_rank == 0) {
+  strcpy(message,"I'm rank 0");
+  MPI_Send(message, strlen(message)+1, MPI_CHAR, 1, 89, MPI_COMM_WORLD);
+
  strcpy(message,"Hello, there");
  MPI_Send(message, strlen(message)+1, MPI_CHAR, 1, 99, MPI_COMM_WORLD);

} else if (world_rank == 1) {
  MPI_Recv(message, 20, MPI_CHAR, 0, 99, MPI_COMM_WORLD, &status);
  printf("Rank %d received: %s\n", world_rank, message);
+
+ MPI_Recv(message, 20, MPI_CHAR, 0, 89, MPI_COMM_WORLD, &status);
+  printf("Rank %d received: %s\n", world_rank, message);
}

The order of the tags on the receiver side is inverted (89, then 99) with respect to the order on the sender side (99, then 89). As a result, when we run this the code we obtain the following output

 $ mpirun -np 2 ./mpi_hello_msg
Rank 1 received: Hello, there
Rank 1 received: I'm rank 0

The messages have been matched by the tag. Even if, on the sender side, the message with tag 89 has been issued first, it's not the first to be received. Instead, it's the second message that is received first because its tag (99) matches the tag in the first call to MPI_Recv on the receiver side.

However, in some situations, the last example will cause a deadlock due to the fact that MPI uses different communication protocols depending on the size of the message.

MPI communication protocols

The standard it's specified that MPI_Send will return when it's safe to modify the send buffer. However it does not mean that the message has reached its destination (the standard doesn't require it). In fact, depending on the message size, most MPI implementation will use different communication protocols. For one of these communication protocols MPI_Send will return before the message arrives at its destination: the eager protocol.

With the eager protocol, when MPI_Send is called, the MPI implementation will store a copy of the data for the message in a temporary internal buffer and return. Subsequently, when a matching receive is issued, MPI will transfer the data from this temporary buffer to the receiver.

Eager protocol

The MPI eager send communication protocol

Another protocol is the rendezvous protocol. In this protocol, the first ask to the receiver if it's ready to receive the data, i.e., has a buffer to store the data. The data transfer takes place only when the receiver has confirmed that it's ready to receive the data.

Rendevous protocol Rendevous protocol

The MPI Rendez-vous communication protocol

For small to medium message sizes, most MPI implementations will use the eager protocol. However for large messages, where using a temporary buffer for large messages will consume too much memory, MPI will use the rendezvous protocol.

MPI_Recv is always blocking

MPI_Recv returns when the received buffer as been populated by the data. As a consequence it's always blocking as there is no way to receive the data before it has been send.

A way to force the use of the rendezvous protocol is to use the MPI_Ssend function in place of MPI_Send.

MPI_Ssend stands for synchronous send and has almost the signature as the standard send:

int MPI_Ssend(const void *buf, int count, MPI_Datatype datatype, 
              int dest, int tag, MPI_Comm comm)

If we convert the MPI_Send of the last example to MPI_Ssend:

if (world_rank == 0) {
  strcpy(message,"I'm rank 0");
- MPI_Send(message, strlen(message)+1, MPI_CHAR, 1, 89, MPI_COMM_WORLD);
+ MPI_Ssend(message, strlen(message)+1, MPI_CHAR, 1, 89, MPI_COMM_WORLD);

  strcpy(message,"Hello, there");
- MPI_Send(message, strlen(message)+1, MPI_CHAR, 1, 99, MPI_COMM_WORLD);
+ MPI_Ssend(message, strlen(message)+1, MPI_CHAR, 1, 99, MPI_COMM_WORLD);

} else if (world_rank == 1) {
  MPI_Recv(message, 20, MPI_CHAR, 0, 99, MPI_COMM_WORLD, &status);
  printf("Rank %d received: %s\n", world_rank, message);

  MPI_Recv(message, 20, MPI_CHAR, 0, 89, MPI_COMM_WORLD, &status);
  printf("Rank %d received: %s\n", world_rank, message);
}

the result is a deadlock. The reason for this deadlock is that

  • Sender: issue a send with tag = 89 and block waiting for the message to be transmitted
  • Receiver: issue a receive waiting for a message with tag = 99 and wait for this message

Because the sender is waiting for a matching receive before returning, the second send with tag = 99 is never issued and the receiving side cannot progress. The same is true for the sender side as the second receive is never reached.

The reason why the code was able to progress when we used MPI_Send is because MPI was using the eager protocol and the second send was reached, allowing the application to progress.

This illustrates the importance of issuing the send and receive calls in the right order as well as the importance of the tag. This is a common problem faced by MPI beginners. Initially, when testing their code with smaller problem sizes, it often functions correctly due to the use of the eager protocol. However, as they apply the same code to larger problem sizes, MPI switches to the rendezvous protocol, resulting in a deadlock situation.

Work-sharing with MPI

Up to this point, we've focused on communication aspects without delving into the methods for achieving parallelization, or work-sharing. The fundamental approach to attaining parallelism with MPI involves breaking down the global problem into smaller sub-problems and then distributing these subproblems among the individual ranks.

To illustrate how it can be done, we will consider a simple application that sums integers from 1 to N:

$$ sum = \sum_{i = 1}^N i $$

Source code for this example

intsum_serial.c
#include <stdio.h>

int main(int argc, char** argv) {
  const unsigned int N = 10000000;

  unsigned long sum = 0;
  for (unsigned int i = 1; i <= N; ++i) {
    sum += i;
  }

  printf("The sum from 1 to %d is %lu.\n", N, sum);

  return 0;
}

We can subdivide the problem by assigning a range of values to each rank. For that, we define the starting value ($start_i$) for a given process $i$ with rank $rank_i$ as

$$ start_i = \frac{N\cdot rank_i}{worldsize} + 1 $$

Note that we added $+ 1$ to the start because we want to sum all the integers from 1 to N.

Similarly, we can define the end value ($end_i$) for a given process with rank $rank_i$ as

$$ end_i = \frac{N\cdot (rank_i + 1)}{worldsize} $$

After obtaining the range of values, each rank can calculate its "local" sum. Upon completing this local computation, we transmit the local sum to the process with rank 0, which will handle the final summation.

Source code for this example

intsum_mpi.c
#include <stdio.h>
#include <stdlib.h>
#include <mpi.h>

int main(int argc, char* argv[]) {
  const unsigned int N = 10000000;

  int rank, world_size;
  MPI_Init(&argc, &argv);
  MPI_Comm_size(MPI_COMM_WORLD, &world_size);
  MPI_Comm_rank(MPI_COMM_WORLD, &rank);

  if (rank == 0) {  
    printf("Running with %d processes.\n", world_size);
  }

  unsigned int startidx = (N * rank / world_size) + 1;
  unsigned int   endidx = N * (rank+1) / world_size;

  unsigned long local_sum = 0;
  for (unsigned int i = startidx; i <= endidx; ++i)
    local_sum += i;

  printf("Process with rank %2d has local sum: %lu\n", rank, local_sum);

  if (rank > 0) {
    MPI_Send(&local_sum, 1, MPI_UNSIGNED_LONG, 0, 1, MPI_COMM_WORLD);
  } 

  if (rank == 0) {
    unsigned long remote_sum;
    for(int src = 1; src < world_size; ++src) {
      MPI_Recv(&remote_sum, 1, MPI_UNSIGNED_LONG, src, 1, 
               MPI_COMM_WORLD, MPI_STATUS_IGNORE);

      local_sum += remote_sum;
    }

    printf("\nThe sum from 1 to %d is %lu.\n", N, local_sum);
  }

  MPI_Finalize();

  return 0;
}

Note that we used MPI_STATUS_IGNORE in the MPI_Recv call. This informs MPI to not fill an MPI_Status, which, in our case, saves some time as we have no use for it.

There is a better way to perform the final sum

Later on, we'll discuss a more efficient approach for performing the final sum than using multiple receives (through collective communication). The purpose of presenting this pattern here is to demonstrate how work-sharing can be accomplished with MPI. It's worth noting that this approach is not necessarily the optimal implementation.

Alternative implementation for the communication

Since the order of message reception doesn't affect correctness, we can consider an alternative communication approach by employing the MPI_ANY_SOURCE wildcard for the source parameter in the MPI_Recv call. This instructs MPI to receive the message without specifying the sender's rank, allowing for relaxed order in message reception.

if (rank > 0) {
  MPI_Send(&local_sum, 1, MPI_UNSIGNED_LONG, 0, 1, MPI_COMM_WORLD);
} 

if (rank == 0) {
  unsigned long remote_sum;
  for(int src = 1; src < world_size; ++src) {
    MPI_Recv(&remote_sum, 1, MPI_UNSIGNED_LONG, MPI_ANY_SOURCE,
           1, MPI_COMM_WORLD, MPI_STATUS_IGNORE);

    local_sum += remote_sum;
  }
}

Diffusion in one dimension

For our next example, we will consider the diffusion equation in one dimension which reads

$$ \frac{\partial u}{\partial t} = \alpha \frac{\partial^2 u}{\partial x^2} $$

where $u(x,t)$ is the unknown function to be solved for, $x$ is a coordinate in space, and $t$ is time. The coefficient $\alpha$ is the diffusion coefficient which determines how fast $u$ changes in time. This problem can be discretized in time and space to give

$$ \frac{\partial}{\partial t}u(x_i, t_n) = \alpha \frac{\partial^2}{\partial x^2}u(x_i, t_n) $$

After applying a forward difference in time and a central difference in space to the equation above we obtain

$$ \frac{u_i^{n+1} - u_i^n}{\Delta t} = \frac{u_{i+1}^n - 2 u_i^n + u_{i-1}^{n}}{\Delta x^2} $$

so that the values on the one-dimensional grid ($u_i^{n+1}$) for the next time step ($t_{n+1}$) can be computed from the values obtained at the previous time step ($t_n$) using

$$ u_i^{n+1} = u_i^n + \alpha\frac{\Delta t}{\Delta x^2}(u_{i+1}^n - 2 u_i^n + u_{i-1}^{n}) $$

From the equation above, we see that to compute an updated value on the grid, we need the value from the previous step on at the same location on the grid ($u_i^n$) but also the value of the neighbors ($u_{i+1}^n$, and $u_{i-1}^{n}$).

The domain can be divided into chunks that are computed on different processes. However, since computing the value of each point requires the values of the neighbors, the computation is not embarrassingly parallel: the points at the borders of a chunk require the values of points from the neighboring chunks. This can be implemented by adding extra cells to the chunk called "ghost cell" that will be updated with values coming from the neighbors using MPI communication.

ghost cell pattern ghost cell pattern

1D diffusion ghost cell pattern

To compute the chunk start and end values, we can use a similar approach to the one used for the sum of integers example. Once we have computed these values, we can determine the size of the chunk and allocate memory to hold the values for the previous (uold) and current time step (unew).

my_start = NPOINTS * my_rank / world_size;
my_end = NPOINTS * (my_rank + 1) / world_size;
my_size = my_end - my_start;

uold = malloc(sizeof(double)*(my_size+2));
unew = malloc(sizeof(double)*(my_size+2));

Note that we allocated two extra cells in our call to malloc. These extra cells are the ghost cells used to store the values we require from the neighbors.

The neighbor's ranks are defined as our own rank minus one for the left neighbor and plus one for the right neighbor.

left_rank =  my_rank > 0            ? my_rank - 1 : MPI_PROC_NULL;
right_rank = my_rank < world_size-1 ? my_rank + 1 : MPI_PROC_NULL;

were we used MPI_PROC_NULL to represent the non-existent neighbors at the start and end of the domain. It's a constant that represents a dummy MPI process rank. When passing it to a send or receive operation, it is guaranteed to succeed and return as soon as possible.

Now that we have defined the ranks of the neighbors, we can implement the update of the ghost cells for every time steps as

MPI_Send(&uold[my_size],  1, MPI_DOUBLE, right_rank, 0, MPI_COMM_WORLD);
MPI_Recv(&uold[0], 1, MPI_DOUBLE, left_rank, 0, MPI_COMM_WORLD, 
           MPI_STATUS_IGNORE);

MPI_Send(&uold[1], 1, MPI_DOUBLE, left_rank, 1, MPI_COMM_WORLD);
MPI_Recv(&uold[my_size+1], 1, MPI_DOUBLE, right_rank, 1, MPI_COMM_WORLD, 
           MPI_STATUS_IGNORE);

where we first send the value at the end of the process subdomain to the right neighbor and receive the value from the left neighbor to update the ghost cell at the beginning of the subdomain. In a subsequent communication step, we do the opposite, send to the left neighbor and update the ghost cell and the end of the subdomain by receiving the value from the right neighbor.

The code where everything is put together is presented below.

Source code for this example and gnuplot input for visualization

diffusion1d_mpi.c
my_start = NPOINTS * my_rank / world_size;
my_end = NPOINTS * (my_rank + 1) / world_size;
my_size = my_end - my_start;

left_rank =  my_rank > 0            ? my_rank - 1 : MPI_PROC_NULL;
right_rank = my_rank < world_size-1 ? my_rank + 1 : MPI_PROC_NULL;

uold = malloc(sizeof(double)*(my_size+2));
unew = malloc(sizeof(double)*(my_size+2));

// Initial conditions
for(int i = 1; i <= my_size; i++) {
  x = dx * ((double)(i+my_start) - 0.5) - 0.5 * XLEN;
  uold[i] = 0.5 * cos(2.0 * M_PI * x / XLEN) + 0.5;
}

if (my_rank == 0) {
  uold[0] = 0.0;
  unew[0] = 0.0;
}

if (my_rank == world_size-1) {
  uold[my_size+1] = 0.0;
  unew[my_size+1] = 0.0;
}

for (int iter = 1; iter <= NITERS; iter++) {
  MPI_Send(&uold[my_size],  1, MPI_DOUBLE, right_rank, 0, MPI_COMM_WORLD);
  MPI_Recv(&uold[0], 1, MPI_DOUBLE, left_rank, 0, MPI_COMM_WORLD, 
             MPI_STATUS_IGNORE);

  MPI_Send(&uold[1], 1, MPI_DOUBLE, left_rank, 1, MPI_COMM_WORLD);
  MPI_Recv(&uold[my_size+1], 1, MPI_DOUBLE, right_rank, 1, MPI_COMM_WORLD, 
             MPI_STATUS_IGNORE);

  for (int i = 1; i <= my_size; i++) {
    unew[i] = uold[i] + DIFF_COEF * dtdx2
                      * (uold[i+1] - 2.0 * uold[i] + uold[i-1]);
  }

  t += dt;

  double *tmpptr = uold;
  uold = unew;
  unew = tmpptr;
}