Reactive Scala Driver for MongoDB

Asynchronous & Non-Blocking

ReactiveMongo 0.12.2 – Release details

What’s new?

The documentation is available online, and its code samples are compiled to make sure it’s up-to-date.

The next release will be 1.0.0 (not 0.13.0).

The impatient can have a look at the release slideshow.

Compatibility

This release is compatible with the following runtime.

MongoDB versions older than 2.6 are not longer supported by ReactiveMongo.

Recommended configuration:

The driver core and the modules are tested in a container based environment, with the specifications as bellow.

This can be considered as a recommended environment.

Connection pool

The way ReactiveMongo manages the connection pool has been improved.

Database resolution

A new better DB resolution is available (see connection tutorial).

The synchronous .db has been deprecated as it was assuming it can initially find an active channel in the connection pool (MongoConnection), whereas checking/discovering the initial node set can take time, according the network speed/latency.

The new version fix this assumption with an asynchronous/reactive resolution of the database (possibly using a failover strategy) to make sure at least one initial channel (according the chosen read preference).

The new resolution returns a Future[DefaultDB], and should be used instead of the former connection(..) (or its alias connection.db(..)).

import scala.concurrent.{ ExecutionContext, Future }

import reactivemongo.api.{ DefaultDB, MongoConnection }

def newResolution(con: MongoConnection, name: String)(implicit ec: ExecutionContext): Future[DefaultDB] = con.database(name)

Similarly the function .db of the Play module must be replaced by its .database equivalent.

It’s generally a good practice not to assign the database and collection references to val (even to lazy val), as it’s better to get a fresh reference each time, to automatically recover from any previous issues (e.g. network failure).

Consequently to this change, a runtime error such as ConnectionNotInitialized can be raised when calling a database or collection operation (e.g. collection.find(..)), if the deprecated database resolution is still used.

Connection options

Some default read preference and default write concern can be set in the connection configuration.

import reactivemongo.api._, commands.WriteConcern

def connection(driver: MongoDriver) =
  driver.connection(List("localhost"), options = MongoConnectionOptions(
    readPreference = ReadPreference.primary,
    writeConcern = WriteConcern.Default // Acknowledged
  ))

The authentication algorithm is now SCRAM SHA1 by default. To change it (e.g. for MongoDB 2.6.x), see the connection options.

The default failover strategy can also be defined in the connection options.

import reactivemongo.api.{ FailoverStrategy, MongoConnectionOptions }

val options1 = MongoConnectionOptions(
  failoverStrategy = FailoverStrategy(retries = 10))

The option maxIdleTimeMS is now supported, with a default value 0 (no timeout).

import reactivemongo.api.MongoConnectionOptions

val options2 = MongoConnectionOptions(maxIdleTimeMS = 2000 /* 2s */)

The frequency at which the ReactiveMongo monitor refreshes the information about the MongoDB nodes can be configured in the connection options. The default interval is 10 seconds.

import reactivemongo.api.MongoConnectionOptions

val options3 = MongoConnectionOptions(monitorRefreshMS = 5000 /* 5s */)

Query and write operations

The collection API provides new operations.

FindAndModify:

The MongoDB findAndModify command modifies and returns a single document. The ReactiveMongo API now has a corresponding operation.

import scala.concurrent.Future
import scala.concurrent.ExecutionContext.Implicits.global

import reactivemongo.bson.{ BSONDocument, BSONDocumentReader, Macros }
import reactivemongo.api.collections.bson.BSONCollection

case class Person(firstName: String, lastName: String, age: Int)

object Person {
  implicit def reader: BSONDocumentReader[Person] =
    Macros.reader[Person]
}

def findAndModifyTests(coll: BSONCollection) = {
  val updateOp = coll.updateModifier(
    BSONDocument("$set" -> BSONDocument("age" -> 35)))

  val personBeforeUpdate: Future[Option[Person]] =
    coll.findAndModify(BSONDocument("name" -> "Joline"), updateOp).
    map(_.result[Person])

  val removedPerson: Future[Option[Person]] = coll.findAndModify(
    BSONDocument("name" -> "Jack"), coll.removeModifier).
    map(_.result[Person])
}

In the previous example, the findAndModify is used to find and update the person whose name is Joline by setting its age to 35, and it’s also used to remove the document about Jack.

The findAndModify can be performed more easily to find and update documents, using findAndUpdate.

import scala.concurrent.Future
import scala.concurrent.ExecutionContext.Implicits.global

import reactivemongo.bson.{ BSONDocument, BSONDocumentReader, Macros }
import reactivemongo.api.collections.bson.BSONCollection

def update(collection: BSONCollection, age: Int): Future[Option[Person]] = {
  import collection.BatchCommands.FindAndModifyCommand.FindAndModifyResult
  
  val result: Future[FindAndModifyResult] = collection.findAndUpdate(
    BSONDocument("firstName" -> "James"),
    BSONDocument("$set" -> BSONDocument("age" -> 17)),
    fetchNewObject = true)

  result.map(_.result[Person])
}

For removal, a convenient findAndRemove is also available.

import scala.concurrent.{ ExecutionContext, Future }

import reactivemongo.bson.{ BSONDocument, BSONDocumentReader }
import reactivemongo.api.collections.bson.BSONCollection

def removedPerson(coll: BSONCollection, name: String)(implicit ec: ExecutionContext, reader: BSONDocumentReader[Person]): Future[Option[Person]] =
  coll.findAndRemove(BSONDocument("firstName" -> name)).
    map(_.result[Person])

Query builder:

The new requireOne function, based on the head cursor, allows to more easily find and require a single result.

import scala.concurrent.{ ExecutionContext, Future }
import reactivemongo.bson.BSONDocument
import reactivemongo.api.collections.bson.BSONCollection

trait PersonService {
  def collection: BSONCollection

  def requirePerson(firstName: String, lastName: String)(implicit ec: ExecutionContext): Future[Person] = collection.find(BSONDocument(
    "firstName" -> firstName,
    "lastName" -> lastName
  )).requireOne[Person]
}

The option maxTimeMs is supported by the query builder, to specify a cumulative time limit in milliseconds for the processing of the operations.

import scala.concurrent.{ ExecutionContext, Future }

import reactivemongo.bson.BSONDocument
import reactivemongo.api.collections.bson.BSONCollection
import reactivemongo.api.ReadPreference

def withMaxTimeMs(col: BSONCollection)(implicit ec: ExecutionContext): Future[List[BSONDocument]] = col.find(BSONDocument("foo" -> "bar")).maxTimeMs(1234L).cursor[BSONDocument](ReadPreference.primary).collect[List]()

The explain operation is now supported, to get information on the query plan.

import scala.concurrent.Future
import scala.concurrent.ExecutionContext.Implicits.global

import reactivemongo.bson.BSONDocument
import reactivemongo.api.collections.bson.BSONCollection

// If using the Play JSON support
import play.api.libs.json.{ Json, JsObject }
import reactivemongo.play.json._, collection.JSONCollection

def bsonExplain(col: BSONCollection): Future[Option[BSONDocument]] =
  col.find(BSONDocument.empty).explain().one[BSONDocument]

def jsonExplain(col: JSONCollection): Future[Option[JsObject]] =
  col.find(Json.obj()).explain().one[JsObject]

More: Query builder API

Error handling:

The WriteResult that represents the errors from executing commands, is now coming with pattern matching utilities.

import scala.concurrent.Future
import scala.concurrent.ExecutionContext.Implicits.global

import reactivemongo.api.collections.bson.BSONCollection
import reactivemongo.api.commands.WriteResult 

def insertPerson(personColl: BSONCollection, person: Person) = {
  implicit val writer = reactivemongo.bson.Macros.writer[Person]
  val future: Future[WriteResult] = personColl.insert(person)

  val end: Future[Unit] = future.map(_ => {}).recover {
    case WriteResult.Code(11000) =>
      // if the result is defined with the error code 11000 (duplicate error)
      println("Match the code 11000")

    case WriteResult.Message("Must match this exact message") =>
      println("Match the error message")

    case _ => ()
  }
}

The same approach can be used with CommandError.

GridFS:

The GridFS provides the new saveWithMD5 and iterateeWithMD5, which automatically compute the MD5 digested while storing data.

import scala.concurrent.{ ExecutionContext, Future }

import play.api.libs.iteratee.Enumerator

import reactivemongo.api.BSONSerializationPack
import reactivemongo.api.gridfs.{ DefaultFileToSave, GridFS }
import reactivemongo.api.gridfs.Implicits._
import reactivemongo.bson.BSONValue

type BSONFile = 
  reactivemongo.api.gridfs.ReadFile[BSONSerializationPack.type, BSONValue]

def saveWithComputedMD5(
  gridfs: GridFS[BSONSerializationPack.type],
  filename: String, 
  contentType: Option[String], 
  data: Enumerator[Array[Byte]]
)(implicit ec: ExecutionContext): Future[BSONFile] = {
  // Prepare the GridFS object to the file to be pushed
  val gridfsObj = DefaultFileToSave(Some(filename), contentType)

  gridfs.saveWithMD5(data, gridfsObj)
}

BSON library

The BSON library for ReactiveMongo has been updated.

A BSON handler is provided to respectively, read a java.util.Date from a BSONDateTime, and write a Date as BSONDateTime.

import java.util.Date
import reactivemongo.bson._

def foo(doc: BSONDocument): Option[Date] = doc.getAs[Date]("aBsonDateTime")

def bar(date: Date): BSONDocument = BSONDocument("aBsonDateTime" -> date)

The traits BSONReader and BSONWriter have new combinators, so new instances can be easily defined using the existing ones.

import reactivemongo.bson._

sealed trait MyEnum
object EnumValA extends MyEnum
object EnumValB extends MyEnum

implicit def MyEnumReader(implicit underlying: BSONReader[BSONString, String]): BSONReader[BSONString, MyEnum] = underlying.afterRead {
  case "A" => EnumValA
  case "B" => EnumValB
  case v => sys.error(s"unexpected value: $v")
}

implicit def MyEnumWriter(implicit underlying: BSONWriter[String, BSONString]): BSONWriter[MyEnum, BSONString] = underlying.beforeWrite[MyEnum] {
  case EnumValA => "A"
  case _ => "B"
}

The companion objects for BSONDocumentReader and BSONDocumentWriter provides new factories.

import reactivemongo.bson.{
  BSONDocument, BSONDocumentReader, BSONDocumentWriter, BSONNumberLike
}

case class Foo(bar: String, lorem: Int)

val w1 = BSONDocumentWriter[Foo] { foo =>
  BSONDocument("_bar" -> foo.bar, "ipsum" -> foo.lorem)
}

val r1 = BSONDocumentReader[Foo] { doc =>
  (for {
    bar <- doc.getAsTry[String]("_bar")
    lorem <- doc.getAsTry[BSONNumberLike]("ipsum").map(_.toInt)
  } yield Foo(bar, lorem)).get
}

The new instances of BSONTimestamp can be created from a raw numeric value, representing the milliseconds timestamp, with the time and ordinal properties being extracted.

import reactivemongo.bson.BSONTimestamp

def foo(millis: Long) = BSONTimestamp(millis)

// or...
def bar(time: Long, ordinal: Int) = BSONTimestamp(time, ordinal)

The generic types are now supported:

case class GenFoo[T](bar: T, lorem: Int)

reactivemongo.bson.Macros.reader[GenFoo[String]]

Some undocumented macro features, such as union types and sealed trait support are now explained.

import reactivemongo.bson.{ BSONDocument, BSONHandler, Macros }

sealed trait Tree
case class Node(left: Tree, right: Tree) extends Tree
case class Leaf(data: String) extends Tree

object Tree {
  implicit val node = Macros.handler[Node]
  implicit val leaf = Macros.handler[Leaf]

  implicit val bson: BSONHandler[BSONDocument, Tree] = Macros.handler[Tree]
}

Taking care of backward compatibility, a refactoring of the BSON types has been started.

More: BSON Library overview

Streaming

Instead of accumulating documents in memory, they can be processed as a stream, using a reactive Cursor.

ReactiveMongo can now be used with several streaming frameworks.

Akka Stream

The Akka Stream library can be used to consume ReactiveMongo results.

To enable the Akka Stream support (up to Akka 2.4.8), the import reactivemongo.play.akkastream.cursorProducer must be added.

import scala.concurrent.Future
import scala.concurrent.ExecutionContext.Implicits.global

import akka.stream.Materializer
import akka.stream.scaladsl.{ Sink, Source }

import reactivemongo.bson.BSONDocument
import reactivemongo.api.collections.bson.BSONCollection
import reactivemongo.api.ReadPreference

import reactivemongo.akkastream.{ State, cursorProducer }
// Provides the cursor producer with the AkkaStream capabilities

def processPerson0(collection: BSONCollection, query: BSONDocument)(implicit m: Materializer): Future[Seq[BSONDocument]] = {
  val sourceOfPeople: Source[BSONDocument, Future[State]] =
    collection.find(query).cursor[BSONDocument](ReadPreference.primary).documentSource()

  sourceOfPeople.runWith(Sink.seq[BSONDocument])
}

More: ReactiveMongo AkkaStream

Aggregated streams

The results from the new aggregation operation can be processed in a streaming way, using the cursor option.

import scala.concurrent.{ ExecutionContext, Future }

import reactivemongo.bson._
import reactivemongo.api.Cursor
import reactivemongo.api.collections.bson.BSONCollection

def populatedStates(cities: BSONCollection)(implicit ec: ExecutionContext): Cursor[BSONDocument] = cities.aggregateWith[BSONDocument]() { framework =>
  import framework.{ Group, Match, SumField }

  Group(BSONString("$state"))("totalPop" -> SumField("population")) -> List(
    Match(document("totalPop" -> document("$gte" -> 10000000L)))
  )
}

Custom streaming

The new streaming support is based on the function Cursor.foldWhileM[A] (and its variants), which allows to implement custom stream processing.

import scala.concurrent.Future
import scala.concurrent.ExecutionContext.Implicits.global

import reactivemongo.api.Cursor

def streaming(c: Cursor[String]): Future[List[String]] =
  c.foldWhile(List.empty[String], 1000/* optional: max doc */)(
    { (ls, str) => // process next String value
      if (str startsWith "#") Cursor.Cont(ls) // Skip: continue unchanged `ls`
      else if (str == "_end") Cursor.Done(ls) // End processing
      else Cursor.Cont(str :: ls) // Continue with updated `ls`
    },
    { (ls, err) => // handle failure
      err match {
        case e: RuntimeException => Cursor.Cont(ls) // Skip error, continue
        case _ => Cursor.Fail(err) // Stop with current failure -> Future.failed
      }
    })

An ErrorHandler can be used with the Cursor, instead of the previous stopOnError boolean flag.

import scala.concurrent.Future
import scala.concurrent.ExecutionContext.Implicits.global

import reactivemongo.api.Cursor

def foldStrings(cursor: Cursor[String]): Future[Seq[String]] = {
  val handler: Cursor.ErrorHandler[Seq[String]] =
    { (last: Seq[String], error: Throwable) =>
      println(s"Encounter error: $error")

      if (last.isEmpty) { // continue, skip error if no previous value
        Cursor.Cont(last)
      } else Cursor.Fail(error)
    }

  cursor.foldWhile(Seq.empty[String])({ (agg, str) =>
    Cursor.Cont(agg :+ str)
  }, handler)
}

Some convenient error handlers are provided along with the driver:

Aggregation

The distinct command to find the distinct values for a specified field across a single collection, is now provided as a collection operation.

import scala.concurrent.{ ExecutionContext, Future }

import reactivemongo.bson.BSONDocument
import reactivemongo.api.collections.bson.BSONCollection

def distinctStates(col: BSONCollection)(implicit ec: ExecutionContext): Future[Set[String]] = col.distinct[String, Set]("state")

The ReactiveMongo collections now has the convenient operation .aggregate.

import scala.concurrent.Future
import scala.concurrent.ExecutionContext.Implicits.global

import reactivemongo.bson.{ BSONDocument, BSONString }
import reactivemongo.api.collections.bson.BSONCollection

def populatedStates(col: BSONCollection): Future[List[BSONDocument]] = {
  import col.BatchCommands.AggregationFramework.{
    AggregationResult, Group, Match, SumField
  }

  val res: Future[AggregationResult] = col.aggregate(
    Group(BSONString("$state"))( "totalPop" -> SumField("population")),
    List(Match(BSONDocument("totalPop" -> BSONDocument("$gte" -> 10000000L)))))

  res.map(_.documents)
}

About the type AggregationResult the property documents has been renamed to firstBatch, to clearly indicate it returns the first batch from result (which is frequently the single one).

There are also some newly supported Pipeline Aggregation Stages.

filter:

The $filter stage is available in this new version.

import scala.concurrent.{ ExecutionContext, Future }

import reactivemongo.bson.{ BSONString, Macros, array, document }
import reactivemongo.api.collections.bson.BSONCollection

object FilterUseCase {
  case class SaleItem(itemId: Int, quantity: Int, price: Int)
  case class Sale(_id: Int, items: List[SaleItem])

  implicit val saleItemHandler = Macros.handler[SaleItem]
  implicit val saleHandler = Macros.handler[Sale]

  def filterSales(sales: BSONCollection)(implicit ec: ExecutionContext): Future[List[Sale]] = {
    import sales.BatchCommands.AggregationFramework.{ Project, Filter }

    sales.aggregate(Project(document("items" -> Filter(
      input = BSONString("$items"),
      as = "item",
      cond = document("$gte" -> array("$$item.price", 100))
    )))).map(_.head[Sale])
  }
}

geoNear:

The $geoNear stage returns an ordered stream of documents based on the proximity to a geospatial point.

It can be used in the MongoDB shell as follows.

db.places.aggregate([{
  $geoNear: {
    near: { type: "Point", coordinates: [ -73.9667, 40.78 ] },
    distanceField: "dist.calculated",
    minDistance: 1000,
    maxDistance: 5000,
    query: { type: "public" },
    includeLocs: "dist.location",
    num: 5,
    spherical: true
  }
}])

The same can be done with ReactiveMongo.

import scala.concurrent.{ ExecutionContext, Future }

import reactivemongo.bson.{ array, document, Macros }
import reactivemongo.api.collections.bson.BSONCollection

case class GeoPoint(coordinates: List[Double])
case class GeoDistance(calculated: Double, loc: GeoPoint)

case class GeoPlace(
  loc: GeoPoint,
  name: String,
  category: String,
  dist: GeoDistance
)

object GeoPlace {
  implicit val pointReader = Macros.reader[GeoPoint]
  implicit val distanceReader = Macros.reader[GeoDistance]
  implicit val placeReader = Macros.reader[GeoPlace]
}

def placeArround(places: BSONCollection)(implicit ec: ExecutionContext): Future[List[GeoPlace]] = {
  import places.BatchCommands.AggregationFramework.GeoNear

  places.aggregate(GeoNear(document(
    "type" -> "Point",
    "coordinates" -> array(-73.9667, 40.78)
  ), distanceField = Some("dist.calculated"),
    minDistance = Some(1000),
    maxDistance = Some(5000),
    query = Some(document("type" -> "public")),
    includeLocs = Some("dist.loc"),
    limit = 5,
    spherical = true)).map(_.head[GeoPlace])
}

group:

Now all the accumulators of the $group aggregation stage are supported, for example the $avg accumulator.

import scala.concurrent.{ ExecutionContext, Future }

import reactivemongo.bson.{ BSONDocument, BSONString }
import reactivemongo.api.collections.bson.BSONCollection

def avgPopByState(col: BSONCollection)(implicit ec: ExecutionContext): Future[List[BSONDocument]] = {
  import col.BatchCommands.AggregationFramework.{
    AggregationResult, AvgField, Group, SumField
  }

  col.aggregate(Group(BSONDocument("state" -> "$state", "city" -> "$city"))(
    "pop" -> SumField("population")),
    List(Group(BSONString("$_id.state"))("avgCityPop" -> AvgField("pop")))).
    map(_.documents)
}

indexStats:

The $indexStats stage returns statistics regarding the use of each index for the collection.

import scala.concurrent.Future
import scala.concurrent.ExecutionContext.Implicits.global

import reactivemongo.api.collections.bson.BSONCollection

def aggregateIndexes(coll: BSONCollection) = {
  import coll.BatchCommands.AggregationFramework.{ Ascending, IndexStats, Sort }
  import reactivemongo.api.commands.{ bson => bsoncommands }
  import bsoncommands.BSONAggregationFramework.{
    IndexStatsResult, IndexStatAccesses
  }
  import bsoncommands.BSONAggregationResultImplicits.BSONIndexStatsReader

  val result: Future[List[IndexStatsResult]] =
    coll.aggregate(IndexStats, List(Sort(Ascending("name")))).
    map(_.head[IndexStatsResult])

  result
}

lookup:

Using the MongoDB aggregation, the $lookup stage performs a left outer join between two collections in the same database (see the examples). ReactiveMongo now supports this new stage.

import scala.concurrent.Future
import scala.concurrent.ExecutionContext.Implicits.global

import reactivemongo.bson.Macros
import reactivemongo.api.collections.bson.BSONCollection

object LookupUseCase {
  implicit def productHandler = Macros.handler[Product]
  implicit def invReportHandler = Macros.handler[InventoryReport]

  def lookupInventoryReports(orders: BSONCollection, inventory: BSONCollection): Future[List[InventoryReport]] = {
    import orders.BatchCommands.AggregationFramework.Lookup

    // Left outer join between the current `orders` collection,
    // and the inventory collection (referenced by its name)
    orders.aggregate(Lookup(inventory.name, "item", "sku", "docs")).
      map(_.head[InventoryReport].toList)
  }

  case class Product(
    _id: Int, sku: Option[String] = None,
    description: Option[String] = None,
    instock: Option[Int] = None
  )

  case class InventoryReport(
    _id: Int,
    item: Option[String] = None,
    price: Option[Int] = None,
    quantity: Option[Int] = None,
    docs: List[Product] = Nil
  )
}

out:

The $out aggregation stage takes the documents returned by the aggregation pipeline and writes them to a specified collection.

Consider a collection books that contains the following documents.

{ "_id" : 8751, "title" : "The Banquet", "author" : "Dante", "copies" : 2 }
{ "_id" : 8752, "title" : "Divine Comedy", "author" : "Dante", "copies" : 1 }
{ "_id" : 8645, "title" : "Eclogues", "author" : "Dante", "copies" : 2 }
{ "_id" : 7000, "title" : "The Odyssey", "author" : "Homer", "copies" : 10 }
{ "_id" : 7020, "title" : "Iliad", "author" : "Homer", "copies" : 10 }

Then its documents can be aggregated and outputted to another collection.

import scala.concurrent.Future
import scala.concurrent.ExecutionContext.Implicits.global

import reactivemongo.bson.BSONString
import reactivemongo.api.collections.bson.BSONCollection

def outputBooks(books: BSONCollection, outColl: String): Future[Unit] = {
  import books.BatchCommands.AggregationFramework
  import AggregationFramework.{ Ascending, Group, PushField, Out, Sort }

  books.aggregate(Sort(Ascending("title")), List(
    Group(BSONString("$author"))("books" -> PushField("title")),
    Out(outColl))).map(_ => {})
}

For the current example, the result collection will contain the following documents.

{ "_id" : "Homer", "books" : [ "Iliad", "The Odyssey" ] }
{ "_id" : "Dante", "books" : [ "Divine Comedy", "Eclogues", "The Banquet" ] }

redact:

The $redact stage reshapes each document in the stream by restricting the content for each document based on information stored in the documents themselves.

It can be done in the MongoDB shell as follows.

db.forecasts.aggregate([
  { $match: { year: 2014 } },
  { 
    $redact: {
      $cond: {
        if: { $gt: [ { $size: { 
          $setIntersection: [ "$tags", [ "STLW", "G" ] ] } }, 0 ]
        },
        then: "$$DESCEND",
        else: "$$PRUNE"
      }
    }
  }
])

With ReactiveMongo, the aggregation framework can perform a similar redaction.

import scala.concurrent.{ ExecutionContext, Future }

import reactivemongo.bson._
import reactivemongo.api.collections.bson.BSONCollection

def redactForecasts(forecasts: BSONCollection)(implicit ec: ExecutionContext) = {
  import forecasts.BatchCommands.AggregationFramework.{ Match, Redact }

  forecasts.aggregate(Match(document("year" -> 2014)), List(
    Redact(document("$cond" -> document(
      "if" -> document(
        "$gt" -> array(document(
          "$size" -> document("$setIntersection" -> array(
            "$tags", array("STLW", "G")
          ))
        ), 0)
      ),
      "then" -> "$$DESCEND",
      "else" -> "$$PRUNE"
    ))))).map(_.head[BSONDocument])
}

sample:

The $sample aggregation stage is also supported (only MongoDB >= 3.2). It randomly selects the specified number of documents from its input. With ReactiveMongo, the Sample stage can be used as follows.

import scala.concurrent.Future
import scala.concurrent.ExecutionContext.Implicits.global

import reactivemongo.bson.BSONDocument
import reactivemongo.api.collections.bson.BSONCollection

def randomDocs(coll: BSONCollection, count: Int): Future[List[BSONDocument]] = {
  import coll.BatchCommands.AggregationFramework

  coll.aggregate(AggregationFramework.Sample(count)).map(_.head[BSONDocument])
}

text:

When the $text operator is used in an aggregation pipeline, then new the results can be sorted according the text scores.

import scala.concurrent.{ ExecutionContext, Future }

import reactivemongo.bson.BSONDocument
import reactivemongo.api.collections.bson.BSONCollection

/**
 * 1. Find the documents matching the text `"JP"`,
 * 2. and sort according the (metadata) text score.
 */
def textFind(coll: BSONCollection)(implicit ec: ExecutionContext): Future[List[BSONDocument]] = {
  import coll.BatchCommands.AggregationFramework
  import AggregationFramework.{
    Cursor,
    Match,
    MetadataSort,
    Sort,
    TextScore
  }

  val firstOp = Match(BSONDocument(
    "$text" -> BSONDocument("$search" -> "JP")))

  val pipeline = List(Sort(MetadataSort("score", TextScore)))

  coll.aggregate1[BSONDocument](firstOp, pipeline).collect[List]()
}

With the changes, the aggregation framework provides an API for all the stages supported by MongoDB 3.2.

More: Aggregation Framework

Play Framework

The integration with Play Framework is still a priority for ReactiveMongo.

For Play > 2.4, if you still have a file conf/play.plugins, it’s important to make sure this file no longer mentions ReactiveMongoPlugin, which is replaced by ReactiveMongoModule. With such deprecated configuration, the following error can be raised.

ConfigurationException: Guice configuration errors: 
1) Could not find a suitable constructor in 
play.modules.reactivemongo.ReactiveMongoPlugin.

Considering the configuration with Play, the new setting mongodb.connection.strictUri (true or false) can be added. It makes the ReactiveMongo module for Play will enforce that only strict connection URI is accepted: with no unsupported option in it (otherwise it throws an exception). By default this setting is disabled (false).

As for Play 2.5, due to the Streams Migration, a akka.stream.Materializer is required when implementing Play/ReactiveMongo controllers, otherwise the following error will be raised.

could not find implicit value for parameter materializer: akka.Stream.Materializer

It’s also important to note that the Play support has also been modularized.

JSON serialization

There is now a standalone Play JSON library, providing a serialization pack that can be used outside a Play application.

This new library increases the JSON support to handle the following BSON types.

To use this JSON library, it’s necessary to make sure the right imports are there.

import reactivemongo.play.json._
// import the default BSON/JSON conversions

Without these imports, the following error can occur.

No Json serializer as JsObject found for type play.api.libs.json.JsObject.
Try to implement an implicit OWrites or OFormat for this type.

There are also some helpers coming along with this JSON pack.

import scala.concurrent.Future
import scala.concurrent.ExecutionContext.Implicits.global

import reactivemongo.play.json.collection._

// Import a list of JSON object as document into the JSON `collection`,
// and returns the insertion count.
def importJson(collection: JSONCollection, resource: String): Future[Int] =
  Helpers.bulkInsert(collection, getClass.getResourceAsStream(resource)).
    map(_.totalN)

In order to comply with the extended JSON syntax for the timestamps, BSONTimestamp values are written with both $time and $timestamp formats.

{
  "$time": <t>, "$i": <i>,
  "$timestamp": { "t": <t>, "i": <i> }
}

The deprecated properties $time and $i will be removed.

These two formats are also supported when reading from JSON.

The extended syntax is also supported for the BSONMinKey and the BSONMaxKey.

{
  "aMinKey": { "$minKey": 1 },
  "aMaxKey" : { "$maxKey": 1 }
}

New functions from the BSONFormats provides JSON formats derived from BSON handlers.

import play.api.libs.json.OFormat
import reactivemongo.bson._
import reactivemongo.play.json.BSONFormats

def derivesBsonHandlers[T](
  implicit bsonWriter: BSONDocumentWriter[T],
  bsonReader: BSONDocumentReader[T]
): OFormat[T] = BSONFormats.jsonOFormat[T]

More: JSON overview

Dependency injection

Now multiple connection pools can be injected using the @NamedDatabase annotation.

For example with the following configuration:

# The default URI
mongodb.uri = "mongodb://someuser:somepasswd@localhost:27017/foo"

# Another one, named with 'bar'
mongodb.bar.uri = "mongodb://someuser:somepasswd@localhost:27017/lorem"

Then the dependency injection can select the API instances using the names.

import javax.inject.Inject

import play.modules.reactivemongo._

class MyComponent @Inject() (
  val defaultApi: ReactiveMongoApi, // corresponds to 'mongodb.uri'
  @NamedDatabase("bar") val barApi: ReactiveMongoApi // 'mongodb.bar'
) {

}

MVC integration

Instances of Play Formatter are provided for the BSON values.

import play.api.data.format.Formatter
import play.api.libs.json.Json

import reactivemongo.bson.BSONValue

import reactivemongo.play.json._
import play.modules.reactivemongo.Formatters._

def playFormat[T <: BSONValue](bson: T)(implicit formatter: Formatter[T]) = {
  val binding = Map("foo" -> Json.stringify(Json.toJson(bson)))

  formatter.bind("foo", binding)
  // must be Right(bson)

  formatter.unbind("foo", bson)
  // must == binding
}

Routing

The BSON types can be used in the bindings of the Play routing.

For example, consider a Play action as follows.

package mine

import play.api.mvc.{ Action, Controller }
import reactivemongo.bson.BSONObjectID

class Application extends Controller {
  def foo(id: BSONObjectID) = Action {
    Ok(s"Foo: ${id.stringify}")
  }
}

This action can be configured with a BSONObjectID binding, in the conf/routes file.

GET /foo/:id mine.Application.foo(id: reactivemongo.bson.BSONObjectID)

When using BSON types in the route bindings, the Play plugin for SBT must be setup (in your build.sbt or project/Build.scala) to install the appropriate import in the generated routes.

import play.sbt.routes.RoutesKeys

RoutesKeys.routesImport += "play.modules.reactivemongo.PathBindables._"

Play Iteratees

The enumerate on the cursors is now deprecated, and the Play Iteratees support has been moved to a separate module, with a new enumerator operation.

import scala.concurrent.Future
import scala.concurrent.ExecutionContext.Implicits.global

import play.api.libs.iteratee.{ Enumerator, Iteratee }

import reactivemongo.bson.BSONDocument
import reactivemongo.api.collections.bson.BSONCollection
import reactivemongo.api.ReadPreference

def workWithIteratees(personColl: BSONCollection): Future[Int] = {
  import reactivemongo.play.iteratees.cursorProducer
  // Provides the cursor producer with the Iteratees capabilities

  val cur = personColl.find(BSONDocument("plop" -> "plop")).
    cursor[BSONDocument](ReadPreference.primary) // can be seen as PlayIterateesCursor ...

  // ... so the new `enumerator` operation is available
  val source: Enumerator[BSONDocument] = cur.enumerator(10)

  source |>>> Iteratee.fold(0) { (r, doc) => r + 1 }
}

To use the Iteratees support for the ReactiveMongo cursors, reactivemongo.play.iteratees.cursorProducer must be imported.

import reactivemongo.play.iteratees.cursorProducer
// Provides the cursor producer with the Iteratees capabilities

Without this import, the following error can occur.

value enumerator is not a member of reactivemongo.api.CursorProducer[reactivemongo.bson.BSONDocument]#ProducedCursor

Administration

The operations to manage a MongoDB instance can be executed using ReactiveMongo. This new release has new functions for DB administration.

Rename collection:

The Database now has a renameCollection operation, which can be easily used with the ‘admin’ database, to rename collections in the other databases.

import scala.concurrent.ExecutionContext.Implicits.global
import reactivemongo.api.DefaultDB

def renameWithSuffix(
  admin: DefaultDB,
  otherDb: String,
  collName: String,
  suffix: String
) = admin.renameCollection(otherDb, collName, s"$collName-$suffix")

Drop collection:

The new drop operation can try to perform, without failing if the collection doesn’t exist. The previous behaviour is still available.

import scala.concurrent.Future
import scala.concurrent.ExecutionContext.Implicits.global

import reactivemongo.bson.BSONDocument
import reactivemongo.api.collections.bson.BSONCollection

// Doesn't fail if the collection represented by `col` doesn't exists,
// but return Future(false)
def dropNotFail(col: BSONCollection): Future[Boolean] = col.drop(false)

// Fails if the collection represented by `col` doesn't exists,
// as in the previous behaviour
def dropFail(col: BSONCollection): Future[Unit] = col.drop(true).map(_ => {})

def deprecatedDrop(col: BSONCollection): Future[Unit] = col.drop()

Create user:

The DefaultDB is defined with a function to create a database user.

import scala.concurrent.Future
import scala.concurrent.ExecutionContext.Implicits.global

import reactivemongo.api.DefaultDB
import reactivemongo.api.commands.UserRole

// Creates a 'foo' user, with the 'readWrite' role
def createFooUser(db: DefaultDB, password: String): Future[Unit] =
  db.createUser("foo", Some(password), roles = List(UserRole("readWrite")))

Indexes:

In the case class Index, the property partialFilter has been added to support MongoDB index with partialFilterExpression.

import scala.concurrent.Future
import scala.concurrent.ExecutionContext.Implicits.global

import reactivemongo.bson.BSONDocument
import reactivemongo.api.collections.bson.BSONCollection
import reactivemongo.api.commands.WriteResult
import reactivemongo.api.indexes.{ Index, IndexType }

def createPartialIndex(col: BSONCollection): Future[WriteResult] = 
  col.indexesManager.create(Index(
    key = Seq("username" -> IndexType.Ascending),
    unique = true,
    partialFilter = Some(BSONDocument("age" -> BSONDocument("$gte" -> 21)))))

Collection statistics:

In the case class CollStatsResult, the field maxSize has been added.

import scala.concurrent.{ ExecutionContext, Future }
import reactivemongo.api.collections.bson.BSONCollection
import reactivemongo.api.commands.CollStatsResult

def maxSize(coll: BSONCollection)(implicit ec: ExecutionContext): Future[Option[Double]] = coll.stats.map(_.maxSize)

Resync replica set members:

The replication command resync is now provided.

import scala.concurrent.{ ExecutionContext, Future }
import reactivemongo.api.MongoConnection
import reactivemongo.api.commands.{ Resync, bson }, bson.BSONResyncImplicits._

def resyncDatabase(con: MongoConnection)(implicit ec: ExecutionContext): Future[Unit] = con.database("admin").flatMap(_.runCommand(Resync)).map(_ => {})

Logging

Log4J is still required for backward compatibility (by deprecated code), but is replaced by SLF4J for the ReactiveMongo logging.

If you see the following message, please make sure you have a Log4J framework available.

ERROR StatusLogger No log4j2 configuration file found. Using default configuration: logging only errors to the console.

As SLF4J is now used, if the following error is raised, please make sure to provide a SLF4J binding (e.g. slf4j-simple).

NoClassDefFoundError: : org/slf4j/LoggerFactory

In order to debug the networking issues, the internal state of the node set is provided as details of the related exceptions, as bellow.

reactivemongo.core.actors.Exceptions$InternalState: null (<time:1469208071685>:-1)
reactivemongo.ChannelClosed(-2079537712, {{NodeSet None Node[localhost:27017: Primary (0/0 available connections), latency=5], auth=Set() }})(<time:1469208071685>)
reactivemongo.Shutdown(<time:1469208071673>)
reactivemongo.ChannelDisconnected(-2079537712, {{NodeSet None Node[localhost:27017: Primary (1/1 available connections), latency=5], auth=Set() }})(<time:1469208071663>)
reactivemongo.ChannelClosed(967102512, {{NodeSet None Node[localhost:27017: Primary (1/2 available connections), latency=5], auth=Set() }})(<time:1469208071663>)
reactivemongo.ChannelDisconnected(967102512, {{NodeSet None Node[localhost:27017: Primary (2/2 available connections), latency=5], auth=Set() }})(<time:1469208071663>)
reactivemongo.ChannelClosed(651496230, {{NodeSet None Node[localhost:27017: Primary (2/3 available connections), latency=5], auth=Set() }})(<time:1469208071663>)
reactivemongo.ChannelDisconnected(651496230, {{NodeSet None Node[localhost:27017: Primary (3/3 available connections), latency=5], auth=Set() }})(<time:1469208071663>)
reactivemongo.ChannelClosed(1503989210, {{NodeSet None Node[localhost:27017: Primary (3/4 available connections), latency=5], auth=Set() }})(<time:1469208071662>)
reactivemongo.ChannelDisconnected(1503989210, {{NodeSet None Node[localhost:27017: Primary (4/4 available connections), latency=5], auth=Set() }})(<time:1469208071662>)
reactivemongo.ChannelClosed(-228911231, {{NodeSet None Node[localhost:27017: Primary (4/5 available connections), latency=5], auth=Set() }})(<time:1469208071662>)
reactivemongo.ChannelDisconnected(-228911231, {{NodeSet None Node[localhost:27017: Primary (5/5 available connections), latency=5], auth=Set() }})(<time:1469208071662>)
reactivemongo.ChannelClosed(-562085577, {{NodeSet None Node[localhost:27017: Primary (5/6 available connections), latency=5], auth=Set() }})(<time:1469208071662>)
reactivemongo.ChannelDisconnected(-562085577, {{NodeSet None Node[localhost:27017: Primary (6/6 available connections), latency=5], auth=Set() }})(<time:1469208071662>)
reactivemongo.ChannelClosed(-857553810, {{NodeSet None Node[localhost:27017: Primary (6/7 available connections), latency=5], auth=Set() }})(<time:1469208071662>)

Monitoring

A new JMX module is available. It can be enabled by adding the corresponding dependency:

"org.reactivemongo" %% "reactivemongo-jmx" % "0.12.2"

More: Monitoring

Dependencies

The internal Netty dependency has been updated to the version 3.10.4, and is now shaded to avoid conflict (dependency hell).

Test coverage

Breaking changes

The Typesafe Migration Manager has been setup on the ReactiveMongo repository. It will validate all the future contributions, and help to make the API more stable.

For the current 0.12 release, it has detected the following breaking changes.

Connection

Since release 0.11, the package reactivemongo.api.collections.default has been refactored as the package reactivemongo.api.collections.bson. If you get a compilation error like the following one, you need to update the corresponding imports.

object default is not a member of package reactivemongo.api.collections
[error] import reactivemongo.api.collections.default.BSONCollection

Operation results

The type hierarchy of the trait WriteResult has changed in new version. It’s no longer an Exception, and no longer inherits from reactivemongo.core.errors.DatabaseException, scala.util.control.NoStackTrace, reactivemongo.core.errors.ReactiveMongoException. As it now longer represents errors in the public API, the following properties have been removed: errmsg, hasErrors, inError and message.

For the type LastError, the properties writeErrors and writeConcernError have been added.

The type hierarchy of the classes DefaultWriteResult and UpdateWriteResult have changed in new version; no longer inherits from java.lang.Exception:

In the class Upserted;

In the case class GetLastError.TagSet, the field s is renamed to tag.

The exception case objects NodeSetNotReachable, NodeSetNotReachable, ChannelNotFound and ClosedException have been refactored as sealed classes. When try to catch such exception the class type must be used, rather than the object patterns.

import scala.concurrent.{ ExecutionContext, Future }
import reactivemongo.core.actors.Exceptions.{
  ClosedException, NodeSetNotReachable
}

def handle(mongoOp: Future[String])(implicit ec: ExecutionContext) =
  mongoOp.recover {
    case err1: ClosedException => // rather than `case ClosedException`
      "closed"
  
    case err2: NodeSetNotReachable => // rather than `case NodeSetNotReachable`
      "notReachable"
  }

Aggregation framework

Operations and commands

GridFS

Core/internal