Reactive Scala Driver for MongoDB

Asynchronous & Non-Blocking

Connect to the database

The first thing you need, is to create a new MongoDriver instance.

val driver1 = new reactivemongo.api.MongoDriver

Then you can connect to a MongoDB server.

import reactivemongo.api.MongoConnection

val connection3 = driver1.connection(List("localhost"))

A MongoDriver instance manages the shared resources (e.g. the actor system for the asynchronous processing); A connection manages a pool of network channels. In general, a MongoDriver or a MongoConnection should not be instantiated more than once.

You can provide a list of one or more servers, the driver will guess if it’s a standalone server or a replica set configuration. Even with one replica node, the driver will probe for other nodes and add them automatically.

Connection options

Some options can be provided while creating a connection.

import reactivemongo.api.MongoConnectionOptions

val conOpts = MongoConnectionOptions(/* connection options */)
val connection4 = driver1.connection(List("localhost"), options = conOpts)

The following options can be used with MongoConnectionOptions to configure the connection behaviour.

The option sslEnabled is needed if the MongoDB server is requiring SSL (mongod --sslMode requireSSL). The related option sslAllowsInvalidCert is required if the server allows invalid certificate (mongod --sslAllowInvalidCertificates).

If the connection pool is defined by an URI, then the options can be given after the ? separator:

mongodb.uri = "mongodb://user:pass@host1:27017,host2:27018,host3:27019/mydatabase?authMode=scram-sha1&rm.tcpNoDelay=true"

See: Connect using MongoDB URI

Connecting to a Replica Set

ReactiveMongo provides support for replica sets as follows.

Connecting to a replica set is pretty much the same as connecting to a unique server. You may have notice that the connection argument is a List[String], so more than one node can be specified.

val servers6 = List("server1:27017", "server2:27017", "server3:27017")
val connection6 = driver1.connection(servers6)

There is no obligation to give all the nodes in the replica set. Actually, just one of them is required. ReactiveMongo will ask the nodes it can reach for the addresses of the other nodes in the replica set. Obviously it is better to give at least 2 or more nodes, in case of unavailability of one node at the start of the application.

Using many connection instances

In some (rare) cases it is perfectly viable to create as many MongoConnection instances you need, from a single MongoDriver instance.

In that case, you will get different connection pools. This is useful when your application has to connect to two or more independent MongoDB nodes (i.e. that do not belong to the same replica set), or different replica sets.

val serversReplicaSet1 = List("rs11", "rs12", "rs13")
val connectionReplicaSet1 = driver1.connection(serversReplicaSet1)

val serversReplicaSet2 = List("rs21", "rs22", "rs23")
val connectionReplicaSet2 = driver1.connection(serversReplicaSet2)

Handling Authentication

There are two ways to give ReactiveMongo your credentials.

It can be done using driver.connection.

import reactivemongo.core.nodeset.Authenticate

def servers7: List[String] = List("server1", "server2")

val dbName = "somedatabase"
val userName = "username"
val password = "password"
val credentials7 = List(Authenticate(dbName, userName, password))
val connection7 = driver1.connection(servers7, authentications = credentials7)

Using this connection function with an URI allows to indicates the credentials in this URI.

There is also a authenticate function for the database references.

import scala.concurrent.{ ExecutionContext, Future },
  ExecutionContext.Implicits.global
import scala.concurrent.duration.FiniteDuration

import reactivemongo.api.DefaultDB


def authenticateDB(db: DefaultDB): Future[Unit] = {
  def username = "anyUser"
  def password = "correspondingPass"
  implicit def authTimeout = FiniteDuration(5, "seconds")

  val futureAuthenticated = db.authenticate(username, password)

  futureAuthenticated.map { _ =>
    // doSomething
  }
}

Like any other operation in ReactiveMongo, authentication is done asynchronously.

Connect Using MongoDB URI

You can also give the connection information as a URI:

mongodb://[username:password@]host1[:port1][,host2[:port2],...[,hostN[:portN]]][/[database][?[option1=value1][&option2=value2][...&optionN=valueN]]

If credentials and the database name are included in the URI, ReactiveMongo will authenticate the connections on that database.

import scala.util.Try
import reactivemongo.api.MongoConnection

// connect to the replica set composed of `host1:27018`, `host2:27019` and `host3:27020`
// and authenticate on the database `somedb` with user `user123` and password `passwd123`
val uri = "mongodb://user123:passwd123@host1:27018,host2:27019,host3:27020/somedb"

def connection7a(driver: reactivemongo.api.MongoDriver): Try[MongoConnection] =
  MongoConnection.parseURI(uri).map { parsedUri =>
    driver.connection(parsedUri)
  }

def connection7b(driver: reactivemongo.api.MongoDriver): Try[MongoConnection] =
  for {
    parsedUri <- MongoConnection.parseURI(uri)
    connection <- driver.connection(parsedUri, strictUri = true)
  } yield connection // strictUri ~> validate all URI options are supported

The following example is using a connection to asynchronously resolve a database.

import scala.concurrent.Future
import scala.concurrent.ExecutionContext.Implicits.global
import reactivemongo.api.{ MongoDriver, MongoConnection }

val mongoUri = "mongodb://host:port/db"

val driver = new MongoDriver

val database = for {
  uri <- Future.fromTry(MongoConnection.parseURI(mongoUri))
  con = driver.connection(uri)
  dn <- Future(uri.db.get)
  db <- con.database(dn)
} yield db

database.onComplete {
  case resolution =>
    println(s"DB resolution: $resolution")
    driver.close()
}

Additional Notes

MongoConnection stands for a pool of connections.

Do not get confused here: a MongoConnection is a logical connection, not a physical one (not a network channel); It’s actually a connection pool. By default, a MongoConnection creates 10 physical network channels to each node; It can be tuned this by setting the rm.nbChannelsPerNode options (see the connection options.

Why are MongoDriver and MongoConnection distinct?

They manage two different things. MongoDriver holds the actor system, and MongoConnection the references to the actors. This is useful because it enables to work with many different single nodes or replica sets. Thus, your application can communicate with different replica sets or single nodes, with only one MongoDriver instance.

Creation Costs:

MongoDriver and MongoConnection involve creation costs:

It is also a good idea to store the driver and connection instances to reuse them.

On the contrary, DefaultDB and Collection are just plain objects that store references and nothing else. Getting such references is lightweight, and calling connection.database(..) or db.collection(..) may be done many times without any performance hit.

It’s generally a good practice not to assign the database and collection references to val (even to lazy val), as it’s better to get a fresh reference each time, to automatically recover from any previous issues (e.g. network failure).

Virtual Private Network (VPN):

When connecting to a MongoDB replica set over a VPN, if using IP addresses instead of host names to configure the connection nodes, then it’s possible that the nodes are discovered with host names that are only known within the remote network, and so not usable from the driver/client side.

Troubleshooting

The bellow errors may indicate there is a connectivity and/or network issue.

Primary not available

Node set not reachable

Diagnostic:

If one of the error is seen, first retry/refresh to check it wasn’t a temporary system/network issue. If the issue is then reproduced, the following can be checked.

Are the DB nodes accessible from the node running the application?

Is the connection URI used with ReactiveMongo valid?

Connecting to a MongoDB ReplicaSet, is status ok?

Additional actions:

With the ReactiveMongo logging enabled, more details can be found (see a trace example thereafter).

reactivemongo.core.actors.Exceptions$InternalState: null (<time:1469208071685>:-1)
reactivemongo.ChannelClosed(-2079537712, {{NodeSet None Node[localhost:27017: Primary (0/0 available connections), latency=5], auth=Set() }})(<time:1469208071685>)
reactivemongo.Shutdown(<time:1469208071673>)
reactivemongo.ChannelDisconnected(-2079537712, {{NodeSet None Node[localhost:27017: Primary (1/1 available connections), latency=5], auth=Set() }})(<time:1469208071663>)
reactivemongo.ChannelClosed(967102512, {{NodeSet None Node[localhost:27017: Primary (1/2 available connections), latency=5], auth=Set() }})(<time:1469208071663>)
reactivemongo.ChannelDisconnected(967102512, {{NodeSet None Node[localhost:27017: Primary (2/2 available connections), latency=5], auth=Set() }})(<time:1469208071663>)
reactivemongo.ChannelClosed(651496230, {{NodeSet None Node[localhost:27017: Primary (2/3 available connections), latency=5], auth=Set() }})(<time:1469208071663>)
reactivemongo.ChannelDisconnected(651496230, {{NodeSet None Node[localhost:27017: Primary (3/3 available connections), latency=5], auth=Set() }})(<time:1469208071663>)
reactivemongo.ChannelClosed(1503989210, {{NodeSet None Node[localhost:27017: Primary (3/4 available connections), latency=5], auth=Set() }})(<time:1469208071662>)
reactivemongo.ChannelDisconnected(1503989210, {{NodeSet None Node[localhost:27017: Primary (4/4 available connections), latency=5], auth=Set() }})(<time:1469208071662>)
reactivemongo.ChannelClosed(-228911231, {{NodeSet None Node[localhost:27017: Primary (4/5 available connections), latency=5], auth=Set() }})(<time:1469208071662>)
reactivemongo.ChannelDisconnected(-228911231, {{NodeSet None Node[localhost:27017: Primary (5/5 available connections), latency=5], auth=Set() }})(<time:1469208071662>)
reactivemongo.ChannelClosed(-562085577, {{NodeSet None Node[localhost:27017: Primary (5/6 available connections), latency=5], auth=Set() }})(<time:1469208071662>)
reactivemongo.ChannelDisconnected(-562085577, {{NodeSet None Node[localhost:27017: Primary (6/6 available connections), latency=5], auth=Set() }})(<time:1469208071662>)
reactivemongo.ChannelClosed(-857553810, {{NodeSet None Node[localhost:27017: Primary (6/7 available connections), latency=5], auth=Set() }})(<time:1469208071662>)

The JMX module can be used to check how the node set is seen by the driver.

Previous: Get started / Next: Database and collections

Suggest changes