The following subsections include the word count mapper and reducer implementations for WMR in several languages:
Word count for WMR in Scheme language (spec is found on Wmr_scm.pdf)
(define helper
(lambda (lis) ; a list of strings
(cond
((null? lis) #t)
;; at least one element in lis
(else (wmr-emit (car lis) "1")
(helper (cdr lis))))))
(define mapper
(lambda (key value)
(helper (wmr-split key))))
(define loop
(lambda (key iter ct) ; a key,value iterator in its current state, and
; subtotal so far
(cond
((iter 'has-next)
(loop key iter (+ ct (string->number (iter 'get-next)))))
;; assert -- no more input values to add
(else (wmr-emit key (number->string ct))))))
(define reducer
(lambda (key iter)
(loop key iter 0)))
Note
For this WMR interface for Scheme (see spec for details):
Word count for WMR in C++ language (C++ style iterators, spec is found on Wmr_cpp.pdf)
class Mapper
{
public:
void mapper(string key, string value)
{
char delim = ' ';
vector splits = Wmr::split(key, delim);
for (unsigned int i = 0; i < splits.size(); ++i)
{
Wmr::emit(splits.at(i), "1");
}
}
};
class Reducer
{
public:
void reducer(string key, WmrIterator iter)
{
long count = 0;
while (iter != WmrIterator::end())
{
count += Wmr::strToLong(*iter++);
}
Wmr::emit(key, Wmr::longToStr(count));
}
};
Note
for this WMR interface for C++ (see spec for details):
Word count for WMR in Java language (Java style iterators, spec is found on Wmr_java.pdf)
/* Mapper for word count */
class Mapper {
public void mapper(String key, String value) {
String words[] = key.split(" ");
int i = 0;
for (i = 0; i < words.length; i++)
Wmr.emit(words[i], "1");
}
}
/* Reducer for word count */
class Reducer {
public void reducer(String key, WmrIterator iter) {
int sum = 0;
while (iter.hasNext()) {
sum += Integer.parseInt(iter.next());
}
Wmr.emit(key, Integer.valueOf(sum).toString());
}
}
Note
for this WMR interface for Java (see spec for details):
Word count for WMR in Python3 language (Python3 style iterators, spec is found on Wmr_jpy3.pdf)
def mapper(key, value):
words=key.split()
for word in words:
Wmr.emit(word, '1')
def reducer(key, iter):
sum = 0
for s in iter:
sum = sum + int(s)
Wmr.emit(key, str(sum))
Note
Notes for this WMR interface for Python3 (see spec for details):
For comparison, here is an implementation of word count mapper and reducer for Java using Hadoop map-reduce directly, without using WMR.
// Java WordCount for Hadoop
// Based on Hadoop documentation
package wc;
import java.io.IOException;
import java.util.*;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.conf.*;
import org.apache.hadoop.io.*;
import org.apache.hadoop.mapred.*;
import org.apache.hadoop.util.*;
public class WordCount {
public static void main(String[] args) throws Exception {
JobConf conf = new JobConf(WordCount.class);
conf.setJobName("WordCount");
conf.setOutputKeyClass(Text.class);
conf.setOutputValueClass(IntWritable.class);
conf.setMapperClass(Map.class);
conf.setCombinerClass(Reduce.class);
conf.setReducerClass(Reduce.class);
conf.setInputFormat(TextInputFormat.class);
conf.setOutputFormat(TextOutputFormat.class);
FileInputFormat.setInputPaths(conf, new Path(args[0]));
FileOutputFormat.setOutputPath(conf, new Path(args[1]));
JobClient.runJob(conf);
}
public static class Map extends MapReduceBase
implements Mapper {
private final static IntWritable one = new IntWritable(1);
private Text word = new Text();
public void map(LongWritable key, Text value,
OutputCollector output,
Reporter reporter) throws IOException {
String line = value.toString();
StringTokenizer tokenizer = new StringTokenizer(line);
while (tokenizer.hasMoreTokens()) {
word.set(tokenizer.nextToken());
output.collect(word, one);
}
}
}
public static class Reduce extends MapReduceBase
implements Reducer {
public void reduce(Text key, Iterator values, OutputCollector output, Reporter reporter) throws IOException {
int sum = 0;
while (values.hasNext()) {
sum += values.next().get();
}
output.collect(key, new IntWritable(sum));
}
}
}