Reactive Scala Driver for MongoDB

Asynchronous & Non-Blocking

Write Documents

MongoDB offers different kinds of write operations: insertion, update or removal. Using ReactiveMongo Data, this can be performed asynchronously.

Insert a document

Insertions are done with the insert function.

import scala.util.{ Failure, Success }

import scala.concurrent.Future
import scala.concurrent.ExecutionContext.Implicits.global

import reactivemongo.bson.BSONDocument
import reactivemongo.api.commands.WriteResult
import reactivemongo.api.collections.bson.BSONCollection

val document1 = BSONDocument(
  "firstName" -> "Stephane",
  "lastName" -> "Godbillon",
  "age" -> 29)

def insertDoc1(coll: BSONCollection, doc: BSONDocument): Future[Unit] = {
  val writeRes: Future[WriteResult] = coll.insert(document1)

  writeRes.onComplete { // Dummy callbacks
    case Failure(e) => e.printStackTrace()
    case Success(writeResult) =>
      println(s"successfully inserted document with result: $writeResult")
  }

  writeRes.map(_ => {}) // in this example, do nothing with the success
}

The type Future[LastError] previously returned by the write operations is replaced by Future[WriteResult] in the new API.

What does WriteResult mean?

A WriteResult is a special document that contains information about the write operation, like the number of documents that were updated.

If the write result actually indicates an error, the Future will be in a failed state.

Like all the other collection operations (in GenericCollection trait), you can insert any writeable value to insert(). With the default BSON serialization, that means provided there a BSONDocumentWriter for its type in the implicit scope. So, considering the Person case class:

import scala.util.{ Failure, Success }
import scala.concurrent.ExecutionContext.Implicits.global

val person = Person("Stephane Godbillon", 29)

val future2 = personColl.insert(person)

future2.onComplete {
  case Failure(e) => throw e
  case Success(writeResult) => {
    println(s"successfully inserted document: $writeResult")
  }
}

When calling a write operation, it’s possible to handle some specific error case by testing the result.

import scala.concurrent.Future
import scala.concurrent.ExecutionContext.Implicits.global

import reactivemongo.api.commands.{ CommandError, WriteResult }

val future: Future[WriteResult] = personColl.insert(person)

val end: Future[Unit] = future.map(_ => {}).recover {
  case err: CommandError if (err.code contains 11000) =>
    // if the result is defined with the error code 11000 (duplicate error)
    println("Just a warning")

  case _ => ()
}

Insert multiple document

The operation bulkInsert makes it possible to insert multiple documents.

import scala.concurrent.Future
import scala.concurrent.ExecutionContext.Implicits.global

import reactivemongo.bson.BSONDocument
import reactivemongo.api.commands.MultiBulkWriteResult

def bsonCollection: reactivemongo.api.collections.bson.BSONCollection = ???
def persons: List[Person] = ???

val personColl = bsonCollection

val bulkResult1: Future[MultiBulkWriteResult] =
  personColl.bulkInsert(ordered = false)(
    BSONDocument("name" -> "document1"),
    BSONDocument("name" -> "document2"),
    BSONDocument("name" -> "document3"))

// Considering `persons` a `Seq[Person]`, 
// provided a `BSONDocumentWriter[Person]` can be resolved.
val bulkDocs = // prepare the person documents to be inserted
  persons.map(implicitly[personColl.ImplicitlyDocumentProducer](_))
  
val bulkResult2 = personColl.bulkInsert(ordered = true)(bulkDocs: _*)

Update a document

Updates are done with the update() method, which follows the same logic as insert.

import scala.concurrent.ExecutionContext.Implicits.global

import reactivemongo.bson.BSONDocument

val selector = BSONDocument("name" -> "Jack")

val modifier = BSONDocument(
  "$set" -> BSONDocument(
    "lastName" -> "London",
    "firstName" -> "Jack"),
    "$unset" -> BSONDocument("name" -> 1))

// get a future update
val futureUpdate1 = personColl.update(selector, modifier)

By default, the update operation only update a single matching document. You can also specify that the update should be applied to all the documents that are matching, with the multi parameter.

import scala.concurrent.ExecutionContext.Implicits.global

// get a future update
val futureUpdate2 = personColl.update(selector, modifier, multi = true)

It’s possible to automatically insert data if there is no existing document matching the update using the upsert parameter.

import scala.concurrent.ExecutionContext.Implicits.global

val futureUpdate3 = personColl.update(selector, modifier, upsert = true)

Remove a document

import scala.util.{ Failure, Success }
import scala.concurrent.ExecutionContext.Implicits.global

import reactivemongo.bson.BSONDocument

val selector1 = BSONDocument("firstName" -> "Stephane")

val futureRemove1 = personColl.remove(selector1)

futureRemove1.onComplete { // callback
  case Failure(e) => throw e
  case Success(writeResult) => println("successfully removed document")
}

By default, this remove function deletes all the documents that match the selector. You can change this behaviour by setting the firstMatchOnly parameter to true:

import scala.concurrent.ExecutionContext.Implicits.global
import reactivemongo.bson.BSONDocument

def removeFirst(selector: BSONDocument) =
  personColl.remove(selector, firstMatchOnly = true)

ReactiveMongo can even store instances of a custom class directly by defining a custom writer.

Find and modify

ReactiveMongo also supports the MongoDB findAndModify operation.

In the case you want to update the age of a document in a collection of persons, and at the same time to return the information about the person before this change, it can be done using findAndUpdate.

import scala.concurrent.Future
import scala.concurrent.ExecutionContext.Implicits.global

import reactivemongo.bson.{ BSONDocument, BSONDocumentReader, Macros }
import reactivemongo.api.collections.bson.BSONCollection

case class Person(name: String, age: Int)

def update(collection: BSONCollection, age: Int): Future[Option[Person]] = {
  import collection.BatchCommands.FindAndModifyCommand.FindAndModifyResult
  implicit val reader = Macros.reader[Person]  
  
  val result: Future[FindAndModifyResult] = collection.findAndUpdate(
    BSONDocument("name" -> "James"),
    BSONDocument("$set" -> BSONDocument("age" -> 17)),
    fetchNewObject = true)

  result.map(_.result[Person])
}

As on a simple update, it’s possible to insert a new document when one does not already exist.

import scala.concurrent.Future
import scala.concurrent.ExecutionContext.Implicits.global

import reactivemongo.bson.{ BSONDocument, Macros }
import reactivemongo.api.collections.bson.BSONCollection

implicit val writer = Macros.writer[Person]

def result(coll: BSONCollection): Future[coll.BatchCommands.FindAndModifyCommand.FindAndModifyResult] = coll.findAndUpdate(
  BSONDocument("name" -> "James"),
  Person(name = "Foo", age = 25),
  upsert = true)
  // insert a new document if a matching one does not already exist

The findAndModify approach can be used on removal.

import scala.concurrent.{ ExecutionContext, Future }

import reactivemongo.bson.{ BSONDocument, BSONDocumentReader }
import reactivemongo.api.collections.bson.BSONCollection

def removedPerson(coll: BSONCollection, name: String)(implicit ec: ExecutionContext, reader: BSONDocumentReader[Person]): Future[Option[Person]] =
  coll.findAndRemove(BSONDocument("name" -> name)).
    map(_.result[Person])

Previous: Database and collections / Next: Find documents

Suggest changes