Reactive Scala Driver for MongoDB
Asynchronous & Non-Blocking
Connect to the database
The first thing you need, is to create a new AsyncDriver
instance.
val driver1 = new reactivemongo.api.AsyncDriver
Then you can connect to a MongoDB server.
import scala.concurrent.Future
import reactivemongo.api.MongoConnection
val connection3: Future[MongoConnection] = driver1.connect(List("localhost"))
A AsyncDriver
instance manages the shared resources (e.g. the actor system for the asynchronous processing); A connection manages a pool of network channels.
In general, a AsyncDriver
or a MongoConnection
should not be instantiated more than once.
You can provide a list of one or more servers, the driver will guess if it’s a standalone server or a replica set configuration. Even with one replica node, the driver will probe for other nodes and add them automatically.
Connection options
Some options can be provided while creating a connection.
import reactivemongo.api.MongoConnectionOptions
val conOpts = MongoConnectionOptions(/* connection options */)
val connection4 = driver1.connect(List("localhost"), options = conOpts)
The following options can be used with MongoConnectionOptions
to configure the connection behaviour.
Authentication:
authSource
: DEPRECATED since 0.13, seeauthenticationDatabase
authMode
: DEPRECATED since 0.14, seeauthenticationMechanism
authenticationDatabase
: (optional) The database source for authentication credentials.authenticationMechanism
: (optional) The authentication mechanism, by default set toscram-sha1
for SCRAM-SHA-1. Can be configured with:scram-sha1
(the default since MongoDB 3.x),scram-sha256
(since MongoDB 4.0),mongocr
for the backward compatible MONGODB-CR,x509
for x.509 certificate authentication.
SSL & certificates:
sslEnabled
: DEPRECATED, seessl
ssl
: (optional) It enables the SSL support for the connection (true|false
, default isfalse
).sslAllowsInvalidCert
: (optional) IfsslEnabled
is true, this one indicates whether to accept invalid certificates (e.g. self-signed) (true|false
, default isfalse
).keyStore
: (optional) An URI to a key store (e.g.file:///path/to/keystore.p12
).keyStorePassword
: (optional) IfkeyStore
is set, then provides the password to load it (if required).keyStoreType
: (optional) IfkeyStore
is set, indicates the type of the store.
The option ssl
is needed if the MongoDB server is requiring SSL (mongod --sslMode requireSSL
). The related option sslAllowsInvalidCert
is required if the server allows invalid certificate (mongod --sslAllowInvalidCertificates
).
SNI is supported for the SSL connection.
Network, timeouts & failover:
connectTimeoutMS
: The number of milliseconds to wait for a connection to be established before giving up.maxIdleTimeMS
: The maximum number of milliseconds that a connection can remain idle in the pool before being removed and closed.rm.tcpNoDelay
: TCPNoDelay boolean flag (true|false
).rm.keepAlive
: TCP KeepAlive boolean flag (true|false
).rm.nbChannelsPerNode
: The number of user channels (connections) per node (default: 10). Note that an extra signaling channel is always created, to manage the pool state.rm.maxInFlightRequestsPerChannel
(EXPERIMENTAL): The maximum number of in flight/concurrent requests per user channel (default: 200).heartbeatFrequencyMS
(formerlyrm.monitorRefreshMS
): The interval (in milliseconds) used by the ReactiveMongo monitor to refresh the node set (default: 10s); The minimal value is 100ms.rm.failover
: The default failover strategy.default
: The default/minimal strategy, with 10 retries with an initial delay of 100ms and a delay factor ofretry count * 1.25
(100ms .. 125ms, 250ms, 375ms, 500ms, 625ms, 750ms, 875ms, 1s, 1125ms, 1250ms).remote
: The strategy for remote MongoDB node(s); Same as default but with 16 retries.strict
: A more strict strategy; Same as default but with only 5 retries.<delay>:<retries>x<factor>
: The definition of a custom strategy;- delay: The initial delay as a finite duration string accepted by the
Duration
factory. - retries: The number of retry (
Int
). - factor: The
Double
value to multiply the retry counter with, to define the delay factor (retryCount * factor
).
- delay: The initial delay as a finite duration string accepted by the
Read isolation & consistency:
writeConcern
: The default write concern (default:acknowledged
).unacknowledged
: Optionw
set to 0, journaling off (j
),fsync
off, no timeout.acknowledged
: Optionw
set to 1, journaling off,fsync
off, no timeout.journaled
: Optionw
set to 1, journaling on,fsync
off, no timeout.
writeConcernW
: The optionw
for the default write concern. IfwriteConcern
is specified, itsw
will be replaced by thiswriteConcernW
.majority
: The write operations have to be propagated to the majority of voting nodes.0
: Disable the acknowledgement.1
: Acknowledgement from the standalone server or primary one.- positive integer: Acknowledgement by at least the specified number of replica set members.
- tag: Acknowledgement by the member of the replica set matching the given tag.
writeConcernJ
: Toggle journaling on the default write concern. OfwriteConcern
is specified, itsj
will be replaced by thiswriteConcernJ
boolean flag (true|false
).writeConcernTimeout
: The time limit (in milliseconds) for the default write concern. IfwriteConcern
is specified, its timeout is replaced by this one.readPreference
: The default read preference (primary|primaryPreferred|secondary|secondaryPreferred|nearest
) (default isprimary
).nearest
: Do not consider whether nodes are primary or secondary, but select one according how fast it is to answer anisMaster
request (ping time).
-
readConcernLevel
: The level for the default read concern. appName
: The optional application name
If the connection pool is defined by an URI, then the options can be given after the ?
separator:
mongodb.uri = "mongodb://user:pass@host1:27017,host2:27018,host3:27019/mydatabase?authenticationMechanism=scram-sha1&rm.tcpNoDelay=true"
See: Connect using MongoDB URI
Connecting to a Replica Set
ReactiveMongo provides support for replica sets as follows.
- The driver will detect if it is connected to a replica set.
- It will probe for the other nodes in the set and connect to them.
- It will detect when the primary has changed and guess which is the new one.
- It will allow running queries on secondaries, if allowed by the read preference (See the MongoDB documentation for more details about querying secondary nodes).
Connecting to a replica set is pretty much the same as connecting to a unique server. You may have notice that the connection argument is a List[String]
, so more than one node can be specified.
val servers6 = List("server1:27017", "server2:27017", "server3:27017")
val connection6 = driver1.connect(servers6)
There is no obligation to give all the nodes in the replica set. Actually, just one of them is required. ReactiveMongo will ask the nodes it can reach for the addresses of the other nodes in the replica set. Obviously it is better to give at least 2 or more nodes, in case of unavailability of one node at the start of the application.
Using many connection instances
In some (rare) cases it is perfectly viable to create as many MongoConnection
instances you need, from a single AsyncDriver
instance.
In that case, you will get different connection pools. This is useful when your application has to connect to two or more independent MongoDB nodes (i.e. that do not belong to the same replica set), or different replica sets.
val serversReplicaSet1 = List("rs11", "rs12", "rs13")
val connectionReplicaSet1 = driver1.connect(serversReplicaSet1)
val serversReplicaSet2 = List("rs21", "rs22", "rs23")
val connectionReplicaSet2 = driver1.connect(serversReplicaSet2)
Handling Authentication
There are two ways to give ReactiveMongo your credentials.
It can be done using driver.connection
.
import reactivemongo.core.nodeset.Authenticate
def servers7: List[String] = List("server1", "server2")
val dbName = "somedatabase"
val userName = "username"
val password = "password"
val credentials7 = List(Authenticate(dbName, userName, Some(password)))
val connection7 = driver1.connect(servers7, authentications = credentials7)
Using this connection
function with an URI allows to indicates the credentials in this URI.
There is also a authenticate
function for the database references.
import scala.concurrent.{ ExecutionContext, Future },
ExecutionContext.Implicits.global
import scala.concurrent.duration.FiniteDuration
import reactivemongo.api.MongoConnection
def authenticateDB(con: MongoConnection): Future[Unit] = {
def username = "anyUser"
def password = "correspondingPass"
val futureAuthenticated = con.authenticate("mydb", username, password)
futureAuthenticated.map { _ =>
// doSomething
}
}
Like any other operation in ReactiveMongo, authentication is done asynchronously.
Connect Using MongoDB URI
You can also give the connection information as a URI:
mongodb://[username:password@]host1[:port1][,host2[:port2],...[,hostN[:portN]]][/[database][?[option1=value1][&option2=value2][...&optionN=valueN]]
If credentials and the database name are included in the URI, ReactiveMongo will authenticate the connections on that database.
import scala.util.Try
import scala.concurrent.Future
import scala.concurrent.ExecutionContext.Implicits.global
import reactivemongo.api.{ AsyncDriver, MongoConnection }
// connect to the replica set composed of `host1:27018`, `host2:27019` and `host3:27020`
// and authenticate on the database `somedb` with user `user123` and password `passwd123`
val uri = "mongodb://user123:passwd123@host1:27018,host2:27019,host3:27020/somedb"
def connection7(driver: AsyncDriver): Future[MongoConnection] = for {
parsedUri <- MongoConnection.fromString(uri)
con <- driver.connect(parsedUri)
} yield con
The following example is using a connection to asynchronously resolve a database.
import scala.concurrent.Future
import scala.concurrent.ExecutionContext.Implicits.global
import reactivemongo.api.{ AsyncDriver, MongoConnection }
val mongoUri = "mongodb://host:port/db"
val driver = new AsyncDriver
val database = for {
uri <- MongoConnection.fromString(mongoUri)
con <- driver.connect(uri)
dn <- Future(uri.db.get)
db <- con.database(dn)
} yield db
database.onComplete {
case resolution =>
println(s"DB resolution: $resolution")
driver.close()
}
Note that DNS seedlist is supported, using mongodb+srv://
scheme in the connection URI.
import reactivemongo.api._
def seedListCon(driver: AsyncDriver) =
driver.connect("mongodb+srv://usr:pass@mymongo.mydomain.tld/mydb")
See:
- How to connect to MongoDB Atlas
- How to connect to Azure CosmosDB
- How to connect to Amazon DocumentDB
- How to connect to Alibaba ApsaraDB
Netty native
ReactiveMongo is internally using (as a shaded dependency) Netty 4.1.x.
It makes possible to use the native optimization of Netty. To do so, the reactivemongo-shaded-native
must be added as runtime dependency, with the appropriate version.
// For Mac OS X (x86-64), kqueue native support
libraryDependencies += "org.reactivemongo" % "reactivemongo-shaded-native" % "0.1x-osx-x86-64" % "runtime"
// For Linux (x86-64), kqueue native support
libraryDependencies += "org.reactivemongo" % "reactivemongo-shaded-native" % "0.1x-linux-x86-64" % "runtime"
In order to make sure such optimization is loaded, you can enable the INFO
level for the logger reactivemongo.core.netty.Pack
(e.g. in your logback configuration), then check for a log entry containing “NettyPack”.
16:03:43.098 ReactiveMongo INFO [r.c.n.Pack] :: Instantiated NettyPack(class reactivemongo.io.netty.channel.kqueue.KQueueSocketChannel)
Additional Notes
MongoConnection
stands for a pool of connections.
Do not get confused here: a MongoConnection
is a logical connection, not a physical one (not a network channel); It’s actually a connection pool. By default, a MongoConnection
creates 10 physical network channels to each node; It can be tuned this by setting the rm.nbChannelsPerNode
options (see the connection options.
Why are AsyncDriver
and MongoConnection
distinct?
They manage two different things. AsyncDriver
holds the actor system, and MongoConnection
the references to the actors. This is useful because it enables to work with many different single nodes or replica sets. Thus, your application can communicate with different replica sets or single nodes, with only one AsyncDriver
instance.
Creation Costs:
AsyncDriver
and MongoConnection
involve creation costs:
- the driver creates a new
actor system
, - and the connection, will connect to the servers (creating network channels).
It is also a good idea to store the driver and connection instances to reuse them.
On the contrary, DefaultDB
and Collection
are just plain objects that store references and nothing else.
Getting such references is lightweight, and calling connection.database(..)
or db.collection(..)
may be done many times without any performance hit.
It’s generally a good practice not to assign the database and collection references to
val
(even tolazy val
), as it’s better to get a fresh reference each time, to automatically recover from any previous issues (e.g. network failure).
Virtual Private Network (VPN):
When connecting to a MongoDB replica set over a VPN, if using IP addresses instead of host names to configure the connection nodes, then it’s possible that the nodes are discovered with host names that are only known within the remote network, and so not usable from the driver/client side.
Troubleshooting
The bellow errors may indicate there is a connectivity and/or network issue.
Primary not available
- Error:
reactivemongo.core.actors.Exceptions$PrimaryUnavailableException: MongoError['No primary node is available! ...']
- In logging: “The primary is unavailable, is there a network problem?” (not critical if no application error occurs)
Node set not reachable
- Error:
reactivemongo.core.actors.Exceptions$NodeSetNotReachable: MongoError['The node set can not be reached! Please check your network connectivity ...']
- In logging: “The entire node set is unreachable, is there a network problem?” (not critical if no application error occurs)
Diagnostic:
If one of the error is seen, first retry/refresh to check it wasn’t a temporary system/network issue. If the issue is then reproduced, the following can be checked.
Are the DB nodes accessible from the node running the application?
- Using the MongoDB Shell:
mongo primary-host:primary-port/name-of-database
(replace the primary host & port and the database name with the same used in the ReactiveMongo connection URI). - With the SBT Playground, using the same connection URI.
- Possible causes: Broken network, authentication issue (before 0.13, MONGODB-CR is the default mode; See the
authenticationMechanism
option), SSL issue (check thesslEnabled
andsslAllowsInvalidCert
options).
Is the connection URI used with ReactiveMongo valid?
If using the Play module, the strictUri
setting can be enabled (e.g. mongodb.connection.strictUri=true
).
If calling directly the function AsyncDriver.connection
, the strictUri
parameter can be set to true
.
Connect without any non mandatory options (e.g. connectTimeoutMS
), using the SBT Playground to try the alternative URI.
Using the following code, make sure there is no authentication issue.
import scala.concurrent.Future
import scala.concurrent.ExecutionContext.Implicits.global
import reactivemongo.api._
def troubleshootAuth() = {
val strictUri = "mongodb://..."
val dbname = "db-name"
val user = "your-user"
val pass = "your-password"
val driver = AsyncDriver()
driver.connect(strictUri).flatMap {
_.authenticate(dbname, user, pass)
}.onComplete {
case res => println(s"Auth: $res")
}
}
troubleshootAuth()
// Would display something like `Auth: Failure(...)` in case of failure
Connecting to a MongoDB ReplicaSet, is status ok?
- Using the MongoDB Shell to connect to the primary node, execute
rs.status()
.
Additional actions:
With the ReactiveMongo logging enabled, more details can be found (see a trace example thereafter).
reactivemongo.core.actors.Exceptions$InternalState: null (<time:1469208071685>:-1)
reactivemongo.ChannelClosed(-2079537712, {{NodeSet None Node[localhost:27017: Primary (0/0 available connections), latency=5], auth=Set() }})(<time:1469208071685>)
reactivemongo.Shutdown(<time:1469208071673>)
reactivemongo.ChannelDisconnected(-2079537712, {{NodeSet None Node[localhost:27017: Primary (1/1 available connections), latency=5], auth=Set() }})(<time:1469208071663>)
reactivemongo.ChannelClosed(967102512, {{NodeSet None Node[localhost:27017: Primary (1/2 available connections), latency=5], auth=Set() }})(<time:1469208071663>)
reactivemongo.ChannelDisconnected(967102512, {{NodeSet None Node[localhost:27017: Primary (2/2 available connections), latency=5], auth=Set() }})(<time:1469208071663>)
reactivemongo.ChannelClosed(651496230, {{NodeSet None Node[localhost:27017: Primary (2/3 available connections), latency=5], auth=Set() }})(<time:1469208071663>)
reactivemongo.ChannelDisconnected(651496230, {{NodeSet None Node[localhost:27017: Primary (3/3 available connections), latency=5], auth=Set() }})(<time:1469208071663>)
reactivemongo.ChannelClosed(1503989210, {{NodeSet None Node[localhost:27017: Primary (3/4 available connections), latency=5], auth=Set() }})(<time:1469208071662>)
reactivemongo.ChannelDisconnected(1503989210, {{NodeSet None Node[localhost:27017: Primary (4/4 available connections), latency=5], auth=Set() }})(<time:1469208071662>)
reactivemongo.ChannelClosed(-228911231, {{NodeSet None Node[localhost:27017: Primary (4/5 available connections), latency=5], auth=Set() }})(<time:1469208071662>)
reactivemongo.ChannelDisconnected(-228911231, {{NodeSet None Node[localhost:27017: Primary (5/5 available connections), latency=5], auth=Set() }})(<time:1469208071662>)
reactivemongo.ChannelClosed(-562085577, {{NodeSet None Node[localhost:27017: Primary (5/6 available connections), latency=5], auth=Set() }})(<time:1469208071662>)
reactivemongo.ChannelDisconnected(-562085577, {{NodeSet None Node[localhost:27017: Primary (6/6 available connections), latency=5], auth=Set() }})(<time:1469208071662>)
reactivemongo.ChannelClosed(-857553810, {{NodeSet None Node[localhost:27017: Primary (6/7 available connections), latency=5], auth=Set() }})(<time:1469208071662>)
The JMX module can be used to check how the node set is seen by the driver.