Reactive Scala Driver for MongoDB
Asynchronous & Non-Blocking
Consume streams of documents
Instead of accumulating documents in memory like in the two previous examples, we can process them in a streaming way. This is achieved using the
play-iteratees library, in two steps:
- get an
Enumeratorof documents from ReactiveMongo. This is a producer of data;
- apply an
Iteratee(that we build for this purpose), which will consume data and eventually produce a result.
cursor.enumerate() returns an
Enumerator[T]. In this case, we get a producer of documents (of type
Now that we have the producer, we need to define how the documents are processed: that is the Iteratee job. Iteratees, as the opposite of Enumerators, are consumers: they are fed in by enumerators and do some computation with the chunks they get.
Here, we build an
Iteratee[BSONDocument, Unit] that takes
BSONDocument as an input and eventually returns
Unit – which is normal because we just print the results without computing any final value. Each time it gets a document, it extracts the
lastName and prints it on the console along with the whole document. Note that none of these operations are blocking: when the running thread is not processing the callback of our Iteratee, it can be used to compute other things.
When this snippet is run, we get the following:
enumeratorOfPeople.apply(processDocuments) returns a
Future[Unit]; it will eventually return the final value of the Iteratee, which is
Unit in our case.
Enumeratorhas an operator alias,
|>>>. So we can rewrite the last line like this:
enumeratorOfPeople |>>> processDocuments.
Obviously, we may use a pure
Iteratee that performs some computation:
At each step, this Iteratee will extract the age from the document and add it to the current result; it also counts the number of documents processed. It eventually produces a tuple of two integers; in our case
(173, 3). When the
cumulated future is completed, we divide the cumulated age by the number of documents to get the mean age.