Reactive Scala Driver for MongoDB
Asynchronous & Non-Blocking
ReactiveMongo 0.20.13 – Release details
What’s new?
The documentation is available online, and its code samples are compiled to make sure it’s up-to-date.
- Compatibility
- Connection
- Support x.509 certificate to authenticate.
- Support DNS seedlist in the connection URI.
- New
rm.reconnectDelayMS
setting. - Add
credentials
in theMongoConnectionOptions
- Netty native
- BSON library
- BSON Decimal128
Option
support & new@NoneAsNull
annotation
- New query and write operations,
- bulk operations (e.g.
.delete.many
) on collection, arrayFilters
on update operations.
- bulk operations (e.g.
- Aggregation
CursorOptions
parameter when using.aggregatorContext
.- New stages:
$addFields
,$bucketAuto
,$count
,$filter
,$replaceRoot
,$slice
- Monitoring
- Administration
- Breaking changes
The next release will be 1.0.0.
The impatient can have a look at the release slideshow.
Compatibility
This release is compatible with the following runtime.
- MongoDB from 3.0 up to 4.2.
- Akka from 2.3.13 up to 2.5.23 (see Setup)
- Play Framework from 2.3.13 to 2.7.1
MongoDB versions older than 3.0 are not longer (end of life 2018-2).
Recommended configuration:
The driver core and the modules are tested in a container based environment, with the specifications as bellow.
- 2 cores (64 bits)
- 4 GB of system memory, with a maximum of 2 GB for the JVM
This can be considered as a recommended environment.
Connection
The MongoDriver
type is replaced by AsyncDriver
, with asynchronous methods.
MongoDriver.connection
replaced byAsyncDriver.connect
close
is asynchronous.
The utility function MongoConnection.parseURI
is replaced by asynchronous function .fromString
.
Also, the following options are deprecated:
authSource
replaced byauthenticationDatabase
(as the MongoShell option)authMode
replaced byauthenticationMechanism
(as the MongoShell option)sslEnabled
replaced byssh
(as the MongoShell option)
SNI is now supported for the SSL connection.
The x.509 certificate authentication is now supported, and can be configured by setting x509
as authenticationMechanism
, and with the following new options.
keyStore
: An URI to a key store (e.g.file:///path/to/keystore.p12
)keyStorePassword
: Provides the password to load it (if required)keyStoreType
: Indicates the type of the store
import reactivemongo.api._
def connection(driver: AsyncDriver) =
driver.connect("mongodb://server:27017/db?ssl=true&authenticationMechanism=x509&keyStore=file:///path/to/keystore.p12&keyStoreType=PKCS12")
The DNS seedlist is now supported, using mongodb+srv://
scheme in the connection URI.
It’s also possible to specify the credential directly in the URI.
import reactivemongo.api._
def seedListCon(driver: AsyncDriver) =
driver.connect("mongodb+srv://usr:pass@mymongo.mydomain.tld/mydb")
The option rm.monitorRefreshMS
is renamed heartbeatFrequencyMS
.
Netty native
The internal Netty dependency has been updated to the version 4.1.
It comes with various improvements (memory consumption, …), and also to use Netty native support (kqueue for Mac OS X and epoll for Linux, on x86_64
arch).
Note that the Netty dependency is shaded so it won’t conflict with any Netty version in your environment.
BSON library
The current BSON library for ReactiveMongo has been updated.
It now supports BSON Decimal128 (MongoDB 3.4+).
The way Option
is handled by the macros has been improved, also with a new annotation @NoneAsNull
, which write None
values as BSONNull
(instead of omitting field/value).
More: BSON Library overview
Types
The Decimal128 type introduced by MongoDB 3.4 is supported, as BSONDecimal
, and can be read or write as java.math.BigDecimal
.
Handlers
A handler is now available to write and read Scala Map
as BSON, provided the key and value types are themselves supported.
import scala.util.Try
import reactivemongo.api.bson._
def bsonMap = {
val input: Map[String, Int] = Map("a" -> 1, "b" -> 2)
// Ok as key and value (String, Int) are provided BSON handlers
val doc: Try[BSONDocument] = BSON.writeDocument(input)
val output = doc.flatMap { BSON.readDocument[Map[String, Int]](_) }
}
Macros
The compile-time option AutomaticMaterialization
has been added, when using the macros with sealed family, to explicitly indicate when you want to automatically materialize required instances for the subtypes (if missing from the implicit scope).
sealed trait Color
case object Red extends Color
case object Blue extends Color
case class Green(brightness: Int) extends Color
case class CustomColor(code: String) extends Color
object Color {
import reactivemongo.api.bson.{ Macros, MacroOptions },
MacroOptions.{ AutomaticMaterialization, UnionType, \/ }
// Use `UnionType` to define a subset of the `Color` type,
type PredefinedColor =
UnionType[Red.type \/ Green \/ Blue.type] with AutomaticMaterialization
val predefinedColor = Macros.handlerOpts[Color, PredefinedColor]
}
A new annotation @Flatten
has been added, to indicate to the macros that the representation of a property must be flatten rather than a nested document.
import reactivemongo.api.bson.BSONDocument
import reactivemongo.api.bson.Macros.Annotations.Flatten
case class Range(start: Int, end: Int)
case class LabelledRange(
name: String,
@Flatten range: Range)
// Flattened with macro as bellow:
BSONDocument("name" -> "foo", "start" -> 0, "end" -> 1)
// Rather than:
// BSONDocument("name" -> "foo", "range" -> BSONDocument(
// "start" -> 0, "end" -> 1))
Query and write operations
The query builder supports more options (see find
).
singleBatch
:boolean
; Optional. Determines whether to close the cursor after the first batch. Defaults tofalse
.maxScan
:boolean
; Optional. Maximum number of documents or index keys to scan when executing the query.max
:document
; Optional. The exclusive upper bound for a specific index.min
:document
; Optional. The exclusive upper bound for a specific index.returnKey
:boolean
; Optional. If true, returns only the index keys in the resulting documents.showRecordId
:boolean
; Optional. Determines whether to return the record identifier for each document.collation
:document
; Optional; Specifies the collation to use for the operation (since 3.4).
The collection API provides new builders for write operations. This supports bulk operations (e.g. insert many documents at once).
The new insert
operation is providing an InsertBuilder
, which supports,
import scala.concurrent.Future
import scala.concurrent.ExecutionContext.Implicits.global
import reactivemongo.api.commands.{ MultiBulkWriteResult, WriteResult }
import reactivemongo.api.bson.BSONDocument
import reactivemongo.api.bson.collection.BSONCollection
val document1 = BSONDocument(
"firstName" -> "Stephane",
"lastName" -> "Godbillon",
"age" -> 29)
// Simple: .insert.one(t)
def simpleInsert(coll: BSONCollection): Future[WriteResult] =
coll.insert.one(document1)
// Bulk: .insert.many(Seq(t1, t2, ..., tN))
def bulkInsert(coll: BSONCollection): Future[MultiBulkWriteResult] =
coll.insert(ordered = false).many(Seq(
document1, BSONDocument(
"firstName" -> "Foo",
"lastName" -> "Bar",
"age" -> 1)))
The new update
operation returns an UpdateBuilder
, which can be used to perform simple or bulk update.
import scala.concurrent.Future
import scala.concurrent.ExecutionContext.Implicits.global
import reactivemongo.api.bson.BSONDocument
import reactivemongo.api.bson.collection.BSONCollection
def update1(personColl: BSONCollection) = {
val selector = BSONDocument("name" -> "Jack")
val modifier = BSONDocument(
"$set" -> BSONDocument(
"lastName" -> "London",
"firstName" -> "Jack"),
"$unset" -> BSONDocument("name" -> 1))
// Simple update: get a future update
val futureUpdate1 = personColl.update.one(
q = selector, u = modifier,
upsert = false, multi = false)
// Bulk update: multiple update
val updateBuilder1 = personColl.update(ordered = true)
val updates = Future.sequence(Seq(
updateBuilder1.element(
q = BSONDocument("firstName" -> "Jane", "lastName" -> "Doh"),
u = BSONDocument("age" -> 18),
upsert = true,
multi = false),
updateBuilder1.element(
q = BSONDocument("firstName" -> "Bob"),
u = BSONDocument("age" -> 19),
upsert = false,
multi = true)))
val bulkUpdateRes1 = updates.flatMap { ops => updateBuilder1.many(ops) }
}
The .delete
function returns a DeleteBuilder
, to perform simple or bulk delete.
import scala.concurrent.Future
import scala.concurrent.ExecutionContext.Implicits.global
import reactivemongo.api.bson.BSONDocument
import reactivemongo.api.bson.collection.BSONCollection
def simpleDelete1(personColl: BSONCollection) =
personColl.delete.one(BSONDocument("firstName" -> "Stephane"))
def bulkDelete1(personColl: BSONCollection) = {
val deleteBuilder = personColl.delete(ordered = false)
val deletes = Future.sequence(Seq(
deleteBuilder.element(
q = BSONDocument("firstName" -> "Stephane"),
limit = Some(1), // former option firstMatch
collation = None),
deleteBuilder.element(
q = BSONDocument("lastName" -> "Doh"),
limit = None, // delete all the matching document
collation = None)))
deletes.flatMap { ops => deleteBuilder.many(ops) }
}
The
.remove
operation is now deprecated.
arrayFilters
The arrayFilters
criteria is supported for findAndModify
and update
operations.
import scala.concurrent.Future
import scala.concurrent.ExecutionContext.Implicits.global
import reactivemongo.api.WriteConcern
import reactivemongo.api.bson.BSONDocument
import reactivemongo.api.bson.collection.BSONCollection
def findAndUpdateArrayFilters(personColl: BSONCollection) =
personColl.findAndModify(
selector = BSONDocument.empty,
modifier = personColl.updateModifier(
update = BSONDocument(f"$$set" -> BSONDocument(
f"grades.$$[element]" -> 100)),
fetchNewObject = true,
upsert = false),
sort = None,
fields = None,
bypassDocumentValidation = false,
writeConcern = WriteConcern.Journaled,
maxTime = None,
collation = None,
arrayFilters = Seq(
BSONDocument("elem.grade" -> BSONDocument(f"$$gte" -> 85))))
def updateArrayFilters(personColl: BSONCollection) =
personColl.update.one(
q = BSONDocument("grades" -> BSONDocument(f"$$gte" -> 100)),
u = BSONDocument(f"$$set" -> BSONDocument(
f"grades.$$[element]" -> 100)),
upsert = false,
multi = true,
collation = None,
arrayFilters = Seq(
BSONDocument("element" -> BSONDocument(f"$$gte" -> 100))))
More: Find documents, Write documents
Aggregation
There are newly supported by the Aggregation Framework.
addFields:
The $addFields
stage can now be used.
import scala.concurrent.ExecutionContext
import reactivemongo.api.bson.collection.BSONCollection
def sumHomeworkQuizz(students: BSONCollection) =
students.aggregateWith1[BSONDocument]() { framework =>
import framework.AddFields
AddFields(document(
"totalHomework" -> document(f"$$sum" -> f"$$homework"),
"totalQuiz" -> document(f"$$sum" -> f"$$quiz"))) -> List(
AddFields(document(
"totalScore" -> document(f"$$add" -> array(
f"$$totalHomework", f"$$totalQuiz", f"$$extraCredit")))))
}
bucketAuto:
The $bucketAuto
stage introduced by MongoDB 3.4 can be used as bellow.
import scala.concurrent.ExecutionContext
import reactivemongo.api.Cursor
import reactivemongo.api.bson._
import reactivemongo.api.bson.collection.BSONCollection
def populationBuckets(zipcodes: BSONCollection)(implicit ec: ExecutionContext) =
zipcodes.aggregateWith1[BSONDocument]() { framework =>
import framework.BucketAuto
BucketAuto(BSONString(f"$$population"), 2, None)() -> List.empty
}.collect[Set](Int.MaxValue, Cursor.FailOnError[Set[BSONDocument]]())
count:
If the goal is only to count the aggregated documents, the $count
stage can be used.
import scala.concurrent.Future
import scala.concurrent.ExecutionContext.Implicits.global
import reactivemongo.api.Cursor
import reactivemongo.api.bson.{ BSONDocument, BSONDocumentReader, BSONString }
import reactivemongo.api.bson.collection.BSONCollection
def countPopulatedStates1(col: BSONCollection): Future[Int] = {
implicit val countReader = BSONDocumentReader[Int] { doc =>
doc.getAsTry[Int]("popCount").get
}
col.aggregateWith[Int]() { framework =>
import framework.{ Count, Group, Match, SumField }
Group(BSONString("$state"))(
"totalPop" -> SumField("population")) -> List(
Match(BSONDocument("totalPop" -> BSONDocument("$gte" -> 10000000L))),
Count("popCount"))
}.head
}
facet:
The $facet
stage is now supported, to create multi-faceted aggregations which characterize data across multiple dimensions, or facets.
import reactivemongo.api.bson.collection.BSONCollection
def useFacetAgg(coll: BSONCollection) = {
import coll.aggregationFramework.{ Count, Facet, Out, UnwindField }
Facet(Seq(
"foo" -> (UnwindField("bar"), List(Count("c"))),
"lorem" -> (Out("ipsum"), List.empty)))
/* {
$facet: {
'foo': [
{ '$unwind': '$bar' },
{ '$count': 'c' }
],
'lorem': [
{ '$out': 'ipsum' }
]
}
} */
}
filter:
The $filter
stage is now supported.
import scala.concurrent.ExecutionContext.Implicits.global
import reactivemongo.api.Cursor
import reactivemongo.api.bson.{ BSONArray, BSONDocument, BSONString }
import reactivemongo.api.bson.collection.BSONCollection
def salesWithItemGreaterThanHundered(sales: BSONCollection) =
sales.aggregateWith1[BSONDocument]() { framework =>
import framework._
val sort = Sort(Ascending("_id"))
Project(BSONDocument("items" -> Filter(
input = BSONString(f"$$items"),
as = "item",
cond = BSONDocument(
f"$$gte" -> BSONArray(f"$$$$item.price", 100))))) -> List(sort)
}.collect[List](Int.MaxValue, Cursor.FailOnError[List[BSONDocument]]())
replaceRoot:
The $replaceRoot
stage is now supported.
import scala.concurrent.Future
import scala.concurrent.ExecutionContext.Implicits.global
import reactivemongo.api.bson.BSONDocument
import reactivemongo.api.bson.collection.BSONCollection
/* For a fruits collection:
{
"_id" : 1,
"fruit" : [ "apples", "oranges" ],
"in_stock" : { "oranges" : 20, "apples" : 60 },
"on_order" : { "oranges" : 35, "apples" : 75 }
}, ...
*/
def replaceRootTest(fruits: BSONCollection): Future[Option[BSONDocument]] = {
fruits.aggregateWith1[BSONDocument]() { framework =>
import framework._
ReplaceRootField("in_stock") -> List.empty
}.headOption
// Results: { "oranges": 20, "apples": 60 }, ...
}
slice:
The $slice
operator is also supported as bellow.
import scala.concurrent.ExecutionContext
import reactivemongo.api.Cursor
import reactivemongo.api.bson._
import reactivemongo.api.bson.collection.BSONCollection
def sliceFavorites(coll: BSONCollection)(implicit ec: ExecutionContext) =
coll.aggregateWith1[BSONDocument]() { framework =>
import framework.{ Project, Slice }
Project(BSONDocument(
"name" -> 1,
"favorites" -> Slice(
array = BSONString(f"$$favorites"),
n = BSONInteger(3)).makePipe)) -> List.empty
}.collect[Seq](4, Cursor.FailOnError[Seq[BSONDocument]]())
Miscellaneous: Other stages are also supported.
$and
$allElementsTrue
$acosh
$acos
$abs
$planCacheStats
$collStats
$bucket
$merge
$listSessions
$listLocalSessions
$currentOp
$unset
$sortByCount
$set
$replaceWith
More: Aggregation Framework
Monitoring
A new module is available to collect ReactiveMongo metrics with Kamon.
"org.reactivemongo" %% "reactivemongo-kamon" % "0.20.13"
Then the metrics can be configured in dashboards, according the used Kamon reporters. For example if using Kamon APM.
More: Monitoring
Administration
The operations to manage a MongoDB instance can be executed using ReactiveMongo. This new release has new functions for DB administration.
Ping:
The DefaultDB
has now a ping
operation, to execute a ping command.
import scala.concurrent.Future
import scala.concurrent.ExecutionContext.Implicits.global
import reactivemongo.api.DefaultDB
def ping(admin: DefaultDB): Future[Boolean] = admin.ping()
Breaking changes
The Typesafe Migration Manager has been setup on the ReactiveMongo repository. It will validate all the future contributions, and help to make the API more stable.
For the current 0.20.13 release, it has detected the following breaking changes.
Connection
reactivemongo.api.ReadPreference.Taggable
Operations and commands
reactivemongo.api.commands.DeleteCommand.DeleteElement
Core/internal
reactivemongo.core
packages after Netty 4.1.25 upgrade.