Reactive Scala Driver for MongoDB

Asynchronous & Non-Blocking

Consume streams of documents

Instead of accumulating documents in memory like in the two previous examples, we can process them in a streaming way. This is achieved using the play-iteratees library, in two steps:

import play.api.libs.iteratee._

// result type is Enumerator[BSONDocument]
val enumeratorOfPeople = 
  collection.
    find(query). 
    cursor[BSONDocument].
    enumerate()

val processDocuments: Iteratee[BSONDocument, Unit] =
  Iteratee.foreach { person =>
    val lastName = person.getAs[String]("lastName")
    val prettyBson = BSONDocument.pretty(person)
    println(s"found $lastName: $prettyBson")
  }

enumeratorOfPeople.apply(processDocuments) // returns Future[Unit]

The method cursor.enumerate() returns an Enumerator[T]. In this case, we get a producer of documents (of type BSONDocument).

Now that we have the producer, we need to define how the documents are processed: that is the Iteratee job. Iteratees, as the opposite of Enumerators, are consumers: they are fed in by enumerators and do some computation with the chunks they get.

Here, we build an Iteratee[BSONDocument, Unit] that takes BSONDocument as an input and eventually returns Unit – which is normal because we just print the results without computing any final value. Each time it gets a document, it extracts the lastName and prints it on the console along with the whole document. Note that none of these operations are blocking: when the running thread is not processing the callback of our Iteratee, it can be used to compute other things.

When this snippet is run, we get the following:

found London: {
  _id: BSONObjectID("4f899e7eaf527324ab25c56b"),
  firstName: BSONString(Jack),
  lastName: BSONString(London),
  age: BSONInteger(40)
}
found Whitman: {
  _id: BSONObjectID("4f899f9baf527324ab25c56c"),
  firstName: BSONString(Walt),
  lastName: BSONString(Whitman),
  age: BSONInteger(72)
}
found Hemingway: {
  _id: BSONObjectID("4f899f9baf527324ab25c56d"),
  firstName: BSONString(Ernest),
  lastName: BSONString(Hemingway),
  age: BSONInteger(61)
}

The line enumeratorOfPeople.apply(processDocuments) returns a Future[Unit]; it will eventually return the final value of the Iteratee, which is Unit in our case.

The apply method on Enumerator has an operator alias, |>>>. So we can rewrite the last line like this: enumeratorOfPeople |>>> processDocuments.

Obviously, we may use a pure Iteratee that performs some computation:

val cumulateAge: Iteratee[BSONDocument, (Int, Int)] =
  Iteratee.fold( 0 -> 0 ) { case ( (cumulatedAge, n), doc) =>
    val age = person.getAs[Int]("age").getOrElse(0)
    (cumulatedAge + age, n + 1)
  }

val cumulated = enumeratorOfPeople |>>> cumulateAge // Future[(Int, Int)]

val meanAge = 
  cumulated.map { case (cumulatedAge, n) =>
    if(n == 0)
      0
    else cumulatedAge / n
  }

// meanAge is of type Future[Float]

At each step, this Iteratee will extract the age from the document and add it to the current result; it also counts the number of documents processed. It eventually produces a tuple of two integers; in our case (173, 3). When the cumulated future is completed, we divide the cumulated age by the number of documents to get the mean age.