Reactive Scala Driver for MongoDB
Asynchronous & Non-Blocking
Aggregation Framework
The MongoDB Aggregation Framework is available through ReactiveMongo.
$project
: Reshapes each document in the stream, such as by adding new fields or removing existing fields (API).$match
: Filters the document stream to allow only matching documents (API).$redact
: Reshapes each document in the stream by restricting the content for each document based on information stored in the documents themselves (API).$limit
: Passes the first n documents unmodified to the pipeline where n is the specified limit (API).$skip
: Skips the first n documents where n is the specified skip number and passes the remaining documents unmodified to the pipeline (API).$unwind
: Deconstructs an array field from the input documents to output a document for each element (API).$group
: Groups the input documents by a specified identifier expression and possibly applies some accumulators (API).$sum
: Sum of numerical values.$avg
: Average of numerical values.$first
: Value from the first document for each group.$last
: Value from the last document for each group.$max
: Highest expression value for each group.$min
: Lowest expression value for each group.$push
: Array of expression values for each group.$addToSet
: Array of unique expression values for each group.$stdDevPop
: Population standard deviation of the input values.$stdDevSamp
: Sample standard deviation of the input values.
$sample
: Randomly selects the specified number of documents from its input (API).$sort
: Reorders the document stream by a specified sort key (API).$geoNear
: Returns an ordered stream of documents based on the proximity to a geospatial point (API).$lookup
: Performs a left outer join to another collection in the same database (API).$out
: Writes the resulting documents of the aggregation pipeline to a collection (API).$indexStats
: Returns statistics regarding the use of each index for the collection (API).
Zip codes example
Considering there is a zipcodes
collection in a MongoDB, with the following documents.
[
{ '_id': "10280", 'city': "NEW YORK", 'state': "NY",
'population': 19746227, 'location': {'lon':-74.016323, 'lat':40.710537} },
{ '_id': "72000", 'city': "LE MANS", 'state': "FR",
'population': 148169, 'location': {'long':48.0077, 'lat':0.1984}},
{ '_id': "JP-13", 'city': "TOKYO", 'state': "JP",
'population': 13185502L, 'location': {'lon':35.683333, 'lat':139.683333} },
{ '_id': "AO", 'city': "AOGASHIMA", 'state': "JP",
'population': 200, 'location': {'lon':32.457, 'lat':139.767} }
]
Distinct state
The distinct
command is available to find the distinct values for a specified field across a single collection.
In the MongoDB shell, such command can be used to find the distinct states from the zipcodes
collection, with results "NY"
, "FR"
, and "JP"
.
db.runCommand({ distinct: "state" })
Using the ReactiveMongo API, it can be done with the corresponding collection operation.
import scala.concurrent.{ ExecutionContext, Future }
import reactivemongo.bson.BSONDocument
import reactivemongo.api.collections.bson.BSONCollection
def distinctStates(col: BSONCollection)(implicit ec: ExecutionContext): Future[Set[String]] = col.distinct[String, Set]("state")
States with population above 10000000
It’s possible to determine the states for which the sum of the population of their cities is above 10000000, by grouping the documents by their state, then for each group calculating the sum of the population values, and finally get only the grouped documents whose population sum matches the filter “above 10000000”.
In the MongoDB shell, such aggregation is written as bellow (see the example).
db.zipcodes.aggregate([
{ $group: { _id: "$state", totalPop: { $sum: "$pop" } } },
{ $match: { totalPop: { $gte: 10000000 } } }
])
With ReactiveMongo, it can be done using the .aggregate
operation.
import scala.concurrent.Future
import scala.concurrent.ExecutionContext.Implicits.global
import reactivemongo.bson.{ BSONDocument, BSONString }
import reactivemongo.api.collections.bson.BSONCollection
def populatedStates(col: BSONCollection): Future[List[BSONDocument]] = {
import col.BatchCommands.AggregationFramework.{
AggregationResult, Group, Match, SumField
}
val res: Future[AggregationResult] = col.aggregate(
Group(BSONString("$state"))( "totalPop" -> SumField("population")),
List(Match(BSONDocument("totalPop" -> BSONDocument("$gte" -> 10000000L)))))
res.map(_.documents)
}
The local
import col.BatchCommands.AggregationFramework._
is required, and cannot be replaced by a global staticimport reactivemongo.api.collections.BSONCollection.BatchCommands.AggregationFramework._
. The type.BatchCommands.AggregationFramework.AggregationResult
is a dependent one, used for the intermediary/MongoDB result, and must not be exposed as public return type in your application/API.
Then when calling populatedStates(theZipCodeCol)
, the asynchronous result will be as bellow.
[
{ "_id" -> "JP", "totalPop" -> 13185702 },
{ "_id" -> "NY", "totalPop" -> 19746227 }
]
Note that for the state “JP”, the population of Aogashima (200) and of Tokyo (13185502) have been summed.
As for the other commands in ReactiveMongo, it’s possible to return the aggregation result as custom types (see BSON readers), rather than generic documents, for example considering a class State
as bellow.
import scala.concurrent.Future
import scala.concurrent.ExecutionContext.Implicits.global
import reactivemongo.bson.Macros
import reactivemongo.api.collections.bson.BSONCollection
case class State(name: String, population: Long)
implicit val reader = Macros.reader[State]
def aggregate(col: BSONCollection): Future[col.BatchCommands.AggregationFramework.AggregationResult] = ???
def states(col: BSONCollection): Future[List[State]] =
aggregate(col).map(_.result[State])
Using cursor:
The alternative .aggregateWith
builder can be used, to process the aggregation result with a Cursor
.
import scala.concurrent.{ ExecutionContext, Future }
import reactivemongo.bson._
import reactivemongo.api.Cursor
import reactivemongo.api.collections.bson.BSONCollection
def populatedStatesCursor(cities: BSONCollection)(implicit ec: ExecutionContext): Cursor[BSONDocument] =
cities.aggregateWith[BSONDocument]() { framework =>
import framework.{ Group, Match, SumField }
Group(BSONString("$state"))("totalPop" -> SumField("population")) -> List(
Match(document("totalPop" -> document("$gte" -> 10000000L)))
)
}
The GroupField
operator can be used instead of the Group
one, to simply work with a single field.
In the previous example the dependent import
myCol.BatchCommands.AggregationFramework._
are replaced by a simpler import using the instance of the aggregation framework provided by the builder:import framework.{ Group, Match, SumField }
Most populated city per state
The $max
can be used to get the most populated site per state.
In the MongoDB shell, it would be executed as following.
db.zipcodes.aggregate([
{ $group: { _id: "$state", maxPop: { $max: "$population" } } }
])
It will return a result as bellow.
[
{ _id: "JP", maxPop: 13185502 },
{ _id: "FR", maxPop: 148169 }
{ _id: "NY", maxPop: 19746227 }
]
Using ReactiveMongo:
import scala.concurrent.{ ExecutionContext, Future }
import reactivemongo.bson._
import reactivemongo.api.collections.bson.BSONCollection
def mostPopulated(cities: BSONCollection)(implicit ec: ExecutionContext): Future[List[BSONDocument]] = {
import cities.BatchCommands.AggregationFramework
import AggregationFramework.{ Group, MaxField }
cities.aggregate(Group(BSONString("$state"))(
"maxPop" -> MaxField("population")
)).map(_.firstBatch)
}
Similarly, the $min
accumulator can be used to get the least populated cities.
import scala.concurrent.{ ExecutionContext, Future }
import reactivemongo.bson._
import reactivemongo.api.collections.bson.BSONCollection
def leastPopulated(cities: BSONCollection)(implicit ec: ExecutionContext): Future[List[BSONDocument]] = {
import cities.BatchCommands.AggregationFramework
import AggregationFramework.{ Group, MinField }
cities.aggregate(Group(BSONString("$state"))(
"minPop" -> MinField("population")
)).map(_.firstBatch)
}
The
Min
and theMax
operators can be used instead ofMinField
andMaxField
, to use expressions in place of single fields.
Gather the city names per state as a simple array
The $push
accumulator can be used to gather some fields, so there is a computed array for each group.
In the MongoDB shell, it can be done as bellow.
db.zipcodes.aggregate([
{ $group: { _id: "$state", cities: { $push: "$city" } } }
])
It will return the aggregation results:
[
{ _id: "JP", cities: [ "TOKYO", "AOGASHIMA" ] },
{ _id: "FR", cities: [ "LE MANS" ] },
{ _id: "NY", cities: [ "NEW YORK" ] }
}
import scala.concurrent.{ ExecutionContext, Future }
import reactivemongo.bson._
import reactivemongo.api.collections.bson.BSONCollection
def citiesPerState1(cities: BSONCollection)(implicit ec: ExecutionContext): Future[List[BSONDocument]] = {
import cities.BatchCommands.AggregationFramework.{ Group, PushField }
cities.aggregate(Group(BSONString("$state"))(
"cities" -> PushField("city"))).map(_.firstBatch)
}
Similarly the $addToSet
accumulator can be applied to collect all the unique values in the array for each group (there it’s equivalent to $push
).
import scala.concurrent.{ ExecutionContext, Future }
import reactivemongo.bson._
import reactivemongo.api.collections.bson.BSONCollection
def citiesPerState1(cities: BSONCollection)(implicit ec: ExecutionContext): Future[List[BSONDocument]] = {
import cities.BatchCommands.AggregationFramework.{ Group, AddFieldToSet }
cities.aggregate(Group(BSONString("$state"))(
"cities" -> AddFieldToSet("city"))).map(_.firstBatch)
}
The
AddToSet
and thePush
operators can be used instead ofAddFieldToSet
andPushField
, to use expressions in place of single fields.
Average city population by state
The accumulator $avg
can be used to find the average population of the cities by state.
In the MongoDB shell, it can be done as following.
db.zipcodes.aggregate([
{ $group: { _id: { state: "$state", city: "$city" }, pop: { $sum: "$pop" } } },
{ $group: { _id: "$_id.state", avgCityPop: { $avg: "$pop" } } }
])
- Group the documents by the combination of city and state, to get intermediate documents of the form
{ "_id" : { "state" : "NY", "city" : "NEW YORK" }, "pop" : 19746227 }
. - Group the intermediate documents by the
_id.state
field (i.e. the state field inside the_id
document), and get the average of population of each group ($avg: "$pop"
).
Using ReactiveMongo, it can be written as bellow.
import scala.concurrent.{ ExecutionContext, Future }
import reactivemongo.bson.{ BSONDocument, BSONString }
import reactivemongo.api.collections.bson.BSONCollection
def avgPopByState(col: BSONCollection)(implicit ec: ExecutionContext): Future[List[BSONDocument]] = {
import col.BatchCommands.AggregationFramework.{
AggregationResult, AvgField, Group, SumField
}
col.aggregate(Group(BSONDocument("state" -> "$state", "city" -> "$city"))(
"pop" -> SumField("population")),
List(Group(BSONString("$_id.state"))("avgCityPop" -> AvgField("pop")))).
map(_.documents)
}
The
Avg
operator can be used instead ofAvgField
, to use an expression in place of a single field.
Largest and smallest cities by state
Aggregating the documents can be used to find the largest and the smallest cities for each state:
db.zipcodes.aggregate([
{ $group:
{
_id: { state: "$state", city: "$city" },
pop: { $sum: "$pop" }
}
},
{ $sort: { pop: 1 } },
{ $group:
{
_id : "$_id.state",
biggestCity: { $last: "$_id.city" },
biggestPop: { $last: "$pop" },
smallestCity: { $first: "$_id.city" },
smallestPop: { $first: "$pop" }
}
},
// the following $project is optional, and
// modifies the output format.
{ $project:
{ _id: 0,
state: "$_id",
biggestCity: { name: "$biggestCity", pop: "$biggestPop" },
smallestCity: { name: "$smallestCity", pop: "$smallestPop" }
}
}
])
A ReactiveMongo function can be written as bellow.
import scala.concurrent.Future
import scala.concurrent.ExecutionContext.Implicits.global
import reactivemongo.bson.{ BSONDocument, BSONString, Macros }
import reactivemongo.api.collections.bson.BSONCollection
case class City(name: String, population: Long)
object City {
implicit val reader = Macros.reader[City]
}
case class StateStats(state: String, biggestCity: City, smallestCity: City)
object StateStats {
implicit val reader = Macros.reader[StateStats]
}
def stateStats(col: BSONCollection): Future[List[StateStats]] = {
import col.BatchCommands.AggregationFramework.{
AggregationResult, Ascending, FirstField, Group, LastField,
Project, Sort, SumField
}
col.aggregate(Group(BSONDocument("state" -> "$state", "city" -> "$city"))(
"pop" -> SumField("population")),
List(Sort(Ascending("population")), Group(BSONString("$_id.state"))(
"biggestCity" -> LastField("_id.city"),
"biggestPop" -> LastField("pop"),
"smallestCity" -> FirstField("_id.city"),
"smallestPop" -> FirstField("pop")),
Project(BSONDocument("_id" -> 0, "state" -> "$_id",
"biggestCity" -> BSONDocument("name" -> "$biggestCity",
"population" -> "$biggestPop"),
"smallestCity" -> BSONDocument("name" -> "$smallestCity",
"population" -> "$smallestPop"))))).
map(_.result[StateStats])
}
This function would return statistics like the following.
List(
StateStats(state = "NY",
biggestCity = City(name = "NEW YORK", population = 19746227L),
smallestCity = City(name = "NEW YORK", population = 19746227L)),
StateStats(state = "FR",
biggestCity = City(name = "LE MANS", population = 148169L),
smallestCity = City(name = "LE MANS", population = 148169L)),
StateStats(state = "JP",
biggestCity = City(name = "TOKYO", population = 13185502L),
smallestCity = City(name = "AOGASHIMA", population = 200L)))
The
First
and theLast
operators can be used instead ofFirstField
andLastField
, to use expressions in place of single fields.
The $limit
or the $skip
stages can be used to consider only some states:
import scala.concurrent.Future
import scala.concurrent.ExecutionContext.Implicits.global
import reactivemongo.bson._
import reactivemongo.api.collections.bson.BSONCollection
def paginatedStats(col: BSONCollection, max: Int, offset: Int = 0): Future[List[StateStats]] = {
import col.BatchCommands.AggregationFramework.{
AggregationResult, Ascending, FirstField, Group, LastField, Limit,
Project, Skip, Sort, SumField
}
col.aggregate(Group(BSONDocument("state" -> "$state", "city" -> "$city"))(
"pop" -> SumField("population")),
List(
Skip(offset), // <-- skip some states if offset > 0
Limit(max), // <-- limit the state groups
Sort(Ascending("population")),
Group(BSONString("$_id.state"))(
"biggestCity" -> LastField("_id.city"),
"biggestPop" -> LastField("pop"),
"smallestCity" -> FirstField("_id.city"),
"smallestPop" -> FirstField("pop")),
Project(BSONDocument("_id" -> 0, "state" -> "$_id",
"biggestCity" -> BSONDocument("name" -> "$biggestCity",
"population" -> "$biggestPop"),
"smallestCity" -> BSONDocument("name" -> "$smallestCity",
"population" -> "$smallestPop"))))).
map(_.result[StateStats])
}
Standard deviation of the japanese cities
The group accumulators $stdDevPop
and $stdDevSamp
can be used to find the standard deviation of the japanese cities.
In the MongoDB, it can be done as following.
db.zipcodes.aggregate([
{ $group:
{
_id: "$state",
popDev: { $stdDevPop: "$population" }
}
},
{ $match: { _id: "JP" } }
])
It will find the result:
{ _id: "JP", popDev: 6592651 }
It can be done with ReactiveMongo as bellow.
import scala.concurrent.{ ExecutionContext, Future }
import reactivemongo.bson._
import reactivemongo.api.collections.bson.BSONCollection
def populationStdDeviation(cities: BSONCollection)(implicit ec: ExecutionContext): Future[Option[BSONDocument]] = {
import cities.BatchCommands.AggregationFramework
import AggregationFramework.{ StdDevPopField, Group, Match }
cities.aggregate(Group(BSONString("$state"))(
"popDev" -> StdDevPopField("population")),
List(Match(document("_id" -> "JP")))).map(_.firstBatch.headOption)
}
def populationSampleDeviation(cities: BSONCollection)(implicit ec: ExecutionContext): Future[Option[BSONDocument]] = {
import cities.BatchCommands.AggregationFramework
import AggregationFramework.{ StdDevSampField, Group, Match }
cities.aggregate(Group(BSONString("$state"))(
"popDev" -> StdDevSampField("population")),
List(Match(document("_id" -> "JP")))).map(_.firstBatch.headOption)
}
The
StdDevPop
and theStdDevSamp
operators can be used instead ofStdDevPopField
andStdDevSampField
, to use expressions in place of single fields.
Find documents using text indexing
Consider the following text indexes is maintained for the fields city
and state
of the zipcodes
collection.
db.zipcodes.ensureIndex({ city: "text", state: "text" })
Then it’s possible to find documents using the $text
operator, and also the results can be sorted according the text scores.
For example to find the documents matching the text "JP"
, and sort according the text score, the following query can be executed in the MongoDB shell.
db.users.aggregate([
{ $match: { $text: { $search: "JP" } } },
{ $sort: { score: { $meta: "textScore" } } }
])
A ReactiveMongo function can be written as bellow.
import scala.concurrent.Future
import scala.concurrent.ExecutionContext.Implicits.global
import reactivemongo.bson.BSONDocument
import reactivemongo.api.collections.bson.BSONCollection
def textFind(coll: BSONCollection): Future[List[BSONDocument]] = {
import coll.BatchCommands.AggregationFramework
import AggregationFramework.{
Cursor,
Match,
MetadataSort,
Sort,
TextScore
}
val firstOp = Match(BSONDocument(
"$text" -> BSONDocument("$search" -> "JP")))
val pipeline = List(Sort(MetadataSort("score", TextScore)))
coll.aggregate1[BSONDocument](firstOp, pipeline).collect[List]()
}
This will return the sorted documents for the cities TOKYO
and AOGASHIMA
.
Random sample
The $sample aggregation stage can be used (since MongoDB 3.2), in order to randomly selects documents.
In the MongoDB shell, it can be used as following to fetch a sample of 3 random documents.
db.zipcodes.aggregate([
{ $sample: { size: 3 } }
])
With ReactiveMongo, the $sample
operator can be used as follows.
import scala.concurrent.{ ExecutionContext, Future }
import reactivemongo.bson.BSONDocument
import reactivemongo.api.collections.bson.BSONCollection
def randomZipCodes(coll: BSONCollection)(implicit ec: ExecutionContext): Future[List[BSONDocument]] = {
import coll.BatchCommands.AggregationFramework
coll.aggregate(AggregationFramework.Sample(3)).map(_.head[BSONDocument])
}
Places examples
Let consider a collection of different kinds of place (e.g. Central Park …), with their locations indexed using 2dsphere
.
This can be setup with the MongoDB shell as follows.
db.place.createIndex({'loc':"2dsphere"});
db.place.insert({
"type": "public",
"loc": {
"type": "Point", "coordinates": [-73.97, 40.77]
},
"name": "Central Park",
"category": "Parks"
});
db.place.insert({
"type": "public",
"loc": {
"type": "Point", "coordinates": [-73.88, 40.78]
},
"name": "La Guardia Airport",
"category": "Airport"
});
The $geoNear
aggregation can be used on the collection, to find the place near the geospatial coordinates [ -73.9667, 40.78 ]
, within 1 km (1000 meters) and 5 km (5000 meters)
db.places.aggregate([{
$geoNear: {
near: { type: "Point", coordinates: [ -73.9667, 40.78 ] },
distanceField: "dist.calculated",
minDistance: 1000,
maxDistance: 5000,
query: { type: "public" },
includeLocs: "dist.location",
num: 5,
spherical: true
}
}])
The results will be of the following form:
{
"type": "public",
"loc": {
"type": "Point",
"coordinates": [ -73.97, 40.77 ]
},
"name": "Central Park",
"category": "Parks",
"dist": {
"calculated": 1147.4220523120696,
"loc": {
"type": "Point",
"coordinates": [ -73.97, 40.77 ]
}
}
}
It can be done with ReactiveMongo as follows.
import scala.concurrent.{ ExecutionContext, Future }
import reactivemongo.bson.{ array, document, Macros }
import reactivemongo.api.collections.bson.BSONCollection
case class GeoPoint(coordinates: List[Double])
case class GeoDistance(calculated: Double, loc: GeoPoint)
case class GeoPlace(
loc: GeoPoint,
name: String,
category: String,
dist: GeoDistance
)
object GeoPlace {
implicit val pointReader = Macros.reader[GeoPoint]
implicit val distanceReader = Macros.reader[GeoDistance]
implicit val placeReader = Macros.reader[GeoPlace]
}
def placeArround(places: BSONCollection)(implicit ec: ExecutionContext): Future[List[GeoPlace]] = {
import places.BatchCommands.AggregationFramework.GeoNear
places.aggregate(GeoNear(document(
"type" -> "Point",
"coordinates" -> array(-73.9667, 40.78)
), distanceField = Some("dist.calculated"),
minDistance = Some(1000),
maxDistance = Some(5000),
query = Some(document("type" -> "public")),
includeLocs = Some("dist.loc"),
limit = 5,
spherical = true)).map(_.head[GeoPlace])
}
Forecast example
Consider a collection of forecasts with the following document.
{
_id: 1,
title: "123 Department Report",
tags: [ "G", "STLW" ],
year: 2014,
subsections: [
{
subtitle: "Section 1: Overview",
tags: [ "SI", "G" ],
content: "Section 1: This is the content of section 1."
},
{
subtitle: "Section 2: Analysis",
tags: [ "STLW" ],
content: "Section 2: This is the content of section 2."
},
{
subtitle: "Section 3: Budgeting",
tags: [ "TK" ],
content: {
text: "Section 3: This is the content of section3.",
tags: [ "HCS" ]
}
}
]
}
Using the $redact
stage, the MongoDB aggregation can be used to restricts the contents of the documents. It can be done in the MongoDB shell as follows:
db.forecasts.aggregate([
{ $match: { year: 2014 } },
{
$redact: {
$cond: {
if: { $gt: [ { $size: {
$setIntersection: [ "$tags", [ "STLW", "G" ] ] } }, 0 ]
},
then: "$$DESCEND",
else: "$$PRUNE"
}
}
}
])
The corresponding results a redacted document.
{
"_id" : 1,
"title" : "123 Department Report",
"tags" : [ "G", "STLW" ],
"year" : 2014,
"subsections" : [
{
"subtitle" : "Section 1: Overview",
"tags" : [ "SI", "G" ],
"content" : "Section 1: This is the content of section 1."
},
{
"subtitle" : "Section 2: Analysis",
"tags" : [ "STLW" ],
"content" : "Section 2: This is the content of section 2."
}
]
}
With ReactiveMongo, the aggregation framework can perform a similar redaction.
import scala.concurrent.{ ExecutionContext, Future }
import reactivemongo.bson._
import reactivemongo.api.collections.bson.BSONCollection
def redactForecasts(forecasts: BSONCollection)(implicit ec: ExecutionContext) = {
import forecasts.BatchCommands.AggregationFramework.{ Match, Redact }
forecasts.aggregate(Match(document("year" -> 2014)), List(
Redact(document("$cond" -> document(
"if" -> document(
"$gt" -> array(document(
"$size" -> document("$setIntersection" -> array(
"$tags", array("STLW", "G")
))
), 0)
),
"then" -> "$$DESCEND",
"else" -> "$$PRUNE"
))))).map(_.head[BSONDocument])
}
Inventory example
Consider an inventory
collection, with the following document.
{ "_id" : 1, "item" : "ABC1", "sizes": [ "S", "M", "L"] }
The $unwind
stage can be used as bellow in the MongoDB shell, to return a document for each size.
db.inventory.aggregate( [ { $unwind : "$sizes" } ] )
It will return results as bellow.
{ "_id" : 1, "item" : "ABC1", "sizes" : "S" }
{ "_id" : 1, "item" : "ABC1", "sizes" : "M" }
{ "_id" : 1, "item" : "ABC1", "sizes" : "L" }
With ReactiveMongo, it can be done using Unwind
.
import scala.concurrent.Future
import scala.concurrent.ExecutionContext.Implicits.global
import reactivemongo.api.collections.bson.BSONCollection
def sized(inventory: BSONCollection) = {
import inventory.BatchCommands.AggregationFramework.UnwindField
inventory.aggregate(UnwindField("sizes")).map(_.firstBatch)
}
If there is a second price
collection, with the following document.
{ "_id" : 10, "item" : "ABC1", "price" : 12.34 }
This can be joined using the MongoDB shell, with the $lookup
stage.
db.inventory.aggregate([{
$lookup: {
from: "prices",
localField: "item",
foreignField: "item",
as: "prices"
}
}])
It will result in the document thereafter.
{
"_id" : 1,
"item" : "ABC1",
"sizes": [ "S", "M", "L" ],
"prices": [
{ "_id" : 10, "item" : "ABC1", "price" : 12.34 }
]
}
It can be done with ReactiveMongo using the Lookup
operator.
import scala.concurrent.Future
import scala.concurrent.ExecutionContext.Implicits.global
import reactivemongo.bson.BSONString
import reactivemongo.api.collections.bson.BSONCollection
def priced(inventory: BSONCollection, prices: BSONCollection) = {
import prices.BatchCommands.AggregationFramework.Lookup
prices.aggregate(
Lookup(inventory.name, "item", "item", "prices")
).map(_.firstBatch)
}
Book library example
Consider a collection books that contains the following documents.
{ "_id" : 8751, "title" : "The Banquet", "author" : "Dante", "copies" : 2 }
{ "_id" : 8752, "title" : "Divine Comedy", "author" : "Dante", "copies" : 1 }
{ "_id" : 8645, "title" : "Eclogues", "author" : "Dante", "copies" : 2 }
{ "_id" : 7000, "title" : "The Odyssey", "author" : "Homer", "copies" : 10 }
{ "_id" : 7020, "title" : "Iliad", "author" : "Homer", "copies" : 10 }
Then its documents can be aggregated and outputted to another collection, using the $out
stage.
import scala.concurrent.Future
import scala.concurrent.ExecutionContext.Implicits.global
import reactivemongo.bson.BSONString
import reactivemongo.api.collections.bson.BSONCollection
def outputBooks(books: BSONCollection, outColl: String): Future[Unit] = {
import books.BatchCommands.AggregationFramework
import AggregationFramework.{ Ascending, Group, PushField, Out, Sort }
books.aggregate(Sort(Ascending("title")), List(
Group(BSONString("$author"))("books" -> PushField("title")),
Out(outColl))).map(_ => {})
}
For the current example, the result collection will contain the following documents.
{ "_id" : "Homer", "books" : [ "Iliad", "The Odyssey" ] }
{ "_id" : "Dante", "books" : [ "Divine Comedy", "Eclogues", "The Banquet" ] }
Database indexes aggregation
The $indexStats
stage returns statistics regarding the use of each index for a collection.
import scala.concurrent.Future
import scala.concurrent.ExecutionContext.Implicits.global
import reactivemongo.api.collections.bson.BSONCollection
def aggregateIndexes(coll: BSONCollection) = {
import coll.BatchCommands.AggregationFramework.{ Ascending, IndexStats, Sort }
import reactivemongo.api.commands.{ bson => bsoncommands }
import bsoncommands.BSONAggregationFramework.{
IndexStatsResult, IndexStatAccesses
}
import bsoncommands.BSONAggregationResultImplicits.BSONIndexStatsReader
val result: Future[List[IndexStatsResult]] =
coll.aggregate(IndexStats, List(Sort(Ascending("name")))).
map(_.head[IndexStatsResult])
result
}
See also:
- The operators available to define an aggregation pipeline are documented in the API reference.
- The Aggregation Framework tests