Reactive Scala Driver for MongoDB

Asynchronous & Non-Blocking

Streaming

Instead of accumulating documents in memory, they can be processed as a stream, using a reactive Cursor.

ReactiveMongo can be used with several streaming frameworks:

More:

Akka Stream

The Akka Stream library can be used to consume ReactiveMongo results.

The following dependency must be configured in your project/Build.scala (or build.sbt).

libraryDependencies += "org.reactivemongo" %% "reactivemongo-akkastream" % "1.1.0-RC14"

Maven Central

The main features of this modules are as follows.

To use the Akka Stream support for the ReactiveMongo cursors, reactivemongo.akkastream.cursorProducer must be imported.

import scala.concurrent.Future

import akka.stream.Materializer
import akka.stream.scaladsl.{ Sink, Source }

import reactivemongo.api.bson.BSONDocument
import reactivemongo.api.bson.collection.BSONCollection

import reactivemongo.akkastream.{ State, cursorProducer }
// Provides the cursor producer with the Akka Stream capabilities

def processPerson1(collection: BSONCollection, query: BSONDocument)(implicit m: Materializer): Future[Seq[BSONDocument]] = {
  val sourceOfPeople: Source[BSONDocument, Future[State]] =
    collection.find(query).cursor[BSONDocument]().documentSource()

  sourceOfPeople.runWith(Sink.seq[BSONDocument])
}

The operation AkkaStreamCursor.documentSource returns an Source[T, Future[State]] (with Future[State] representing the completion of the asynchronous materialization). In this case, we get a producer of documents (of type BSONDocument).

Now that we have the producer, we need to define how the documents are processed, using a Sink or a Flow (with transformations).

The line sourceOfPeople.run(processDocuments) returns a Future[Unit]. It will eventually return the final value of the sink, which is a Seq in our case.

Obviously, we may use a pure Sink that performs some computation.

import scala.concurrent.Future
import scala.concurrent.ExecutionContext.Implicits.global

import akka.NotUsed
import akka.stream.Materializer
import akka.stream.scaladsl.{ Sink, Source }

import reactivemongo.api.bson.BSONDocument

def processPerson2(sourceOfPeople: Source[BSONDocument, NotUsed])(implicit m: Materializer): Future[Float] = {
  val cumulateAge: Sink[BSONDocument, Future[(Int, Int)]] =
    Sink.fold(0 -> 0) {
      case ((cumulatedAge, n), doc) =>
        val age = doc.getAsOpt[Int]("age").getOrElse(0)
        (cumulatedAge + age, n + 1)
    }

  val cumulated: Future[(Int, Int)] = sourceOfPeople runWith cumulateAge

  val meanAge: Future[Float] =
    cumulated.map { case (cumulatedAge, n) =>
      if (n == 0) 0
      else cumulatedAge / n
    }

  meanAge
}

The cumulateAge sink extracts the age from the each document, and add it the current result. At the same time, it counts the processed documents. When the cumulated age is completed, it is divided by the number of documents to get the mean age.

More:

Pekko Stream

The Pekko Stream library can be used to consume ReactiveMongo results.

The following dependency must be configured in your project/Build.scala (or build.sbt).

libraryDependencies += "org.reactivemongo" %% "reactivemongo-pekkostream" % "1.1.0-RC14"

Maven Central

The main features of this modules are as follows.

To use the Pekko Stream support for the ReactiveMongo cursors, reactivemongo.pekkostream.cursorProducer must be imported.

import scala.concurrent.Future

import org.apache.pekko.stream.Materializer
import org.apache.pekko.stream.scaladsl.{ Sink, Source }

import reactivemongo.api.bson.BSONDocument
import reactivemongo.api.bson.collection.BSONCollection

import reactivemongo.pekkostream.{ State, cursorProducer }
// Provides the cursor producer with the Pekko Stream capabilities

def processPerson1(collection: BSONCollection, query: BSONDocument)(implicit m: Materializer): Future[Seq[BSONDocument]] = {
  val sourceOfPeople: Source[BSONDocument, Future[State]] =
    collection.find(query).cursor[BSONDocument]().documentSource()

  sourceOfPeople.runWith(Sink.seq[BSONDocument])
}

The operation PekkoStreamCursor.documentSource returns an Source[T, Future[State]] (with Future[State] representing the completion of the asynchronous materialization). In this case, we get a producer of documents (of type BSONDocument).

Now that we have the producer, we need to define how the documents are processed, using a Sink or a Flow (with transformations).

The line sourceOfPeople.run(processDocuments) returns a Future[Unit]. It will eventually return the final value of the sink, which is a Seq in our case.

Obviously, we may use a pure Sink that performs some computation.

import scala.concurrent.Future
import scala.concurrent.ExecutionContext.Implicits.global

import org.apache.pekko.NotUsed
import org.apache.pekko.stream.Materializer
import org.apache.pekko.stream.scaladsl.{ Sink, Source }

import reactivemongo.api.bson.BSONDocument

def processPerson2(sourceOfPeople: Source[BSONDocument, NotUsed])(implicit m: Materializer): Future[Float] = {
  val cumulateAge: Sink[BSONDocument, Future[(Int, Int)]] =
    Sink.fold(0 -> 0) {
      case ((cumulatedAge, n), doc) =>
        val age = doc.getAsOpt[Int]("age").getOrElse(0)
        (cumulatedAge + age, n + 1)
    }

  val cumulated: Future[(Int, Int)] = sourceOfPeople runWith cumulateAge

  val meanAge: Future[Float] =
    cumulated.map { case (cumulatedAge, n) =>
      if (n == 0) 0
      else cumulatedAge / n
    }

  meanAge
}

The cumulateAge sink extracts the age from the each document, and add it the current result. At the same time, it counts the processed documents. When the cumulated age is completed, it is divided by the number of documents to get the mean age.

More:

Play Iteratees

The Play Iteratees library can work with streams of MongoDB documents.

The dependencies can be updated as follows.

val reactiveMongoVer = "1.1.0-RC14"
val playVer = "2.5.4" // or greater

libraryDependencies ++= Seq(
  "org.reactivemongo" %% "rectivemongo" % reactiveMongoVer,
  "org.reactivemongo" %% "reactivemongo-iteratees" % reactiveMongoVer,
  "com.typesafe.play" %% "play-iteratees" % playVer)

To use the Iteratees support for the ReactiveMongo cursors, reactivemongo.play.iteratees.cursorProducer must be imported.

Then the corresponding operations are available on the cursors.

import scala.concurrent.Future
import scala.concurrent.ExecutionContext.Implicits.global

import play.api.libs.iteratee._

import reactivemongo.api.bson.BSONDocument
import reactivemongo.api.bson.collection.BSONCollection

import reactivemongo.play.iteratees.cursorProducer
// Provides the cursor producer with the Iteratees capabilities

def processPerson3(collection: BSONCollection, query: BSONDocument): Future[Unit] = {
  val enumeratorOfPeople: Enumerator[BSONDocument] =
    collection.find(query).cursor[BSONDocument]().enumerator()

  val processDocuments: Iteratee[BSONDocument, Unit] =
    Iteratee.foreach { person =>
      val lastName = person.getAsOpt[String]("lastName")
      val prettyBson = BSONDocument.pretty(person)
      println(s"found $lastName: $prettyBson")
    }

  enumeratorOfPeople.run(processDocuments)
}

The operation PlayIterateesCursor.enumerator returns an Enumerator[T]. In this case, we get a producer of documents (of type BSONDocument).

Now that we have the producer, we need to define how the documents are processed: that is the Iteratee’s job. Iteratees, as the opposite of Enumerators, are consumers: they are fed in by enumerators and do some computation with the chunks they get.

Here, we build an Iteratee[BSONDocument, Unit] that takes BSONDocument as an input and eventually returns Unit – which is normal because we just print the results without computing any final value. Each time it gets a document, it extracts the lastName and prints it on the console along with the whole document. Note that none of these operations are blocking: when the running thread is not processing the callback of our Iteratee, it can be used to compute other things.

When this snippet is run, we get the following:

found London: {
  _id: BSONObjectID("4f899e7eaf527324ab25c56b"),
  firstName: BSONString(Jack),
  lastName: BSONString(London),
  age: BSONInteger(40)
}
found Whitman: {
  _id: BSONObjectID("4f899f9baf527324ab25c56c"),
  firstName: BSONString(Walt),
  lastName: BSONString(Whitman),
  age: BSONInteger(72)
}
found Hemingway: {
  _id: BSONObjectID("4f899f9baf527324ab25c56d"),
  firstName: BSONString(Ernest),
  lastName: BSONString(Hemingway),
  age: BSONInteger(61)
}

The line enumeratorOfPeople.run(processDocuments) returns a Future[Unit]. It will eventually return the final value of the Iteratee, which is Unit in our case.

The run method on Enumerator has an operator alias, |>>>. So we can rewrite the last line like this: enumeratorOfPeople |>>> processDocuments.

Obviously, we may use a pure Iteratee that performs some computation:

import scala.concurrent.Future
import scala.concurrent.ExecutionContext.Implicits.global

import play.api.libs.iteratee._
import reactivemongo.api.bson.BSONDocument

def processPerson4(enumeratorOfPeople: Enumerator[BSONDocument]) = {
  val cumulateAge: Iteratee[BSONDocument, (Int, Int)] =
    Iteratee.fold(0 -> 0) {
      case ((cumulatedAge, n), doc) =>
        val age = doc.getAsOpt[Int]("age").getOrElse(0)
        (cumulatedAge + age, n + 1)
    }

  val cumulated: Future[(Int, Int)] = enumeratorOfPeople |>>> cumulateAge

  val meanAge: Future[Float] =
    cumulated.map { case (cumulatedAge, n) =>
      if (n == 0) 0
      else cumulatedAge / n
    }

  meanAge
}

At each step, this Iteratee will extract the age from the document and add it to the current result. It also counts the number of documents processed. It eventually produces a tuple of two integers; in our case (173, 3). When the cumulated age is completed, we divide it by the number of documents to get the mean age.

More: ReactiveMongo Iteratees API

Custom streaming

ReactiveMongo streaming is based on the function Cursor.foldWhileM[A], which also allows you to implement a custom stream processor.

import scala.concurrent.Future
import scala.concurrent.ExecutionContext.Implicits.global

import reactivemongo.api.Cursor

def streaming(c: Cursor[String]): Future[List[String]] =
  c.foldWhile(List.empty[String], 1000/* optional: max doc */)(
    { (ls, str) => // process next String value
      if (str startsWith "#") Cursor.Cont(ls) // Skip: continue unchanged `ls`
      else if (str == "_end") Cursor.Done(ls) // End processing
      else Cursor.Cont(str :: ls) // Continue with updated `ls`
    },
    { (ls, err) => // handle failure
      err match {
        case e: RuntimeException => Cursor.Cont(ls) // Skip error, continue
        case _ => Cursor.Fail(err) // Stop with current failure -> Future.failed
      }
    })

At each streaming step, for each new value or error, you choose how you want to proceed, using the cases Cursor.{ Cont, Done, Fail }.

There are convenient handler functions, that are helpful to implement a custom streaming: Cursor.{ ContOnError, DoneOnError, FailOnError, Ignore }.

Each fold operations (foldResponses, foldBulks or foldWhile) have variants working with a function returning a Future[State[T]] (instead of a synchronous State[T]).

Previous: Find Documents

Suggest changes