Attempt to filter out as many documents (and as many fields from the documents) as possible at the beginning of your pipeline before hitting any "$project", "$group", or
"$unwind" operations. Once the pipeline isn’t using the data directly from the collection, indexes can no longer be used to help filter and sort. The aggregation pipeline will attempt to reorder operations for you, if possible, to be able to use indexes.
MongoDB won’t allow a single aggregation to use more than a fraction of the system’s memory: if it calculates that an aggregation has used more than 20% of the memory, the aggregation will simply error out. Allowing output to be piped to a collection (which would minimize the amount of memory required) is planned for the future.
If you can quickly whittle down the result set size with a selective "$match", you can use the pipeline for real-time aggregations. As pipelines need to include more docu‐
ments and become more complex, it is less likely that you’ll be able to get realtime results from them.
MapReduce
MapReduce is a powerful and flexible tool for aggregating data. It can solve some prob‐
lems that are too complex to express using the aggregation framework’s query language.
MapReduce uses JavaScript as its “query language” so it can express arbitrarily complex logic. However, this power comes at a price: MapReduce tends to be fairly slow and should not be used for real-time data analysis.
MapReduce can be easily parallelized across multiple servers. It splits up a problem, sends chunks of it to different machines, and lets each machine solve its part of the problem. When all the machines are finished, they merge all the pieces of the solution back into a full solution.
MapReduce has a couple of steps. It starts with the map step, which maps an operation onto every document in a collection. That operation could be either “do nothing” or
“emit these keys with X values.” There is then an intermediary stage called the shuffle step: keys are grouped and lists of emitted values are created for each key. The reduce takes this list of values and reduces it to a single element. This element is returned to the shuffle step until each key has a list containing a single value: the result.
We’ll go through a couple examples because MapReduce is an incredibly useful and powerful, but also somewhat complex, tool.
Example 1: Finding All Keys in a Collection
Using MapReduce for this problem might be overkill, but it is a good way to get familiar with how MapReduce works. If you already understand MapReduce, feel free to skip 140 | Chapter 7: Aggregation
ahead to the last part of this section, where we cover MongoDB-specific MapReduce considerations.
MongoDB assumes that your schema is dynamic, so it does not keep track of the keys in each document. The best way, in general, to find all the keys across all the documents in a collection is to use MapReduce. In this example, we’ll also get a count of how many times each key appears in the collection. This example doesn’t include keys for embed‐
ded documents, but it would be a simple addition to the map function to do so.
For the mapping step, we want to get every key of every document in the collection. The map function uses a special function to “return” values that we want to process later:
emit. emit gives MapReduce a key (like the one used by group earlier) and a value. In this case, we emit a count of how many times a given key appeared in the document (once: {count : 1}). We want a separate count for each key, so we’ll call emit for every key in the document. this is a reference to the current document we are mapping:
> map = function() { ... for (var key in this) { ... emit(key, {count : 1});
... }};
Now we have a ton of little {count : 1} documents floating around, each associated with a key from the collection. An array of one or more of these {count : 1} documents will be passed to the reduce function. The reduce function is passed two arguments:
key, which is the first argument from emit, and an array of one or more {count : 1}
documents that were emitted for that key:
> reduce = function(key, emits) { ... total = 0;
... for (var i in emits) { ... total += emits[i].count;
... }
... return {"count" : total};
... }
reduce must be able to be called repeatedly on results from either the map phase or previous reduce phases. Therefore, reduce must return a document that can be re-sent to reduce as an element of its second argument. For example, say we have the key x mapped to three documents: {count : 1, id : 1}, {count : 1, id : 2}, and {count : 1, id : 3}. (The ID keys are just for identification purposes.) MongoDB might call reduce in the following pattern:
> r1 = reduce("x", [{count : 1, id : 1}, {count : 1, id : 2}]) {count : 2}
> r2 = reduce("x", [{count : 1, id : 3}]) {count : 1}
> reduce("x", [r1, r2]) {count : 3}
MapReduce | 141
You cannot depend on the second argument always holding one of the initial documents ({count : 1} in this case) or being a certain length. reduce should be able to be run on any combination of emit documents and reduce return values.
Altogether, this MapReduce function would look like this:
> mr = db.runCommand({"mapreduce" : "foo", "map" : map, "reduce" : reduce}) {
"result" : "tmp.mr.mapreduce_1266787811_1", "timeMillis" : 12,
"counts" : { "input" : 6 "emit" : 14 "output" : 5 },
"ok" : true }
The document MapReduce returns gives you a bunch of metainformation about the operation:
"result" : "tmp.mr.mapreduce_1266787811_1"
This is the name of the collection the MapReduce results were stored in. This is a temporary collection that will be deleted when the connection that did the Map‐
Reduce is closed. We will go over how to specify a nicer name and make the col‐
lection permanent in a later part of this chapter.
"timeMillis" : 12
How long the operation took, in milliseconds.
"counts" : { ... }
This embedded document is mostly used for debugging and contains three keys:
"input" : 6
The number of documents sent to the map function.
"emit" : 14
The number of times emit was called in the map function.
"output" : 5
The number of documents created in the result collection.
If we do a find on the resulting collection, we can see all the keys and their counts from our original collection:
> db[mr.result].find()
{ "_id" : "_id", "value" : { "count" : 6 } } { "_id" : "a", "value" : { "count" : 4 } } { "_id" : "b", "value" : { "count" : 2 } } { "_id" : "x", "value" : { "count" : 1 } } { "_id" : "y", "value" : { "count" : 1 } }
142 | Chapter 7: Aggregation
Each of the key values becomes an "_id", and the final result of the reduce step(s) becomes the "value".
Example 2: Categorizing Web Pages
Suppose we have a site where people can submit links to other pages, such as reddit.
Submitters can tag a link as related to certain popular topics, e.g., “politics,” “geek,” or
“icanhascheezburger.” We can use MapReduce to figure out which topics are the most popular, as a combination of recent and most-voted-for.
First, we need a map function that emits tags with a value based on the popularity and recency of a document:
map = function() {
for (var i in this.tags) {
var recency = 1/(new Date() - this.date);
var score = recency * this.score;
emit(this.tags[i], {"urls" : [this.url], "score" : score});
} };
Now we need to reduce all of the emitted values for a tag into a single score for that tag:
reduce = function(key, emits) { var total = {urls : [], score : 0}
for (var i in emits) {
emits[i].urls.forEach(function(url) { total.urls.push(url);
}
total.score += emits[i].score;
}
return total;
};
The final collection will end up with a full list of URLs for each tag and a score showing how popular that particular tag is.