📜 ⬆️ ⬇️

Track updates from MongoDB Replica Set Oplog using Scala and Akka Streams

I present to your attention the translation of the article Tailing the MongoDB Replica Set Oplog with Scala and Akka Streams .

Introduction


In this article I will try to explain how to monitor updates in MongoDB Oplog with the help of the Scala driver MongoDB and Akka Streams .
The examples given in this article should not be considered and used in production environment.
Each of us knows the Unix command tail -f , the Tailable Cursor has the same concept. MongoDB provides the ability to use this feature by default and does not require additional libraries and tools. As for Oplog , this is the same collection as everyone else and nothing new is required.
If you want to learn more about Oplog and the Tailable Cursor , then you can find more information in the MongoDB documentation:

The project created in this article is conveniently located on Github .

Libraries and Tools



An example of the build.sbt file:
 name := "MongoDB Replica Set oplog tailing with Akks Streams" version := "1.0" scalaVersion := "2.11.7" libraryDependencies ++= Seq( "ch.qos.logback" % "logback-classic" % "1.1.5", "org.mongodb.scala" %% "mongo-scala-driver" % "1.1.0", "com.typesafe.akka" %% "akka-slf4j" % "2.4.2", "com.typesafe.akka" %% "akka-stream" % "2.4.2" ) 

You will also need MongoDB Replica Set, I can recommend the official mongo docker image.

Request to monitor MongoDB Oplog


Assuming that you already have an established connection, let's define the request. The following is an example:
 val collection: MongoCollection[Document] = _ val observable = collection.find(in("op", "i", "d", "u")) .cursorType(CursorType.TailableAwait) .noCursorTimeout(true) 

As you can see from the request, we define a tailable cursor tailable no timeout , and the op field that defines the type of operation in Oplog should be a CRUD operation i/d/u .

A bit about Akka Streams terminology


Complete documentation in English is available here .
In Akka Stream, the data stream processing scheme is defined by the following abstractions:
Source — A data processing stage with only one exit point, providing data elements as soon as the underlying data processing elements are ready to be received.
Sink - The stage of processing data with only one entry point, requesting and receiving data elements with the possibility of slowing down the receipt of data from the overlying element.
Flow - The stage of processing data with only one entry and exit point, which connects the flow of data and transforms the elements passing through it.
RunnableGraph - This is a Flow that is connected to Source and Sink and is ready to execute the run() command.
In our case, we will only use Source and Sink , since we will only follow the updates without changing the incoming data.

MongoDB Scala Driver and Akka Stream


Unfortunately, there is no default option for integrating Akka Streams and MongoDB drivers, but Akka Streams has the ability to integrate with Reactive Streams , as well as the newly published, new, official, asynchronous MongoDB Scala driver. The new driver uses the Observable model, which can be converted to Reactive Streams Publisher with just a few lines of code, and the MongoDB team has already given an example of an implicit conversion that we will use as the contact point between the two libraries.

Stream Source Declaration


Source definition is very easy. From Oplog we will receive Document objects, this will be the type of the Source stream.
 val source: Source[Document, NotUsed] = _ 

At the moment, we have a FindObservable[Document] object from a MongoDB Oplog request and a Source[Document, NotUsed] resource type, so how do we convert one into the other?
The magic of implicit conversion will help us in this. Type Source contains a method:
 def fromPublisher[T](publisher: Publisher[T]): Source[T, NotUsed] 

which converts the Reactive Streams Publisher type to Source , we also have an implicit conversion from MongoDB Observable to Publisher . Now we can link all the parts:
 import rxStreams.Implicits._ val collection: MongoCollection[Document] = _ val observable = collection.find(in("op", "i", "d", "u")) .cursorType(CursorType.TailableAwait) .noCursorTimeout(true) val source: Source[Document, NotUsed] = Source.fromPublisher(observable) 

It was pretty easy, wasn't it?

What to do next?


All that you can imagine, for the time being, I will simply output all data in STDOUT :
 source.runWith(Sink.foreach(println)) 

or you can use shortcuts:
 source.runForeach(println) 

this will output all CRUD operations from the MongoDB Replica Set from the beginning of the Oplog collection to the end and will keep track of new arrivals.
You can specify a more specific query, define databases and collections from which you want to receive updates, as well as you can define the time frame for incoming documents. This I will leave for you.

Why do we need it?


Perhaps you thought about why we need to convert Observable to Publisher , and then also to Source , while we could just use Reactive Streams Publisher or Observable .
The point is that the Observables model and the Reactive Streams API provide common mechanisms for transferring data in an asynchronous lossless framework, while the Akka Streams API focuses on the transformation of the data stream.
So if you are only interested in transferring data from Oplog somewhere, you can follow the Observables model provided by the MongoDB driver, but if you need to transform the data stream, the choice falls on the Akka Stream.

Conclusion


As you can see from the article, tracking MongoDB Oplog is a very simple task, especially in the Replica Set. Other pitfalls may arise, if it is MongoDB Sharded Cluster, it will be covered in the next article.
Of course, this post does not cover all aspects of this topic, for example, error handling, delivery guarantees, etc. This can be implemented in various ways and is not the topic of this article.

')

Source: https://habr.com/ru/post/278441/


All Articles