Reactive Scala Driver for MongoDB

Asynchronous & Non-Blocking

ReactiveMongo 0.20.13 – Release details

What’s new?

The documentation is available online, and its code samples are compiled to make sure it’s up-to-date.

The next release will be 1.0.0.

The impatient can have a look at the release slideshow.

Compatibility

This release is compatible with the following runtime.

MongoDB versions older than 3.0 are not longer (end of life 2018-2).

Recommended configuration:

The driver core and the modules are tested in a container based environment, with the specifications as bellow.

This can be considered as a recommended environment.

Connection

The MongoDriver type is replaced by AsyncDriver, with asynchronous methods.

The utility function MongoConnection.parseURI is replaced by asynchronous function .fromString.

Also, the following options are deprecated:

SNI is now supported for the SSL connection.

The x.509 certificate authentication is now supported, and can be configured by setting x509 as authenticationMechanism, and with the following new options.

import reactivemongo.api._

def connection(driver: AsyncDriver) =
  driver.connect("mongodb://server:27017/db?ssl=true&authenticationMechanism=x509&keyStore=file:///path/to/keystore.p12&keyStoreType=PKCS12")

The DNS seedlist is now supported, using mongodb+srv:// scheme in the connection URI. It’s also possible to specify the credential directly in the URI.

import reactivemongo.api._

def seedListCon(driver: AsyncDriver) =
  driver.connect("mongodb+srv://usr:pass@mymongo.mydomain.tld/mydb")

The option rm.monitorRefreshMS is renamed heartbeatFrequencyMS.

See the documentation

Netty native

The internal Netty dependency has been updated to the version 4.1.

It comes with various improvements (memory consumption, …), and also to use Netty native support (kqueue for Mac OS X and epoll for Linux, on x86_64 arch).

Note that the Netty dependency is shaded so it won’t conflict with any Netty version in your environment.

See the documentation

BSON library

The current BSON library for ReactiveMongo has been updated.

It now supports BSON Decimal128 (MongoDB 3.4+).

The way Option is handled by the macros has been improved, also with a new annotation @NoneAsNull, which write None values as BSONNull (instead of omitting field/value).

More: BSON Library overview

Types

The Decimal128 type introduced by MongoDB 3.4 is supported, as BSONDecimal, and can be read or write as java.math.BigDecimal.

Handlers

A handler is now available to write and read Scala Map as BSON, provided the key and value types are themselves supported.

import scala.util.Try
import reactivemongo.api.bson._

def bsonMap = {
  val input: Map[String, Int] = Map("a" -> 1, "b" -> 2)

  // Ok as key and value (String, Int) are provided BSON handlers
  val doc: Try[BSONDocument] = BSON.writeDocument(input)

  val output = doc.flatMap { BSON.readDocument[Map[String, Int]](_) }
}

Macros

The compile-time option AutomaticMaterialization has been added, when using the macros with sealed family, to explicitly indicate when you want to automatically materialize required instances for the subtypes (if missing from the implicit scope).

sealed trait Color

case object Red extends Color
case object Blue extends Color
case class Green(brightness: Int) extends Color
case class CustomColor(code: String) extends Color

object Color {
  import reactivemongo.api.bson.{ Macros, MacroOptions },
    MacroOptions.{ AutomaticMaterialization, UnionType, \/ }

  // Use `UnionType` to define a subset of the `Color` type,
  type PredefinedColor =
    UnionType[Red.type \/ Green \/ Blue.type] with AutomaticMaterialization

  val predefinedColor = Macros.handlerOpts[Color, PredefinedColor]
}

A new annotation @Flatten has been added, to indicate to the macros that the representation of a property must be flatten rather than a nested document.

import reactivemongo.api.bson.BSONDocument
import reactivemongo.api.bson.Macros.Annotations.Flatten

case class Range(start: Int, end: Int)

case class LabelledRange(
  name: String,
  @Flatten range: Range)

// Flattened with macro as bellow:
BSONDocument("name" -> "foo", "start" -> 0, "end" -> 1)

// Rather than:
// BSONDocument("name" -> "foo", "range" -> BSONDocument(
//   "start" -> 0, "end" -> 1))

Query and write operations

The query builder supports more options (see find).

The collection API provides new builders for write operations. This supports bulk operations (e.g. insert many documents at once).

InsertBuilder

The new insert operation is providing an InsertBuilder, which supports,

import scala.concurrent.Future
import scala.concurrent.ExecutionContext.Implicits.global

import reactivemongo.api.commands.{ MultiBulkWriteResult, WriteResult }

import reactivemongo.api.bson.BSONDocument
import reactivemongo.api.bson.collection.BSONCollection

val document1 = BSONDocument(
  "firstName" -> "Stephane",
  "lastName" -> "Godbillon",
  "age" -> 29)

// Simple: .insert.one(t)
def simpleInsert(coll: BSONCollection): Future[WriteResult] =
  coll.insert.one(document1)

// Bulk: .insert.many(Seq(t1, t2, ..., tN))
def bulkInsert(coll: BSONCollection): Future[MultiBulkWriteResult] =
  coll.insert(ordered = false).many(Seq(
    document1, BSONDocument(
      "firstName" -> "Foo",
      "lastName" -> "Bar",
      "age" -> 1)))

UpdateBuilder

The new update operation returns an UpdateBuilder, which can be used to perform simple or bulk update.

import scala.concurrent.Future
import scala.concurrent.ExecutionContext.Implicits.global

import reactivemongo.api.bson.BSONDocument
import reactivemongo.api.bson.collection.BSONCollection

def update1(personColl: BSONCollection) = {
  val selector = BSONDocument("name" -> "Jack")

  val modifier = BSONDocument(
    "$set" -> BSONDocument(
      "lastName" -> "London",
      "firstName" -> "Jack"),
      "$unset" -> BSONDocument("name" -> 1))

  // Simple update: get a future update
  val futureUpdate1 = personColl.update.one(
    q = selector, u = modifier,
    upsert = false, multi = false)

  // Bulk update: multiple update
  val updateBuilder1 = personColl.update(ordered = true)
  val updates = Future.sequence(Seq(
    updateBuilder1.element(
      q = BSONDocument("firstName" -> "Jane", "lastName" -> "Doh"),
      u = BSONDocument("age" -> 18),
      upsert = true,
      multi = false),
    updateBuilder1.element(
      q = BSONDocument("firstName" -> "Bob"),
      u = BSONDocument("age" -> 19),
      upsert = false,
      multi = true)))

  val bulkUpdateRes1 = updates.flatMap { ops => updateBuilder1.many(ops) }
}

DeleteBuilder

The .delete function returns a DeleteBuilder, to perform simple or bulk delete.

import scala.concurrent.Future
import scala.concurrent.ExecutionContext.Implicits.global

import reactivemongo.api.bson.BSONDocument
import reactivemongo.api.bson.collection.BSONCollection

def simpleDelete1(personColl: BSONCollection) =
  personColl.delete.one(BSONDocument("firstName" -> "Stephane"))

def bulkDelete1(personColl: BSONCollection) = {
  val deleteBuilder = personColl.delete(ordered = false)

  val deletes = Future.sequence(Seq(
    deleteBuilder.element(
      q = BSONDocument("firstName" -> "Stephane"),
      limit = Some(1), // former option firstMatch
      collation = None),
    deleteBuilder.element(
      q = BSONDocument("lastName" -> "Doh"),
      limit = None, // delete all the matching document
      collation = None)))

  deletes.flatMap { ops => deleteBuilder.many(ops) }
}

The .remove operation is now deprecated.

arrayFilters

The arrayFilters criteria is supported for findAndModify and update operations.

import scala.concurrent.Future
import scala.concurrent.ExecutionContext.Implicits.global

import reactivemongo.api.WriteConcern
import reactivemongo.api.bson.BSONDocument
import reactivemongo.api.bson.collection.BSONCollection

def findAndUpdateArrayFilters(personColl: BSONCollection) =
  personColl.findAndModify(
    selector = BSONDocument.empty,
    modifier = personColl.updateModifier(
      update = BSONDocument(f"$$set" -> BSONDocument(
        f"grades.$$[element]" -> 100)),
      fetchNewObject = true,
      upsert = false),
    sort = None,
    fields = None,
    bypassDocumentValidation = false,
    writeConcern = WriteConcern.Journaled,
    maxTime = None,
    collation = None,
    arrayFilters = Seq(
      BSONDocument("elem.grade" -> BSONDocument(f"$$gte" -> 85))))

def updateArrayFilters(personColl: BSONCollection) =
  personColl.update.one(
    q = BSONDocument("grades" -> BSONDocument(f"$$gte" -> 100)),
    u = BSONDocument(f"$$set" -> BSONDocument(
      f"grades.$$[element]" -> 100)),
    upsert = false,
    multi = true,
    collation = None,
    arrayFilters = Seq(
      BSONDocument("element" -> BSONDocument(f"$$gte" -> 100))))

More: Find documents, Write documents

Aggregation

There are newly supported by the Aggregation Framework.

addFields:

The $addFields stage can now be used.

import scala.concurrent.ExecutionContext

import reactivemongo.api.bson.collection.BSONCollection

def sumHomeworkQuizz(students: BSONCollection) =
  students.aggregateWith1[BSONDocument]() { framework =>
    import framework.AddFields

    AddFields(document(
      "totalHomework" -> document(f"$$sum" -> f"$$homework"),
      "totalQuiz" -> document(f"$$sum" -> f"$$quiz"))) -> List(
      AddFields(document(
        "totalScore" -> document(f"$$add" -> array(
        f"$$totalHomework", f"$$totalQuiz", f"$$extraCredit")))))
  }

bucketAuto:

The $bucketAuto stage introduced by MongoDB 3.4 can be used as bellow.

import scala.concurrent.ExecutionContext

import reactivemongo.api.Cursor

import reactivemongo.api.bson._
import reactivemongo.api.bson.collection.BSONCollection

def populationBuckets(zipcodes: BSONCollection)(implicit ec: ExecutionContext) =
  zipcodes.aggregateWith1[BSONDocument]() { framework =>
    import framework.BucketAuto

    BucketAuto(BSONString(f"$$population"), 2, None)() -> List.empty
  }.collect[Set](Int.MaxValue, Cursor.FailOnError[Set[BSONDocument]]())

count:

If the goal is only to count the aggregated documents, the $count stage can be used.

import scala.concurrent.Future
import scala.concurrent.ExecutionContext.Implicits.global

import reactivemongo.api.Cursor

import reactivemongo.api.bson.{ BSONDocument, BSONDocumentReader, BSONString }
import reactivemongo.api.bson.collection.BSONCollection

def countPopulatedStates1(col: BSONCollection): Future[Int] = {
  implicit val countReader = BSONDocumentReader[Int] { doc =>
    doc.getAsTry[Int]("popCount").get
  }

  col.aggregateWith[Int]() { framework =>
    import framework.{ Count, Group, Match, SumField }

    Group(BSONString("$state"))(
      "totalPop" -> SumField("population")) -> List(
        Match(BSONDocument("totalPop" -> BSONDocument("$gte" -> 10000000L))),
        Count("popCount"))
  }.head
}

facet:

The $facet stage is now supported, to create multi-faceted aggregations which characterize data across multiple dimensions, or facets.

import reactivemongo.api.bson.collection.BSONCollection

def useFacetAgg(coll: BSONCollection) = {
  import coll.aggregationFramework.{ Count, Facet, Out, UnwindField }

  Facet(Seq(
    "foo" -> (UnwindField("bar"), List(Count("c"))),
    "lorem" -> (Out("ipsum"), List.empty)))
  /* {
    $facet: {
      'foo': [
        { '$unwind': '$bar' },
        { '$count': 'c' }
      ],
      'lorem': [
        { '$out': 'ipsum' }
      ]
    }
  } */
}

filter:

The $filter stage is now supported.

import scala.concurrent.ExecutionContext.Implicits.global

import reactivemongo.api.Cursor

import reactivemongo.api.bson.{ BSONArray, BSONDocument, BSONString }
import reactivemongo.api.bson.collection.BSONCollection

def salesWithItemGreaterThanHundered(sales: BSONCollection) =
  sales.aggregateWith1[BSONDocument]() { framework =>
    import framework._

    val sort = Sort(Ascending("_id"))

    Project(BSONDocument("items" -> Filter(
      input = BSONString(f"$$items"),
      as = "item",
      cond = BSONDocument(
        f"$$gte" -> BSONArray(f"$$$$item.price", 100))))) -> List(sort)

  }.collect[List](Int.MaxValue, Cursor.FailOnError[List[BSONDocument]]())

replaceRoot:

The $replaceRoot stage is now supported.

import scala.concurrent.Future
import scala.concurrent.ExecutionContext.Implicits.global

import reactivemongo.api.bson.BSONDocument
import reactivemongo.api.bson.collection.BSONCollection

/* For a fruits collection:
{
   "_id" : 1,
   "fruit" : [ "apples", "oranges" ],
   "in_stock" : { "oranges" : 20, "apples" : 60 },
   "on_order" : { "oranges" : 35, "apples" : 75 }
}, ...
*/

def replaceRootTest(fruits: BSONCollection): Future[Option[BSONDocument]] = {
  fruits.aggregateWith1[BSONDocument]() { framework =>
    import framework._

    ReplaceRootField("in_stock") -> List.empty
  }.headOption
  // Results: { "oranges": 20, "apples": 60 }, ...
}

slice:

The $slice operator is also supported as bellow.

import scala.concurrent.ExecutionContext

import reactivemongo.api.Cursor

import reactivemongo.api.bson._
import reactivemongo.api.bson.collection.BSONCollection

def sliceFavorites(coll: BSONCollection)(implicit ec: ExecutionContext) =
  coll.aggregateWith1[BSONDocument]() { framework =>
    import framework.{ Project, Slice }

    Project(BSONDocument(
      "name" -> 1,
      "favorites" -> Slice(
        array = BSONString(f"$$favorites"),
        n = BSONInteger(3)).makePipe)) -> List.empty
  }.collect[Seq](4, Cursor.FailOnError[Seq[BSONDocument]]())

Miscellaneous: Other stages are also supported.

More: Aggregation Framework

Monitoring

A new module is available to collect ReactiveMongo metrics with Kamon.

"org.reactivemongo" %% "reactivemongo-kamon" % "0.20.13"

Then the metrics can be configured in dashboards, according the used Kamon reporters. For example if using Kamon APM.

Graph about established connections

More: Monitoring

Administration

The operations to manage a MongoDB instance can be executed using ReactiveMongo. This new release has new functions for DB administration.

Ping:

The DefaultDB has now a ping operation, to execute a ping command.

import scala.concurrent.Future
import scala.concurrent.ExecutionContext.Implicits.global

import reactivemongo.api.DefaultDB

def ping(admin: DefaultDB): Future[Boolean] = admin.ping()

Breaking changes

The Typesafe Migration Manager has been setup on the ReactiveMongo repository. It will validate all the future contributions, and help to make the API more stable.

For the current 0.20.13 release, it has detected the following breaking changes.

Test coverage

Connection

Operations and commands

Core/internal

Suggest changes