Reactive Scala Driver for MongoDB
Asynchronous & Non-Blocking
Write Documents
MongoDB offers different kinds of write operations: insertion, update or removal. Using ReactiveMongo Data, this can be performed asynchronously.
Insert a document
Insertions are done with the insert
function.
import scala.util.{ Failure, Success }
import scala.concurrent.Future
import scala.concurrent.ExecutionContext.Implicits.global
import reactivemongo.bson.BSONDocument
import reactivemongo.api.commands.WriteResult
import reactivemongo.api.collections.bson.BSONCollection
val document1 = BSONDocument(
"firstName" -> "Stephane",
"lastName" -> "Godbillon",
"age" -> 29)
def insertDoc1(coll: BSONCollection, doc: BSONDocument): Future[Unit] = {
val writeRes: Future[WriteResult] = coll.insert(document1)
writeRes.onComplete { // Dummy callbacks
case Failure(e) => e.printStackTrace()
case Success(writeResult) =>
println(s"successfully inserted document with result: $writeResult")
}
writeRes.map(_ => {}) // in this example, do nothing with the success
}
The type
Future[LastError]
previously returned by the write operations is replaced byFuture[WriteResult]
in the new API.
What does WriteResult
mean?
A WriteResult
is a special document that contains information about the write operation, like the number of documents that were updated.
If the write result actually indicates an error, the Future
will be in a failed
state.
Like all the other collection operations (in GenericCollection
trait), you can insert any writeable value to insert()
. With the default BSON serialization, that means provided there a BSONDocumentWriter
for its type in the implicit scope. So, considering the Person
case class:
import scala.util.{ Failure, Success }
import scala.concurrent.ExecutionContext.Implicits.global
val person = Person("Stephane Godbillon", 29)
val future2 = personColl.insert(person)
future2.onComplete {
case Failure(e) => throw e
case Success(writeResult) => {
println(s"successfully inserted document: $writeResult")
}
}
When calling a write operation, it’s possible to handle some specific error case by testing the result.
import scala.concurrent.Future
import scala.concurrent.ExecutionContext.Implicits.global
import reactivemongo.api.commands.{ CommandError, WriteResult }
val future: Future[WriteResult] = personColl.insert(person)
val end: Future[Unit] = future.map(_ => {}).recover {
case err: CommandError if (err.code contains 11000) =>
// if the result is defined with the error code 11000 (duplicate error)
println("Just a warning")
case _ => ()
}
Insert multiple document
The operation bulkInsert
makes it possible to insert multiple documents.
import scala.concurrent.Future
import scala.concurrent.ExecutionContext.Implicits.global
import reactivemongo.bson.BSONDocument
import reactivemongo.api.commands.MultiBulkWriteResult
def bsonCollection: reactivemongo.api.collections.bson.BSONCollection = ???
def persons: List[Person] = ???
val personColl = bsonCollection
val bulkResult1: Future[MultiBulkWriteResult] =
personColl.bulkInsert(ordered = false)(
BSONDocument("name" -> "document1"),
BSONDocument("name" -> "document2"),
BSONDocument("name" -> "document3"))
// Considering `persons` a `Seq[Person]`,
// provided a `BSONDocumentWriter[Person]` can be resolved.
val bulkDocs = // prepare the person documents to be inserted
persons.map(implicitly[personColl.ImplicitlyDocumentProducer](_))
val bulkResult2 = personColl.bulkInsert(ordered = true)(bulkDocs: _*)
Update a document
Updates are done with the update()
method, which follows the same logic as insert
.
import scala.concurrent.ExecutionContext.Implicits.global
import reactivemongo.bson.BSONDocument
val selector = BSONDocument("name" -> "Jack")
val modifier = BSONDocument(
"$set" -> BSONDocument(
"lastName" -> "London",
"firstName" -> "Jack"),
"$unset" -> BSONDocument("name" -> 1))
// get a future update
val futureUpdate1 = personColl.update(selector, modifier)
By default, the update operation only update a single matching document. You can also specify that the update should be applied to all the documents that are matching, with the multi
parameter.
import scala.concurrent.ExecutionContext.Implicits.global
// get a future update
val futureUpdate2 = personColl.update(selector, modifier, multi = true)
It’s possible to automatically insert data if there is no existing document matching the update using the upsert
parameter.
import scala.concurrent.ExecutionContext.Implicits.global
val futureUpdate3 = personColl.update(selector, modifier, upsert = true)
Remove a document
import scala.util.{ Failure, Success }
import scala.concurrent.ExecutionContext.Implicits.global
import reactivemongo.bson.BSONDocument
val selector1 = BSONDocument("firstName" -> "Stephane")
val futureRemove1 = personColl.remove(selector1)
futureRemove1.onComplete { // callback
case Failure(e) => throw e
case Success(writeResult) => println("successfully removed document")
}
By default, this remove
function deletes all the documents that match the selector
. You can change this behaviour by setting the firstMatchOnly
parameter to true
:
import scala.concurrent.ExecutionContext.Implicits.global
import reactivemongo.bson.BSONDocument
def removeFirst(selector: BSONDocument) =
personColl.remove(selector, firstMatchOnly = true)
ReactiveMongo can even store instances of a custom class directly by defining a custom writer.
Find and modify
ReactiveMongo also supports the MongoDB findAndModify
operation.
In the case you want to update the age of a document in a collection of persons, and at the same time to return the information about the person before this change, it can be done using findAndUpdate
.
import scala.concurrent.Future
import scala.concurrent.ExecutionContext.Implicits.global
import reactivemongo.bson.{ BSONDocument, BSONDocumentReader, Macros }
import reactivemongo.api.collections.bson.BSONCollection
case class Person(name: String, age: Int)
def update(collection: BSONCollection, age: Int): Future[Option[Person]] = {
import collection.BatchCommands.FindAndModifyCommand.FindAndModifyResult
implicit val reader = Macros.reader[Person]
val result: Future[FindAndModifyResult] = collection.findAndUpdate(
BSONDocument("name" -> "James"),
BSONDocument("$set" -> BSONDocument("age" -> 17)),
fetchNewObject = true)
result.map(_.result[Person])
}
As on a simple update, it’s possible to insert a new document when one does not already exist.
import scala.concurrent.Future
import scala.concurrent.ExecutionContext.Implicits.global
import reactivemongo.bson.{ BSONDocument, Macros }
import reactivemongo.api.collections.bson.BSONCollection
implicit val writer = Macros.writer[Person]
def result(coll: BSONCollection): Future[coll.BatchCommands.FindAndModifyCommand.FindAndModifyResult] = coll.findAndUpdate(
BSONDocument("name" -> "James"),
Person(name = "Foo", age = 25),
upsert = true)
// insert a new document if a matching one does not already exist
The findAndModify
approach can be used on removal.
import scala.concurrent.{ ExecutionContext, Future }
import reactivemongo.bson.{ BSONDocument, BSONDocumentReader }
import reactivemongo.api.collections.bson.BSONCollection
def removedPerson(coll: BSONCollection, name: String)(implicit ec: ExecutionContext, reader: BSONDocumentReader[Person]): Future[Option[Person]] =
coll.findAndRemove(BSONDocument("name" -> name)).
map(_.result[Person])