Reactive Scala Driver for MongoDB
Asynchronous & Non-Blocking
Write Documents
MongoDB offers different kinds of write operations: insertion, update or removal. Using ReactiveMongo Data, this can be performed asynchronously.
Insert a document
Insertions are done with the insert
function.
import scala.util.{ Failure, Success }
import scala.concurrent.Future
import scala.concurrent.ExecutionContext.Implicits.global
import reactivemongo.bson.BSONDocument
import reactivemongo.api.commands.WriteResult
import reactivemongo.api.collections.bson.BSONCollection
val document1 = BSONDocument(
"firstName" -> "Stephane",
"lastName" -> "Godbillon",
"age" -> 29)
def insertDoc1(coll: BSONCollection, doc: BSONDocument): Future[Unit] = {
val writeRes: Future[WriteResult] = coll.insert(document1)
writeRes.onComplete { // Dummy callbacks
case Failure(e) => e.printStackTrace()
case Success(writeResult) =>
println(s"successfully inserted document with result: $writeResult")
}
writeRes.map(_ => {}) // in this example, do nothing with the success
}
The type
Future[LastError]
previously returned by the write operations is replaced byFuture[WriteResult]
in the new API.
What does WriteResult
mean?
A WriteResult
is a special document that contains information about the write operation, like the number of documents that were updated.
If the write result actually indicates an error, the Future
will be in a failed
state (no need to check for WriteResult.ok
).
Like all the other collection operations (in GenericCollection
trait), you can insert any writeable value to insert()
. With the default BSON serialization, that means provided there a BSONDocumentWriter
for its type in the implicit scope. So, considering the Person
case class:
import scala.util.{ Failure, Success }
import scala.concurrent.ExecutionContext.Implicits.global
val person = Person("Stephane Godbillon", 29)
val future2 = personColl.insert(person)
future2.onComplete {
case Failure(e) => throw e
case Success(writeResult) => {
println(s"successfully inserted document: $writeResult")
}
}
Error handling:
When calling a write operation, it’s possible to handle some specific error case by testing the result, using some pattern matching utilities.
WriteResult.Code
: matches the errors according the specified code (e.g. the 11000 code for the Duplicate error)WriteResult.Message
: matches the errors according the message
import scala.concurrent.Future
import scala.concurrent.ExecutionContext.Implicits.global
import reactivemongo.api.commands.WriteResult
val future: Future[WriteResult] = personColl.insert(person)
val end: Future[Unit] = future.map(_ => {}).recover {
case WriteResult.Code(11000) =>
// if the result is defined with the error code 11000 (duplicate error)
println("Match the code 11000")
case WriteResult.Message("Must match this exact message") =>
println("Match the error message")
case _ => ()
}
Insert multiple document
The operation bulkInsert
makes it possible to insert multiple documents.
import scala.concurrent.Future
import scala.concurrent.ExecutionContext.Implicits.global
import reactivemongo.bson.BSONDocument
import reactivemongo.api.commands.MultiBulkWriteResult
def bsonCollection: reactivemongo.api.collections.bson.BSONCollection = ???
def persons: List[Person] = ???
val personColl = bsonCollection
val bulkResult1: Future[MultiBulkWriteResult] =
personColl.bulkInsert(ordered = false)(
BSONDocument("name" -> "document1"),
BSONDocument("name" -> "document2"),
BSONDocument("name" -> "document3"))
// Considering `persons` a `Seq[Person]`,
// provided a `BSONDocumentWriter[Person]` can be resolved.
val bulkDocs = // prepare the person documents to be inserted
persons.map(implicitly[personColl.ImplicitlyDocumentProducer](_))
val bulkResult2 = personColl.bulkInsert(ordered = true)(bulkDocs: _*)
Update a document
Updates are done with the update()
method, which follows the same logic as insert
.
import scala.concurrent.ExecutionContext.Implicits.global
import reactivemongo.bson.BSONDocument
val selector = BSONDocument("name" -> "Jack")
val modifier = BSONDocument(
"$set" -> BSONDocument(
"lastName" -> "London",
"firstName" -> "Jack"),
"$unset" -> BSONDocument("name" -> 1))
// get a future update
val futureUpdate1 = personColl.update(selector, modifier)
By default, the update operation only updates a single matching document. You can also indicate that the update should be applied to all the documents that are matching, with the multi
parameter.
import scala.concurrent.ExecutionContext.Implicits.global
// get a future update
val futureUpdate2 = personColl.update(selector, modifier, multi = true)
It’s possible to automatically insert data if there is no matching document using the upsert
parameter.
import scala.concurrent.ExecutionContext.Implicits.global
val futureUpdate3 = personColl.update(selector, modifier, upsert = true)
Remove a document
import scala.util.{ Failure, Success }
import scala.concurrent.ExecutionContext.Implicits.global
import reactivemongo.bson.BSONDocument
val selector1 = BSONDocument("firstName" -> "Stephane")
val futureRemove1 = personColl.remove(selector1)
futureRemove1.onComplete { // callback
case Failure(e) => throw e
case Success(writeResult) => println("successfully removed document")
}
By default, this remove
function deletes all the documents that match the selector
. You can change this behaviour by setting the firstMatchOnly
parameter to true
:
import scala.concurrent.ExecutionContext.Implicits.global
import reactivemongo.bson.BSONDocument
def removeFirst(selector: BSONDocument) =
personColl.remove(selector, firstMatchOnly = true)
ReactiveMongo can even store instances of a custom class directly by defining a custom writer.
Find and modify
ReactiveMongo also supports the MongoDB findAndModify
operation.
In the case you want to update the age of a document in a collection of persons, and at the same time to return the information about the person before this change, it can be done using findAndUpdate
.
import scala.concurrent.Future
import scala.concurrent.ExecutionContext.Implicits.global
import reactivemongo.bson.{ BSONDocument, BSONDocumentReader, Macros }
import reactivemongo.api.collections.bson.BSONCollection
case class Person(name: String, age: Int)
def update(collection: BSONCollection, age: Int): Future[Option[Person]] = {
import collection.BatchCommands.FindAndModifyCommand.FindAndModifyResult
implicit val reader = Macros.reader[Person]
val result: Future[FindAndModifyResult] = collection.findAndUpdate(
BSONDocument("name" -> "James"),
BSONDocument("$set" -> BSONDocument("age" -> 17)),
fetchNewObject = true)
result.map(_.result[Person])
}
As on a simple update, it’s possible to insert a new document when one does not already exist.
import scala.concurrent.Future
import scala.concurrent.ExecutionContext.Implicits.global
import reactivemongo.bson.{ BSONDocument, Macros }
import reactivemongo.api.collections.bson.BSONCollection
implicit val writer = Macros.writer[Person]
def result(coll: BSONCollection): Future[coll.BatchCommands.FindAndModifyCommand.FindAndModifyResult] = coll.findAndUpdate(
BSONDocument("name" -> "James"),
Person(name = "Foo", age = 25),
upsert = true)
// insert a new document if a matching one does not already exist
The findAndModify
approach can be used on removal.
import scala.concurrent.{ ExecutionContext, Future }
import reactivemongo.bson.{ BSONDocument, BSONDocumentReader }
import reactivemongo.api.collections.bson.BSONCollection
def removedPerson(coll: BSONCollection, name: String)(implicit ec: ExecutionContext, reader: BSONDocumentReader[Person]): Future[Option[Person]] =
coll.findAndRemove(BSONDocument("name" -> name)).
map(_.result[Person])
Troubleshooting
The synchronous .db
has been deprecated as it didn’t offer a sufficient guaranty that it can initially find an active channel in the connection pool (MongoConnection
).
The new .database
resolution must be used (see connection tutorial).
If the deprecated database resolution is still used, a runtime error such as ConnectionNotInitialized
can be raised when writing documents.