Patterns used when threads share data values

10. Shared Data Algorithm Strategy: Parallel-for-loop pattern needs non-shared, private variables

file: Vath_pth/10.private/private.C

Build inside 10.private directory:

make private

Execute on the command line inside 09.private directory:

./private

In this example, you will try a parallel for loop where variables (beg, end in the code) cannot be shared by all of the threads, but must instead be private to each thread, which means that each thread has its own copy of that variable. In this case, the outer loop is being split into chunks and given to each thread, but the inner loop is being executed by each thread for each of the elements in its chunk. The beginning and end chunk variables must be maintained separately by each thread. Because they were initially declared outside the thread function at the beginning of the program, by default these variables are shared by all the threads.

 1
 2
 3
 4
 5
 6
 7
 8
 9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
/* private.C
 *
 * Using Victor Alessandrini's vath_pth library.
 *  ... illustrates why private variables are needed in parallel for loop
 *
 * Modeled from code provided by Joel Adams, Calvin College, November 2009.
 * Hannah Sonsalla, Macalester College, 2017.
 *
 * Usage: ./private.C
 *
 * Exercise:
 * - Run, noting that the sequential program produces correct results
 * - Comment out line A and uncomment section B in Main(), recompile/run and compare,
 *   program produces incorrect results
 * - Comment out line C and uncomment section D, recompile/run and compare,
 *   program produces correct results
 *
 */

#include <stdlib.h>
#include <stdio.h>
#include <SPool.h>
#include <pthread.h>

using namespace std;
SPool TH(4);
#define SIZE 100

int m[SIZE][SIZE];
int beg = 0, end = SIZE;     // Line C


void thread_fct(void *idp)  {

    /*
    int beg, end;
    beg = 0;               // Section D
    end = SIZE;
    */

    TH.ThreadRange(beg, end);

    for(int i=beg; i<end; i++)  {
        for(int j=0; j<SIZE; j++) {
            m[i][j] = 1;
        }
    }
}

void sequentialArrayFill(int n) {
    for (int i = 0; i < n; i++) {
        for (int j = 0; j < n; j++) {
            m[i][j] = 1;
        }
    }
}

int main(int argc, char **argv)  {
    int ok = 1;

    sequentialArrayFill(SIZE);       // Line A

    /*
    TH.Dispatch(thread_fct, NULL);    // Section B
    TH.WaitForIdle();
    */

    // test (without using threads)
    for(int i=0; i<SIZE; i++) {
        for(int j=0; j<SIZE; j++) {
            if ( m[i][j] != 1 ) {
                printf("Element [%d,%d] not set... \n", i, j);
                ok = 0;
            }
        }
    }

    if ( ok ) {
        printf("\nAll elements correctly set to 1\n\n");
    }

    return 0;
}

11. Race Condition: missing the mutual exclusion coordination pattern

file: Vath_pth/11.raceCondition/raceCondition.C

Build inside 11.raceCondition directory:

make raceCondition

Execute on the command line inside 11.raceCondition directory:

./raceCondition

When a variable must be shared by all the threads, as in this example below, an issue called a race condition can occur when the threads are updating that variable concurrently. This happens because there are multiple underlying machine instructions needed to complete the update of the memory location and each thread must execute all of them atomically before another thread does so, thus ensuring mutual exclusion between the threads when updating a shared variable.

Atomic operations are lock-free algorithms that attempt to go ahead and run the program with threads executing in parallel. If a race condition occurs, it is necessary to start over. Note that atomic operations may perform redundant work. In contrast, reduction ensures mutual exclusion and is considered pessimistic. Since a race condition could possibly happen, reduction makes sure it never happens by using mutex locks. In Pthreads, there are no atomic services so we will stick with lock reduction.

 1
 2
 3
 4
 5
 6
 7
 8
 9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
/* raceCondition.C
 *
 * Using Victor Alessandrini's vath_pth library.
 * ... illustrates a race condition when multiple threads read from and
 *     write to a shared variable.
 *
 * Modeled from code provided by Joel Adams, Calvin College, November 2009.
 * Hannah Sonsalla, Macalester College, 2017.
 *
 * Usage: ./raceCondition [numThreads]
 *
 * Exercise:
 *   - Compile and run 10 times; note that the sequential version always
 *     produces the correct balance: $1,000,000.00
 *   - To make parallel, comment out line A and uncomment section B,
 *     recompile and rerun multiple times, compare results
 *   - To fix parallel version, comment out line C, uncomment line D,
 *     change balance to RD.Data() in print statement,
 *     recompile and rerun, compare results
 */

#include <stdlib.h>
#include <stdio.h>
#include <SPool.h>
#include <pthread.h>
#include <Reduction.h>  // reduction

// global variables
SPool *TH;
int numThreads;
double balance = 0.0;
Reduction<double> RD;   // Reduction: accumulator of doubles
const int REPS = 1000000;

// -------------------
// Worker threads code
// -------------------
void thread_fct(void *idp)  {

    int beg, end;
    beg = 0;
    end = REPS;
    TH->ThreadRange(beg, end);

    for (int i = beg; i < end; i++)  {
        balance += 1.0;                    // C
        //RD.Accumulate(1.0);              // D
    }
}

// -------------------
// Sequential balance
// -------------------

void seqBalance(int n){
    for (int i = 0; i <n; i++) {
        balance += 1.0;
    }
}

int main(int argc, char **argv)  {

    if(argc==2) numThreads = atoi(argv[1]);
    else numThreads = 4;

    seqBalance(REPS);                         //A

    /*
    TH = new SPool(numThreads);
    TH->Dispatch(thread_fct, NULL);    // Section B
    TH->WaitForIdle();
    delete TH;
    */

    printf("\n After %d $1 deposits, your balance is $%0.2f\n\n", REPS, balance);
    return 0;
}

12. Mutual Exclusion Coordination Pattern: language difference

file: Vath_pth/12.languageDiff/languageDiff.C

Build inside 12.languageDiff:

make languageDiff

Execute on the command line inside 12.languageDiff directory:

./languageDiff

The following is a C++ code example to illustrate some language differences between C and C++.

C: printf is a single function and is performed atomically

C++: cout << << endl may have many different function calls so the outputs will be interleaved

A solution to the mixed output would be to implement a thread safe cout class which uses critical sections and locks to give each thread exclusive access to stdout. We will not look further into this. Note: The Reduction utility class actually does this. Try the exercises described in the code below.

 1
 2
 3
 4
 5
 6
 7
 8
 9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
/*
 * languageDiff.c
 *
 * Using Victor Alessandrini's vath_pth library.
 * ... langugage difference between C and C++
 *
 * Modeled from code provided by Joel Adams, Calvin College, November 2009.
 * Hannah Sonsalla, Macalester College, 2017.
 *
 * Usage: ./languageDiff [numThreads]
 *
 * Exercise:
 *  - Compile, run, note resulting output is correct.
 *  - Uncomment section A and comment out line B, recompile, rerun, note results.
 *
 */
#include <stdlib.h>
#include <stdio.h>
#include <SPool.h>
#include <pthread.h>
#include <iostream>   // cout

SPool *TH;
int numThreads;

using namespace std;

// -------------------
// Worker threads code
// -------------------
void thread_fct(void *idp)  {

    int rank = TH->GetRank();

    /*
    cout << "Hello from thread #" << rank     // Section A
             << " out of " << numThreads
             << " threads\n";
   */

    printf("Hello from thread #%d of %d\n", rank, numThreads);  // Line B
}

int main(int argc, char** argv) {
    cout << "\n";

    if(argc==2) numThreads = atoi(argv[1]);
    else numThreads = 4;

    TH = new SPool(numThreads);
    TH->Dispatch(thread_fct, NULL);
    TH->WaitForIdle();
    delete TH;

    cout << "\n";
    return 0;
}