Reactive Scala Driver for MongoDB
Asynchronous & Non-Blocking
Write Documents
MongoDB offers different kinds of write operations: insertion, update or removal. Using ReactiveMongo Data, this can be performed asynchronously.
Insert a document
Insertions are done with the insert
function.
import scala.util.{ Failure, Success }
import scala.concurrent.Future
import scala.concurrent.ExecutionContext.Implicits.global
import reactivemongo.api.bson.BSONDocument
import reactivemongo.api.commands.WriteResult
import reactivemongo.api.bson.collection.BSONCollection
val document1 = BSONDocument(
"firstName" -> "Stephane",
"lastName" -> "Godbillon",
"age" -> 29)
// Simple: .insert.one(t)
def simpleInsert(coll: BSONCollection): Future[Unit] = {
val writeRes: Future[WriteResult] = coll.insert.one(document1)
writeRes.onComplete { // Dummy callbacks
case Failure(e) => e.printStackTrace()
case Success(writeResult) =>
println(s"successfully inserted document with result: $writeResult")
}
writeRes.map(_ => {}) // in this example, do nothing with the success
}
// Bulk: .insert.many(Seq(t1, t2, ..., tN))
def bulkInsert(coll: BSONCollection): Future[Unit] = {
val writeRes: Future[coll.MultiBulkWriteResult] =
coll.insert(ordered = false).many(Seq(
document1, BSONDocument(
"firstName" -> "Foo",
"lastName" -> "Bar",
"age" -> 1)))
writeRes.onComplete { // Dummy callbacks
case Failure(e) => e.printStackTrace()
case Success(writeResult) =>
println(s"successfully inserted document with result: $writeResult")
}
writeRes.map(_ => {}) // in this example, do nothing with the success
}
What does WriteResult
mean?
A WriteResult
is a special document that contains information about the write operation, like the number of documents that were updated.
If the write result actually indicates an error, the Future
will be in a failed
state (no need to check for WriteResult.ok
).
Like all the other collection operations (in GenericCollection
trait), you can insert any writeable value to insert()
. With the default BSON serialization, that means provided there a BSONDocumentWriter
for its type in the implicit scope. So, considering the Person
case class:
import scala.util.{ Failure, Success }
import scala.concurrent.ExecutionContext.Implicits.global
import reactivemongo.api.bson.collection.BSONCollection
val person = Person("Stephane Godbillon", 29)
def testInsert(personColl: BSONCollection) = {
val future2 = personColl.insert.one(person)
future2.onComplete {
case Failure(e) => throw e
case Success(writeResult) => {
println(s"successfully inserted document: $writeResult")
}
}
}
Error handling:
When calling a write operation, it’s possible to handle some specific error case by testing the result, using some pattern matching utilities.
WriteResult.Code
: matches the errors according the specified code (e.g. the 11000 code for the Duplicate error)WriteResult.Message
: matches the errors according the messageWriteResult.Exception
: matches the exception details
import scala.concurrent.Future
import scala.concurrent.ExecutionContext.Implicits.global
import reactivemongo.api.commands.WriteResult
import reactivemongo.api.bson.collection.BSONCollection
def insertErrors(personColl: BSONCollection) = {
val future: Future[WriteResult] = personColl.insert.one(person)
val end: Future[Unit] = future.map(_ => {}).recover {
case WriteResult.Code(11000) =>
// if the result is defined with the error code 11000 (duplicate error)
println("Match the code 11000")
case WriteResult.Message("Must match this exact message") =>
println("Match the error message")
case WriteResult.Exception(cause) =>
cause.printStackTrace() // Print any other Exception
case _ => ()
}
}
Update a document
Updates are done with the update
operation, which follows the same logic as insert
.
import scala.concurrent.Future
import scala.concurrent.ExecutionContext.Implicits.global
import reactivemongo.api.bson.BSONDocument
import reactivemongo.api.bson.collection.BSONCollection
def update1(personColl: BSONCollection) = {
val selector = BSONDocument("name" -> "Jack")
val modifier = BSONDocument(
"$set" -> BSONDocument(
"lastName" -> "London",
"firstName" -> "Jack"),
"$unset" -> BSONDocument("name" -> 1))
// Simple update: get a future update
val futureUpdate1 = personColl.update.one(
q = selector, u = modifier,
upsert = false, multi = false)
// Bulk update: multiple update
val updateBuilder1 = personColl.update(ordered = true)
val updates = Future.sequence(Seq(
updateBuilder1.element(
q = BSONDocument("firstName" -> "Jane", "lastName" -> "Doh"),
u = BSONDocument("age" -> 18),
upsert = true,
multi = false),
updateBuilder1.element(
q = BSONDocument("firstName" -> "Bob"),
u = BSONDocument("age" -> 19),
upsert = false,
multi = true)))
val bulkUpdateRes1 = updates.flatMap { ops => updateBuilder1.many(ops) }
}
By default, the update operation only updates a single matching document. You can also indicate that the update should be applied to all the documents that are matching, with the multi
parameter.
It’s possible to automatically insert data if there is no matching document using the upsert
parameter.
The arrayFilters
criteria can also be specified on update.
import scala.concurrent.ExecutionContext
import reactivemongo.api.bson.BSONDocument
import reactivemongo.api.bson.collection.BSONCollection
def updateArrayFilters(personColl: BSONCollection)(
implicit ec: ExecutionContext) =
personColl.update.one(
q = BSONDocument("grades" -> BSONDocument(f"$$gte" -> 100)),
u = BSONDocument(f"$$set" -> BSONDocument(
f"grades.$$[element]" -> 100)),
upsert = false,
multi = true,
collation = None,
arrayFilters = Seq(
BSONDocument("element" -> BSONDocument(f"$$gte" -> 100))))
Delete a document
The .delete
function returns a DeleteBuilder
, which allows to perform simple or bulk delete.
import scala.util.{ Failure, Success }
import scala.concurrent.Future
import scala.concurrent.ExecutionContext.Implicits.global
import reactivemongo.api.bson.BSONDocument
import reactivemongo.api.bson.collection.BSONCollection
def simpleDelete1(personColl: BSONCollection) = {
val selector1 = BSONDocument("firstName" -> "Stephane")
val futureRemove1 = personColl.delete.one(selector1)
futureRemove1.onComplete { // callback
case Failure(e) => throw e
case Success(writeResult) => println("successfully removed document")
}
}
def bulkDelete1(personColl: BSONCollection) = {
val deleteBuilder = personColl.delete(ordered = false)
val deletes = Future.sequence(Seq(
deleteBuilder.element(
q = BSONDocument("firstName" -> "Stephane"),
limit = Some(1), // former option firstMatch
collation = None),
deleteBuilder.element(
q = BSONDocument("lastName" -> "Doh"),
limit = None, // delete all the matching document
collation = None)))
deletes.flatMap { ops => deleteBuilder.many(ops) }
}
The
.remove
operation is now deprecated.
Find and modify
ReactiveMongo also supports the MongoDB findAndModify
operation.
In the case you want to update the age of a document in a collection of persons, and at the same time to return the information about the person before this change, it can be done using findAndUpdate
.
import scala.concurrent.Future
import scala.concurrent.ExecutionContext.Implicits.global
import reactivemongo.api.bson.{ BSONDocument, Macros }
import reactivemongo.api.bson.collection.BSONCollection
case class Person(name: String, age: Int)
def update(collection: BSONCollection, age: Int): Future[Option[Person]] = {
implicit val reader = Macros.reader[Person]
val result = collection.findAndUpdate(
BSONDocument("name" -> "James"),
BSONDocument("$set" -> BSONDocument("age" -> 17)),
fetchNewObject = true)
result.map(_.result[Person])
}
As on a simple update, it’s possible to insert a new document when one does not already exist.
import scala.concurrent.Future
import scala.concurrent.ExecutionContext.Implicits.global
import reactivemongo.api.bson.{ BSONDocument, BSONDocumentHandler, Macros }
import reactivemongo.api.bson.collection.BSONCollection
implicit val handler: BSONDocumentHandler[Person] = Macros.handler[Person]
/** Insert a new document if a matching one does not already exist. */
def result(coll: BSONCollection): Future[Option[Person]] =
coll.findAndUpdate(
BSONDocument("name" -> "James"),
Person(name = "Foo", age = 25),
upsert = true).map(_.result[Person])
The findAndModify
approach can be used on removal.
import scala.concurrent.{ ExecutionContext, Future }
import reactivemongo.api.bson.{ BSONDocument, BSONDocumentReader }
import reactivemongo.api.bson.collection.BSONCollection
def removedPerson(coll: BSONCollection, name: String)(implicit ec: ExecutionContext, reader: BSONDocumentReader[Person]): Future[Option[Person]] =
coll.findAndRemove(BSONDocument("name" -> name)).
map(_.result[Person])
As when using update
arrayFilters
criteria can be specified for a findAndModify
operation.
import scala.concurrent.ExecutionContext.Implicits.global
import reactivemongo.api.bson.BSONDocument
import reactivemongo.api.WriteConcern
import reactivemongo.api.bson.collection.BSONCollection
def findAndUpdateArrayFilters(personColl: BSONCollection) =
personColl.findAndModify(
selector = BSONDocument.empty,
modifier = personColl.updateModifier(
update = BSONDocument(f"$$set" -> BSONDocument(
f"grades.$$[element]" -> 100)),
fetchNewObject = true,
upsert = false),
sort = None,
fields = None,
bypassDocumentValidation = false,
writeConcern = WriteConcern.Journaled,
maxTime = None,
collation = None,
arrayFilters = Seq(
BSONDocument("elem.grade" -> BSONDocument(f"$$gte" -> 85))))
Session/transaction
Starting in 3.6, MongoDB offers session management to gather operations, and since MongoDB 4.0, transactions can be defined for session.
import scala.concurrent.{ ExecutionContext, Future }
import reactivemongo.api.bson.BSONDocument
import reactivemongo.api.DB
def testTx(db: DB)(implicit ec: ExecutionContext): Future[Unit] =
for {
dbWithSession <- db.startSession()
dbWithTx <- dbWithSession.startTransaction(None)
coll = dbWithTx.collection("foo")
_ <- coll.insert.one(BSONDocument("id" -> 1, "bar" -> "lorem"))
r <- coll.find(BSONDocument("id" -> 1)).one[BSONDocument] // found
_ <- db.collection("foo").find(
BSONDocument("id" -> 1)).one[BSONDocument]
// not found for DB outside transaction
_ <- dbWithTx.commitTransaction() // or abortTransaction()
// session still open, can start another transaction, or other ops
_ <- dbWithSession.endSession()
} yield ()
The support for session and transaction is defined in the database API (still experimental).
startSession
startTransaction
, for a DB reference with a session started.abortTransaction
orcommitTransaction
on a DB reference with transaction.endSession
orkillSession