Reactive Scala Driver for MongoDB

Asynchronous & Non-Blocking

Write Documents

MongoDB offers different kinds of write operations: insertion, update or removal. Using ReactiveMongo Data, this can be performed asynchronously.

Insert a document

Insertions are done with the insert function.

import scala.util.{ Failure, Success }

import scala.concurrent.Future
import scala.concurrent.ExecutionContext.Implicits.global

import reactivemongo.bson.BSONDocument
import reactivemongo.api.commands.WriteResult
import reactivemongo.api.collections.bson.BSONCollection

val document1 = BSONDocument(
  "firstName" -> "Stephane",
  "lastName" -> "Godbillon",
  "age" -> 29)

def insertDoc1(coll: BSONCollection, doc: BSONDocument): Future[Unit] = {
  val writeRes: Future[WriteResult] = coll.insert(document1)

  writeRes.onComplete { // Dummy callbacks
    case Failure(e) => e.printStackTrace()
    case Success(writeResult) =>
      println(s"successfully inserted document with result: $writeResult")
  }

  writeRes.map(_ => {}) // in this example, do nothing with the success
}

The type Future[LastError] previously returned by the write operations is replaced by Future[WriteResult] in the new API.

What does WriteResult mean?

A WriteResult is a special document that contains information about the write operation, like the number of documents that were updated.

If the write result actually indicates an error, the Future will be in a failed state (no need to check for WriteResult.ok).

Like all the other collection operations (in GenericCollection trait), you can insert any writeable value to insert(). With the default BSON serialization, that means provided there a BSONDocumentWriter for its type in the implicit scope. So, considering the Person case class:

import scala.util.{ Failure, Success }
import scala.concurrent.ExecutionContext.Implicits.global

val person = Person("Stephane Godbillon", 29)

val future2 = personColl.insert(person)

future2.onComplete {
  case Failure(e) => throw e
  case Success(writeResult) => {
    println(s"successfully inserted document: $writeResult")
  }
}

Error handling:

When calling a write operation, it’s possible to handle some specific error case by testing the result, using some pattern matching utilities.

import scala.concurrent.Future
import scala.concurrent.ExecutionContext.Implicits.global

import reactivemongo.api.commands.WriteResult

val future: Future[WriteResult] = personColl.insert(person)

val end: Future[Unit] = future.map(_ => {}).recover {
  case WriteResult.Code(11000) =>
    // if the result is defined with the error code 11000 (duplicate error)
    println("Match the code 11000")

  case WriteResult.Message("Must match this exact message") =>
    println("Match the error message")

  case _ => ()
}

Insert multiple document

The operation bulkInsert makes it possible to insert multiple documents.

import scala.concurrent.Future
import scala.concurrent.ExecutionContext.Implicits.global

import reactivemongo.bson.BSONDocument
import reactivemongo.api.commands.MultiBulkWriteResult

def bsonCollection: reactivemongo.api.collections.bson.BSONCollection = ???
def persons: List[Person] = ???

val personColl = bsonCollection

val bulkResult1: Future[MultiBulkWriteResult] =
  personColl.bulkInsert(ordered = false)(
    BSONDocument("name" -> "document1"),
    BSONDocument("name" -> "document2"),
    BSONDocument("name" -> "document3"))

// Considering `persons` a `Seq[Person]`, 
// provided a `BSONDocumentWriter[Person]` can be resolved.
val bulkDocs = // prepare the person documents to be inserted
  persons.map(implicitly[personColl.ImplicitlyDocumentProducer](_))
  
val bulkResult2 = personColl.bulkInsert(ordered = true)(bulkDocs: _*)

Update a document

Updates are done with the update() method, which follows the same logic as insert.

import scala.concurrent.ExecutionContext.Implicits.global

import reactivemongo.bson.BSONDocument

val selector = BSONDocument("name" -> "Jack")

val modifier = BSONDocument(
  "$set" -> BSONDocument(
    "lastName" -> "London",
    "firstName" -> "Jack"),
    "$unset" -> BSONDocument("name" -> 1))

// get a future update
val futureUpdate1 = personColl.update(selector, modifier)

By default, the update operation only updates a single matching document. You can also indicate that the update should be applied to all the documents that are matching, with the multi parameter.

import scala.concurrent.ExecutionContext.Implicits.global

// get a future update
val futureUpdate2 = personColl.update(selector, modifier, multi = true)

It’s possible to automatically insert data if there is no matching document using the upsert parameter.

import scala.concurrent.ExecutionContext.Implicits.global

val futureUpdate3 = personColl.update(selector, modifier, upsert = true)

Remove a document

import scala.util.{ Failure, Success }
import scala.concurrent.ExecutionContext.Implicits.global

import reactivemongo.bson.BSONDocument

val selector1 = BSONDocument("firstName" -> "Stephane")

val futureRemove1 = personColl.remove(selector1)

futureRemove1.onComplete { // callback
  case Failure(e) => throw e
  case Success(writeResult) => println("successfully removed document")
}

By default, this remove function deletes all the documents that match the selector. You can change this behaviour by setting the firstMatchOnly parameter to true:

import scala.concurrent.ExecutionContext.Implicits.global
import reactivemongo.bson.BSONDocument

def removeFirst(selector: BSONDocument) =
  personColl.remove(selector, firstMatchOnly = true)

ReactiveMongo can even store instances of a custom class directly by defining a custom writer.

Find and modify

ReactiveMongo also supports the MongoDB findAndModify operation.

In the case you want to update the age of a document in a collection of persons, and at the same time to return the information about the person before this change, it can be done using findAndUpdate.

import scala.concurrent.Future
import scala.concurrent.ExecutionContext.Implicits.global

import reactivemongo.bson.{ BSONDocument, BSONDocumentReader, Macros }
import reactivemongo.api.collections.bson.BSONCollection

case class Person(name: String, age: Int)

def update(collection: BSONCollection, age: Int): Future[Option[Person]] = {
  import collection.BatchCommands.FindAndModifyCommand.FindAndModifyResult
  implicit val reader = Macros.reader[Person]  
  
  val result: Future[FindAndModifyResult] = collection.findAndUpdate(
    BSONDocument("name" -> "James"),
    BSONDocument("$set" -> BSONDocument("age" -> 17)),
    fetchNewObject = true)

  result.map(_.result[Person])
}

As on a simple update, it’s possible to insert a new document when one does not already exist.

import scala.concurrent.Future
import scala.concurrent.ExecutionContext.Implicits.global

import reactivemongo.bson.{ BSONDocument, Macros }
import reactivemongo.api.collections.bson.BSONCollection

implicit val writer = Macros.writer[Person]

def result(coll: BSONCollection): Future[coll.BatchCommands.FindAndModifyCommand.FindAndModifyResult] = coll.findAndUpdate(
  BSONDocument("name" -> "James"),
  Person(name = "Foo", age = 25),
  upsert = true)
  // insert a new document if a matching one does not already exist

The findAndModify approach can be used on removal.

import scala.concurrent.{ ExecutionContext, Future }

import reactivemongo.bson.{ BSONDocument, BSONDocumentReader }
import reactivemongo.api.collections.bson.BSONCollection

def removedPerson(coll: BSONCollection, name: String)(implicit ec: ExecutionContext, reader: BSONDocumentReader[Person]): Future[Option[Person]] =
  coll.findAndRemove(BSONDocument("name" -> name)).
    map(_.result[Person])

Troubleshooting

The synchronous .db has been deprecated as it didn’t offer a sufficient guaranty that it can initially find an active channel in the connection pool (MongoConnection). The new .database resolution must be used (see connection tutorial).

If the deprecated database resolution is still used, a runtime error such as ConnectionNotInitialized can be raised when writing documents.

Previous: Database and collections / Next: Find documents

Suggest changes