Getting Started with Phoenix++

Introduction

In this section, we will discuss the Phoenix++ wordcount example in detail. You will need a basic knowledge of C/C++ to follow along with these concepts. You can download a copy of the Phoenix++ word count example here. We will start by looking at the file word_count.cpp. At the top of the file, there are three structs we should pay attention to:

 1
 2
 3
 4
 5
 6
 7
 8
 9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
// a passage from the text. The input data to the Map-Reduce
struct wc_string {
    char* data;
    uint64_t len;
};

// a single null-terminated word
struct wc_word {
    char* data;

    // necessary functions to use this as a key
    bool operator<(wc_word const& other) const {
        return strcmp(data, other.data) < 0;
    }
    bool operator==(wc_word const& other) const {
        return strcmp(data, other.data) == 0;
    }
};


// a hash for the word
struct wc_word_hash
{
    // FNV-1a hash for 64 bits
    size_t operator()(wc_word const& key) const
    {
        char* h = key.data;
        uint64_t v = 14695981039346656037ULL;
        while (*h != 0)
            v = (v ^ (size_t)(*(h++))) * 1099511628211ULL;
        return v;
    }
};

These three structs define the type of our input chunk (wc_string), our keys (wc_word) and the hash function that we are going to use to aggregate common keys (wc_word_hash). The wc_string struct has a data pointer field (which points to to the start of the chunk), and a len field which indicates the size of the chunk. The wc_word struct contains only a pointer to the start of a word, along with the defintions on how to compare two “words”. At this point, you may be asking yourself, but, how do you know where a word ends? Be patient; when we get to the main body of code, it will all become clear. The last struct contains only an operator definition for (), which requires a key of type wc_count as its single parameter. This is an implementation of the Fowler-Noll-Vo hash function.

While other hash functions can be used, it is best just to leave this code alone.

WordsMR: The word count class

For every application you write using Phoenix++, you will need to define a class for it. Let’s start by taking a look at the class header:

class WordsMR : public MapReduceSort<WordsMR, wc_string, wc_word, uint64_t,
                                     hash_container<wc_word, uint64_t, sum_combiner, wc_word_hash>
                                     >

The first thing to note about this definition is that WordsMR is derived from class MapReduceSort, which is defined in mapreduce.h. This is the primary beauty of Phoenix++; to write your own MapReduce programs, you simply overload the default function defined in the base class. We define the class with the following parameters (in order):

  • the implemented class type (Impl = WordsMR)
  • the input data type (D = wc_string)
  • the key type (K = wc_word)
  • the value type (V = uint_64)
  • the definition of the hash container (hash_container<...>)

hash_container defines the parameters for the hashtable used to aggregate (key, value) pairs. A full definition of the hash_container class can be found in container.h. It’s input parameters are:

  • the key type (K = wc_word)
  • the value type (V = uint_64)
  • the type of combiner function (Combiner = sum_combiner)
  • The hash function to use (hash = wc_word_hash)

Note the use of the combiner sum_combiner, an associative combiner implemented in Phoenix++. This means, as collisions in our hash table occur, the values are added together. This actually eliminates the need for a reduce function in our application! The other type of combiner is known as the buffer_combiner, and reflects typical MapReduce behavior. This combiner chains all the values together. The functions shown below are ones that are commonly overloaded when creating a Phoenix++ MapReduce class:

Function Name Description
map() Defines the functionality for map tasks. The default definition of the map() function is empty. This function must be overloaded for every user-defined MapReduce class.
reduce() Defines the functionality for the reduce tasks. By default, the reduce() function generates a list of (key,value) pairs from a given key and list(value) input.
split() Defines how input data should be chunked. The default split() function returns 0. This function must be overloaded for every user-defined MapReduce class.
locate() Indicates where to access the input data from. By default, the locate() function casts the input data as a void pointer.

For the word count application, the map(), locate() and split() functions are overloaded as public methods.

The class declares a number of global variables, that will be initialized by user input:

char* data;
uint64_t data_size;
uint64_t chunk_size;
uint64_t splitter_pos;

We can see these values getting initialized in the constructor below.

explicit WordsMR(char* _data, uint64_t length, uint64_t _chunk_size) :
         data(_data), data_size(length), chunk_size(_chunk_size),
         splitter_pos(0) {}

The locate() function

The first function declared in the public scope of the class is locate():

1
2
3
4
    void* locate(data_type* str, uint64_t len) const
    {
        return str->data;
    }

The locate() function takes two parameters: a pointer to the input data type (data_type*), and a length (len). It returns a pointer to the start of readable data. In this case, it will be the data field of the wc_string struct, which is our input data type.

The map() function

The map() function is declared next. Its two parameters is the input data type (passed by reference), and the ouput data container (out). Remember, our input data type is wc_string. Remember that wc_string is a struct with two fields: data and len. The input represents a “chunk” of data which we want to parse words from and emit (key,value) pairs associating each word with the count of 1:

 1
 2
 3
 4
 5
 6
 7
 8
 9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
    void map(data_type const& s, map_container& out) const
    {
        for (uint64_t i = 0; i < s.len; i++)
        {
            s.data[i] = toupper(s.data[i]);
        }

        uint64_t i = 0;
        while(i < s.len)
        {            
            while(i < s.len && (s.data[i] < 'A' || s.data[i] > 'Z'))
                i++;
            uint64_t start = i;
            while(i < s.len && ((s.data[i] >= 'A' && s.data[i] <= 'Z') || s.data[i] == '\''))
                i++;
            if(i > start)
            {
                s.data[i] = 0;
                wc_word word = { s.data+start };
                emit_intermediate(out, word, 1);
            }
        }
    }

The first four lines of the function converts every character in the input chunk to uppercase. The function toupper is declared in the standard header file cctype.h. This is to ensure the word count application ignores case as it counts words.

The while code block contains the majority of the work . It contains two inner while loops and an if statement.

  • The first inner while loop determines the starting point of the word. It skips any characters that are outside the ASCII range of A to Z. As soon as it exceeds the length of the string or hits an alphabetic character, it stops incrementing i. This value of i is designated as the start of the word (start = i).
  • The second inner while loop determines the end of a word. It keeps incrementing i, so long as the current character is alphabetic, and the value of i is less than the length of the input chunk. The variable i stops incrementing once we hit a non-alphebetic character or the end of the input chunk.

The next few line of code is crucial to understanding how the word count applicaton works:

s.data[i] = 0;
wc_word word = { s.data+start };
emit_intermediate(out, word, 1);

Recall that 0 is the integer value of \0, the null terminator, which indicates where a string should be terminated. We define our key data (wc_word word) to be a pointer to our input chunk, at offset start. Next, we emit the word key with the value 1.

The above process repeats until all the valid words in the input chunk are consumed.

The split() function

The split() function tokenizes the input prior to its going to the map() function:

 1
 2
 3
 4
 5
 6
 7
 8
 9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
    /** wordcount split()
     *  Memory map the file and divide file on a word border i.e. a space.
     */
    int split(wc_string& out)
    {
        /* End of data reached, return FALSE. */
        if ((uint64_t)splitter_pos >= data_size)
        {
            return 0;
        }

        /* Determine the nominal end point. */
        uint64_t end = std::min(splitter_pos + chunk_size, data_size);

        /* Move end point to next word break */
        while(end < data_size && 
            data[end] != ' ' && data[end] != '\t' &&
            data[end] != '\r' && data[end] != '\n')
            end++;

        /* Set the start of the next data. */
        out.data = data + splitter_pos;
        out.len = end - splitter_pos;
        
        splitter_pos = end;

        /* Return true since the out data is valid. */
        return 1;
    }

    bool sort(keyval const& a, keyval const& b) const
    {
        return a.val < b.val || (a.val == b.val && strcmp(a.key.data, b.key.data) > 0);
    }
};

Keep in mind while reading this code that the input file is in shared memory. The function takes a single parameter, a reference to a wc_string object, which is our input data type. Recall that the variables splitter_pos and data_size are global. The variable splitter_pos tells us where we are currently in the file. The variable data_size represents the size of the entire file. The variable chunk_size represents the size of each chunk.

The first if statement simply ensures that our current position does not exceed the bounds of the file. If so, we exit the function by returning 0 (indicating failure, and that there is nothing more to split).

We want each chunk to be approximately the same. We first determine a “nominal” end-point, or position to “chunk” the data:

uint64_t end = std::min(splitter_pos + chunk_size, data_size);

Obviously, this end-point won’t always work. What if we land in the middle of a word? Therefore, we want to increment end until we hit a natural word boundry. The split() function declares this boundry as being either a space, tab, return carriage, or new line character. This is achieved by the following code:

while(end < data_size &&
          data[end] != ' ' && data[end] != '\t' &&
          data[end] != '\r' && data[end] != '\n')
          end++;

Once we determine a valid end-point, we populate the inputted wc_string object:

out.data = data + splitter_pos;
out.len = end - splitter_pos;

The starting point is set to data pointer plus the starting value of splitter_pos. The length is determined by subtracting end from splitter_pos.

Finally, we update splitter_pos to be the end point (end), and return 1 to indicate that we were able to successfully split the input.

The main() function

A simplified version of the main() function is shown below. This version only shows the non memory mapped code. The memory mapped version of the code can be viewed here. We also remove timing code for simplicity. The CHECK_ERROR function is defined in the file stddefines.h and is a useful wrapper for error handling.

 1
 2
 3
 4
 5
 6
 7
 8
 9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
int main(int argc, char *argv[]) 
{
    int fd;
    char * fdata;
    unsigned int disp_num;
    struct stat finfo;
    char * fname, * disp_num_str;
    struct timespec begin, end;

    get_time (begin);

    // Make sure a filename is specified
    if (argv[1] == NULL)
    {
        printf("USAGE: %s <filename> [Top # of results to display]\n", argv[0]);
        exit(1);
    }

    fname = argv[1];
    disp_num_str = argv[2];

    printf("Wordcount: Running...\n");

    // Read in the file
    CHECK_ERROR((fd = open(fname, O_RDONLY)) < 0);
    // Get the file info (for file length)
    CHECK_ERROR(fstat(fd, &finfo) < 0);

    uint64_t r = 0;

    fdata = (char *)malloc (finfo.st_size);
    CHECK_ERROR (fdata == NULL);
    while(r < (uint64_t)finfo.st_size)
        r += pread (fd, fdata + r, finfo.st_size, r);
    CHECK_ERROR (r != (uint64_t)finfo.st_size);

    
    // Get the number of results to display
    CHECK_ERROR((disp_num = (disp_num_str == NULL) ? 
      DEFAULT_DISP_NUM : atoi(disp_num_str)) <= 0);

    get_time (end);

    #ifdef TIMING
    print_time("initialize", begin, end);
    #endif

    printf("Wordcount: Calling MapReduce Scheduler Wordcount\n");
    get_time (begin);
    std::vector<WordsMR::keyval> result;    
    WordsMR mapReduce(fdata, finfo.st_size, 1024*1024);
    CHECK_ERROR( mapReduce.run(result) < 0);
    get_time (end);

    #ifdef TIMING
    print_time("library", begin, end);
    #endif
    printf("Wordcount: MapReduce Completed\n");

    get_time (begin);

    unsigned int dn = std::min(disp_num, (unsigned int)result.size());
    printf("\nWordcount: Results (TOP %d of %lu):\n", dn, result.size());
    uint64_t total = 0;
    for (size_t i = 0; i < dn; i++)
    {
        printf("%15s - %lu\n", result[result.size()-1-i].key.data, result[result.size()-1-i].val);
    }

    for(size_t i = 0; i < result.size(); i++)
    {
        total += result[i].val;
    }

    printf("Total: %lu\n", total);

    free (fdata);

    CHECK_ERROR(close(fd) < 0);

    get_time (end);

    #ifdef TIMING
    print_time("finalize", begin, end);
    #endif

    return 0;
}

We analyze this code in parts:

In the first part, we are simply setting up variables. Users running the program are required to input a file, and may choose to specify the top number of results to display. Variables are declared to facillitate file reading and command line parsing:

int fd;
char * fdata;
unsigned int disp_num;
struct stat finfo;
char * fname, * disp_num_str;
// Make sure a filename is specified
if (argv[1] == NULL)
 {
       printf("USAGE: %s  [Top # of results to display]\n", argv[0]);
        exit(1);
 }
fname = argv[1];
disp_num_str = argv[2];

We next open the file for reading, and get its size using the fstat function. We malloc() a block of memory, and have the descriptor fdata point to it. Next, we read the file into memory using the pread() system call. We also check to see if the user inputted the optional parameter that sets the maximum number of entries to display. If so, we update the variable DEFAULT_DISP_NUM to reflect this amount:

uint64_t r = 0;
fdata = (char *)malloc (finfo.st_size);
CHECK_ERROR (fdata == NULL);
while(r < (uint64_t)finfo.st_size)
       r += pread (fd, fdata + r, finfo.st_size, r);
CHECK_ERROR (r != (uint64_t)finfo.st_size);
// Get the number of results to display
CHECK_ERROR((disp_num = (disp_num_str == NULL) ?
             DEFAULT_DISP_NUM : atoi(disp_num_str)) <= 0);

Now the magic happens: we run our MapReduce job. This is easily accomplished in three lines. We first instantiate a result vector. We instantiate a mapreduce job with the line:

WordsMR mapReduce(fdata, finfo.st_size, 1024*1024);

Here, fdata will bind to the data pointer in WordsMR, finfo.st_size will bind to data_size and chunk_size wil be set to the quantity 1024*1024. The following line just ensure the result array is non empty:

CHECK_ERROR( mapReduce.run(result) < 0);

The final part of the code prints out the top DEFAULT_DISP_NUM entries, sorted in order of greatest to least count. Since the output of the MapReduce task is in sorted descending order, it suffices just to print the first DEFAULT_DISP_NUM values. A second loop counts the total number of words found:

unsigned int dn = std::min(disp_num, (unsigned int)result.size());
printf("\nWordcount: Results (TOP %d of %lu):\n", dn, result.size());
uint64_t total = 0;
for (size_t i = 0; i < dn; i++)
{
     printf("%15s - %lu\n", result[result.size()-1-i].key.data, result[result.size()-1-i].val);
}
for (size_t i = 0; i < result.size(); i++)
{
        total += result[i].val;
}
printf("Total: %lu\n", total);

Finally, the fdata pointer is freed and we end the program:

free (fdata);
CHECK_ERROR(close(fd) < 0);
return 0;

Running the Code

We prepared a simplified version of the word count program, in this archive called phoenix++-wc.tar.gz, which shows what a standalone Phoenix++ application looks like. Alternatively, you can access the official Phoenix++ release at this link. The following instructions assume that you downloaded the phoenix++-wc.tar.gz file.

After downloading the file, untar it with the following command:

tar -xzvf phoenix++-wc.tar.gz

Let’s look at this folder’s directory structure:

|-- data
|   |-- dickens.txt
|   |-- sherlock.txt
|-- Defines.mk
|-- docs
|   |-- 2011.phoenixplus.mapreduce.pdf
|-- include
|   |-- atomic.h
|   |-- combiner.h
|   |-- container.h
|   |-- locality.h
|   |-- map_reduce.h
|   |-- processor.h
|   |-- scheduler.h
|   |-- stddefines.h
|   |-- synch.h
|   |-- task_queue.h
|   |-- thread_pool.h
|-- lib
|-- Makefile
|-- README
|-- src
|   |-- Makefile
|   |-- task_queue.cpp
|   |-- thread_pool.cpp
|-- word_count
    |-- Makefile
    |-- README
    |-- word_count.cpp

The folder data contains some sample data files for you to play with. The file Defines.mk contains many of the compiler flags and other directives needed to compile our code. The docs folder contains the Phoenix++ paper that you can read. The include folder contains all the header files we need for our Phoenix++ application. The lib directory is currently empty; once we compile our code, it will contain the phoenix++ library file, libphoenix.a. The src folder contains the code needed to make the Phoenix++ library file. Lastly, our word count application is located in the directory word_count.

To compile the application, run the make command in the main Phoenix++-wc
directory:
make

Let’s run the application on the file dickens.txt. This file is 21MB, and contains the collective works of Charles Dickens. Run the application with the following command:

time -p ./word_count/word_count data/dickens.txt

This will show you the top 10 most frequent words detected in dickens.txt. To see detailed timing information, uncomment the line #define TIMING in include/stddefines.h.

Below you will find a series of exercises to explore the example further. Happy analyzing!

Exercises

Let’s explore the word_count.cpp file a bit futher by modifying it slightly. Remember, every time you change this file, you must recompile your code using the make command!

  • Run the word count program to print the top 20 words, the top 50 words, and the top 100 words. How does the run-time change?
  • Use the setenv command to set the MR_NUMTHREADS environmental variable to a user inutted number of threads in main.cpp. Check the setenv documentation for more details.
  • Check the number of CPU cores on your machine by checking /proc/cpuinfo. Vary the number of threads from 1...c where c is the number of CPU cores. Using sherlock.txt create files from sizes 100MB ... 500MB in increments of 100MB. Calculate the speedup and the efficiency for each dataset on each core combination. Plot your results using Matplotlib.
  • Most of the words that have been showing up are common short words, such as “and”, “the”, “of”, “in”, and “a”. Modify the map() function to only print out words that are five characters or longer. What are the top ten words now? How does Charles Dickens’ top 10 words differ from Arthur Conan Doyle’s?
  • Challenge: The words that are showing up are still those that largely reflect the grammar of an author’s writing. These are known as function words. Modify the map() function to exclude any words that are function words. A list of 321 common function words can be found at this link.