Reactive Scala Driver for MongoDB

Asynchronous & Non-Blocking

Write Documents

MongoDB offers different kinds of write operations: insertion, update or removal. Using ReactiveMongo Data, this can be performed asynchronously.

Insert a document

Insertions are done with the insert function.

import scala.util.{ Failure, Success }

import scala.concurrent.Future
import scala.concurrent.ExecutionContext.Implicits.global

import reactivemongo.api.bson.BSONDocument
import reactivemongo.api.commands.WriteResult
import reactivemongo.api.bson.collection.BSONCollection

val document1 = BSONDocument(
  "firstName" -> "Stephane",
  "lastName" -> "Godbillon",
  "age" -> 29)

// Simple: .insert.one(t)
def simpleInsert(coll: BSONCollection): Future[Unit] = {
  val writeRes: Future[WriteResult] = coll.insert.one(document1)

  writeRes.onComplete { // Dummy callbacks
    case Failure(e) => e.printStackTrace()
    case Success(writeResult) =>
      println(s"successfully inserted document with result: $writeResult")
  }

  writeRes.map(_ => {}) // in this example, do nothing with the success
}

// Bulk: .insert.many(Seq(t1, t2, ..., tN))
def bulkInsert(coll: BSONCollection): Future[Unit] = {
  val writeRes: Future[coll.MultiBulkWriteResult] =
    coll.insert(ordered = false).many(Seq(
      document1, BSONDocument(
        "firstName" -> "Foo",
        "lastName" -> "Bar",
        "age" -> 1)))

  writeRes.onComplete { // Dummy callbacks
    case Failure(e) => e.printStackTrace()
    case Success(writeResult) =>
      println(s"successfully inserted document with result: $writeResult")
  }

  writeRes.map(_ => {}) // in this example, do nothing with the success
}

What does WriteResult mean?

A WriteResult is a special document that contains information about the write operation, like the number of documents that were updated.

If the write result actually indicates an error, the Future will be in a failed state (no need to check for WriteResult.ok).

Like all the other collection operations (in GenericCollection trait), you can insert any writeable value to insert(). With the default BSON serialization, that means provided there a BSONDocumentWriter for its type in the implicit scope. So, considering the Person case class:

import scala.util.{ Failure, Success }
import scala.concurrent.ExecutionContext.Implicits.global

import reactivemongo.api.bson.collection.BSONCollection

val person = Person("Stephane Godbillon", 29)

def testInsert(personColl: BSONCollection) = {
  val future2 = personColl.insert.one(person)

  future2.onComplete {
    case Failure(e) => throw e
    case Success(writeResult) => {
      println(s"successfully inserted document: $writeResult")
    }
  }
}

Error handling:

When calling a write operation, it’s possible to handle some specific error case by testing the result, using some pattern matching utilities.

import scala.concurrent.Future
import scala.concurrent.ExecutionContext.Implicits.global

import reactivemongo.api.commands.WriteResult

import reactivemongo.api.bson.collection.BSONCollection

def insertErrors(personColl: BSONCollection) = {
  val future: Future[WriteResult] = personColl.insert.one(person)

  val end: Future[Unit] = future.map(_ => {}).recover {
    case WriteResult.Code(11000) =>
      // if the result is defined with the error code 11000 (duplicate error)
      println("Match the code 11000")

    case WriteResult.Message("Must match this exact message") =>
      println("Match the error message")

    case WriteResult.Exception(cause) =>
      cause.printStackTrace() // Print any other Exception

    case _ => ()
  }
}

Update a document

Updates are done with the update operation, which follows the same logic as insert.

import scala.concurrent.Future
import scala.concurrent.ExecutionContext.Implicits.global

import reactivemongo.api.bson.BSONDocument

import reactivemongo.api.bson.collection.BSONCollection

def update1(personColl: BSONCollection) = {
  val selector = BSONDocument("name" -> "Jack")

  val modifier = BSONDocument(
    "$set" -> BSONDocument(
      "lastName" -> "London",
      "firstName" -> "Jack"),
      "$unset" -> BSONDocument("name" -> 1))

  // Simple update: get a future update
  val futureUpdate1 = personColl.update.one(
    q = selector, u = modifier,
    upsert = false, multi = false)

  // Bulk update: multiple update
  val updateBuilder1 = personColl.update(ordered = true)
  val updates = Future.sequence(Seq(
    updateBuilder1.element(
      q = BSONDocument("firstName" -> "Jane", "lastName" -> "Doh"),
      u = BSONDocument("age" -> 18),
      upsert = true,
      multi = false),
    updateBuilder1.element(
      q = BSONDocument("firstName" -> "Bob"),
      u = BSONDocument("age" -> 19),
      upsert = false,
      multi = true)))

  val bulkUpdateRes1 = updates.flatMap { ops => updateBuilder1.many(ops) }
}

By default, the update operation only updates a single matching document. You can also indicate that the update should be applied to all the documents that are matching, with the multi parameter.

It’s possible to automatically insert data if there is no matching document using the upsert parameter.

The arrayFilters criteria can also be specified on update.

import scala.concurrent.ExecutionContext

import reactivemongo.api.bson.BSONDocument
import reactivemongo.api.bson.collection.BSONCollection

def updateArrayFilters(personColl: BSONCollection)(
  implicit ec: ExecutionContext) =
  personColl.update.one(
    q = BSONDocument("grades" -> BSONDocument(f"$$gte" -> 100)),
    u = BSONDocument(f"$$set" -> BSONDocument(
      f"grades.$$[element]" -> 100)),
    upsert = false,
    multi = true,
    collation = None,
    arrayFilters = Seq(
      BSONDocument("element" -> BSONDocument(f"$$gte" -> 100))))

Delete a document

The .delete function returns a DeleteBuilder, which allows to perform simple or bulk delete.

import scala.util.{ Failure, Success }

import scala.concurrent.Future
import scala.concurrent.ExecutionContext.Implicits.global

import reactivemongo.api.bson.BSONDocument

import reactivemongo.api.bson.collection.BSONCollection

def simpleDelete1(personColl: BSONCollection) = {
  val selector1 = BSONDocument("firstName" -> "Stephane")

  val futureRemove1 = personColl.delete.one(selector1)

  futureRemove1.onComplete { // callback
    case Failure(e) => throw e
    case Success(writeResult) => println("successfully removed document")
  }
}

def bulkDelete1(personColl: BSONCollection) = {
  val deleteBuilder = personColl.delete(ordered = false)

  val deletes = Future.sequence(Seq(
    deleteBuilder.element(
      q = BSONDocument("firstName" -> "Stephane"),
      limit = Some(1), // former option firstMatch
      collation = None),
    deleteBuilder.element(
      q = BSONDocument("lastName" -> "Doh"),
      limit = None, // delete all the matching document
      collation = None)))

  deletes.flatMap { ops => deleteBuilder.many(ops) }
}

The .remove operation is now deprecated.

Find and modify

ReactiveMongo also supports the MongoDB findAndModify operation.

In the case you want to update the age of a document in a collection of persons, and at the same time to return the information about the person before this change, it can be done using findAndUpdate.

import scala.concurrent.Future
import scala.concurrent.ExecutionContext.Implicits.global

import reactivemongo.api.bson.{ BSONDocument, Macros }
import reactivemongo.api.bson.collection.BSONCollection

case class Person(name: String, age: Int)

def update(collection: BSONCollection, age: Int): Future[Option[Person]] = {
  implicit val reader = Macros.reader[Person]  
  
  val result = collection.findAndUpdate(
    BSONDocument("name" -> "James"),
    BSONDocument("$set" -> BSONDocument("age" -> 17)),
    fetchNewObject = true)

  result.map(_.result[Person])
}

As on a simple update, it’s possible to insert a new document when one does not already exist.

import scala.concurrent.Future
import scala.concurrent.ExecutionContext.Implicits.global

import reactivemongo.api.bson.{ BSONDocument, BSONDocumentHandler, Macros }
import reactivemongo.api.bson.collection.BSONCollection

implicit val handler: BSONDocumentHandler[Person] = Macros.handler[Person]

/** Insert a new document if a matching one does not already exist. */
def result(coll: BSONCollection): Future[Option[Person]] =
  coll.findAndUpdate(
    BSONDocument("name" -> "James"),
    Person(name = "Foo", age = 25),
    upsert = true).map(_.result[Person])

The findAndModify approach can be used on removal.

import scala.concurrent.{ ExecutionContext, Future }

import reactivemongo.api.bson.{ BSONDocument, BSONDocumentReader }
import reactivemongo.api.bson.collection.BSONCollection

def removedPerson(coll: BSONCollection, name: String)(implicit ec: ExecutionContext, reader: BSONDocumentReader[Person]): Future[Option[Person]] =
  coll.findAndRemove(BSONDocument("name" -> name)).
    map(_.result[Person])

As when using update arrayFilters criteria can be specified for a findAndModify operation.

import scala.concurrent.ExecutionContext.Implicits.global

import reactivemongo.api.bson.BSONDocument

import reactivemongo.api.WriteConcern
import reactivemongo.api.bson.collection.BSONCollection

def findAndUpdateArrayFilters(personColl: BSONCollection) =
  personColl.findAndModify(
    selector = BSONDocument.empty,
    modifier = personColl.updateModifier(
      update = BSONDocument(f"$$set" -> BSONDocument(
        f"grades.$$[element]" -> 100)),
      fetchNewObject = true,
      upsert = false),
    sort = None,
    fields = None,
    bypassDocumentValidation = false,
    writeConcern = WriteConcern.Journaled,
    maxTime = None,
    collation = None,
    arrayFilters = Seq(
      BSONDocument("elem.grade" -> BSONDocument(f"$$gte" -> 85))))

Session/transaction

Starting in 3.6, MongoDB offers session management to gather operations, and since MongoDB 4.0, transactions can be defined for session.

import scala.concurrent.{ ExecutionContext, Future }

import reactivemongo.api.bson.BSONDocument

import reactivemongo.api.DB

def testTx(db: DB)(implicit ec: ExecutionContext): Future[Unit] = 
  for {
    dbWithSession <- db.startSession()
    dbWithTx <- dbWithSession.startTransaction(None)
    coll = dbWithTx.collection("foo")

    _ <- coll.insert.one(BSONDocument("id" -> 1, "bar" -> "lorem"))
    r <- coll.find(BSONDocument("id" -> 1)).one[BSONDocument] // found

    _ <- db.collection("foo").find(
      BSONDocument("id" -> 1)).one[BSONDocument]
      // not found for DB outside transaction

    _ <- dbWithTx.commitTransaction() // or abortTransaction()
      // session still open, can start another transaction, or other ops

    _ <- dbWithSession.endSession()
  } yield ()

The support for session and transaction is defined in the database API (still experimental).

Previous: Database and collections / Next: Find documents

Suggest changes