tail -f
, the Tailable Cursor
has the same concept. MongoDB provides the ability to use this feature by default and does not require additional libraries and tools. As for Oplog
, this is the same collection as everyone else and nothing new is required.Oplog
and the Tailable Cursor
, then you can find more information in the MongoDB documentation: name := "MongoDB Replica Set oplog tailing with Akks Streams" version := "1.0" scalaVersion := "2.11.7" libraryDependencies ++= Seq( "ch.qos.logback" % "logback-classic" % "1.1.5", "org.mongodb.scala" %% "mongo-scala-driver" % "1.1.0", "com.typesafe.akka" %% "akka-slf4j" % "2.4.2", "com.typesafe.akka" %% "akka-stream" % "2.4.2" )
val collection: MongoCollection[Document] = _ val observable = collection.find(in("op", "i", "d", "u")) .cursorType(CursorType.TailableAwait) .noCursorTimeout(true)
tailable
cursor tailable
no timeout
, and the op
field that defines the type of operation in Oplog
should be a CRUD operation i/d/u
.Flow
that is connected to Source
and Sink
and is ready to execute the run()
command.Source
and Sink
, since we will only follow the updates without changing the incoming data.Observable
model, which can be converted to Reactive Streams Publisher
with just a few lines of code, and the MongoDB team has already given an example of an implicit
conversion that we will use as the contact point between the two libraries.Source
definition is very easy. From Oplog
we will receive Document
objects, this will be the type of the Source
stream. val source: Source[Document, NotUsed] = _
FindObservable[Document]
object from a MongoDB Oplog
request and a Source[Document, NotUsed]
resource type, so how do we convert one into the other?Source
contains a method: def fromPublisher[T](publisher: Publisher[T]): Source[T, NotUsed]
Publisher
type to Source
, we also have an implicit conversion from MongoDB Observable
to Publisher
. Now we can link all the parts: import rxStreams.Implicits._ val collection: MongoCollection[Document] = _ val observable = collection.find(in("op", "i", "d", "u")) .cursorType(CursorType.TailableAwait) .noCursorTimeout(true) val source: Source[Document, NotUsed] = Source.fromPublisher(observable)
STDOUT
: source.runWith(Sink.foreach(println))
source.runForeach(println)
Oplog
collection to the end and will keep track of new arrivals.Observable
to Publisher
, and then also to Source
, while we could just use Reactive Streams Publisher
or Observable
.Observables
model and the Reactive Streams API provide common mechanisms for transferring data in an asynchronous lossless framework, while the Akka Streams API focuses on the transformation of the data stream.Oplog
somewhere, you can follow the Observables
model provided by the MongoDB driver, but if you need to transform the data stream, the choice falls on the Akka Stream.Source: https://habr.com/ru/post/278441/
All Articles