Reactive Scala Driver for MongoDB
Asynchronous & Non-Blocking
GridFS
GridFS is a way to store and retrieve files using MongoDB.
ReactiveMongo provides an API for MongoDB GridFS, whose references can be resolved as bellow.
import reactivemongo.api.DB
import reactivemongo.api.bson.collection.BSONSerializationPack
import reactivemongo.api.gridfs.GridFS
type BSONGridFS = GridFS[BSONSerializationPack.type]
def resolveGridFS(db: DB): BSONGridFS = db.gridfs
Save files to GridFS
Once a reference to GridFS is obtained, it can be used to push a file in a streaming way (for now using Play Iteratees).
import scala.concurrent.{ ExecutionContext, Future }
import reactivemongo.api.bson.collection.BSONSerializationPack
import reactivemongo.api.gridfs.GridFS
import reactivemongo.api.bson.BSONValue
def saveToGridFS(
gfs: GridFS[BSONSerializationPack.type],
filename: String,
contentType: Option[String],
data: java.io.InputStream
)(implicit ec: ExecutionContext): Future[gfs.ReadFile[BSONValue]] = {
// Prepare the GridFS object to the file to be pushed
val gridfsObj = gfs.fileToSave(Some(filename), contentType)
gfs.writeFromInputStream(gridfsObj, data)
}
A function update
is provided to update the file metadata.
import scala.concurrent.ExecutionContext
import reactivemongo.api.bson.{ BSONDocument, BSONObjectID }
import reactivemongo.api.DB
import reactivemongo.api.gridfs.GridFS
def updateFile(db: DB, fileId: BSONObjectID)(implicit ec: ExecutionContext) =
db.gridfs.update(fileId, BSONDocument(f"$$set" ->
BSONDocument("meta" -> "data")))
The GridFS writeFromInputStream
operation will return a reference to the stored object, represented with the ReadFile
type.
The reference for a file save in this way will have Some
MD5 property.
The Akka Stream module is providing the GridFSStreams.sinkWithMD5
, which allows to stream data to a GridFS file.
import scala.concurrent.Future
import akka.NotUsed
import akka.util.ByteString
import akka.stream.Materializer
import akka.stream.scaladsl.Source
import reactivemongo.api.bson.BSONValue
import reactivemongo.api.bson.collection.BSONSerializationPack
import reactivemongo.api.gridfs.GridFS
import reactivemongo.akkastream.GridFSStreams
def saveWithComputedMD5(
gfs: GridFS[BSONSerializationPack.type],
filename: String,
contentType: Option[String],
data: Source[ByteString, NotUsed]
)(implicit m: Materializer): Future[gfs.ReadFile[BSONValue]] = {
implicit def ec = m.executionContext
// Prepare the GridFS object to the file to be pushed
val gridfsObj = gfs.fileToSave(Some(filename), contentType)
val streams = GridFSStreams[BSONSerializationPack.type](gfs)
val upload = streams.sinkWithMD5(gridfsObj)
data.runWith(upload)
}
Find a file from GridFS
A file previously stored in a GridFS can be retrieved as any MongoDB, using a find
operation.
import scala.concurrent.{ ExecutionContext, Future }
import reactivemongo.api.bson.collection.BSONSerializationPack
import reactivemongo.api.gridfs.{ GridFS, ReadFile }
import reactivemongo.api.bson.{ BSONDocument, BSONValue }
def gridfsByFilename(
gridfs: GridFS[BSONSerializationPack.type],
filename: String
)(implicit ec: ExecutionContext): Future[ReadFile[BSONValue, BSONDocument]] = {
def cursor = gridfs.find(BSONDocument("filename" -> filename))
cursor.head
}
The Akka Stream module is providing the GridFSStreams.source
to stream data from GridFS file.
import scala.concurrent.Future
import akka.util.ByteString
import akka.stream.Materializer
import akka.stream.scaladsl.Source
import reactivemongo.api.bson.collection.BSONSerializationPack
import reactivemongo.api.gridfs.GridFS
import reactivemongo.api.bson.BSONDocument
import reactivemongo.akkastream.{ GridFSStreams, State }
def downloadGridFSFile(
gridfs: GridFS[BSONSerializationPack.type],
filename: String
)(implicit m: Materializer): Source[ByteString, Future[State]] = {
implicit def ec = m.executionContext
val src = gridfs.find(BSONDocument("filename" -> filename)).head.map { file =>
val streams = GridFSStreams(gridfs)
streams.source(file)
}
Source.fromFutureSource(src).mapMaterializedValue(_.flatten)
}
Delete a file
A file can be removed from a GridFS using the appropriate operation.
import scala.concurrent.{ ExecutionContext, Future }
import reactivemongo.api.bson.collection.BSONSerializationPack
import reactivemongo.api.gridfs.GridFS
import reactivemongo.api.bson.BSONValue
def removeFrom(
gridfs: GridFS[BSONSerializationPack.type],
id: BSONValue // see ReadFile.id
)(implicit ec: ExecutionContext): Future[Unit] =
gridfs.remove(id).map(_ => {})
See also:
- Some GridFS tests
- An example with Play