Collective Communication

With independent, often distributed processes, there is a need in many program situations to have all the processes communicating with each other, usually by sharing data, either before or after independent simultaneous computations that each process performs. Here we see simple examples of these collective communication patterns.

12. Collective Communication: Reduction

file: patternlets/MPI/12.reduction/reduction.c

Build inside 12.reduction directory:

make reduction

Execute on the command line inside 12.reduction directory:

mpirun -np <number of processes> ./reduction

Once processes have performed independent concurrent computations, possibly on some portion of decomposed data, it is quite common to then reduce those individual computations into one value. This example shows a simple calculation done by each process being reduced to a sum and a maximum. In this example, MPI, has built-in computations, indicated by MPI_SUM and MPI_MAX in the following code. With four processes, the code is implemented like this:

../_images/Reduction.png
 1
 2
 3
 4
 5
 6
 7
 8
 9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
/* reduction.c
* ... illustrates the use of MPI_Reduce()...
* Joel Adams, Calvin College, November 2009.
*
* Usage: mpirun -np N ./reduction
*
* Exercise:
* - Compile and run, varying N: 4, 6, 8, 10.
* - Explain behavior of MPI_Reduce().
*/

#include <mpi.h>
#include <stdio.h>
#include <stdlib.h>

int main(int argc, char** argv) {
    int numProcs = -1, myRank = -1, square = -1, max = -1, sum = 0;

    MPI_Init(&argc, &argv);
    MPI_Comm_size(MPI_COMM_WORLD, &numProcs);
    MPI_Comm_rank(MPI_COMM_WORLD, &myRank);

    square = (myRank+1) * (myRank+1);

    printf("Process %d computed %d\n", myRank, square);

    MPI_Reduce(&square, &sum, 1, MPI_INT, MPI_SUM, 0, MPI_COMM_WORLD);

    MPI_Reduce(&square, &max, 1, MPI_INT, MPI_MAX, 0, MPI_COMM_WORLD);

    if (myRank == 0) {
        printf("\nThe sum of the squares is %d\n\n", sum);
        printf("The max of the squares is %d\n\n", max);
    }

    MPI_Finalize();

    return 0;
}

13. Collective Communication: Reduction

file: patternlets/MPI/13.reduction2/reduction2.c

Build inside 13.reduction2 directory:

make reduction2

Execute on the command line inside 13.reduction2 directory:

mpirun -np <number of processes> ./reduction2

Here is a second reduction example using arrays of data.

To do:

Can you explain the reduction, MPI_reduce, in terms of srcArr and destArr?

 1
 2
 3
 4
 5
 6
 7
 8
 9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
/* reduction2.c
 * ... illustrates the use of MPI_Reduce() using arrays...
 * Joel Adams, Calvin College, January 2015.
 *
 * Usage: mpirun -np 4 ./reduction2
 *
 * Exercise:
 * - Compile and run, comparing output to source code.
 * - Uncomment the 'commented out' call to printArray.
 * - Save, recompile, rerun, comparing output to source code.
 * - Explain behavior of MPI_Reduce() in terms of
 *     srcArr and destArr.
 */

#include <mpi.h>
#include <stdio.h>

#define ARRAY_SIZE 5

void printArray(int id, char* arrayName, int* array, int SIZE);

int main(int argc, char** argv) {
    int myRank = -1;
    int srcArr[ARRAY_SIZE] = {0};
    int destArr[ARRAY_SIZE] = {0};

    MPI_Init(&argc, &argv);
    MPI_Comm_rank(MPI_COMM_WORLD, &myRank);

    if (myRank == 0) {
        printf("\nBefore reduction: ");
        printArray(myRank, "destArr", destArr, ARRAY_SIZE);
    }

    for (unsigned i = 0; i < ARRAY_SIZE; i++) {
        srcArr[i] = myRank * i;
    }

   printArray(myRank, "srcArr", srcArr, ARRAY_SIZE);

    MPI_Reduce(srcArr, destArr, ARRAY_SIZE, MPI_INT, MPI_SUM, 0, MPI_COMM_WORLD);

    if (myRank == 0) {
        printf("\nAfter reduction:  ");
        printArray(myRank, "destArr", destArr, ARRAY_SIZE);
        printf("\n");
    }

    MPI_Finalize();

    return 0;
}

/* utility to display an array
 * params: id, the rank of the current process
 *         arrayName, the name of the array being displayed
 *         array, the array being displayed
 *         SIZE, the number of items in array.
 * postcondition:
 *         the id, name, and items in array have been printed to stdout.
 */
void printArray(int id, char* arrayName, int * array, int SIZE) {
    printf("Process %d, %s: [", id, arrayName);
    for (int i = 0; i < SIZE; i++) {
        printf("%3d", array[i]);
        if (i < SIZE-1) printf(",");
    }
    printf("]\n");
}

Further Exploration:

This useful MPI tutorial explains other reduction operations that can be performed. You could use the above code or the previous examples to experiment with some of these.

14. Collective communication: Scatter for message-passing data decomposition

file: patternlets/MPI/14.scatter/scatter.c

Build inside 14.scatter directory:

make scatter

Execute on the command line inside 14.scatter directory:

mpirun -np <number of processes> ./scatter

If processes can independently work on portions of a larger data array using the geometric data decomposition pattern, the scatter pattern can be used to ensure that each process receives a copy of its portion of the array. Process 0 gets the first chunk, process 1 gets the second chunk and so on until the entire array has been distributed.

../_images/Scatter.png

To do:

What previous data decomposition pattern is this similar to?

 1
 2
 3
 4
 5
 6
 7
 8
 9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
/* scatter.c
 * ... illustrates the use of MPI_Scatter()...
 * Joel Adams, Calvin College, November 2009.
 *
 * Usage: mpirun -np N ./scatter
 *
 * Exercise:
 * - Compile and run, varying N: 1, 2, 4, 8
 * - Trace execution through source code.
 * - Explain behavior/effect of MPI_Scatter().
 */

#include <mpi.h>      // MPI
#include <stdio.h>    // printf(), etc.
#include <stdlib.h>   // malloc()

void print(int id, char* arrName, int* arr, int arrSize);

int main(int argc, char** argv) {
    const int MAX = 8;
    int* arrSend = NULL;
    int* arrRcv = NULL;
    int numProcs = -1, myRank = -1, numSent = -1;

    MPI_Init(&argc, &argv);                            // initialize
    MPI_Comm_size(MPI_COMM_WORLD, &numProcs);
    MPI_Comm_rank(MPI_COMM_WORLD, &myRank);

    if (myRank == 0) {                                 // master process:
        arrSend = (int*) malloc( MAX * sizeof(int) );  //  allocate array1
        for (int i = 0; i < MAX; i++) {                //  load with values
            arrSend[i] = (i+1) * 11;
        }
        print(myRank, "arrSend", arrSend, MAX);        //  display array1
    }
     
    numSent = MAX / numProcs;                          // all processes:
    arrRcv = (int*) malloc( numSent * sizeof(int) );   //  allocate array2

    MPI_Scatter(arrSend, numSent, MPI_INT, arrRcv,     //  scatter array1 
                 numSent, MPI_INT, 0, MPI_COMM_WORLD); //   into array2

    print(myRank, "arrRcv", arrRcv, numSent);          // display array2

    free(arrSend);                                     // clean up
    free(arrRcv);
    MPI_Finalize();
    return 0;
}

void print(int id, char* arrName, int* arr, int arrSize) {
    printf("Process %d, %s: ", id, arrName);
    for (int i = 0; i < arrSize; i++) {
        printf(" %d", arr[i]);
    }
    printf("\n");
}

15. Collective communication: Gather for message-passing data decomposition

file: patternlets/MPI/15.gather/gather.c

Build inside 15.gather directory:

make gather

Execute on the command line inside 15.gather directory:

mpirun -np <number of processes> ./gather

If processes can independently work on portions of a larger data array using the geometric data decomposition pattern, the gather pattern can be used to ensure that each process sends a copy of its portion of the array back to the root, or master process. Thus, gather is the reverse of scatter. Here is the idea:

../_images/Gather.png

To do:

Find documentation for the MPI function MPI_Gather. Make sure that you know what each parameter is for. Why are the second and fourth parameters in our example both SIZE? Can you explain what this means in terms of MPI_Gather?

 1
 2
 3
 4
 5
 6
 7
 8
 9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
/* gather.c
 * ... illustrates the use of MPI_Gather()...
 * Joel Adams, Calvin College, November 2009.
 *
 * Usage: mpirun -np N ./gather
 *
 * Exercise:
 * - Compile and run, varying N: 1, 2, 4, 8.
 * - Trace execution through source.
 * - Explain behavior of MPI_Gather().
 */

#include <mpi.h>       // MPI
#include <stdio.h>     // printf()
#include <stdlib.h>    // malloc()

void print(int id, char* arrName, int* arr, int arrSize);

#define SIZE 3

int main(int argc, char** argv) {
   int  computeArray[SIZE];                          // array1
   int* gatherArray = NULL;                          // array2
   int  numProcs = -1, myRank = -1,
        totalGatheredVals = -1;

   MPI_Init(&argc, &argv);                           // initialize
   MPI_Comm_size(MPI_COMM_WORLD, &numProcs);
   MPI_Comm_rank(MPI_COMM_WORLD, &myRank);
                                                     // all processes:
   for (int i = 0; i < SIZE; i++) {                  //  load array1 with
      computeArray[i] = myRank * 10 + i;             //   3 distinct values
   }

   print(myRank, "computeArray", computeArray,       //  show array1
           SIZE);

   if (myRank == 0) {                                // master:
      totalGatheredVals = SIZE * numProcs;           //  allocate array2
      gatherArray = (int*) malloc( totalGatheredVals * sizeof(int) );
   }

   MPI_Gather(computeArray, SIZE, MPI_INT,           //  gather array1 vals
               gatherArray, SIZE, MPI_INT,           //   into array2
               0, MPI_COMM_WORLD);                   //   at master process

   if (myRank == 0) {                                // master process:
      print(myRank, "gatherArray",                   //  show array2
             gatherArray, totalGatheredVals);
      free(gatherArray);                             // clean up
   }


   MPI_Finalize();
   return 0;
}

void print(int id, char* arrName, int* arr, int arrSize) {
    printf("Process %d, %s: ", id, arrName);
    for (int i = 0; i < arrSize; i++) {
        printf(" %d", arr[i]);
    }
    printf("\n");
}