Ceci n'est pas encore publié, mais dans la branche master d'Alpakka, MongoSource.apply
prend un paramètre de type :
object MongoSource {
def apply[T](query: Observable[T]): Source[T, NotUsed] =
Source.fromPublisher(ObservableToPublisher(query))
}
Par conséquent, avec la prochaine version 0.18 d'Alpakka, vous pourrez faire ce qui suit :
val source: Source[TodoMongo, NotUsed] = MongoSource[TodoMongo](todoCollection.find())
Notez que source
suppose ici que todoCollection.find()
renvoie un Observable[TodoMongo]
; ajustez les types selon vos besoins.
En attendant, vous pouvez simplement ajouter manuellement le code ci-dessus. Par exemple :
package akka.stream.alpakka.mongodb.scaladsl
import akka.NotUsed
import akka.stream.alpakka.mongodb.ObservableToPublisher
import akka.stream.scaladsl.Source
import org.mongodb.scala.Observable
object MyMongoSource {
def apply[T](query: Observable[T]): Source[T, NotUsed] =
Source.fromPublisher(ObservableToPublisher(query))
}
Notez que MyMongoSource
est défini pour résider dans le akka.stream.alpakka.mongodb.scaladsl
package (comme MongoSource
), car ObservableToPublisher
est une classe package-private. Vous utiliseriez MyMongoSource
de la même manière que vous utiliseriez MongoSource
:
val source: Source[TodoMongo, NotUsed] = MyMongoSource[TodoMongo](todoCollection.find())