Reactive Scala Driver for MongoDB
Asynchronous & Non-Blocking
Aggregation Framework
The MongoDB Aggregation Framework is available through ReactiveMongo.
$addFields
: Adds new fields to documents.$bucketAuto
: Categorizes incoming documents into a specific number of groups, called buckets, based on a specified expression.$count
: Count the input documents.$facet
: Create multi-faceted aggregations.$filter
: Selects a subset of an array to return based on the specified condition.$limit
: Passes the first n documents unmodified to the pipeline where n is the specified limit (API).$match
: Filters the document stream to allow only matching documents (API).$project
: Reshapes each document in the stream, such as by adding new fields or removing existing fields (API).$redact
: Reshapes each document in the stream by restricting the content for each document based on information stored in the documents themselves (API).$skip
: Skips the first n documents where n is the specified skip number and passes the remaining documents unmodified to the pipeline (API).$unwind
: Deconstructs an array field from the input documents to output a document for each element (API).$group
: Groups the input documents by a specified identifier expression and possibly applies some accumulators (API).$sum
: Sum of numerical values.$avg
: Average of numerical values.$first
: Value from the first document for each group.$last
: Value from the last document for each group.$max
: Highest expression value for each group.$min
: Lowest expression value for each group.$push
: Array of expression values for each group.$addToSet
: Array of unique expression values for each group.$stdDevPop
: Population standard deviation of the input values.$stdDevSamp
: Sample standard deviation of the input values.
$sample
: Randomly selects the specified number of documents from its input (API).$sort
: Reorders the document stream by a specified sort key (API).$geoNear
: Returns an ordered stream of documents based on the proximity to a geospatial point (API).$lookup
: Performs a left outer join to another collection in the same database (API).$out
: Writes the resulting documents of the aggregation pipeline to a collection (API).$indexStats
: Returns statistics regarding the use of each index for the collection (API).$replaceRoot
: Promotes a specified document to the top level and replaces all other fields.$search
: Applies the Atlas Search.$slice
: Returns a subset of an array.
Zip codes
Considering there is a zipcodes
collection in a MongoDB, with the following documents.
[
{ '_id': "10280", 'city': "NEW YORK", 'state': "NY",
'population': 19746227, 'location': {'lon':-74.016323, 'lat':40.710537} },
{ '_id': "72000", 'city': "LE MANS", 'state': "FR",
'population': 148169, 'location': {'long':48.0077, 'lat':0.1984}},
{ '_id': "JP-13", 'city': "TOKYO", 'state': "JP",
'population': 13185502, 'location': {'lon':35.683333, 'lat':139.683333} },
{ '_id': "AO", 'city': "AOGASHIMA", 'state': "JP",
'population': 200, 'location': {'lon':32.457, 'lat':139.767} }
]
Distinct state
The distinct
command is available to find the distinct values for a specified field across a single collection.
In the MongoDB shell, such command can be used to find the distinct states from the zipcodes
collection, with results "NY"
, "FR"
, and "JP"
.
db.runCommand({ distinct: "state" })
Using the ReactiveMongo API, it can be done with the corresponding collection operation.
import scala.concurrent.{ ExecutionContext, Future }
import reactivemongo.api.bson.collection.BSONCollection
def distinctStates(col: BSONCollection)(implicit ec: ExecutionContext): Future[Set[String]] = col.distinct[String, Set]("state")
States with population above 10000000
It’s possible to determine the states for which the sum of the population of their cities is above 10000000, by grouping the documents by their state, then for each group calculating the sum of the population values, and finally get only the grouped documents whose population sum matches the filter “above 10000000”.
In the MongoDB shell, such aggregation is written as bellow (see the example).
db.zipcodes.aggregate([
{ $group: { _id: "$state", totalPop: { $sum: "$pop" } } },
{ $match: { totalPop: { $gte: 10000000 } } }
])
With ReactiveMongo, it can be done using the .aggregatorContext
.
import scala.concurrent.Future
import scala.concurrent.ExecutionContext.Implicits.global
import reactivemongo.api.bson.{ BSONDocument, BSONString }
import reactivemongo.api.Cursor
import reactivemongo.api.bson.collection.BSONCollection
def populatedStates1(coll: BSONCollection): Future[List[BSONDocument]] = {
import coll.AggregationFramework.{ Group, Match, SumField }
coll.aggregatorContext[BSONDocument](
pipeline = List(
Group(BSONString("$state"))( "totalPop" -> SumField("population")),
Match(BSONDocument("totalPop" -> BSONDocument("$gte" -> 10000000L))))).
prepared.cursor.collect[List]()
}
The local
import coll.AggregationFramework._
is required, and cannot be replaced by a global staticimport reactivemongo.api.bson.collection.BSONCollection.AggregationFramework._
.
Then when calling populatedStates(theZipCodeCol)
, the asynchronous result will be as bellow.
[
{ "_id" -> "JP", "totalPop" -> 13185702 },
{ "_id" -> "NY", "totalPop" -> 19746227 }
]
Note that for the state “JP”, the population of Aogashima (200) and of Tokyo (13185502) have been summed.
If the goal is only to count the populated states, the $count
stage can be used.
import scala.concurrent.Future
import scala.concurrent.ExecutionContext.Implicits.global
import reactivemongo.api.bson.{ BSONDocument, BSONDocumentReader, BSONString }
import reactivemongo.api.bson.collection.BSONCollection
def countPopulatedStates1(coll: BSONCollection): Future[Int] = {
implicit val countReader = BSONDocumentReader.from[Int] { doc =>
doc.getAsTry[Int]("popCount")
}
coll.aggregateWith[Int]() { framework =>
import framework.{ Count, Group, Match, SumField }
List(
Group(BSONString("$state"))("totalPop" -> SumField("population")),
Match(BSONDocument("totalPop" -> BSONDocument("$gte" -> 10000000L))),
Count("popCount"))
}.head
}
As for the other commands in ReactiveMongo, it’s possible to return the aggregation result as custom types (see BSON readers), rather than generic documents, for example considering a class State
as bellow.
import scala.concurrent.Future
import scala.concurrent.ExecutionContext.Implicits.global
import reactivemongo.api.bson._
import reactivemongo.api.Cursor
import reactivemongo.api.bson.collection.BSONCollection
case class State(name: String, population: Long)
implicit val reader = Macros.reader[State]
def populatedStates2(coll: BSONCollection): Future[List[State]] = {
import coll.AggregationFramework.{ Group, Match, SumField }
coll.aggregatorContext[State]( // <--
pipeline = List(
Group(BSONString("$state"))( "totalPop" -> SumField("population")),
Match(BSONDocument(
"totalPop" -> BSONDocument("$gte" -> 10000000L))))).
prepared.cursor.collect[List]()
}
Using cursor:
The alternative .aggregateWith
builder can be used, to process the aggregation result with a Cursor
.
import scala.concurrent.ExecutionContext
import reactivemongo.api.bson._
import reactivemongo.api.Cursor
import reactivemongo.api.bson.collection.BSONCollection
def populatedStatesCursor(cities: BSONCollection)(implicit ec: ExecutionContext): Cursor[BSONDocument] =
cities.aggregateWith[BSONDocument]() { framework =>
import framework.{ Group, Match, SumField }
List(
Group(BSONString("$state"))("totalPop" -> SumField("population")),
Match(document("totalPop" -> document("$gte" -> 10000000L)))
)
}
The GroupField
operator can be used instead of the Group
one, to simply work with a single field.
In the previous example the dependent import
myCol.AggregationFramework._
are replaced by a simpler import using the instance of the aggregation framework provided by the builder:import framework.{ Group, Match, SumField }
Most populated city per state
The $max
can be used to get the most populated site per state.
In the MongoDB shell, it would be executed as following.
db.zipcodes.aggregate([
{ $group: { _id: "$state", maxPop: { $max: "$population" } } }
])
It will return a result as bellow.
[
{ _id: "JP", maxPop: 13185502 },
{ _id: "FR", maxPop: 148169 }
{ _id: "NY", maxPop: 19746227 }
]
Using ReactiveMongo:
import scala.concurrent.{ ExecutionContext, Future }
import reactivemongo.api.bson._
import reactivemongo.api.Cursor
import reactivemongo.api.bson.collection.BSONCollection
def mostPopulated(cities: BSONCollection)(implicit ec: ExecutionContext): Future[List[BSONDocument]] = {
import cities.AggregationFramework.{ Group, MaxField }
cities.aggregatorContext[BSONDocument](
pipeline = List(Group(BSONString("$state"))(
"maxPop" -> MaxField("population")
))).prepared.cursor.collect[List]()
}
Similarly, the $min
accumulator can be used to get the least populated cities.
import scala.concurrent.{ ExecutionContext, Future }
import reactivemongo.api.bson._
import reactivemongo.api.Cursor
import reactivemongo.api.bson.collection.BSONCollection
def leastPopulated(cities: BSONCollection)(implicit ec: ExecutionContext): Future[List[BSONDocument]] = {
import cities.AggregationFramework.{ Group, MinField }
cities.aggregatorContext[BSONDocument](
pipeline = List(Group(BSONString("$state"))(
"minPop" -> MinField("population")
))).prepared.cursor.collect[List]()
}
The
Min
and theMax
operators can be used instead ofMinField
andMaxField
, to use expressions in place of single fields.
Gather the city names per state as a simple array
The $push
accumulator can be used to gather some fields, so there is a computed array for each group.
In the MongoDB shell, it can be done as bellow.
db.zipcodes.aggregate([
{ $group: { _id: "$state", cities: { $push: "$city" } } }
])
It will return the aggregation results:
[
{ _id: "JP", cities: [ "TOKYO", "AOGASHIMA" ] },
{ _id: "FR", cities: [ "LE MANS" ] },
{ _id: "NY", cities: [ "NEW YORK" ] }
}
import scala.concurrent.{ ExecutionContext, Future }
import reactivemongo.api.bson._
import reactivemongo.api.Cursor
import reactivemongo.api.bson.collection.BSONCollection
def citiesPerState1(cities: BSONCollection)(implicit ec: ExecutionContext): Future[List[BSONDocument]] = {
import cities.AggregationFramework.{ Group, PushField }
cities.aggregatorContext[BSONDocument](
pipeline = List(Group(BSONString("$state"))(
"cities" -> PushField("city")))).
prepared.cursor.collect[List]()
}
Similarly the $addToSet
accumulator can be applied to collect all the unique values in the array for each group (there it’s equivalent to $push
).
import scala.concurrent.{ ExecutionContext, Future }
import reactivemongo.api.bson._
import reactivemongo.api.Cursor
import reactivemongo.api.bson.collection.BSONCollection
def citiesPerState1(cities: BSONCollection)(implicit ec: ExecutionContext): Future[List[BSONDocument]] = {
import cities.AggregationFramework.{ Group, AddFieldToSet }
cities.aggregatorContext[BSONDocument](Group(BSONString("$state"))(
"cities" -> AddFieldToSet("city"))).prepared.cursor.
collect[List](-1, Cursor.FailOnError[List[BSONDocument]]())
}
The
AddToSet
and thePush
operators can be used instead ofAddFieldToSet
andPushField
, to use expressions in place of single fields.
Average city population by state
The accumulator $avg
can be used to find the average population of the cities by state.
In the MongoDB shell, it can be done as following.
db.zipcodes.aggregate([
{ $group: { _id: { state: "$state", city: "$city" }, pop: { $sum: "$pop" } } },
{ $group: { _id: "$_id.state", avgCityPop: { $avg: "$pop" } } }
])
- Group the documents by the combination of city and state, to get intermediate documents of the form
{ "_id" : { "state" : "NY", "city" : "NEW YORK" }, "pop" : 19746227 }
. - Group the intermediate documents by the
_id.state
field (i.e. the state field inside the_id
document), and get the average of population of each group ($avg: "$pop"
).
Using ReactiveMongo, it can be written as bellow.
import scala.concurrent.{ ExecutionContext, Future }
import reactivemongo.api.bson.{ BSONDocument, BSONString }
import reactivemongo.api.Cursor
import reactivemongo.api.bson.collection.BSONCollection
def avgPopByState(col: BSONCollection)(implicit ec: ExecutionContext): Future[List[BSONDocument]] = {
import col.AggregationFramework.{ AvgField, Group, SumField }
col.aggregatorContext[BSONDocument](pipeline = List(
Group(BSONDocument("state" -> "$state", "city" -> "$city"))(
"pop" -> SumField("population")),
Group(BSONString("$_id.state"))("avgCityPop" -> AvgField("pop")))).
prepared.cursor.collect[List]()
}
The
Avg
operator can be used instead ofAvgField
, to use an expression in place of a single field.
Largest and smallest cities by state
Aggregating the documents can be used to find the largest and the smallest cities for each state:
db.zipcodes.aggregate([
{ $group:
{
_id: { state: "$state", city: "$city" },
pop: { $sum: "$pop" }
}
},
{ $sort: { pop: 1 } },
{ $group:
{
_id : "$_id.state",
biggestCity: { $last: "$_id.city" },
biggestPop: { $last: "$pop" },
smallestCity: { $first: "$_id.city" },
smallestPop: { $first: "$pop" }
}
},
// the following $project is optional, and
// modifies the output format.
{ $project:
{ _id: 0,
state: "$_id",
biggestCity: { name: "$biggestCity", pop: "$biggestPop" },
smallestCity: { name: "$smallestCity", pop: "$smallestPop" }
}
}
])
A ReactiveMongo function can be written as bellow.
import scala.concurrent.Future
import scala.concurrent.ExecutionContext.Implicits.global
import reactivemongo.api.bson.{ BSONDocument, BSONString, Macros }
import reactivemongo.api.Cursor
import reactivemongo.api.bson.collection.BSONCollection
case class City(name: String, population: Long)
object City {
implicit val reader = Macros.reader[City]
}
case class StateStats(state: String, biggestCity: City, smallestCity: City)
object StateStats {
implicit val reader = Macros.reader[StateStats]
}
def stateStats(col: BSONCollection): Future[List[StateStats]] = {
import col.AggregationFramework.{
Ascending, FirstField, Group, LastField, Project, Sort, SumField
}
col.aggregatorContext[StateStats](pipeline = List(
Group(BSONDocument("state" -> "$state", "city" -> "$city"))(
"pop" -> SumField("population")),
Sort(Ascending("population")), Group(BSONString("$_id.state"))(
"biggestCity" -> LastField("_id.city"),
"biggestPop" -> LastField("pop"),
"smallestCity" -> FirstField("_id.city"),
"smallestPop" -> FirstField("pop")),
Project(BSONDocument("_id" -> 0, "state" -> "$_id",
"biggestCity" -> BSONDocument("name" -> "$biggestCity",
"population" -> "$biggestPop"),
"smallestCity" -> BSONDocument("name" -> "$smallestCity",
"population" -> "$smallestPop"))))).
prepared.cursor.
collect[List](-1, Cursor.FailOnError[List[StateStats]]())
}
This function would return statistics like the following.
List(
StateStats(state = "NY",
biggestCity = City(name = "NEW YORK", population = 19746227L),
smallestCity = City(name = "NEW YORK", population = 19746227L)),
StateStats(state = "FR",
biggestCity = City(name = "LE MANS", population = 148169L),
smallestCity = City(name = "LE MANS", population = 148169L)),
StateStats(state = "JP",
biggestCity = City(name = "TOKYO", population = 13185502L),
smallestCity = City(name = "AOGASHIMA", population = 200L)))
The
First
and theLast
operators can be used instead ofFirstField
andLastField
, to use expressions in place of single fields.
The $limit
or the $skip
stages can be used to consider only some states:
import scala.concurrent.Future
import scala.concurrent.ExecutionContext.Implicits.global
import reactivemongo.api.bson._
import reactivemongo.api.Cursor
import reactivemongo.api.bson.collection.BSONCollection
def paginatedStats(col: BSONCollection, max: Int, offset: Int = 0): Future[List[StateStats]] = {
import col.AggregationFramework.{
Ascending, FirstField, Group, LastField, Limit,
Project, Skip, Sort, SumField
}
col.aggregatorContext[StateStats](pipeline = List(
Group(BSONDocument("state" -> "$state", "city" -> "$city"))(
"pop" -> SumField("population")),
Skip(offset), // <-- skip some states if offset > 0
Limit(max), // <-- limit the state groups
Sort(Ascending("population")),
Group(BSONString("$_id.state"))(
"biggestCity" -> LastField("_id.city"),
"biggestPop" -> LastField("pop"),
"smallestCity" -> FirstField("_id.city"),
"smallestPop" -> FirstField("pop")),
Project(BSONDocument("_id" -> 0, "state" -> "$_id",
"biggestCity" -> BSONDocument("name" -> "$biggestCity",
"population" -> "$biggestPop"),
"smallestCity" -> BSONDocument("name" -> "$smallestCity",
"population" -> "$smallestPop"))))).
prepared.cursor.collect[List]()
}
Standard deviation of the japanese cities
The group accumulators $stdDevPop
and $stdDevSamp
can be used to find the standard deviation of the japanese cities.
In the MongoDB, it can be done as following.
db.zipcodes.aggregate([
{ $group:
{
_id: "$state",
popDev: { $stdDevPop: "$population" }
}
},
{ $match: { _id: "JP" } }
])
It will find the result:
{ _id: "JP", popDev: 6592651 }
It can be done with ReactiveMongo as bellow.
import scala.concurrent.{ ExecutionContext, Future }
import reactivemongo.api.bson._
import reactivemongo.api.bson.collection.BSONCollection
def populationStdDeviation(cities: BSONCollection)(implicit ec: ExecutionContext): Future[Option[BSONDocument]] = {
import cities.AggregationFramework.{ StdDevPopField, Group, Match }
cities.aggregatorContext[BSONDocument](pipeline = List(
Group(BSONString("$state"))("popDev" -> StdDevPopField("population")),
Match(document("_id" -> "JP")))).prepared.cursor.headOption
}
def populationSampleDeviation(cities: BSONCollection)(implicit ec: ExecutionContext): Future[Option[BSONDocument]] = {
import cities.AggregationFramework.{ StdDevSampField, Group, Match }
cities.aggregatorContext[BSONDocument](pipeline = List(
Group(BSONString("$state"))("popDev" -> StdDevSampField("population")),
Match(document("_id" -> "JP")))).prepared.cursor.headOption
}
The
StdDevPop
and theStdDevSamp
operators can be used instead ofStdDevPopField
andStdDevSampField
, to use expressions in place of single fields.
Find documents using text indexing
Consider the following text indexes is maintained for the fields city
and state
of the zipcodes
collection.
db.zipcodes.ensureIndex({ city: "text", state: "text" })
Then it’s possible to find documents using the $text
operator, and also the results can be sorted according the text scores.
For example to find the documents matching the text "JP"
, and sort according the text score, the following query can be executed in the MongoDB shell.
db.users.aggregate([
{ $match: { $text: { $search: "JP" } } },
{ $sort: { score: { $meta: "textScore" } } }
])
A ReactiveMongo function can be written as bellow.
import scala.concurrent.Future
import scala.concurrent.ExecutionContext.Implicits.global
import reactivemongo.api.bson.BSONDocument
import reactivemongo.api.Cursor
import reactivemongo.api.bson.collection.BSONCollection
def textFind(coll: BSONCollection): Future[List[BSONDocument]] = {
import coll.AggregationFramework.{ Match, MetadataSort, Sort, TextScore }
val pipeline = List(
Match(BSONDocument("$text" -> BSONDocument("$search" -> "JP"))),
Sort(MetadataSort("score", TextScore)))
coll.aggregatorContext[BSONDocument](pipeline).
prepared.cursor.collect[List]()
}
This will return the sorted documents for the cities TOKYO
and AOGASHIMA
.
Random sample
The $sample
aggregation stage can be used (since MongoDB 3.2), in order to randomly selects documents.
In the MongoDB shell, it can be used as following to fetch a sample of 3 random documents.
db.zipcodes.aggregate([
{ $sample: { size: 3 } }
])
With ReactiveMongo, the $sample
operator can be used as follows.
import scala.concurrent.{ ExecutionContext, Future }
import reactivemongo.api.bson.BSONDocument
import reactivemongo.api.Cursor
import reactivemongo.api.bson.collection.BSONCollection
def randomZipCodes(coll: BSONCollection)(implicit ec: ExecutionContext): Future[List[BSONDocument]] = {
import coll.AggregationFramework.Sample
coll.aggregatorContext[BSONDocument](List(Sample(3))).
prepared.cursor.collect[List]()
}
Population buckets
Since MongoDB 3.4, Using the $bucketAuto
stage can be used in the MongoShell, to group the cities according their population, in an expected number of group so called ‘buckets’ (bellow 2).
db.zipcodes.aggregate([
{ $bucketAuto: { groupBy: '$population', buckets: 2 } }
])
Such aggregation with return the following results.
{ "_id" : { "min" : 200, "max" : 13185502 }, "count" : 2 }
{ "_id" : { "min" : 13185502, "max" : 19746227 }, "count" : 2 }
This stage $bucketAuto
can be used in ReactiveMongo as bellow.
import scala.concurrent.ExecutionContext
import reactivemongo.api.bson._
import reactivemongo.api.Cursor
import reactivemongo.api.bson.collection.BSONCollection
def populationBuckets(zipcodes: BSONCollection)(implicit ec: ExecutionContext) =
zipcodes.aggregateWith[BSONDocument]() { framework =>
import framework.BucketAuto
List(BucketAuto(BSONString(f"$$population"), 2, None)())
}.collect[Set](Int.MaxValue, Cursor.FailOnError[Set[BSONDocument]]())
Places
Let consider a collection of different kinds of place (e.g. Central Park …), with their locations indexed using 2dsphere
.
This can be setup with the MongoDB shell as follows.
db.place.createIndex({'loc':"2dsphere"});
db.place.insert({
"type": "public",
"loc": {
"type": "Point", "coordinates": [-73.97, 40.77]
},
"name": "Central Park",
"category": "Parks"
});
db.place.insert({
"type": "public",
"loc": {
"type": "Point", "coordinates": [-73.88, 40.78]
},
"name": "La Guardia Airport",
"category": "Airport"
});
The $geoNear
aggregation can be used on the collection, to find the place near the geospatial coordinates [ -73.9667, 40.78 ]
, within 1 km (1000 meters) and 5 km (5000 meters)
db.places.aggregate([{
$geoNear: {
near: { type: "Point", coordinates: [ -73.9667, 40.78 ] },
distanceField: "dist.calculated",
minDistance: 1000,
maxDistance: 5000,
query: { type: "public" },
includeLocs: "dist.location",
num: 5,
spherical: true
}
}])
The results will be of the following form:
{
"type": "public",
"loc": {
"type": "Point",
"coordinates": [ -73.97, 40.77 ]
},
"name": "Central Park",
"category": "Parks",
"dist": {
"calculated": 1147.4220523120696,
"loc": {
"type": "Point",
"coordinates": [ -73.97, 40.77 ]
}
}
}
It can be done with ReactiveMongo as follows.
import scala.concurrent.{ ExecutionContext, Future }
import reactivemongo.api.bson.{ array, document, Macros }
import reactivemongo.api.Cursor
import reactivemongo.api.bson.collection.BSONCollection
case class GeoPoint(coordinates: List[Double])
case class GeoDistance(calculated: Double, loc: GeoPoint)
case class GeoPlace(
loc: GeoPoint,
name: String,
category: String,
dist: GeoDistance
)
object GeoPlace {
implicit val pointReader = Macros.reader[GeoPoint]
implicit val distanceReader = Macros.reader[GeoDistance]
implicit val placeReader = Macros.reader[GeoPlace]
}
def placeArround(places: BSONCollection)(implicit ec: ExecutionContext): Future[List[GeoPlace]] = {
import places.AggregationFramework.GeoNear
places.aggregatorContext[GeoPlace](List(GeoNear(document(
"type" -> "Point",
"coordinates" -> array(-73.9667, 40.78)
), distanceField = Some("dist.calculated"),
minDistance = Some(1000),
maxDistance = Some(5000),
query = Some(document("type" -> "public")),
includeLocs = Some("dist.loc"),
limit = Some(5),
spherical = true))).prepared.cursor.collect[List]()
}
Forecast
Consider a collection of forecasts with the following document.
{
_id: 1,
title: "123 Department Report",
tags: [ "G", "STLW" ],
year: 2014,
subsections: [
{
subtitle: "Section 1: Overview",
tags: [ "SI", "G" ],
content: "Section 1: This is the content of section 1."
},
{
subtitle: "Section 2: Analysis",
tags: [ "STLW" ],
content: "Section 2: This is the content of section 2."
},
{
subtitle: "Section 3: Budgeting",
tags: [ "TK" ],
content: {
text: "Section 3: This is the content of section3.",
tags: [ "HCS" ]
}
}
]
}
Using the $redact
stage, the MongoDB aggregation can be used to restricts the contents of the documents. It can be done in the MongoDB shell as follows:
db.forecasts.aggregate([
{ $match: { year: 2014 } },
{
$redact: {
$cond: {
if: { $gt: [ { $size: {
$setIntersection: [ "$tags", [ "STLW", "G" ] ] } }, 0 ]
},
then: "$$DESCEND",
else: "$$PRUNE"
}
}
}
])
The corresponding results a redacted document.
{
"_id" : 1,
"title" : "123 Department Report",
"tags" : [ "G", "STLW" ],
"year" : 2014,
"subsections" : [
{
"subtitle" : "Section 1: Overview",
"tags" : [ "SI", "G" ],
"content" : "Section 1: This is the content of section 1."
},
{
"subtitle" : "Section 2: Analysis",
"tags" : [ "STLW" ],
"content" : "Section 2: This is the content of section 2."
}
]
}
With ReactiveMongo, the aggregation framework can perform a similar redaction.
import scala.concurrent.ExecutionContext
import reactivemongo.api.bson._
import reactivemongo.api.Cursor
import reactivemongo.api.bson.collection.BSONCollection
def redactForecasts(forecasts: BSONCollection)(implicit ec: ExecutionContext) = {
import forecasts.AggregationFramework.{ Match, Redact }
forecasts.aggregatorContext[BSONDocument](pipeline = List(
Match(document("year" -> 2014)),
Redact(document("$cond" -> document(
"if" -> document(
"$gt" -> array(document(
"$size" -> document("$setIntersection" -> array(
"$tags", array("STLW", "G")
))
), 0)
),
"then" -> "$$DESCEND",
"else" -> "$$PRUNE"
))))).prepared.cursor.collect[List]()
}
Inventory
Consider an inventory
collection, with the following document.
{ "_id" : 1, "item" : "ABC1", "sizes": [ "S", "M", "L"] }
The $unwind
stage can be used as bellow in the MongoDB shell, to return a document for each size.
db.inventory.aggregate( [ { $unwind : "$sizes" } ] )
It will return results as bellow.
{ "_id" : 1, "item" : "ABC1", "sizes" : "S" }
{ "_id" : 1, "item" : "ABC1", "sizes" : "M" }
{ "_id" : 1, "item" : "ABC1", "sizes" : "L" }
With ReactiveMongo, it can be done using Unwind
.
import scala.concurrent.ExecutionContext.Implicits.global
import reactivemongo.api.bson.BSONDocument
import reactivemongo.api.Cursor
import reactivemongo.api.bson.collection.BSONCollection
def sized(inventory: BSONCollection) = {
import inventory.AggregationFramework.UnwindField
inventory.aggregatorContext[BSONDocument](List(UnwindField("sizes"))).
prepared.cursor.collect[List]()
}
If there is a second price
collection, with the following document.
{ "_id" : 10, "item" : "ABC1", "price" : 12.34 }
This can be joined using the MongoDB shell, with the $lookup
stage.
db.inventory.aggregate([{
$lookup: {
from: "prices",
localField: "item",
foreignField: "item",
as: "prices"
}
}])
It will result in the document thereafter.
{
"_id" : 1,
"item" : "ABC1",
"sizes": [ "S", "M", "L" ],
"prices": [
{ "_id" : 10, "item" : "ABC1", "price" : 12.34 }
]
}
It can be done with ReactiveMongo using the Lookup
operator.
import scala.concurrent.ExecutionContext.Implicits.global
import reactivemongo.api.bson.BSONDocument
import reactivemongo.api.Cursor
import reactivemongo.api.bson.collection.BSONCollection
def priced(inventory: BSONCollection, prices: BSONCollection) = {
import prices.AggregationFramework.Lookup
prices.aggregatorContext[BSONDocument](
pipeline = List(Lookup(inventory.name, "item", "item", "prices"))
).prepared.cursor.collect[List]()
}
The $facet
allow to create multi-faceted aggregations, which characterize data across multiple dimensions (or facets); e.g.
db.inventory.aggregate([{
$facet: {
'foo': [
{ '$unwind': '$sizes' },
{ '$count': 'c' }
],
'lorem': [
{ '$out': 'item_stats' }
]
}
}])
Such aggregation can be executed with ReactiveMongo as bellow.
import scala.concurrent.{ ExecutionContext, Future }
import reactivemongo.api.Cursor
import reactivemongo.api.bson.BSONDocument
import reactivemongo.api.bson.collection.BSONCollection
def useFacetAgg(inventory: BSONCollection)(
implicit ec: ExecutionContext): Future[List[BSONDocument]] = {
import inventory.AggregationFramework.{ Count, Facet, Out, UnwindField }
val facet = Facet(Seq(
"foo" -> List(UnwindField("bar"), Count("c")),
"lorem" -> List(Out("ipsum"))))
inventory.aggregatorContext[BSONDocument](pipeline = List(facet)).
prepared.cursor.collect[List]()
}
Book library
Consider a collection books that contains the following documents.
{ "_id" : 8751, "title" : "The Banquet", "author" : "Dante", "copies" : 2 }
{ "_id" : 8752, "title" : "Divine Comedy", "author" : "Dante", "copies" : 1 }
{ "_id" : 8645, "title" : "Eclogues", "author" : "Dante", "copies" : 2 }
{ "_id" : 7000, "title" : "The Odyssey", "author" : "Homer", "copies" : 10 }
{ "_id" : 7020, "title" : "Iliad", "author" : "Homer", "copies" : 10 }
Then its documents can be aggregated and outputted to another collection, using the $out
stage.
import scala.concurrent.Future
import scala.concurrent.ExecutionContext.Implicits.global
import reactivemongo.api.bson.{ BSONDocument, BSONString }
import reactivemongo.api.bson.collection.BSONCollection
def outputBooks(books: BSONCollection, outColl: String): Future[Unit] = {
import books.AggregationFramework.{ Ascending, Group, PushField, Out, Sort }
books.aggregatorContext[BSONDocument](pipeline = List(
Sort(Ascending("title")),
Group(BSONString(f"$$author"))("books" -> PushField("title")),
Out(outColl))).
prepared.cursor.head.map(_ => {})
}
For the current example, the result collection will contain the following documents.
{ "_id" : "Homer", "books" : [ "Iliad", "The Odyssey" ] }
{ "_id" : "Dante", "books" : [ "Divine Comedy", "Eclogues", "The Banquet" ] }
Fruits
The $replaceRoot
promotes a specified document to the top level and replaces all other fields.
Consider a a collection of fruits as bellow.
{
"_id" : 1,
"fruit" : [ "apples", "oranges" ],
"in_stock" : { "oranges" : 20, "apples" : 60 },
"on_order" : { "oranges" : 35, "apples" : 75 }
}
{
"_id" : 2,
"vegetables" : [ "beets", "yams" ],
"in_stock" : { "beets" : 130, "yams" : 200 },
"on_order" : { "beets" : 90, "yams" : 145 }
}
The stage can be used in aggregation with MongoShell:
db.produce.aggregate([
{ $replaceRoot: { newRoot: "$in_stock" } }
])
It will return the following documents:
{ "oranges": 20, "apples": 60 }
{ "beets": 130, "yams": 200 }
It can be done using ReactiveMongo.
import scala.concurrent.Future
import scala.concurrent.ExecutionContext.Implicits.global
import reactivemongo.api.bson.BSONDocument
import reactivemongo.api.bson.collection.BSONCollection
def replaceRootTest(fruits: BSONCollection): Future[Option[BSONDocument]] = {
fruits.aggregateWith[BSONDocument]() { framework =>
import framework._
List(ReplaceRootField("in_stock"))
}.headOption
}
Sales
Consider a collection of sales as bellow.
{
_id: 0,
items: [
{ item_id: 43, quantity: 2, price: 10 },
{ item_id: 2, quantity: 1, price: 240 }
]
}
{
_id: 1,
items: [
{ item_id: 23, quantity: 3, price: 110 },
{ item_id: 103, quantity: 4, price: 5 },
{ item_id: 38, quantity: 1, price: 300 }
]
}
{
_id: 2,
items: [
{ item_id: 4, quantity: 1, price: 23 }
]
}
Using the aggregate stages $project
and $filter
(since 3.2), in the MongoShell it’s possible to filters the items array to only include documents that have a price greater than or equal to 100:
db.sales.aggregate([ {
$project: {
items: {
$filter: {
input: "$items",
as: "item",
cond: { $gte: [ "$$item.price", 100 ] }
}
}
}
} ])
The same can be done using ReactiveMongo:
import scala.concurrent.ExecutionContext.Implicits.global
import reactivemongo.api.bson.{ BSONArray, BSONDocument, BSONString }
import reactivemongo.api.Cursor
import reactivemongo.api.bson.collection.BSONCollection
def salesWithItemGreaterThanHundered(sales: BSONCollection) =
sales.aggregateWith[BSONDocument]() { framework =>
import framework._
List(
Project(BSONDocument("items" -> Filter(
input = BSONString(f"$$items"),
as = "item",
cond = BSONDocument(
f"$$gte" -> BSONArray(f"$$$$item.price", 100))))),
Sort(Ascending("_id")))
}.collect[List]()
Database indexes aggregation
The $indexStats
stage returns statistics regarding the use of each index for a collection.
import scala.concurrent.Future
import scala.concurrent.ExecutionContext.Implicits.global
import reactivemongo.api.Cursor
import reactivemongo.api.bson.collection.BSONCollection
def aggregateIndexes(coll: BSONCollection) = {
import coll.AggregationFramework.{
Ascending, IndexStats, IndexStatsResult, Sort
}
val result: Future[List[IndexStatsResult]] =
coll.aggregatorContext[IndexStatsResult](
pipeline = List(IndexStats, Sort(Ascending("name")))).
prepared.cursor.collect[List]()
result
}
Students
Considering the following collection of students:
{
_id: 1,
student: "Maya",
homework: [ 10, 5, 10 ],
quiz: [ 10, 8 ],
extraCredit: 0
}
{
_id: 2,
student: "Ryan",
homework: [ 5, 6, 5 ],
quiz: [ 8, 8 ],
extraCredit: 8
}
Then it’s possible to sum the homework
and quiz
arrays using the $addFields
as bellow in the MongoShell.
db.scores.aggregate([
{
$addFields: {
totalHomework: { $sum: "$homework" } ,
totalQuiz: { $sum: "$quiz" }
}
},
{
$addFields: { totalScore:
{ $add: [ "$totalHomework", "$totalQuiz", "$extraCredit" ] } }
}
])
It can be done using ReactiveMongo:
import scala.concurrent.ExecutionContext
import reactivemongo.api.collections.BSONCollection
def sumHomeworkQuizz(students: BSONCollection) =
students.aggregateWith[BSONDocument]() { framework =>
import framework.AddFields
AddFields(document(
"totalHomework" -> document(f"$$sum" -> f"$$homework"),
"totalQuiz" -> document(f"$$sum" -> f"$$quiz"))) -> List(
AddFields(document(
"totalScore" -> document(f"$$add" -> array(
f"$$totalHomework", f"$$totalQuiz", f"$$extraCredit")))))
}
Users
Consider the following user collection.
{ "_id" : 1, "name" : "dave123", favorites: [ "chocolate", "cake", "butter", "apples" ] }
{ "_id" : 2, "name" : "li", favorites: [ "apples", "pudding", "pie" ] }
{ "_id" : 3, "name" : "ahn", favorites: [ "pears", "pecans", "chocolate", "cherries" ] }
{ "_id" : 4, "name" : "ty", favorites: [ "ice cream" ] }
The favorites of each user can be sliced to keep only the 3 top favorites:
db.users.aggregate([
{ $project: { name: 1, favorites: { $slice: [ "$favorites", 3 ] } } }
])
It’s also supported by ReactiveMongo as bellow.
import scala.concurrent.ExecutionContext
import reactivemongo.api.bson._
import reactivemongo.api.Cursor
import reactivemongo.api.bson.collection.BSONCollection
def sliceFavorites(coll: BSONCollection)(implicit ec: ExecutionContext) =
coll.aggregateWith[BSONDocument]() { framework =>
import framework.{ Project, Slice }
List(Project(BSONDocument(
"name" -> 1,
"favorites" -> Slice(
array = BSONString(f"$$favorites"),
n = BSONInteger(3)))))
}.collect[Seq](4, Cursor.FailOnError[Seq[BSONDocument]]())
Change stream
Since MongoDB 3.6, it’s possible to watch the changes applied on a collection.
Using ReactiveMongo, a pipeline can be used to aggregate the change stream.
import reactivemongo.api.Cursor
import reactivemongo.api.bson.BSONDocument
import reactivemongo.api.bson.collection.BSONCollection
def filteredWatch(
coll: BSONCollection,
filter: BSONDocument): Cursor[BSONDocument] = {
import coll.AggregationFramework.{ Match, PipelineOperator }
coll.watch[BSONDocument](
pipeline = List[PipelineOperator](Match(filter))).
cursor[Cursor.WithOps]
}
Atlas Search
When using MongoDB Atlas, the specific aggregation stage $search
is available to apply advanced search features.
For example with a fruit
collection on MongoDB Atlas with the following documents:
{
"_id" : 1,
"type" : "apple",
"description" : "Apples come in several varieties, including Fuji, Granny Smith, and Honeycrisp."
},
{
"_id" : 2,
"type" : "banana",
"description" : "Bananas are usually sold in bunches of five or six."
}
In the MongoDB shell, the search features can be used with a term query as below.
db.fruit.aggregate([
{
$search: {
"term": {
"query": "s*l*",
"path": "description",
"wildcard": true
}
}
}
])
In ReactiveMongo, such search aggregation can be applied using the corresponding API.
import scala.concurrent.{ ExecutionContext, Future }
import reactivemongo.api.Cursor
import reactivemongo.api.bson.BSONDocument
import reactivemongo.api.bson.collection.BSONCollection
def foo(col: BSONCollection)(
implicit ec: ExecutionContext): Future[List[BSONDocument]] = {
import col.AggregationFramework.AtlasSearch, AtlasSearch.Term
col.aggregatorContext[BSONDocument](pipeline = List(AtlasSearch(Term(
path = "description",
query = "s*l*",
modifier = Some(Term.Wildcard) // wildcard: true
)))).prepared.cursor.collect[List]()
}
See also:
- The operators available to define an aggregation pipeline are documented in the API reference.
- The Aggregation Framework tests