$merge (aggregation)

Definition

Note

The following page discusses the $merge stage, whichoutputs the aggregation pipeline results to a collection. For adiscussion of the $mergeObjects operator which mergesdocuments into a single document, see $mergeObjectsinstead.

  • $merge

New in version 4.2.

Writes the results of the aggregation pipeline to a specified collection. The$merge operator must be the last stage in thepipeline.

The $merge stage:

  • Can output to a collection in the same or different database.
  • Creates a new collection if the output collection does not alreadyexist.
  • Can incorporate results (insert new documents, merge documents,replace documents, keep existing documents, fail the operation,process documents with a custom update pipeline) into an existingcollection.
  • Can output to a sharded collection. Input collection canalso be sharded.For a comparison with the $out stage which also outputsthe aggregation results to a collection, seeComparison with $out.

Syntax

$merge has the following syntax:

  1. { $merge: {
  2. into: <collection> -or- { db: <db>, coll: <collection> },
  3. on: <identifier field> -or- [ <identifier field1>, ...], // Optional
  4. let: <variables>, // Optional
  5. whenMatched: <replace|keepExisting|merge|fail|pipeline>, // Optional
  6. whenNotMatched: <insert|discard|fail> // Optional
  7. } }

For example:

  1. { $merge: { into: "myOutput", on: "_id", whenMatched: "replace", whenNotMatched: "insert" } }

If using all default options for $merge (including writingto a collection in the same database), you can use the simplified form:

  1. { $merge: <collection> } // Output collection is in the same database

The $merge takes a document with the following fields:

FieldDescription
intoThe output collection. Specify either:-The collection name as a string to output to a collection inthe same database where the aggregation is run. For example:into: "myOutput"-The database and collection name in a document to output to acollection in the specified database. For example:into: { db:"myDB", coll:"myOutput" }Note- If the output collection does not exist, $mergecreates the collection: - For a replica set or a standalone, ifthe output database does not exist, $mergealso creates the database. - For a sharded cluster, thespecified output database must already exist.- The output collection cannot be the same collection as thecollection being aggregated.- The output collection cannot appear in any other stages ofthe pipeline.- The output collection can be a sharded collection.
onOptional. Field or fields that act as a unique identifier for adocument. The identifier determines if a results documentmatches an already existing documentin the output collection. Specify either:-A single field name as a string. For example:on: "_id"-A combination of fields in an array. For example:on: [ "date", "customerId" ]The order of the fields in the array does not matter, and youcannot specify the same field multiple times.For the specified field or fields:- The aggregation results documents must contain the field(s)specified in the on, unless the on field is the_id field. If the _id field is missing from aresults document, MongoDB adds it automatically.- The specified field or fields cannot contain a null or an arrayvalue.$merge requires a unique,index with keys that correspond to the on identifier fields. Although the order of the index keyspecification does not matter, the unique index must only containthe on fields as its keys.- The index must also have the same collation as the aggregation’s collation.- The unique index can be a sparseindex.- For output collections that already exist, the correspondingindex must already exist.The default value for on depends on the output collection:-If the output collection does not exist, the onidentifier must be and defaults to the _id field. Thecorresponding unique _id index is automatically created.TipTo use a different on identifier field(s)for a collection that does not exist, you can create thecollection first by creating a unique index on the desiredfield(s). See the section on non-existent outputcollection for an example.-If the existing output collection is unsharded, the onidentifier defaults to the _id field.-If the existing output collection is a sharded collection, theon identifier defaults to all the shardkey fields and the _id field. If specifying a differenton identifier, the on must contain all the shard keyfields.
whenMatchedOptional. The behavior of $merge if a result documentand an existing document in the collection have the same valuefor the specified on field(s).You can specify either:-One of the pre-defined action strings:ActionDescription“replace”Replace the existing document in the outputcollection with the matching resultsdocument.When performing a replace, the replacement documentcannot result in a modification of the _id value or,if the output collection is sharded, the shard keyvalue. Otherwise, the operation results in an error.TipTo avoid this error, if the onfield does not include the _id field, remove the_id field in the aggregation results to avoid theerror, such as with a preceding $unsetstage, etc.“keepExisting”Keep the existing document in the outputcollection.“merge” (Default)Merge the matching documents (similar to the$mergeObjects operator). - If the results document contains fields not in theexisting document, add these new fields to theexisting document. - If the results document contains fields in the existingdocument, replace the existing field values with thosefrom the results document.For example, if the output collection has the document:{ _id: 1, a: 1, b: 1 }And the aggregation results has the document:{ _id: 1, b: 5, z: 1 }Then, the merged document is:{ _id: 1, a: 1, b: 5, z: 1 }When performing a merge, the merged document cannotresult in a modification of the _id value or, if theoutput collection is sharded, the shard key value.Otherwise, the operation results in an error.TipTo avoid this error, if the onfield does not include the _id field, remove the_id field in the aggregation results to avoid theerror, such as with a preceding $unsetstage, etc.“fail”Stop and fail the aggregation operation. Any changes tothe output collection from previous documents are notreverted.-An aggregation pipeline to update the document in thecollection.[ <stage1>, <stage2> … ]The pipeline can only consist of the following stages: - $addFields and its alias $set - $project and its alias $unset - $replaceRoot and its alias $replaceWithThe pipeline cannot modify the on field’svalue. For example, if you are matching on the field month,the pipeline cannot modify the month field.The whenMatched pipeline can directly access the fields ofthe existing documents in the output collection using$<field>.To access the fields from the aggregation results documents,use either: - The built-in $$new variable to access the field, i.e.$$new.<field>. The $$new variable is only availableif the let specification is omitted. - The user-defined variables in the letfield, i.e. $$<uservariable>.<field>.
letOptional. Specifies variables accessible for use in thewhenMatched pipelineSpecify a document with the variable name and value expression:{ <var_1>: <expression>, …, <var_n>: <expression> }If unspecified, defaults to { new: "$$ROOT" }; i.e. thewhenMatched pipeline can accessthe $$new variable.To access the let variables in the whenMatched pipeline, use the double dollar signs ($$)prefix and variable name $$<variable>.
whenNotMatchedOptional. The behavior of $merge if a result documentdoes not match an existing document in the out collection.You can specify one of the pre-defined action strings:
ActionDescription
“insert” (Default)Insert the document into the output collection.
“discard”Discard the document; i.e. $merge does notinsert the document into the output collection.
“fail”Stop and fail the aggregation operation. Any changes tothe output collection from previous documents are notreverted.

Considerations

_id Field Generation

If the _id field is not present in a document from theaggregation pipeline results, the $merge stage generatesit automatically.

For example, in the following aggregation pipeline,$project excludes the _id field from the documentspassed into $merge. When $merge writes thesedocuments to the "newCollection", $merge generates anew _id field and value.

  1. db.sales.aggregate( [
  2. { $project: { _id: 0 } },
  3. { $merge : { into : "newCollection" } }
  4. ] )

Create a New Collection if Output Collection is Non-Existent

The $merge operation creates a new collection if thespecified output collection does not exist.

  • The output collection is created when $merge writesthe first document into the collection and is immediately visible.
  • If the aggregation fails, any writes completed by the$merge before the error will not be rolled back.

Note

For a replica set or a standalone, if theoutput database does not exist, $merge also createsthe database.

For a sharded cluster, the specifiedoutput database must already exist.

If the output collection does not exist, $merge requiresthe on identifier to be the _id field. To use adifferent on field value for a collection that does not exist, youcan create the collection first by creating a unique index on thedesired field(s) first. For example, if the output collectionnewDailySales201905 does not exist and you want to specify thesalesDate field as the on identifier:

  1. db.newDailySales201905.createIndex( { salesDate: 1 }, { unique: true } )
  2.  
  3. db.sales.aggregate( [
  4. { $match: { date: { $gte: new Date("2019-05-01"), $lt: new Date("2019-06-01") } } },
  5. { $group: { _id: { $dateToString: { format: "%Y-%m-%d", date: "$date" } }, totalqty: { $sum: "$quantity" } } },
  6. { $project: { _id: 0, salesDate: { $toDate: "$_id" }, totalqty: 1 } },
  7. { $merge : { into : "newDailySales201905", on: "salesDate" } }
  8. ] )

Output to a Sharded Collection

The $merge stage can output to a sharded collection.When the output collection is sharded, $merge usesthe _id field and all the shard key fields as the default on identifier. If you override the default, the on identifier must include all the shard key fields:

  1. { $merge: {
  2. into: "<shardedColl>" or { db:"<sharding enabled db>", coll: "<shardedColl>" },
  3. on: [ "<shardkeyfield1>", "<shardkeyfield2>",... ], // Shard key fields and any additional fields
  4. let: <variables>, // Optional
  5. whenMatched: <replace|keepExisting|merge|fail|pipeline>, // Optional
  6. whenNotMatched: <insert|discard|fail> // Optional
  7. } }

For example, in a database that has sharding enabled, use the sh.shardCollection() methodto create a new sharded collection newrestaurants with thepostcode field as the shard key.

  1. sh.enableSharding("exampledb"); // Sharding must be enabled in the database
  2. sh.shardCollection(
  3. "exampledb.newrestaurants", // Namespace of the collection to shard
  4. { postcode: 1 }, // Shard key
  5. );

The newrestaurants collection will contain documents withinformation on new restaurant openings by month (date field) andpostcode (shard key); specifically, the onidentifier is ["date", "postcode"] (the ordering of the fieldsdoes not matter). Because $merge requires a unique,index with keys that correspond to the on identifier fields, create the unique index(the ordering of the fields do not matter): [1]

  1. use exampledb
  2. db.newrestaurants.createIndex( { postcode: 1, date: 1 }, { unique: true } )

With the sharded collection restaurants and the unique indexcreated, you can use $merge to output the aggregationresults to this collection, matching on [ "date", "postcode" ]as in the following example:

  1. use exampledb
  2.  
  3. db.openings.aggregate([
  4. { $group: {
  5. _id: { date: { $dateToString: { format: "%Y-%m", date: "$date" } }, postcode: "$postcode" },
  6. restaurants: { $push: "$restaurantName" } } },
  7. { $project: { _id: 0, postcode: "$_id.postcode", date: "$_id.date", restaurants: 1 } },
  8. { $merge: { into: "newrestaurants", "on": [ "date", "postcode" ], whenMatched: "replace", whenNotMatched: "insert" } }
  9. ])
[1]The sh.shardCollection() method can also create aunique index on the shard key when passed the { unique: true} option if: the shard key is range-based; the collection is empty; and a uniqueindex on the shard key doesn’t already exist.In the example above, because the on identifier is the shardkey and another field, a separate operation to create thecorresponding index is required.

Replace Documents ($merge) vs Replace Collection ($out)

$merge can replace an existing document in the outputcollection if the aggregation results contain a document ordocuments that match based on the onspecification. As such, $merge can replace all documentsin the existing collection if the aggregation results includematching documents for all existing documents in the collection andyou specify “replace” forwhenMatched.

However, to replace an existing collection regardless of theaggregation results, use $out instead.

Existing Documents and _id and Shard Key Values

The $merge errors if the $merge results in achange to an existing document’s _id value.

Tip

To avoid this error, if the on field does notinclude the _id field, remove the _id field in theaggregation results to avoid the error, such as with a preceding$unset stage, etc.

Additionally, for a sharded collection, $merge alsoerrors if it results in a change to the shard key value of anexising document.

Any writes completed by the $merge before the error willnot be rolled back.

Unique Index Constraints

If the unique index used by $merge for on field(s) is dropped mid-aggregation, there is noguarantee that the aggregation will be killed. If the aggregationcontinues, there is no guarantee that documents do not haveduplicate on field values.

If the $merge attempts to write a document that violatesany unique index on the output collection, the operation errors; forexample:

  • Insert a non-matching document that violates a unique index otherthan the index on the on field(s).
  • Fail if there is a matchingdocument in the collection. Specifically, the operation attemptsto insert the matching document which violates the unique index onthe on field(s).
  • Replace an existing document with a new document thatviolates a unique index other than the index on the on field(s).
  • Merge the matching documents thatresults in a document that violates a unique index other than theindex on the on field(s).

Comparison with $out

Restrictions

RestrictionsDescription
Output Collection- The output collection cannot be the same collection as thecollection being aggregated.- The output collection cannot be a collection that appears inany other stages of the pipeline. For example, theaggregation pipeline cannot contain a $lookupstage or a $graphLookup stage that refers to theoutput collection.
TransactionsAn aggregation pipeline cannot use $merge inside atransaction.
View definitionSeparate from materialized viewView definition cannot include the$merge stage. If the view definition includes nestedpipeline (e.g. the view definition includes $facetstage), this $merge stage restriction applies to thenested pipelines as well.
$lookup stage$lookup stage’s nested pipeline cannot include the$merge stage.
$facet stage$facet stage’s nested pipeline cannot include the$merge stage.
"linearizable" read concernThe $merge stage cannot be used in conjunction with readconcern "linearizable". That is, if you specify"linearizable" read concern fordb.collection.aggregate(), you cannot include the$merge stage in the pipeline.

Examples

On-Demand Materialized View: Initial Creation

If the output collection does not exist, the $merge createsthe collection.

For example, a collection named salaries in the zoo databaseis populated with the employee salary and department history:

  1. db.getSiblingDB("zoo").salaries.insertMany([
  2. { "_id" : 1, employee: "Ant", dept: "A", salary: 100000, fiscal_year: 2017 },
  3. { "_id" : 2, employee: "Bee", dept: "A", salary: 120000, fiscal_year: 2017 },
  4. { "_id" : 3, employee: "Cat", dept: "Z", salary: 115000, fiscal_year: 2017 },
  5. { "_id" : 4, employee: "Ant", dept: "A", salary: 115000, fiscal_year: 2018 },
  6. { "_id" : 5, employee: "Bee", dept: "Z", salary: 145000, fiscal_year: 2018 },
  7. { "_id" : 6, employee: "Cat", dept: "Z", salary: 135000, fiscal_year: 2018 },
  8. { "_id" : 7, employee: "Gecko", dept: "A", salary: 100000, fiscal_year: 2018 },
  9. { "_id" : 8, employee: "Ant", dept: "A", salary: 125000, fiscal_year: 2019 },
  10. { "_id" : 9, employee: "Bee", dept: "Z", salary: 160000, fiscal_year: 2019 },
  11. { "_id" : 10, employee: "Cat", dept: "Z", salary: 150000, fiscal_year: 2019 }
  12. ])

You can use the $group and $merge stages toinitially create a collection named budgets (in the reportingdatabase) from the data currently in the salaries collection:

Note

For a replica set or a standalone deployment, if the outputdatabase does not exist, $merge also creates thedatabase.

For a sharded cluster deployment, the specified output databasemust already exist.

  1. db.getSiblingDB("zoo").salaries.aggregate( [
  2. { $group: { _id: { fiscal_year: "$fiscal_year", dept: "$dept" }, salaries: { $sum: "$salary" } } },
  3. { $merge : { into: { db: "reporting", coll: "budgets" }, on: "_id", whenMatched: "replace", whenNotMatched: "insert" } }
  4. ] )
  • $group stage to group the salaries by the fiscal_yearand dept.
  • $merge stage writes the output of the preceding$group stage to the budgets collection in thereporting database.

To view the documents in the new budgets collection:

  1. db.getSiblingDB("reporting").budgets.find().sort( { _id: 1 } )

The budgets collection contains the following documents:

  1. { "_id" : { "fiscal_year" : 2017, "dept" : "A" }, "salaries" : 220000 }
  2. { "_id" : { "fiscal_year" : 2017, "dept" : "Z" }, "salaries" : 115000 }
  3. { "_id" : { "fiscal_year" : 2018, "dept" : "A" }, "salaries" : 215000 }
  4. { "_id" : { "fiscal_year" : 2018, "dept" : "Z" }, "salaries" : 280000 }
  5. { "_id" : { "fiscal_year" : 2019, "dept" : "A" }, "salaries" : 125000 }
  6. { "_id" : { "fiscal_year" : 2019, "dept" : "Z" }, "salaries" : 310000 }

See also

On-Demand Materialized Views

On-Demand Materialized View: Update/Replace Data

The following example refers to the collections from the previousexample.

The example salaries collection contains theemployee salary and department history:

  1. { "_id" : 1, employee: "Ant", dept: "A", salary: 100000, fiscal_year: 2017 },
  2. { "_id" : 2, employee: "Bee", dept: "A", salary: 120000, fiscal_year: 2017 },
  3. { "_id" : 3, employee: "Cat", dept: "Z", salary: 115000, fiscal_year: 2017 },
  4. { "_id" : 4, employee: "Ant", dept: "A", salary: 115000, fiscal_year: 2018 },
  5. { "_id" : 5, employee: "Bee", dept: "Z", salary: 145000, fiscal_year: 2018 },
  6. { "_id" : 6, employee: "Cat", dept: "Z", salary: 135000, fiscal_year: 2018 },
  7. { "_id" : 7, employee: "Gecko", dept: "A", salary: 100000, fiscal_year: 2018 },
  8. { "_id" : 8, employee: "Ant", dept: "A", salary: 125000, fiscal_year: 2019 },
  9. { "_id" : 9, employee: "Bee", dept: "Z", salary: 160000, fiscal_year: 2019 },
  10. { "_id" : 10, employee: "Cat", dept: "Z", salary: 150000, fiscal_year: 2019 }

The example budgets collection contains the cumulative yearlybudgets:

  1. { "_id" : { "fiscal_year" : 2017, "dept" : "A" }, "salaries" : 220000 }
  2. { "_id" : { "fiscal_year" : 2017, "dept" : "Z" }, "salaries" : 115000 }
  3. { "_id" : { "fiscal_year" : 2018, "dept" : "A" }, "salaries" : 215000 }
  4. { "_id" : { "fiscal_year" : 2018, "dept" : "Z" }, "salaries" : 280000 }
  5. { "_id" : { "fiscal_year" : 2019, "dept" : "A" }, "salaries" : 125000 }
  6. { "_id" : { "fiscal_year" : 2019, "dept" : "Z" }, "salaries" : 310000 }

During the current fiscal year (2019 in this example), new employeesare added to the salaries collection and new head counts arepre-allocated for the next year:

  1. db.getSiblingDB("zoo").salaries.insertMany([
  2. { "_id" : 11, employee: "Wren", dept: "Z", salary: 100000, fiscal_year: 2019 },
  3. { "_id" : 12, employee: "Zebra", dept: "A", salary: 150000, fiscal_year: 2019 },
  4. { "_id" : 13, employee: "headcount1", dept: "Z", salary: 120000, fiscal_year: 2020 },
  5. { "_id" : 14, employee: "headcount2", dept: "Z", salary: 120000, fiscal_year: 2020 }
  6. ])

To update the budgets collection to reflect the newsalary information, the following aggregation pipeline uses:

  • $match stage to find all documents with fiscal_yeargreater than or equal to 2019.
  • $group stage to group the salaries by the fiscal_yearand dept.
  • $merge to write the result set to the budgetscollection, replacing documents with the same _id value (inthis example, a document with the fiscal year and dept). Fordocuments that do not have matches in the collection,$merge inserts the new documents.
  1. db.getSiblingDB("zoo").salaries.aggregate( [
  2. { $match : { fiscal_year: { $gte : 2019 } } },
  3. { $group: { _id: { fiscal_year: "$fiscal_year", dept: "$dept" }, salaries: { $sum: "$salary" } } },
  4. { $merge : { into: { db: "reporting", coll: "budgets" }, on: "_id", whenMatched: "replace", whenNotMatched: "insert" } }
  5. ] )

After the aggregation is run, view the documents in the budgetscollection:

  1. db.getSiblingDB("reporting").budgets.find().sort( { _id: 1 } )

The budgets collection incorporates the new salary data for fiscalyear 2019 and adds new documents for fiscal year 2020:

  1. { "_id" : { "fiscal_year" : 2017, "dept" : "A" }, "salaries" : 220000 }
  2. { "_id" : { "fiscal_year" : 2017, "dept" : "Z" }, "salaries" : 115000 }
  3. { "_id" : { "fiscal_year" : 2018, "dept" : "A" }, "salaries" : 215000 }
  4. { "_id" : { "fiscal_year" : 2018, "dept" : "Z" }, "salaries" : 280000 }
  5. { "_id" : { "fiscal_year" : 2019, "dept" : "A" }, "salaries" : 275000 }
  6. { "_id" : { "fiscal_year" : 2019, "dept" : "Z" }, "salaries" : 410000 }
  7. { "_id" : { "fiscal_year" : 2020, "dept" : "Z" }, "salaries" : 240000 }

See also

On-Demand Materialized Views

Only Insert New Data

To ensure that the $merge does not overwrite existing datain the collection, set whenMatched tokeepExisting or fail.

The example salaries collection in the zoo database containsthe employee salary and department history:

  1. { "_id" : 1, employee: "Ant", dept: "A", salary: 100000, fiscal_year: 2017 },
  2. { "_id" : 2, employee: "Bee", dept: "A", salary: 120000, fiscal_year: 2017 },
  3. { "_id" : 3, employee: "Cat", dept: "Z", salary: 115000, fiscal_year: 2017 },
  4. { "_id" : 4, employee: "Ant", dept: "A", salary: 115000, fiscal_year: 2018 },
  5. { "_id" : 5, employee: "Bee", dept: "Z", salary: 145000, fiscal_year: 2018 },
  6. { "_id" : 6, employee: "Cat", dept: "Z", salary: 135000, fiscal_year: 2018 },
  7. { "_id" : 7, employee: "Gecko", dept: "A", salary: 100000, fiscal_year: 2018 },
  8. { "_id" : 8, employee: "Ant", dept: "A", salary: 125000, fiscal_year: 2019 },
  9. { "_id" : 9, employee: "Bee", dept: "Z", salary: 160000, fiscal_year: 2019 },
  10. { "_id" : 10, employee: "Cat", dept: "Z", salary: 150000, fiscal_year: 2019 }

A collection orgArchive in the reporting databasecontains historical departmental organization records for the pastfiscal years. Archived records should not be modified.

  1. { "_id" : ObjectId("5cd8c68261baa09e9f3622be"), "employees" : [ "Ant", "Gecko" ], "dept" : "A", "fiscal_year" : 2018 }
  2. { "_id" : ObjectId("5cd8c68261baa09e9f3622bf"), "employees" : [ "Ant", "Bee" ], "dept" : "A", "fiscal_year" : 2017 }
  3. { "_id" : ObjectId("5cd8c68261baa09e9f3622c0"), "employees" : [ "Bee", "Cat" ], "dept" : "Z", "fiscal_year" : 2018 }
  4. { "_id" : ObjectId("5cd8c68261baa09e9f3622c1"), "employees" : [ "Cat" ], "dept" : "Z", "fiscal_year" : 2017 }

The orgArchive collection has a unique compound indexon the fiscal_year and dept fields; i.e. there should be atmost one record for the same fiscal year and department combination:

  1. db.getSiblingDB("reporting").orgArchive.createIndex ( { fiscal_year: 1, dept: 1 }, { unique: true } )

At the end of current fiscal year (2019 in this example), thesalaries collection contain the following documents:

  1. { "_id" : 1, "employee" : "Ant", "dept" : "A", "salary" : 100000, "fiscal_year" : 2017 }
  2. { "_id" : 2, "employee" : "Bee", "dept" : "A", "salary" : 120000, "fiscal_year" : 2017 }
  3. { "_id" : 3, "employee" : "Cat", "dept" : "Z", "salary" : 115000, "fiscal_year" : 2017 }
  4. { "_id" : 4, "employee" : "Ant", "dept" : "A", "salary" : 115000, "fiscal_year" : 2018 }
  5. { "_id" : 5, "employee" : "Bee", "dept" : "Z", "salary" : 145000, "fiscal_year" : 2018 }
  6. { "_id" : 6, "employee" : "Cat", "dept" : "Z", "salary" : 135000, "fiscal_year" : 2018 }
  7. { "_id" : 7, "employee" : "Gecko", "dept" : "A", "salary" : 100000, "fiscal_year" : 2018 }
  8. { "_id" : 8, "employee" : "Ant", "dept" : "A", "salary" : 125000, "fiscal_year" : 2019 }
  9. { "_id" : 9, "employee" : "Bee", "dept" : "Z", "salary" : 160000, "fiscal_year" : 2019 }
  10. { "_id" : 10, "employee" : "Cat", "dept" : "Z", "salary" : 150000, "fiscal_year" : 2019 }
  11. { "_id" : 11, "employee" : "Wren", "dept" : "Z", "salary" : 100000, "fiscal_year" : 2019 }
  12. { "_id" : 12, "employee" : "Zebra", "dept" : "A", "salary" : 150000, "fiscal_year" : 2019 }
  13. { "_id" : 13, "employee" : "headcount1", "dept" : "Z", "salary" : 120000, "fiscal_year" : 2020 }
  14. { "_id" : 14, "employee" : "headcount2", "dept" : "Z", "salary" : 120000, "fiscal_year" : 2020 }

To update the orgArchive collection to include the fiscalyear 2019 that has just ended, the following aggregation pipelineuses:

  • $match stage to find all documents with fiscal_yearequal to 2019.

  • $group stage to group the employees by the fiscal_yearand dept.

  • $project stage to suppress the _id field and addseparate dept and fiscal_year field. When the documentsare passed to $merge, $merge automaticallygenerates a new _id field for the documents.

  • $merge to write the result set to orgArchive.

The $merge stage matches documents on the dept and fiscal_year fields and failswhen matched. That is, if a document already exists for the samedepartment and fiscal year, the $merge errors.

  1. db.getSiblingDB("zoo").salaries.aggregate( [
  2. { $match: { fiscal_year: 2019 }},
  3. { $group: { _id: { fiscal_year: "$fiscal_year", dept: "$dept" }, employees: { $push: "$employee" } } },
  4. { $project: { _id: 0, dept: "$_id.dept", fiscal_year: "$_id.fiscal_year", employees: 1 } },
  5. { $merge : { into : { db: "reporting", coll: "orgArchive" }, on: [ "dept", "fiscal_year" ], whenMatched: "fail" } }
  6. ] )

After the operation, the orgArchive collection contains the followingdocuments:

  1. { "_id" : ObjectId("5caccc6a66b22dd8a8cc419f"), "employees" : [ "Ahn", "Bess" ], "dept" : "A", "fiscal_year" : 2017 }
  2. { "_id" : ObjectId("5caccc6a66b22dd8a8cc419e"), "employees" : [ "Ahn", "Gee" ], "dept" : "A", "fiscal_year" : 2018 }
  3. { "_id" : ObjectId("5caccd0b66b22dd8a8cc438e"), "employees" : [ "Ahn", "Zeb" ], "dept" : "A", "fiscal_year" : 2019 }
  4. { "_id" : ObjectId("5caccc6a66b22dd8a8cc41a0"), "employees" : [ "Carl" ], "dept" : "Z", "fiscal_year" : 2017 }
  5. { "_id" : ObjectId("5caccc6a66b22dd8a8cc41a1"), "employees" : [ "Bess", "Carl" ], "dept" : "Z", "fiscal_year" : 2018 }
  6. { "_id" : ObjectId("5caccd0b66b22dd8a8cc438d"), "employees" : [ "Bess", "Carl", "Wen" ], "dept" : "Z", "fiscal_year" : 2019 }

If the orgArchive collection already contained a document for2019 for department "A" and/or "B", the aggregationfails because of the duplicate key error. However, any document insertedbefore the error will not be rolled back.

If you specify keepExisting for the matching document, theaggregation does not affect the matching document and does not errorwith duplicate key error. Similarly, if you specifyreplace, theoperation would not fail; however, the operation would replace theexisting document.

Merge Results from Multiple Collections

By default, if a document in the aggregation results matches adocument in the collection, the $merge stagemerges the documents.

An example collection purchaseorders is populated with thepurchase order information by quarter and regions:

  1. db.purchaseorders.insertMany( [
  2. { _id: 1, quarter: "2019Q1", region: "A", qty: 200, reportDate: new Date("2019-04-01") },
  3. { _id: 2, quarter: "2019Q1", region: "B", qty: 300, reportDate: new Date("2019-04-01") },
  4. { _id: 3, quarter: "2019Q1", region: "C", qty: 700, reportDate: new Date("2019-04-01") },
  5. { _id: 4, quarter: "2019Q2", region: "B", qty: 300, reportDate: new Date("2019-07-01") },
  6. { _id: 5, quarter: "2019Q2", region: "C", qty: 1000, reportDate: new Date("2019-07-01") },
  7. { _id: 6, quarter: "2019Q2", region: "A", qty: 400, reportDate: new Date("2019-07-01") },
  8. ] )

Another example collection reportedsales is populated with thereported sales information by quarter and regions:

  1. db.reportedsales.insertMany( [
  2. { _id: 1, quarter: "2019Q1", region: "A", qty: 400, reportDate: new Date("2019-04-02") },
  3. { _id: 2, quarter: "2019Q1", region: "B", qty: 550, reportDate: new Date("2019-04-02") },
  4. { _id: 3, quarter: "2019Q1", region: "C", qty: 1000, reportDate: new Date("2019-04-05") },
  5. { _id: 4, quarter: "2019Q2", region: "B", qty: 500, reportDate: new Date("2019-07-02") },
  6. ] )

Assume that, for reporting purposes, you want to view the data byquarter in the following format:

  1. { "_id" : "2019Q1", "sales" : 1950, "purchased" : 1200 }
  2. { "_id" : "2019Q2", "sales" : 500, "purchased" : 1700 }

You can use the $merge to merge in results from thepurchaseorders collection and the reportedsales collectionto create a new collection quarterlyreport.

To create the quarterlyreport collection, you can use thefollowing pipeline:

  1. db.purchaseorders.aggregate( [
  2. { $group: { _id: "$quarter", purchased: { $sum: "$qty" } } }, // group purchase orders by quarter
  3. { $merge : { into: "quarterlyreport", on: "_id", whenMatched: "merge", whenNotMatched: "insert" } }
  4. ])
  • First stage:
  • The $group stage groups by the quarter and uses$sum to add the qty fields into a newpurchased field. For example:
  1. { "_id" : "2019Q2", "purchased" : 1700 }
  2. { "_id" : "2019Q1", "purchased" : 1200 }
  • Second stage:
  • The merge stage writes the documents to thequarterlyreport collection in the same database. If the stagefinds an existing document in the collection that matcheson the _id field, the stage merges the matchingdocuments. Otherwise, the stage inserts the document. For theinitial creation, no documents should match.

To view the documents in the collection, run the following operation:

  1. db.quarterlyreport.find().sort( { _id: 1 } )

The collection contains the following documents:

  1. { "_id" : "2019Q1", "sales" : 1200, "purchased" : 1200 }
  2. { "_id" : "2019Q2", "sales" : 1700, "purchased" : 1700 }

Similarly, run the following aggregation pipeline against thereportedsales collection to merge the sales results into thequarterlyreport collection.

  1. db.reportedsales.aggregate( [
  2. { $group: { _id: "$quarter", sales: { $sum: "$qty" } } }, // group sales by quarter
  3. { $merge : { into: "quarterlyreport", on: "_id", whenMatched: "merge", whenNotMatched: "insert" } }
  4. ])
  • First stage:
  • The $group stage groups by the quarter and uses$sum to add the qty fields into a newsales field. For example:
  1. { "_id" : "2019Q2", "sales" : 500 }
  2. { "_id" : "2019Q1", "sales" : 1950 }
  • Second stage:
  • The merge stage writes the documents to thequarterlyreport collection in the same database. If the stagefinds an existing document in the collection that matcheson the _id field (the quarter), the stage mergesthe matching documents. Otherwise, the stage inserts the document.

To view the documents in the quarterlyreport collection afterthe data has been merged, run the following operation:

  1. db.quarterlyreport.find().sort( { _id: 1 } )

The collection contains the following documents:

  1. { "_id" : "2019Q1", "sales" : 1950, "purchased" : 1200 }
  2. { "_id" : "2019Q2", "sales" : 500, "purchased" : 1700 }

Use the Pipeline to Customize the Merge

The $merge can use a custom update pipeline when documents match. ThewhenMatched pipeline can havethe following stages:

An example collection votes is populated with the daily votetally. Create the collection with the following documents:s

  1. db.votes.insertMany([
  2. { date: new Date("2019-05-01"), "thumbsup" : 1, "thumbsdown" : 1 },
  3. { date: new Date("2019-05-02"), "thumbsup" : 3, "thumbsdown" : 1 },
  4. { date: new Date("2019-05-03"), "thumbsup" : 1, "thumbsdown" : 1 },
  5. { date: new Date("2019-05-04"), "thumbsup" : 2, "thumbsdown" : 2 },
  6. { date: new Date("2019-05-05"), "thumbsup" : 6, "thumbsdown" : 10 },
  7. { date: new Date("2019-05-06"), "thumbsup" : 13, "thumbsdown" : 16 }
  8. ])

Another example collection monthlytotals has the up-to-datemonthly vote totals. Create the collection with the followingdocument:

  1. db.monthlytotals.insertOne({ "_id" : "2019-05", "thumbsup" : 26, "thumbsdown" : 31 } )

At the end of each day, that day’s votes is inserted into thevotes collection:

  1. db.votes.insertOne(
  2. { date: new Date("2019-05-07"), "thumbsup" : 14, "thumbsdown" : 10 }
  3. )

You can use $merge with an custom pipeline to update theexisting document in the collection monthlytotals:

  1. db.votes.aggregate([
  2. { $match: { date: { $gte: new Date("2019-05-07"), $lt: new Date("2019-05-08") } } },
  3. { $project: { _id: { $dateToString: { format: "%Y-%m", date: "$date" } }, thumbsup: 1, thumbsdown: 1 } },
  4. { $merge: {
  5. into: "monthlytotals",
  6. on: "_id",
  7. whenMatched: [
  8. { $addFields: {
  9. thumbsup: { $add:[ "$thumbsup", "$$new.thumbsup" ] },
  10. thumbsdown: { $add: [ "$thumbsdown", "$$new.thumbsdown" ] }
  11. } } ],
  12. whenNotMatched: "insert"
  13. } }
  14. ])
  • First stage:
  • The $match stage finds the specific day’s votes. Forexample:
  1. { "_id" : ObjectId("5ce6097c436eb7e1203064a6"), "date" : ISODate("2019-05-07T00:00:00Z"), "thumbsup" : 14, "thumbsdown" : 10 }
  • Second stage:
  • The $project stage sets the _id field to ayear-month string. For example:
  1. { "thumbsup" : 14, "thumbsdown" : 10, "_id" : "2019-05" }
  • Third stage:
  • The merge stage writes the documents to themonthlytotals collection in the same database. If the stagefinds an existing document in the collection that matcheson the _id field, the stage uses a pipeline toadd the thumbsup votes and the thumbsdown votes.

    • This pipeline cannot directly accesses the fields from theresults document. To access the thumbsup field and thethumbsdown field in the results document, the pipeline usesthe $$new variable; i.e. $$new.thumbsup and$new.thumbsdown.
    • This pipeline can directly accesses the thumbsup fieldand the thumbsdown field in the existing document in thecollection; i.e. $thumbsup and $thumbsdown.The resulting document replaces the existing document.

To view documents in the monthlytotals collection after the mergeoperation, run the following operation:

  1. db.monthlytotals.find()

The collection contains the following document:

  1. { "_id" : "2019-05", "thumbsup" : 40, "thumbsdown" : 41 }