WebMapReduce in various languages

The following subsections include the word count mapper and reducer implementations for WMR in several languages:

Scheme

Word count for WMR in Scheme language (spec is found on Wmr_scm.pdf)

mapper

(define helper
  (lambda (lis) ; a list of strings
    (cond 
      ((null? lis) #t)
      ;; at least one element in lis
      (else (wmr-emit (car lis) "1")
            (helper (cdr lis))))))

(define mapper
  (lambda (key value)
    (helper (wmr-split key))))

reducer

(define loop
  (lambda (key iter ct) ; a key,value iterator in its current state, and
                        ; subtotal so far
    (cond 
      ((iter 'has-next) 
       (loop key iter (+ ct (string->number (iter 'get-next)))))
      ;; assert -- no more input values to add
      (else (wmr-emit key (number->string ct))))))

(define reducer
  (lambda (key iter)
    (loop key iter 0)))

Note

For this WMR interface for Scheme (see spec for details):

  • As indicated before, the mapper and reducer in this Scheme interface are functions.
  • String manipulation functions are primitive in Scheme, so a library function split is provided for this interface that allows one to specify delimiters by a regular-expression pattern. Type conversion is provided in Scheme through convenient (though long-named) functions number->string and string->number.
  • We use Scheme-style objects as implemented at St. Olaf for the iterator for a reducer, as described above.

C++

Word count for WMR in C++ language (C++ style iterators, spec is found on Wmr_cpp.pdf)

mapper

class Mapper
{
public:
    void mapper(string key, string value)
    {
        char delim = ' ';
        vector splits = Wmr::split(key, delim);
        
        for (unsigned int i = 0; i < splits.size(); ++i)
        {
            Wmr::emit(splits.at(i), "1");
        }
    }
};

reducer

class Reducer
{
public:
    void reducer(string key, WmrIterator iter)
    {
        long count = 0;
        while (iter != WmrIterator::end())
        {
            count += Wmr::strToLong(*iter++);
        }
        
        Wmr::emit(key, Wmr::longToStr(count));
    }
};

Note

for this WMR interface for C++ (see spec for details):
  • The mapper and reducer are methods of classes Mapper and Reducer, respectively.
  • Strings are split using the method Wmr::split() of a (predefined) library class Wmr. Rather than splitting on arbitrary regular expressions, the (required) second argument of Wmr::split() is a string of characters, any one of which counts as a delimiter. Type conversion between numbers and strings is not convenient in C++, so helper methods are provided.
  • C++-style iterators are used in the reducer method. In this style of iterator, operator* delivers the current value, operator++ is used to advance to the next value, and the end of an iterator is detected by comparing that iterator for equality with the special iterator value WmrIterator::end.

Java

Word count for WMR in Java language (Java style iterators, spec is found on Wmr_java.pdf)

mapper

/* Mapper for word count */

class Mapper {
  public void mapper(String key, String value) {
    String words[] = key.split(" ");
    int i = 0;
    for (i = 0;  i < words.length;  i++)
      Wmr.emit(words[i], "1");
  }

}

reducer

/* Reducer for word count */

class Reducer {
  public void reducer(String key, WmrIterator iter) {
    int sum = 0;
    while (iter.hasNext()) {
      sum += Integer.parseInt(iter.next());
    }
    Wmr.emit(key, Integer.valueOf(sum).toString());
  }

}

Note

for this WMR interface for Java (see spec for details):

  • The mapper and reducer are again methods of classes Mapper and Reducer, respectively, as for C++.
  • Java provides useful string manipulation methods. Type conversion is provided in the Java libraries, but is inconvenient.
  • Java style iterators are used for the reducer. These have methods hasNext() which returns false when no new values exist in an iterator, and next() which returns the next unseen value and advances that iterator.

Python

Word count for WMR in Python3 language (Python3 style iterators, spec is found on Wmr_jpy3.pdf)

mapper

def mapper(key, value):
    words=key.split()
    for word in words:
        Wmr.emit(word, '1')

reducer

def reducer(key, iter):
    sum = 0
    for s in iter:
        sum = sum + int(s)
    Wmr.emit(key, str(sum))

Note

Notes for this WMR interface for Python3 (see spec for details):

  • The mapper and reducer for this interface are functions, as was the case for Scheme.
  • Python provides many useful string manipulation methods for string objects, as well as convenient type conversion functions int() and str().
  • The reducer uses a Python-style iterator, which may be used conveniently in a for loop construct.

Comparison

For comparison, here is an implementation of word count mapper and reducer for Java using Hadoop map-reduce directly, without using WMR.

// Java WordCount for Hadoop
// Based on Hadoop documentation

package wc;

import java.io.IOException;
import java.util.*;

import org.apache.hadoop.fs.Path;
import org.apache.hadoop.conf.*;
import org.apache.hadoop.io.*;
import org.apache.hadoop.mapred.*;
import org.apache.hadoop.util.*;

public class WordCount {

  public static void main(String[] args) throws Exception {
    JobConf conf = new JobConf(WordCount.class);
    conf.setJobName("WordCount");
    
    conf.setOutputKeyClass(Text.class);
    conf.setOutputValueClass(IntWritable.class);

    conf.setMapperClass(Map.class);
    conf.setCombinerClass(Reduce.class);
    conf.setReducerClass(Reduce.class);

    conf.setInputFormat(TextInputFormat.class);
    conf.setOutputFormat(TextOutputFormat.class);

    FileInputFormat.setInputPaths(conf, new Path(args[0]));
    FileOutputFormat.setOutputPath(conf, new Path(args[1]));
    
    JobClient.runJob(conf);
  }

  public static class Map extends MapReduceBase 
      implements Mapper {
    private final static IntWritable one = new IntWritable(1);
    private Text word = new Text();

    public void map(LongWritable key, Text value, 
                    OutputCollector output, 
                    Reporter reporter) throws IOException {
      String line = value.toString();
      StringTokenizer tokenizer = new StringTokenizer(line);
      while (tokenizer.hasMoreTokens()) {
        word.set(tokenizer.nextToken());
        output.collect(word, one);
      }
    }
  }

  public static class Reduce extends MapReduceBase 
      implements Reducer {
    public void reduce(Text key, Iterator values, OutputCollector output, Reporter reporter) throws IOException {
      int sum = 0;
      while (values.hasNext()) {
        sum += values.next().get();
      }

      output.collect(key, new IntWritable(sum));
    }
  }

}