Reactive Scala Driver for MongoDB

Asynchronous & Non-Blocking

Connect to the database

The first thing you need, is to create a new AsyncDriver instance.

val driver1 = new reactivemongo.api.AsyncDriver

Then you can connect to a MongoDB server.

import scala.concurrent.Future
import reactivemongo.api.MongoConnection

val connection3: Future[MongoConnection] = driver1.connect(List("localhost"))

A AsyncDriver instance manages the shared resources (e.g. the actor system for the asynchronous processing); A connection manages a pool of network channels. In general, a AsyncDriver or a MongoConnection should not be instantiated more than once.

You can provide a list of one or more servers, the driver will guess if it’s a standalone server or a replica set configuration. Even with one replica node, the driver will probe for other nodes and add them automatically.

Connection options

Some options can be provided while creating a connection.

import reactivemongo.api.MongoConnectionOptions

val conOpts = MongoConnectionOptions(/* connection options */)
val connection4 = driver1.connect(List("localhost"), options = conOpts)

The following options can be used with MongoConnectionOptions to configure the connection behaviour.

Authentication:

SSL & certificates:

The option ssl is needed if the MongoDB server is requiring SSL (mongod --sslMode requireSSL). The related option sslAllowsInvalidCert is required if the server allows invalid certificate (mongod --sslAllowInvalidCertificates).

SNI is supported for the SSL connection.

Network, timeouts & failover:

Read isolation & consistency:

If the connection pool is defined by an URI, then the options can be given after the ? separator:

mongodb.uri = "mongodb://user:pass@host1:27017,host2:27018,host3:27019/mydatabase?authenticationMechanism=scram-sha1&rm.tcpNoDelay=true"

See: Connect using MongoDB URI

Connecting to a Replica Set

ReactiveMongo provides support for replica sets as follows.

Connecting to a replica set is pretty much the same as connecting to a unique server. You may have notice that the connection argument is a List[String], so more than one node can be specified.

val servers6 = List("server1:27017", "server2:27017", "server3:27017")
val connection6 = driver1.connect(servers6)

There is no obligation to give all the nodes in the replica set. Actually, just one of them is required. ReactiveMongo will ask the nodes it can reach for the addresses of the other nodes in the replica set. Obviously it is better to give at least 2 or more nodes, in case of unavailability of one node at the start of the application.

Using many connection instances

In some (rare) cases it is perfectly viable to create as many MongoConnection instances you need, from a single AsyncDriver instance.

In that case, you will get different connection pools. This is useful when your application has to connect to two or more independent MongoDB nodes (i.e. that do not belong to the same replica set), or different replica sets.

val serversReplicaSet1 = List("rs11", "rs12", "rs13")
val connectionReplicaSet1 = driver1.connect(serversReplicaSet1)

val serversReplicaSet2 = List("rs21", "rs22", "rs23")
val connectionReplicaSet2 = driver1.connect(serversReplicaSet2)

Handling Authentication

There are two ways to give ReactiveMongo your credentials.

It can be done using driver.connection.

import reactivemongo.core.nodeset.Authenticate

def servers7: List[String] = List("server1", "server2")

val dbName = "somedatabase"
val userName = "username"
val password = "password"
val credentials7 = List(Authenticate(dbName, userName, Some(password)))
val connection7 = driver1.connect(servers7, authentications = credentials7)

Using this connection function with an URI allows to indicates the credentials in this URI.

There is also a authenticate function for the database references.

import scala.concurrent.{ ExecutionContext, Future },
  ExecutionContext.Implicits.global
import scala.concurrent.duration.FiniteDuration

import reactivemongo.api.MongoConnection

def authenticateDB(con: MongoConnection): Future[Unit] = {
  def username = "anyUser"
  def password = "correspondingPass"

  val futureAuthenticated = con.authenticate("mydb", username, password)

  futureAuthenticated.map { _ =>
    // doSomething
  }
}

Like any other operation in ReactiveMongo, authentication is done asynchronously.

Connect Using MongoDB URI

You can also give the connection information as a URI:

mongodb://[username:password@]host1[:port1][,host2[:port2],...[,hostN[:portN]]][/[database][?[option1=value1][&option2=value2][...&optionN=valueN]]

If credentials and the database name are included in the URI, ReactiveMongo will authenticate the connections on that database.

import scala.util.Try
import scala.concurrent.Future
import scala.concurrent.ExecutionContext.Implicits.global
import reactivemongo.api.{ AsyncDriver, MongoConnection }

// connect to the replica set composed of `host1:27018`, `host2:27019` and `host3:27020`
// and authenticate on the database `somedb` with user `user123` and password `passwd123`
val uri = "mongodb://user123:passwd123@host1:27018,host2:27019,host3:27020/somedb"

def connection7(driver: AsyncDriver): Future[MongoConnection] = for {
  parsedUri <- MongoConnection.fromString(uri)
  con <- driver.connect(parsedUri)
} yield con

The following example is using a connection to asynchronously resolve a database.

import scala.concurrent.Future
import scala.concurrent.ExecutionContext.Implicits.global
import reactivemongo.api.{ AsyncDriver, MongoConnection }

val mongoUri = "mongodb://host:port/db"

val driver = new AsyncDriver

val database = for {
  uri <- MongoConnection.fromString(mongoUri)
  con <- driver.connect(uri)
  dn <- Future(uri.db.get)
  db <- con.database(dn)
} yield db

database.onComplete {
  case resolution =>
    println(s"DB resolution: $resolution")
    driver.close()
}

Note that DNS seedlist is supported, using mongodb+srv:// scheme in the connection URI.

import reactivemongo.api._

def seedListCon(driver: AsyncDriver) =
  driver.connect("mongodb+srv://usr:pass@mymongo.mydomain.tld/mydb")

See:

Netty native

ReactiveMongo is internally using (as a shaded dependency) Netty 4.1.x.

It makes possible to use the native optimization of Netty. To do so, the reactivemongo-shaded-native must be added as runtime dependency, with the appropriate version.

// For Mac OS X (x86-64), kqueue native support
libraryDependencies += "org.reactivemongo" % "reactivemongo-shaded-native" % "0.1x-osx-x86-64" % "runtime"

// For Linux (x86-64), kqueue native support
libraryDependencies += "org.reactivemongo" % "reactivemongo-shaded-native" % "0.1x-linux-x86-64" % "runtime"

In order to make sure such optimization is loaded, you can enable the INFO level for the logger reactivemongo.core.netty.Pack (e.g. in your logback configuration), then check for a log entry containing “NettyPack”.

16:03:43.098 ReactiveMongo INFO  [r.c.n.Pack] :: Instantiated NettyPack(class reactivemongo.io.netty.channel.kqueue.KQueueSocketChannel)

Additional Notes

MongoConnection stands for a pool of connections.

Do not get confused here: a MongoConnection is a logical connection, not a physical one (not a network channel); It’s actually a connection pool. By default, a MongoConnection creates 10 physical network channels to each node; It can be tuned this by setting the rm.nbChannelsPerNode options (see the connection options.

Why are AsyncDriver and MongoConnection distinct?

They manage two different things. AsyncDriver holds the actor system, and MongoConnection the references to the actors. This is useful because it enables to work with many different single nodes or replica sets. Thus, your application can communicate with different replica sets or single nodes, with only one AsyncDriver instance.

Creation Costs:

AsyncDriver and MongoConnection involve creation costs:

It is also a good idea to store the driver and connection instances to reuse them.

On the contrary, DefaultDB and Collection are just plain objects that store references and nothing else. Getting such references is lightweight, and calling connection.database(..) or db.collection(..) may be done many times without any performance hit.

It’s generally a good practice not to assign the database and collection references to val (even to lazy val), as it’s better to get a fresh reference each time, to automatically recover from any previous issues (e.g. network failure).

Virtual Private Network (VPN):

When connecting to a MongoDB replica set over a VPN, if using IP addresses instead of host names to configure the connection nodes, then it’s possible that the nodes are discovered with host names that are only known within the remote network, and so not usable from the driver/client side.

Troubleshooting

The bellow errors may indicate there is a connectivity and/or network issue.

Primary not available

Node set not reachable

Diagnostic:

If one of the error is seen, first retry/refresh to check it wasn’t a temporary system/network issue. If the issue is then reproduced, the following can be checked.

Are the DB nodes accessible from the node running the application?

Is the connection URI used with ReactiveMongo valid?

If using the Play module, the strictUri setting can be enabled (e.g. mongodb.connection.strictUri=true).

If calling directly the function AsyncDriver.connection, the strictUri parameter can be set to true.

Connect without any non mandatory options (e.g. connectTimeoutMS), using the SBT Playground to try the alternative URI.

Using the following code, make sure there is no authentication issue.

import scala.concurrent.Future
import scala.concurrent.ExecutionContext.Implicits.global

import reactivemongo.api._

def troubleshootAuth() = {
  val strictUri = "mongodb://..."
  val dbname = "db-name"
  val user = "your-user"
  val pass = "your-password"
  val driver = AsyncDriver()
  
  driver.connect(strictUri).flatMap {
    _.authenticate(dbname, user, pass)
  }.onComplete {
    case res => println(s"Auth: $res")
  }
}

troubleshootAuth()

// Would display something like `Auth: Failure(...)` in case of failure

Connecting to a MongoDB ReplicaSet, is status ok?

Additional actions:

With the ReactiveMongo logging enabled, more details can be found (see a trace example thereafter).

reactivemongo.core.actors.Exceptions$InternalState: null (<time:1469208071685>:-1)
reactivemongo.ChannelClosed(-2079537712, {{NodeSet None Node[localhost:27017: Primary (0/0 available connections), latency=5], auth=Set() }})(<time:1469208071685>)
reactivemongo.Shutdown(<time:1469208071673>)
reactivemongo.ChannelDisconnected(-2079537712, {{NodeSet None Node[localhost:27017: Primary (1/1 available connections), latency=5], auth=Set() }})(<time:1469208071663>)
reactivemongo.ChannelClosed(967102512, {{NodeSet None Node[localhost:27017: Primary (1/2 available connections), latency=5], auth=Set() }})(<time:1469208071663>)
reactivemongo.ChannelDisconnected(967102512, {{NodeSet None Node[localhost:27017: Primary (2/2 available connections), latency=5], auth=Set() }})(<time:1469208071663>)
reactivemongo.ChannelClosed(651496230, {{NodeSet None Node[localhost:27017: Primary (2/3 available connections), latency=5], auth=Set() }})(<time:1469208071663>)
reactivemongo.ChannelDisconnected(651496230, {{NodeSet None Node[localhost:27017: Primary (3/3 available connections), latency=5], auth=Set() }})(<time:1469208071663>)
reactivemongo.ChannelClosed(1503989210, {{NodeSet None Node[localhost:27017: Primary (3/4 available connections), latency=5], auth=Set() }})(<time:1469208071662>)
reactivemongo.ChannelDisconnected(1503989210, {{NodeSet None Node[localhost:27017: Primary (4/4 available connections), latency=5], auth=Set() }})(<time:1469208071662>)
reactivemongo.ChannelClosed(-228911231, {{NodeSet None Node[localhost:27017: Primary (4/5 available connections), latency=5], auth=Set() }})(<time:1469208071662>)
reactivemongo.ChannelDisconnected(-228911231, {{NodeSet None Node[localhost:27017: Primary (5/5 available connections), latency=5], auth=Set() }})(<time:1469208071662>)
reactivemongo.ChannelClosed(-562085577, {{NodeSet None Node[localhost:27017: Primary (5/6 available connections), latency=5], auth=Set() }})(<time:1469208071662>)
reactivemongo.ChannelDisconnected(-562085577, {{NodeSet None Node[localhost:27017: Primary (6/6 available connections), latency=5], auth=Set() }})(<time:1469208071662>)
reactivemongo.ChannelClosed(-857553810, {{NodeSet None Node[localhost:27017: Primary (6/7 available connections), latency=5], auth=Set() }})(<time:1469208071662>)

The JMX module can be used to check how the node set is seen by the driver.

Previous: Get started / Next: Database and collections

Suggest changes