+ - 0:00:00
Notes for current slide
Notes for next slide

ReactiveMongo 0.12.4

What's new?

  • Connection pool
  • Query and write operations
  • BSON library
  • Streaming
  • Aggregation
  • Play Framework
  • Administration
  • Logging
  • Monitoring
  • Compatibility
1 / 35

Connection pool

  • Database resolution
    • Initial node set discovery
    • Depends on environment/network (latency)
  • Connection options
 
2 / 35

Database resolution - Before

Synchronous:

import reactivemongo.api.{ DB, MongoConnection }
def deprecatedDB(con: MongoConnection) = con.db("db_name")
  • Assume >=1 active channel in connection pool
  • Not sure the DB reference is immediately usable
3 / 35

Database resolution - After

Asynchronous: Future[DB]

import scala.concurrent.Future
import scala.concurrent.ExecutionContext.Implicits.global
import reactivemongo.api.{ DB, MongoConnection }
def newResolution(con: MongoConnection): Future[DB] =
con.database("db_name")
  • Reactive handling of resolution success (or failure)
  • Use failover strategy
4 / 35

Connection options 1/2

import reactivemongo.api._, commands.WriteConcern
def connection(driver: MongoDriver) =
driver.connection(List("localhost"),
options = MongoConnectionOptions(
readPreference = ReadPreference.primary,
writeConcern = WriteConcern.Acknowledged,
failoverStrategy = FailoverStrategy(retries = 10)
)
)
5 / 35

Connection options 2/2

  • Default authentication is SCRAM SHA1
  • Support for maxIdleTimeMS (e.g. 2s)
  • New option monitorRefreshMS: frequency for pool monitor (e.g. 5s)
import reactivemongo.api._
MongoConnectionOptions(
authMode = ScramSha1Authentication, // default SCRAM SHA1
maxIdleTimeMS = 2000 /* 2s */,
monitorRefreshMS = 5000 /* check nodes each 5s */
)
6 / 35

Query and write operations

The collection API provides new operations.

  • FindAndModify
  • Query builder
  • Error handling
  • GridFS
7 / 35

Query & write - FindAndModify 1/2

Example findAndUpdate: Find Joline and update her age to 35

import scala.concurrent.Future
import scala.concurrent.ExecutionContext.Implicits.global
import reactivemongo.bson._
import reactivemongo.api.collections.bson.BSONCollection
case class Person(
firstName: String, lastName: String, age: Int)
object Person {
implicit def reader: BSONDocumentReader[Person] =
Macros.reader[Person]
}
def example1(coll: BSONCollection): Future[Option[Person]] =
coll.findAndUpdate(
BSONDocument("name" -> "Joline"),
BSONDocument("$set" -> BSONDocument("age" -> 35)),
fetchNewObject = true
).map(_.result[Person])
8 / 35

Query & write - FindAndModify 2/2

Example findAndRemove: Find document about Jack and remove it

import scala.concurrent.Future
import scala.concurrent.ExecutionContext.Implicits.global
import reactivemongo.bson.BSONDocument
import reactivemongo.api.collections.bson.BSONCollection
def example2(coll: BSONCollection) =
coll.findAndRemove(BSONDocument("name" -> "Jack"))
9 / 35

Query & write - Query builder 1/3

New requireOne builder

import scala.concurrent.Future
import scala.concurrent.ExecutionContext.Implicits.global
import reactivemongo.bson.BSONDocument
import reactivemongo.api.collections.bson.BSONCollection
trait PersonService {
def collection: BSONCollection
def example3(f: String, l: String): Future[Person] =
collection.find(BSONDocument(
"firstName" -> f, "lastName" -> l
)).requireOne[Person]
}
10 / 35

Query & write - Query builder 2/3

Option maxTimeMs now supported

Example: Make MongoDB stop if find takes more than 1234ms

import scala.concurrent.Future
import scala.concurrent.ExecutionContext.Implicits.global
import reactivemongo.bson.BSONDocument
import reactivemongo.api.collections.bson.BSONCollection
def example4(coll: BSONCollection) =
coll.find(BSONDocument("foo" -> "bar")).maxTimeMs(1234L)
11 / 35

Query & write - Query builder 2/3

Operation explain to get query plan.

import scala.concurrent.Future
import scala.concurrent.ExecutionContext.Implicits.global
import reactivemongo.bson.BSONDocument
import reactivemongo.api.collections.bson.BSONCollection
def example5(coll: BSONCollection) =
coll.find(BSONDocument.empty).explain().one[BSONDocument]
12 / 35

Query & write - Error handling 1/2

Pattern matching utilities for WriteResult

  • WriteResult.Code: match by error code
  • WriteResult.Message: match by error message
import reactivemongo.api.commands.WriteResult
def example6(res: WriteResult) = res match {
case WriteResult.Code(11000) =>
println("Match the code 11000")
// 11000 code for the Duplicate error
case WriteResult.Message(
"Must match this exact message") =>
println("Match the error message")
case _ => ()
}
13 / 35

Query & write - Error handling 2/2

Pattern matching utilities for CommandError

  • CommandError.Code: match by error code
  • CommandError.Message: match by error message
import reactivemongo.api.commands.CommandError
def example7(res: CommandError) = res match {
case CommandError.Code(11000) =>
println("Match the code 11000")
case CommandError.Message(
"Must match this exact message") =>
println("Match the error message")
case _ => ()
}
14 / 35

Query & write - GridFS

GridFS support provides operations with automatic MD5:
saveWithMD5 & iterateeWithMD5

import scala.concurrent.{ ExecutionContext, Future }
import play.api.libs.iteratee.Enumerator
import reactivemongo.api._, gridfs._, Implicits._
import reactivemongo.bson.BSONValue
type BSONFile = reactivemongo.api.gridfs.
ReadFile[BSONSerializationPack.type, BSONValue]
def example8(
gridfs: GridFS[BSONSerializationPack.type],
filename: String, typ: Option[String],
data: Enumerator[Array[Byte]]
)(implicit ec: ExecutionContext): Future[BSONFile] =
gridfs.saveWithMD5(data,
DefaultFileToSave(Some(filename), typ)
15 / 35

BSON library

Manipulate the BSON values and documents

{
_id: 1,
MongoDB: "represents JSON documents",
in: "binary-encoded format"
}
16 / 35

BSON library - BSONReader

New combinator afterRead

Example: Reader for MyEnum based on the one for BSON string

import reactivemongo.bson._
sealed trait MyEnum
object EnumValA extends MyEnum
object EnumValB extends MyEnum
val underlying: implicitly[BSONReader[BSONString, String]]
implicit def example9: BSONReader[BSONString, MyEnum] =
underlying.afterRead {
case "A" => EnumValA
case "B" => EnumValB
case v => sys.error(s"unexpected value: $v")
}
17 / 35

BSON library - BSONWriter

New combinator beforeWrite

Example: Writer to encode as BSON string the instances of MyEnum

import reactivemongo.bson._
val underlying = implicitly[BSONWriter[String, BSONString]]
implicit def example10: BSONWriter[MyEnum, BSONString] =
underlying.beforeWrite[MyEnum] {
case EnumValA => "A"
case _ => "B"
}
18 / 35

BSON library - Macros

Support for sealed family

Example: Types family of Node and Leaf

import reactivemongo.bson._
sealed trait Tree
case class Node(left: Tree, right: Tree) extends Tree
case class Leaf(data: String) extends Tree
val example11: BSONDocumentHandler[Tree] =
Macros.handler[Tree]
19 / 35

Streaming

  • Instead of accumulating in memory, processing results as document stream
  • Can now be used with several frameworks:
    • Play Iteratees
    • Akka Stream
    • Custom streaming

20 / 35

Streaming - Akka Stream

Example: Find person documents and consume using an Akka Sink

import scala.concurrent.Future
import scala.concurrent.ExecutionContext.Implicits.global
import akka.stream.Materializer
import akka.stream.scaladsl.{ Sink, Source }
import reactivemongo.bson.BSONDocument
import reactivemongo.api.collections.bson.BSONCollection
import reactivemongo.akkastream.{ State, cursorProducer }
trait Example12 {
implicit def m: Materializer
def apply(coll: BSONCollection, query: BSONDocument) = {
val sourceOfPeople = coll.find(query).
cursor[BSONDocument].documentSource()
sourceOfPeople.runWith(Sink.seq[BSONDocument])
}
}
21 / 35

Streaming - Custom streaming

New streaming based on primitive Cursor.foldWhileM[A]

import scala.concurrent.Future
import scala.concurrent.ExecutionContext.Implicits.global
import reactivemongo.api.Cursor
def example13(c: Cursor[String]): Future[List[String]] =
c.foldWhile(List.empty[String], 1000/* max doc */)(
{ (ls, str) => // process next String value
if (str startsWith "#") Cursor.Cont(ls) // Skip
else if (str == "_end") Cursor.Done(ls) // End
else Cursor.Cont(str :: ls) // Continue
},
{ (ls, err) => // handle failure
err match {
case e: RuntimeException => Cursor.Cont(ls) // Skip
case _ => Cursor.Fail(err) // Stop with failure
}
})
22 / 35

Aggregation

MongoDB provides operations to group values from multiple documents, perform operations on the grouped data to return result.

23 / 35

Aggregation - Distinct

Command distinct is now provided as collection operation

Example: Find the distinct state from a collection of cities

import scala.concurrent.Future
import scala.concurrent.ExecutionContext.Implicits.global
import reactivemongo.bson.BSONDocument
import reactivemongo.api.collections.bson.BSONCollection
def example14(cities: BSONCollection): Future[Set[String]] =
cities.distinct[String, Set]("state")
24 / 35

Aggregation - New builder

Example: Aggregates the states with population > 10000000

import scala.concurrent.Future
import scala.concurrent.ExecutionContext.Implicits.global
import reactivemongo.bson._
import reactivemongo.api.Cursor
import reactivemongo.api.collections.bson.BSONCollection
def example15(cities: BSONCollection): Cursor[BSONDocument] =
cities.aggregateWith[BSONDocument]() { framework =>
import framework.{ Group, Match, SumField }
Group(BSONString("$state"))(
"totalPop" -> SumField("population")) -> List(
Match(document("totalPop" -> document(
"$gte" -> 10000000L)))
)
}
25 / 35

Play Framework

  • JSON serialization
  • Dependency injection
  • Routing

26 / 35

Play Framework - JSON serialization 1/3

Standalone library based on Play JSON, usable outside a Play application

Example: Find all documents as JsObject

import scala.concurrent.ExecutionContext.Implicits.global
import play.api.libs.json._
import reactivemongo.api.ReadPreference
import reactivemongo.play.json._, collection._
def example16(coll: JSONCollection) =
coll.find(Json.obj()).cursor[JsObject](ReadPreference.primary).collect[List]()
27 / 35

Play Framework - JSON serialization 2/3

New helper bulkInsert

Example: Import a JSON resource

import scala.concurrent.ExecutionContext.Implicits.global
import reactivemongo.play.json.collection._
// Import many JSON objects as documents to JSON `collection`
def example17(collection: JSONCollection, resource: String) =
Helpers.bulkInsert(collection,
getClass.getResourceAsStream(resource))
28 / 35

Play Framework - JSON serialization 3/3

New functions to derive JSON handlers from BSON handlers

Example: jsonOFormat derives a BSONHandler

import play.api.libs.json.OFormat
import reactivemongo.bson._
import reactivemongo.play.json.BSONFormats
def example18(
implicit bsonWriter: BSONDocumentWriter[Person],
implicit bsonReader: BSONDocumentReader[Person]
): OFormat[Perso] = BSONFormats.jsonOFormat[Person]
29 / 35

Play Framework - Dependency injection

New @NamedDatabase annotation

Example:

# The default URI
mongodb.uri = "mongodb://someuser:somepasswd@localhost:27017/foo"

# Another one, named with 'bar'
mongodb.bar.uri = "mongodb://someuser:somepasswd@localhost:27017/lorem"

Different polls can be injected:

import javax.inject.Inject
import play.modules.reactivemongo._
class MyComponent @Inject() (
val defaultApi: ReactiveMongoApi, // for 'mongodb.uri'
@NamedDatabase("bar")
val barApi: ReactiveMongoApi // for 'mongodb.bar'
) { /* ... */ }
30 / 35

Play Framework - Routing

Bindings for Play routing are available for BSON types

Example: Action foo

import play.api.mvc.{ Action, Controller }
import reactivemongo.bson.BSONObjectID
class Application extends Controller {
def foo(id: BSONObjectID) = Action {
Ok(s"Foo: ${id.stringify}")
}
}

BSONObjectID binding can be used in conf/routes:

GET /foo/:id Application.foo(id: reactivemongo.bson.BSONObjectID)
31 / 35

Administration

New operations to manage a MongoDB instance:

  • Rename collection
  • Drop collection
  • Create user
  • Get collection statistics
  • Resync replica set members

Example: Rename a collection by suffixing

import reactivemongo.api.DefaultDB
def renameWithSuffix(
admin: DefaultDB, otherDb: String,
collName: String, suffix: String
) = admin.renameCollection(
otherDb, collName, s"$collName-$suffix")
32 / 35

Monitoring

  • Monitoring of connection pools
  • Provided JMX implementation

33 / 35

Compatibility

ReactiveMongo 0.12.4 is compatible with the following runtime:

34 / 35

Next

Next release will be 1.0 (new BSON, Spark module, GeoJSON, certificate authentication, Kamon monitoring ...)

Detailed release notes

35 / 35

Connection pool

  • Database resolution
    • Initial node set discovery
    • Depends on environment/network (latency)
  • Connection options
 
2 / 35
Paused

Help

Keyboard shortcuts

, , Pg Up, k Go to previous slide
, , Pg Dn, Space, j Go to next slide
Home Go to first slide
End Go to last slide
Number + Return Go to specific slide
b / m / f Toggle blackout / mirrored / fullscreen mode
c Clone slideshow
p Toggle presenter mode
t Restart the presentation timer
?, h Toggle this help
Esc Back to slideshow