Reactive Scala Driver for MongoDB
Asynchronous & Non-Blocking
Aggregation Framework
The MongoDB Aggregation Framework is available through ReactiveMongo.
Zip codes example
Considering there is a zipcodes
collection in a MongoDB, with the following documents.
[
{ '_id': "10280", 'city': "NEW YORK", 'state': "NY",
'population': 19746227, 'location': {'lon':-74.016323, 'lat':40.710537} },
{ '_id': "72000", 'city': "LE MANS", 'state': "FR",
'population': 148169, 'location': {'long':48.0077, 'lat':0.1984}},
{ '_id': "JP-13", 'city': "TOKYO", 'state': "JP",
'population': 13185502L, 'location': {'lon':35.683333, 'lat':139.683333} },
{ '_id': "AO", 'city': "AOGASHIMA", 'state': "JP",
'population': 200, 'location': {'lon':32.457, 'lat':139.767} }
]
States with population above 10000000
It’s possible to determine the states for which the sum of the population of the cities is above 10000000, by grouping the documents by their state, then for each group calculating the sum of the population values, and finally get only the grouped documents whose population sum matches the filter “above 10000000”.
In the MongoDB shell, such aggregation is written as bellow (see the example).
db.zipcodes.aggregate([
{ $group: { _id: "$state", totalPop: { $sum: "$pop" } } },
{ $match: { totalPop: { $gte: 10000000 } } }
])
With ReactiveMongo, it can be done as using the .aggregate
operation.
import scala.concurrent.Future
import scala.concurrent.ExecutionContext.Implicits.global
import reactivemongo.bson.{ BSONDocument, BSONString }
import reactivemongo.api.collections.bson.BSONCollection
def populatedStates(col: BSONCollection): Future[List[BSONDocument]] = {
import col.BatchCommands.AggregationFramework.{
AggregationResult, Group, Match, SumField
}
val res: Future[AggregationResult] = col.aggregate(
Group(BSONString("$state"))( "totalPop" -> SumField("population")),
List(Match(BSONDocument("totalPop" -> BSONDocument("$gte" -> 10000000L)))))
res.map(_.documents)
}
The local
import col.BatchCommands.AggregationFramework._
is required, and cannot be replaced by a global staticimport reactivemongo.api.collections.BSONCollection.BatchCommands.AggregationFramework._
. The type.BatchCommands.AggregationFramework.AggregationResult
is a dependent one, used for the intermediary/MongoDB result, and must not be exposed as public return type in your application/API.
Then when calling populatedStates(theZipCodeCol)
, the asynchronous result will be as bellow.
[
{ "_id" -> "JP", "totalPop" -> 13185702 },
{ "_id" -> "NY", "totalPop" -> 19746227 }
]
Note that for the state “JP”, the population of Aogashima (200) and of Tokyo (13185502) have been summed.
As for the other commands in ReactiveMongo, it’s possible to return the aggregation result as custom types (see BSON readers), rather than generic documents, for example considering a class State
as bellow.
import scala.concurrent.Future
import scala.concurrent.ExecutionContext.Implicits.global
import reactivemongo.bson.Macros
import reactivemongo.api.collections.bson.BSONCollection
case class State(name: String, population: Long)
implicit val reader = Macros.reader[State]
def aggregate(col: BSONCollection): Future[col.BatchCommands.AggregationFramework.AggregationResult] = ???
def states(col: BSONCollection): Future[List[State]] =
aggregate(col).map(_.result[State])
Average city population by state
The Aggregation Framework can be used to find the average population of the cities by state.
In the MongoDB shell, it can be done as following.
db.zipcodes.aggregate([
{ $group: { _id: { state: "$state", city: "$city" }, pop: { $sum: "$pop" } } },
{ $group: { _id: "$_id.state", avgCityPop: { $avg: "$pop" } } }
])
- Group the documents by the combination of city and state, to get intermediate documents of the form
{ "_id" : { "state" : "NY", "city" : "NEW YORK" }, "pop" : 19746227 }
. - Group the intermediate documents by the
_id.state
field (i.e. the state field inside the_id
document), and get the average of population of each group ($avg: "$pop"
).
Using ReactiveMongo, it can be written as bellow.
import scala.concurrent.Future
import scala.concurrent.ExecutionContext.Implicits.global
import reactivemongo.bson.{ BSONDocument, BSONString }
import reactivemongo.api.collections.bson.BSONCollection
def avgPopByState(col: BSONCollection): Future[List[BSONDocument]] = {
import col.BatchCommands.AggregationFramework.{
AggregationResult, Avg, Group, Match, SumField
}
col.aggregate(Group(BSONDocument("state" -> "$state", "city" -> "$city"))(
"pop" -> SumField("population")),
List(Group(BSONString("$_id.state"))("avgCityPop" -> Avg("pop")))).
map(_.documents)
}
Largest and smallest cities by state
Aggregating the documents can be used to find the largest and the smallest cities for each state:
db.zipcodes.aggregate([
{ $group:
{
_id: { state: "$state", city: "$city" },
pop: { $sum: "$pop" }
}
},
{ $sort: { pop: 1 } },
{ $group:
{
_id : "$_id.state",
biggestCity: { $last: "$_id.city" },
biggestPop: { $last: "$pop" },
smallestCity: { $first: "$_id.city" },
smallestPop: { $first: "$pop" }
}
},
// the following $project is optional, and
// modifies the output format.
{ $project:
{ _id: 0,
state: "$_id",
biggestCity: { name: "$biggestCity", pop: "$biggestPop" },
smallestCity: { name: "$smallestCity", pop: "$smallestPop" }
}
}
])
A ReactiveMongo function can be written as bellow.
import scala.concurrent.Future
import scala.concurrent.ExecutionContext.Implicits.global
import reactivemongo.bson.{ BSONDocument, BSONString, Macros }
import reactivemongo.api.collections.bson.BSONCollection
case class City(name: String, population: Long)
case class StateStats(state: String, biggestCity: City, smallestCity: City)
implicit val cityReader = Macros.reader[City]
implicit val statsReader = Macros.reader[StateStats]
def stateStats(col: BSONCollection): Future[List[StateStats]] = {
import col.BatchCommands.AggregationFramework.{
AggregationResult, Ascending, First, Group, Last, Project, Sort, SumField
}
col.aggregate(Group(BSONDocument("state" -> "$state", "city" -> "$city"))(
"pop" -> SumField("population")),
List(Sort(Ascending("population")), Group(BSONString("$_id.state"))(
"biggestCity" -> Last("_id.city"), "biggestPop" -> Last("pop"),
"smallestCity" -> First("_id.city"), "smallestPop" -> First("pop")),
Project(BSONDocument("_id" -> 0, "state" -> "$_id",
"biggestCity" -> BSONDocument("name" -> "$biggestCity",
"population" -> "$biggestPop"),
"smallestCity" -> BSONDocument("name" -> "$smallestCity",
"population" -> "$smallestPop"))))).
map(_.result[StateStats])
}
This function would return statistics like the following.
List(
StateStats(state = "NY",
biggestCity = City(name = "NEW YORK", population = 19746227L),
smallestCity = City(name = "NEW YORK", population = 19746227L)),
StateStats(state = "FR",
biggestCity = City(name = "LE MANS", population = 148169L),
smallestCity = City(name = "LE MANS", population = 148169L)),
StateStats(state = "JP",
biggestCity = City(name = "TOKYO", population = 13185502L),
smallestCity = City(name = "AOGASHIMA", population = 200L)))
The operators available to define an aggregation pipeline are documented in the API reference.