What is Re-reduce in MongoDB Map-Reduce?

In my previous post on Map-Reduce, we had a look at MongoDB Map-Reduce functionality using a simple sample. In this post, I’m going to explain what is Re-reduce and why it is important to know about Re-reduce when you write your reduce function. I recommend you to go through the previous post before reading this one because I’m going to use the same sample to explain some concepts here.

In Map-Reduce, the map function produces a set of key-value pairs with redundant keys. For example, if we consider the word count sample in the previous post, if we have 25 occurrences of the word “from”, there will be 25 key-value pairs like ({word:from}, {count:1}) emitted from the map phase. After all map tasks are completed, the Map-Reduce framework has to shuffle and sort the key-value pairs produced by all map tasks. This will group all values under a particular key and produce an array of values. According to normal Map-Reduce standard, the reducer will receive a particular key with an array of values which contains all values emitted by the map phase. In other words, reducer will be called only once for a particular key.

// array containing 25 values
reduce("from", [{count:1}, {count:1}, {count:1}, ...])

If we can guarantee that,  we can simply write the reduce function given in the previous post as follows.

function reduce(key, counts) {
    // we assume all values under this key are contained in counts
    return { count:counts.length };
}

However, in MongoDB Map-Reduce we can’t guarantee the above condition. When a particular key contains a large number of values, it will call the reduce function for the same key several times by splitting the set of values into parts. This is called Re-reduce.

Let’s consider the same example of word “from” again in a Re-reduce. Let’s assume MonogoDB executes the reduce function 3 times by selecting a subset of values out of the set of values available before each reduce step. Assume, first the reduce function will be called for 10 {count:1} values and that will return {count:10}.

// array containing 10 values
reduce("from", [{count:1}, {count:1}, {count:1}, ...])

Now for the key “from”, we have 15 {count:1} values and 1 {count:10} value. Then the second reduction will be called on a subset of these 16 values. Assume it’s called for 8 {count:1} values. That will return {count:8}.

// array containing 8 values
reduce("from", [{count:1}, {count:1}, {count:1}, ...])

Finally the third reduction will get an array like [{count:10}, {count:8}, {count:1}, {count:1}, …] which contains the results of previous 2 deductions and the remaining 7 {count:1} values.

// array containing 9 values
reduce("from", [{count:10}, {count:8}, {count:1}, {count:1}, {count:1}, ...])

So the output of the third reduction will be {count:25}. Note that in the above example output of the first reduction has gone into the third reduction. But that is not a must. It might have gone into the second reduction step as well.

Now you can understand the reason why we can’t implement the reduce function as given above (using “counts.length”) if we are using MongoDB Map-Reduce. We always have to keep in mind about the Re-reduce when implementing the reduce function to avoid errors.

Advertisements

MongoDB Map-Reduce Sample

In my previous post we discussed how to write a very simple Java client to read/write data from/to a MongoDB database. In this post, we are going to see how to use inbuilt Map-Reduce functionality in MonogoDB to perform a word counting task. If you are new to Map-Reduce, please go through the Google Map-Reduce paper to see how it works. In order to understand this post properly please go through the previous post as well because we are going to use the same collection created in that post to apply Map-Reduce.

In our previous example, we created a “book” collection inside our “sample” database in MonogoDB. Then we inserted 3 pages into the “book” collection as 3 separate documents. Now we are going to apply Map-Reduce on the “book” collection to get a count of each individual word contained in all three pages. In MongoDB Map-Reduce, we can write our map and reduce functions using javascript. Following is the map function we are going to use for our purpose.

function map() {
    // get content of the current document
    var cnt = this.content;
    // split the content into an array of words using a regular expression
    var words = cnt.match(/\w+/g);
    // if there are no words, return
    if (words == null) {
        return;
    }
    // for each word, output {word, count} pair
    for (var i = 0; i < words.length; i++) {
        emit({ word:words[i] }, { count:1 });
    }
}

MongoDB will apply this map function on top of each and every document in the given collection. Format of the documents contained in our “book” collection is as follows.

{ "_id" : ObjectId("519f6c1f44ae9aea2881672a"), "pageId" : "page1", "content" : "your page1 content" }

In the above map function, “this” keyword always refers to the document on which the function is applied. Therefore, “this.content” will return the content of the page. Then we split the content into words and emit a count of 1 for each word found in the page. For example, if the word “from” appeared 10 times in the current page, there will be 10 ({word:from}, {count:1}) key-value pairs emitted. Likewise the map function will be applied into all 3 documents in our “book” collection before starting the reduce phase.

Following is the reduce function we are going to use.

function reduce(key, counts) {
    var cnt = 0;
    // loop through call count values
    for (var i = 0; i < counts.length; i++) {
        // add current count to total
        cnt = cnt + counts[i].count;
    }
    // return total count
    return { count:cnt };
}

In Map-Reduce, the reduce function will get all values for a particular key as an array. For example, if we found the word “from” 25 times in all 3 pages, reduce function will be called with the key “from” and value “[{count:1}, {count:1}, {count:1}, …]”. This array will contain 25 elements. So in the reduce function, we have to get the total of counts to calculate the total number of occurrences of a particular word.

Now we have our map and reduce functions. Let’s see how to write a simple Java code to execute Map-Reduce on our “book” collection.

package sample.mongo;

import com.mongodb.DB;
import com.mongodb.DBCollection;
import com.mongodb.MapReduceCommand;
import com.mongodb.MongoClient;

import java.io.IOException;
import java.io.InputStream;

public class WordCount {

    public static void main(String[] args) {
        try {
            // create a MongoClient by connecting to the MongoDB instance in localhost
            MongoClient mongoClient = new MongoClient("localhost", 27017);
            // access the db named "sample"
            DB db = mongoClient.getDB("sample");
            // access the input collection
            DBCollection collection = db.getCollection("book");
            // read Map file
            String map = readFile("wc_map.js");
            // read Reduce file
            String reduce = readFile("wc_reduce.js");
            // execute MapReduce on the input collection and direct the result to "wordcounts" collection
            collection.mapReduce(map, reduce, "wordcounts", MapReduceCommand.OutputType.REPLACE, null);
        } catch (Exception e) {
            e.printStackTrace();
        }
    }

    /**
     * Reads the specified file from classpath
     */
    private static String readFile(String fileName) throws IOException {
        // get the input stream
        InputStream fileStream = WordCount.class.getResourceAsStream("/" + fileName);
        // create a buffer with some default size
        byte[] buffer = new byte[8192];
        // read the stream into the buffer
        int size = fileStream.read(buffer);
        // create a string for the needed size and return
        return new String(buffer, 0, size);
    }
}

In order the execute the above code, make sure you have “wc_map.js” and “wc_reduce.js” files on your project classpath containing above map and reduce functions. In the above Java code, first we connect to our MongoDB database and get a reference to the “book” collection. Then we read our map and reduce functions as a String from the classpath. Finally we execute the “mapReduce()” method on our input collection. This will apply our map and reduce functions on the “book” collection and store the output in a new collection called “wordcounts”. If there’s an already existing “wordcounts” collection, it will be replaced by the new one. If you need more details on “mapReduce()” method, please have a look at the documentation and java doc.

Finally let’s log into our MongoDB console and output collection “wordcounts”.

isuru@isuru-w520:~$ mongo
MongoDB shell version: 2.0.4
connecting to: test
> use sample
switched to db sample
>
>
> db.wordcounts.find()
{ "_id" : { "word" : "1930s" }, "value" : { "count" : 1 } }
{ "_id" : { "word" : "A" }, "value" : { "count" : 5 } }
{ "_id" : { "word" : "After" }, "value" : { "count" : 3 } }
...
>

That’s it. Here we had a look at a very basic Map-Reduce sample using MongoDB. Map-Reduce can be used to perform more complex tasks efficiently. If you are interested, you can have a look at some more MongoDB Map-Reduce samples here.