Perform Incremental Map-Reduce

Map-reduce operations can handle complex aggregation tasks. To performmap-reduce operations, MongoDB provides the mapReducecommand and, in the mongo shell, thedb.collection.mapReduce() wrapper method.

If the map-reduce data set is constantly growing, you may want toperform an incremental map-reduce rather thanperforming the map-reduce operation over the entire data set each time.

To perform incremental map-reduce:

  • Run a map-reduce job over the current collection and output theresult to a separate collection.
  • When you have more data to process, run subsequent map-reduce jobwith:
    • the query parameter that specifies conditions that matchonly the new documents.
    • the out parameter that specifies the reduce action tomerge the new results into the existing output collection.Consider the following example where you schedule a map-reduceoperation on a sessions collection to run at the end of each day.

Data Setup

The sessions collection contains documents that log users’ sessionseach day, for example:

  1. db.sessions.save( { userid: "a", ts: ISODate('2011-11-03 14:17:00'), length: 95 } );
  2. db.sessions.save( { userid: "b", ts: ISODate('2011-11-03 14:23:00'), length: 110 } );
  3. db.sessions.save( { userid: "c", ts: ISODate('2011-11-03 15:02:00'), length: 120 } );
  4. db.sessions.save( { userid: "d", ts: ISODate('2011-11-03 16:45:00'), length: 45 } );
  5.  
  6. db.sessions.save( { userid: "a", ts: ISODate('2011-11-04 11:05:00'), length: 105 } );
  7. db.sessions.save( { userid: "b", ts: ISODate('2011-11-04 13:14:00'), length: 120 } );
  8. db.sessions.save( { userid: "c", ts: ISODate('2011-11-04 17:00:00'), length: 130 } );
  9. db.sessions.save( { userid: "d", ts: ISODate('2011-11-04 15:37:00'), length: 65 } );

Initial Map-Reduce of Current Collection

Run the first map-reduce operation as follows:

  • Define the map function that maps the userid to anobject that contains the fields userid, total_time, count,and avg_time:
  1. var mapFunction = function() {
  2. var key = this.userid;
  3. var value = {
  4. userid: this.userid,
  5. total_time: this.length,
  6. count: 1,
  7. avg_time: 0
  8. };
  9.  
  10. emit( key, value );
  11. };
  • Define the corresponding reduce function with two argumentskey and values to calculate the total time and the count.The key corresponds to the userid, and the values is anarray whose elements corresponds to the individual objects mapped to theuserid in the mapFunction.
  1. var reduceFunction = function(key, values) {
  2.  
  3. var reducedObject = {
  4. userid: key,
  5. total_time: 0,
  6. count:0,
  7. avg_time:0
  8. };
  9.  
  10. values.forEach( function(value) {
  11. reducedObject.total_time += value.total_time;
  12. reducedObject.count += value.count;
  13. }
  14. );
  15. return reducedObject;
  16. };
  • Define the finalize function with two arguments key andreducedValue. The function modifies the reducedValue documentto add another field average and returns the modified document.
  1. var finalizeFunction = function (key, reducedValue) {
  2.  
  3. if (reducedValue.count > 0)
  4. reducedValue.avg_time = reducedValue.total_time / reducedValue.count;
  5.  
  6. return reducedValue;
  7. };
  • Perform map-reduce on the session collection using themapFunction, the reduceFunction, and thefinalizeFunction functions. Output the results to a collectionsession_stat. If the session_stat collection already exists,the operation will replace the contents:
  1. db.sessions.mapReduce( mapFunction,
  2. reduceFunction,
  3. {
  4. out: "session_stat",
  5. finalize: finalizeFunction
  6. }
  7. )

Subsequent Incremental Map-Reduce

Later, as the sessions collection grows, you can run additionalmap-reduce operations. For example, add new documents to thesessions collection:

  1. db.sessions.save( { userid: "a", ts: ISODate('2011-11-05 14:17:00'), length: 100 } );
  2. db.sessions.save( { userid: "b", ts: ISODate('2011-11-05 14:23:00'), length: 115 } );
  3. db.sessions.save( { userid: "c", ts: ISODate('2011-11-05 15:02:00'), length: 125 } );
  4. db.sessions.save( { userid: "d", ts: ISODate('2011-11-05 16:45:00'), length: 55 } );

At the end of the day, perform incremental map-reduce on thesessions collection, but use the query field to select only thenew documents. Output the results to the collection session_stat,but reduce the contents with the results of the incrementalmap-reduce:

  1. db.sessions.mapReduce( mapFunction,
  2. reduceFunction,
  3. {
  4. query: { ts: { $gt: ISODate('2011-11-05 00:00:00') } },
  5. out: { reduce: "session_stat" },
  6. finalize: finalizeFunction
  7. }
  8. );