Reactive Scala Driver for MongoDB

Asynchronous & Non-Blocking

Command API

A MongoDB Command is a special query that returns documents. It’s executed at either the database level (db.runCommand in the MongoDB shell), or at the collection level (db.aCol.runCommand in the shell).

In ReactiveMongo, the database command can be executed using db.runCommand(<command>).

The collection command can be executed with collection.runCommand(<command>).

The return type of .runCommand operations depends on the kind of command you gave it as a parameter; For example, with Count it would return Future[Int]:

import scala.concurrent.Future
import scala.concurrent.ExecutionContext.Implicits.global

import reactivemongo.api.bson.BSONDocument
import reactivemongo.api.bson.collection.BSONCollection

// BSON implementation of the count command
import reactivemongo.api.commands.bson.BSONCountCommand.{ Count, CountResult }

// BSON serialization-deserialization for the count arguments and result
import reactivemongo.api.bson.compat._
import reactivemongo.api.commands.bson.BSONCountCommandImplicits._

def run1(collection: BSONCollection) = {
  // count the number of documents which tag equals "closed"
  val query = BSONDocument("tag" -> "closed")
  val command = Count(query)
  val result: Future[CountResult] = collection.runCommand(command)

  result.map { res =>
    val numberOfDocs: Int = res.value
    // do something with this number
  }
}

The .count operation is now directly available on collection.

Some widely used commands, like Count or FindAndModify, are available in ReactiveMongo. But how to run commands that are not yet provided as operations?

Run a raw command

It is possible to run any kind of command, even if they are not yet specifically implemented in ReactiveMongo. Since a command in MongoDB is nothing more than a query on the special collection $cmd, you can make your own command.

Let’s take a look to the following example involving the Aggregation Framework (you can find this example in the MongoDB documentation):

// MongoDB Console example of Aggregate command
db.orders.aggregate([
  { $match: { status: "A" } },
  { $group: { _id: "$cust_id", total: { $sum: "$amount" } } },
  { $sort: { total: -1 } }
])

Actually, the MongoDB console sends a document that is a little bit more complex to the server:

// document sent to the database using the MongoDB console
var command =
  {
    "aggregate": "orders", // name of the collection on which we run this command
    "pipeline": [
      { $match: { status: "A" } },
      { $group: { _id: "$cust_id", total: { $sum: "$amount" } } },
      { $sort: { total: -1 } }
    ]
  }

// run the command
db.runCommand(command)

We do exactly the same thing with raw command, using document that contains the same fields:

import scala.concurrent.{ ExecutionContext, Future }

import reactivemongo.api.bson.{ BSONArray, BSONDocument }

import reactivemongo.api.{
  DefaultDB, FailoverStrategy, ReadPreference
}
import reactivemongo.api.commands.Command

def commandResult(db: DefaultDB)(implicit ec: ExecutionContext): Future[BSONDocument] = {
  val commandDoc = BSONDocument(
    "aggregate" -> "orders", // we aggregate on collection `orders`
    "pipeline" -> BSONArray(
      BSONDocument(f"$$match" -> BSONDocument("status" -> "A")),
      BSONDocument(
        f"$$group" -> BSONDocument(
          "_id" -> f"$$cust_id",
          "total" -> BSONDocument(f"$$sum" -> f"$$amount"))),
      BSONDocument(f"$$sort" -> BSONDocument("total" -> -1))
    )
  ) // For example, otherwise rather use `.aggregatorContext` with a collection

  db.runCommand(commandDoc, FailoverStrategy.default).
    cursor[BSONDocument](ReadPreference.primaryPreferred).head
}

The MongoDB aggregation is already provided by ReactiveMongo with a specific support.

Defining custom commands

It’s possible to define a not yet implemented or custom command using the command API.

Database command:

Considering a database command executed in the Shell using db.runCommand({ "custom": name, "query": { ... } }), with a result like { "count": int, "matching": [ "value1", "value2", ..., "valueN" ] }, it can be defined as following.

package customcmd

import reactivemongo.api.SerializationPack
import reactivemongo.api.commands.{
  Command,
  CommandWithPack,
  CommandWithResult,
  ImplicitCommandHelpers
}

trait CustomCommand[P <: SerializationPack] extends ImplicitCommandHelpers[P] {
  case class Custom(
    name: String,
    query: pack.Document) extends Command
      with CommandWithPack[pack.type] with CommandWithResult[CustomResult]

  case class CustomResult(count: Int, matching: List[String])
}

It specifies what is the command input (arguments), and what kind of result will be deserialized from the output, using the trait CommandWithResult[CustomResult]. If the command returns a document and you want to directly get that, it can be specified with CommandWithResult[pack.Document].

The next step is to implement the custom command.

package customcmd
package bson1

import scala.util.Try
import reactivemongo.api.bson.collection.BSONSerializationPack

object BSONCustomCommand extends CustomCommand[BSONSerializationPack.type] {
  val pack = BSONSerializationPack

  object Implicits {
    import reactivemongo.api.bson.{
      BSONDocument, BSONDocumentReader, BSONDocumentWriter, BSONNumberLike
    }

    implicit val writer: BSONDocumentWriter[Custom] =
      BSONDocumentWriter[Custom] { custom =>
        // { "custom": name, "query": { ... } }
        BSONDocument("custom" -> custom.name, "query" -> custom.query)
      }

    implicit object BSONReader extends BSONDocumentReader[CustomResult] {
      def readDocument(result: BSONDocument): Try[CustomResult] = for {
        count <- result.getAsTry[BSONNumberLike]("count").flatMap(_.toInt)
        matching <- result.getAsTry[List[String]]("matching")
      } yield CustomResult(count, matching)
    }
  }
}

In the previous example, the custom command is implemented using the BSON serialization, providing the writers and readers for the command input and result.

A command can be implemented with various serialization pack (e.g. it can also be implemented using the JSON serialization provided by the Play JSON support).

It’s also possible to gather the command definition and implementation, if only one kind of serialization is needed.

package customcmd
package bson2

import scala.util.Try
import reactivemongo.api.bson.collection.BSONSerializationPack

import reactivemongo.api.commands.{
  Command,
  CommandWithPack,
  CommandWithResult,
  ImplicitCommandHelpers
}

object BSONCustomCommand
    extends ImplicitCommandHelpers[BSONSerializationPack.type] {

  val pack = BSONSerializationPack

  case class Custom(
    name: String,
    query: pack.Document) extends Command
      with CommandWithPack[pack.type] with CommandWithResult[CustomResult]

  case class CustomResult(count: Int, matching: List[String])

  object Implicits {
    import reactivemongo.api.bson.{
      BSONDocument, BSONDocumentReader, BSONDocumentWriter, BSONNumberLike
    }

    implicit val writer: BSONDocumentWriter[Custom] =
      BSONDocumentWriter[Custom] { custom =>
        // { "custom": name, "query": { ... } }
        BSONDocument("custom" -> custom.name, "query" -> custom.query)
      }

    implicit object BSONReader extends BSONDocumentReader[CustomResult] {
      def readDocument(result: BSONDocument): Try[CustomResult] = for {
        count <- result.getAsTry[BSONNumberLike]("count").flatMap(_.toInt)
        matching <- result.getAsTry[List[String]]("matching")
      } yield CustomResult(count, matching)
    }
  }
}

Once the command is implemented, it can be executed on the database.

package customcmd.bson2

import scala.concurrent.{ ExecutionContext, Future }

import reactivemongo.api.bson.BSONDocument
import reactivemongo.api.{ BSONSerializationPack, DefaultDB, FailoverStrategy }

object MyRunner {
  import BSONCustomCommand._
  import BSONCustomCommand.Implicits._

  def custom(
    db: DefaultDB,
    name: String,
    query: BSONDocument)(implicit ec: ExecutionContext): Future[CustomResult] =
    db.runCommand(Custom(name, query), FailoverStrategy())
}

Collection command:

For a collection command db.aCollection.runCommand({ "custom": name, "query": { ... } }), the ReactiveMongo definition will be similar to those at the database level, but based on CollectionCommand (rather than Command).

import reactivemongo.api.SerializationPack
import reactivemongo.api.commands.{
  CollectionCommand,
  CommandWithPack,
  CommandWithResult,
  ImplicitCommandHelpers
}

// { "custom": name, "query": { ... } }
trait CustomCommand[P <: SerializationPack] extends ImplicitCommandHelpers[P] {
  case class Custom(
    name: String,
    query: pack.Document) extends CollectionCommand
      with CommandWithPack[pack.type] with CommandWithResult[CustomResult]

  // { "count": int, "matching": [ "value1", "value2", ..., "valueN" ] }
  case class CustomResult(count: Int, matching: List[String])
}

Once the input and output of a collection command are specified, it must be implemented.

import scala.util.Try
import reactivemongo.api.bson.collection.BSONSerializationPack

object BSONCustomCommand extends CustomCommand[BSONSerializationPack.type] {
  val pack = BSONSerializationPack

  object Implicits {
    import reactivemongo.api.commands.ResolvedCollectionCommand
    import reactivemongo.api.bson.{
      BSONDocument, BSONDocumentReader, BSONDocumentWriter, BSONNumberLike
    }

    // type `Custom` inherited from the specification `CustomCommand` trait
    implicit val BSONWriter =
      BSONDocumentWriter[ResolvedCollectionCommand[Custom]] { custom =>
        val cmd: Custom = custom.command
        val colName: String = custom.collection        

        // { "custom": name, "query": { ... } }
        BSONDocument("custom" -> cmd.name, "query" -> cmd.query)
      }

    implicit object BSONReader extends BSONDocumentReader[CustomResult] {
      // type `CustomResult` inherited from the `CustomCommand` trait
      def readDocument(result: BSONDocument): Try[CustomResult] = for {
        count <- result.getAsTry[BSONNumberLike]("count").flatMap(_.toInt)
        matching <- result.getAsTry[List[String]]("matching")
      } yield CustomResult(count, matching)
    }
  }
}

The writer of a collection collection must serialize a ResolvedCollectionCommand[Custom], rather than directly Custom. The ResolvedCollectionCommand provides the information about the collection against which the command is executed (e.g. the collection name colName in the previous example).

Then the collection command can be executed using runCommand.

import scala.concurrent.{ ExecutionContext, Future }

import reactivemongo.api.bson.BSONDocument
import reactivemongo.api.bson.collection.BSONCollection
import BSONCustomCommand._

def custom(
  col: BSONCollection,
  name: String,
  query: BSONDocument)(implicit ec: ExecutionContext): Future[CustomResult] = {

  import BSONCustomCommand.Implicits._

  col.runCommand(Custom(name, query))
}

See also:

Suggest changes