tail -f , the Tailable Cursor has the same concept. MongoDB provides the ability to use this feature by default and does not require additional libraries and tools. As for Oplog , this is the same collection as everyone else and nothing new is required.Oplog and the Tailable Cursor , then you can find more information in the MongoDB documentation: name := "MongoDB Replica Set oplog tailing with Akks Streams" version := "1.0" scalaVersion := "2.11.7" libraryDependencies ++= Seq( "ch.qos.logback" % "logback-classic" % "1.1.5", "org.mongodb.scala" %% "mongo-scala-driver" % "1.1.0", "com.typesafe.akka" %% "akka-slf4j" % "2.4.2", "com.typesafe.akka" %% "akka-stream" % "2.4.2" )  val collection: MongoCollection[Document] = _ val observable = collection.find(in("op", "i", "d", "u")) .cursorType(CursorType.TailableAwait) .noCursorTimeout(true) tailable cursor tailable no timeout , and the op field that defines the type of operation in Oplog should be a CRUD operation i/d/u .Flow that is connected to Source and Sink and is ready to execute the run() command.Source and Sink , since we will only follow the updates without changing the incoming data.Observable model, which can be converted to Reactive Streams Publisher with just a few lines of code, and the MongoDB team has already given an example of an implicit conversion that we will use as the contact point between the two libraries.Source definition is very easy. From Oplog we will receive Document objects, this will be the type of the Source stream. val source: Source[Document, NotUsed] = _ FindObservable[Document] object from a MongoDB Oplog request and a Source[Document, NotUsed] resource type, so how do we convert one into the other?Source contains a method: def fromPublisher[T](publisher: Publisher[T]): Source[T, NotUsed] Publisher type to Source , we also have an implicit conversion from MongoDB Observable to Publisher . Now we can link all the parts: import rxStreams.Implicits._ val collection: MongoCollection[Document] = _ val observable = collection.find(in("op", "i", "d", "u")) .cursorType(CursorType.TailableAwait) .noCursorTimeout(true) val source: Source[Document, NotUsed] = Source.fromPublisher(observable) STDOUT : source.runWith(Sink.foreach(println))  source.runForeach(println) Oplog collection to the end and will keep track of new arrivals.Observable to Publisher , and then also to Source , while we could just use Reactive Streams Publisher or Observable .Observables model and the Reactive Streams API provide common mechanisms for transferring data in an asynchronous lossless framework, while the Akka Streams API focuses on the transformation of the data stream.Oplog somewhere, you can follow the Observables model provided by the MongoDB driver, but if you need to transform the data stream, the choice falls on the Akka Stream.Source: https://habr.com/ru/post/278441/
All Articles