Sharding , or horizontal scaling, separation and distribution of data across multiple servers or segments (shards). Each segment is an independent database, and collectively, all segments constitute a single local database.
oplog
. These documents have an additional fromMigrate
field, since we are not interested in these operations, we will update our oplog
request to exclude them from the result. client.getDatabase("local") .getCollection("oplog.rs") .find(and( in(MongoConstants.OPLOG_OPERATION, "i", "d", "u"), exists("fromMigrate", false))) .cursorType(CursorType.TailableAwait) .noCursorTimeout(true)
oplog
to the Sharded Cluster, we will need to monitor the oplog
each node (Replica Set).config
database for a list of all available segments. Documents in the collection look like: { "_id" : "shard01", "host" : "shard01/localhost:27018,localhost:27019,localhost:27020" }
case
classes instead of Document
objects, so I will declare a class: case class Shard(name: String, uri: String)
Document
to Shard
: def parseShardInformation(item: Document): Shard = { val document = item.toBsonDocument val shardId = document.getString("_id").getValue val serversDefinition = document.getString("host").getValue val servers = if (serversDefinition.contains("/")) serversDefinition.substring(serversDefinition.indexOf('/') + 1) else serversDefinition Shard(shardId, "mongodb://" + servers) }
val shards = client.getDatabase("config") .getCollection("shards") .find() .map(parseShardInformation)
Source
, we can simply go through our list of segments and use the method from the previous article. def source(client: MongoClient): Source[Document, NotUsed] = { val observable = client.getDatabase("local") .getCollection("oplog.rs") .find(and( in("op", "i", "d", "u"), exists("fromMigrate", false))) .cursorType(CursorType.TailableAwait) .noCursorTimeout(true) Source.fromPublisher(observable) } val sources = shards.map({ shard => val client = MongoClient(shard.uri) source(client) })
Source
separately, but of course, it’s much easier and more convenient to work with them as one Source
. To do this, we must combine them.Fan-in
operations :
- Merge [In] - (N incoming flows, 1 output stream) selects the elements in a random order from the incoming streams and sends them one by one to the output stream.
- MergePreferred [In] - similar to Merge, but if elements are available on the preferred stream, then select elements from it, otherwise choose according to the same principle as ** Merge
- ZipWith [A, B, ..., Out] - (N input streams, 1 output stream) receives a function from N input streams which returns 1 item to the output stream per item from each incoming stream.
- Zip [A, B] - (2 input streams, 1 output stream) is the same as ZipWith designed to connect elements from streams A and B into a stream of pairwise values (A, B)
- Concat [A] - (2 incoming streams, 1 outgoing stream) connects 2 streams (sends items from the first stream, and then from the second)
STDOUT
: val allShards: Source[Document, NotUsed] = sources.foldLeft(Source.empty[Document]) { (prev, current) => Source.combine(prev, current)(Merge(_)) } allShards.runForeach(println)
- Stop - The thread ends with an error.
- Resume - The erroneous item is skipped and the thread processing continues.
- Restart - The erroneous item is skipped and stream processing will continue after the current stage is reloaded. Rebooting a stage means that all accumulated data is cleared. This is usually achieved by creating a new stage pattern.
ActorPublisher
and ActorSubscriber
, so that in the event of any error in our Source
we will not be able to correctly recover the stream processing.Finally, a completely different approach would be if we follow the updates of most or even all the nodes in the replica set. Since a pair ofts & h
field values uniquely identifies each transaction, you can easily combine the results from eachoplog
on the application side, so the result of the flow will be the events that were returned by most MongoDB nodes. With this approach, you do not need to worry about whether the node is primary or secondary, you simply follow theoplog
all the nodes, and all events that are returned by most of theoplog
are considered valid. If you receive events that do not exist in mostoplog
, such events are skipped and discarded.
orphan
documents in MongoDB Sharded Cluster, since in my case, I am interested in all operations from oplog
and consider them idempotent across the _id
field, so this does not interfere.Source: https://habr.com/ru/post/279675/
All Articles