Sharding , or horizontal scaling, separation and distribution of data across multiple servers or segments (shards). Each segment is an independent database, and collectively, all segments constitute a single local database.


oplog . These documents have an additional fromMigrate field, since we are not interested in these operations, we will update our oplog request to exclude them from the result. client.getDatabase("local") .getCollection("oplog.rs") .find(and( in(MongoConstants.OPLOG_OPERATION, "i", "d", "u"), exists("fromMigrate", false))) .cursorType(CursorType.TailableAwait) .noCursorTimeout(true) oplog to the Sharded Cluster, we will need to monitor the oplog each node (Replica Set).config database for a list of all available segments. Documents in the collection look like: { "_id" : "shard01", "host" : "shard01/localhost:27018,localhost:27019,localhost:27020" } case classes instead of Document objects, so I will declare a class: case class Shard(name: String, uri: String) Document to Shard : def parseShardInformation(item: Document): Shard = { val document = item.toBsonDocument val shardId = document.getString("_id").getValue val serversDefinition = document.getString("host").getValue val servers = if (serversDefinition.contains("/")) serversDefinition.substring(serversDefinition.indexOf('/') + 1) else serversDefinition Shard(shardId, "mongodb://" + servers) } val shards = client.getDatabase("config") .getCollection("shards") .find() .map(parseShardInformation) Source , we can simply go through our list of segments and use the method from the previous article. def source(client: MongoClient): Source[Document, NotUsed] = { val observable = client.getDatabase("local") .getCollection("oplog.rs") .find(and( in("op", "i", "d", "u"), exists("fromMigrate", false))) .cursorType(CursorType.TailableAwait) .noCursorTimeout(true) Source.fromPublisher(observable) } val sources = shards.map({ shard => val client = MongoClient(shard.uri) source(client) }) Source separately, but of course, it’s much easier and more convenient to work with them as one Source . To do this, we must combine them.Fan-in operations :
- Merge [In] - (N incoming flows, 1 output stream) selects the elements in a random order from the incoming streams and sends them one by one to the output stream.
- MergePreferred [In] - similar to Merge, but if elements are available on the preferred stream, then select elements from it, otherwise choose according to the same principle as ** Merge
- ZipWith [A, B, ..., Out] - (N input streams, 1 output stream) receives a function from N input streams which returns 1 item to the output stream per item from each incoming stream.
- Zip [A, B] - (2 input streams, 1 output stream) is the same as ZipWith designed to connect elements from streams A and B into a stream of pairwise values (A, B)
- Concat [A] - (2 incoming streams, 1 outgoing stream) connects 2 streams (sends items from the first stream, and then from the second)
STDOUT : val allShards: Source[Document, NotUsed] = sources.foldLeft(Source.empty[Document]) { (prev, current) => Source.combine(prev, current)(Merge(_)) } allShards.runForeach(println)
- Stop - The thread ends with an error.
- Resume - The erroneous item is skipped and the thread processing continues.
- Restart - The erroneous item is skipped and stream processing will continue after the current stage is reloaded. Rebooting a stage means that all accumulated data is cleared. This is usually achieved by creating a new stage pattern.
ActorPublisher and ActorSubscriber , so that in the event of any error in our Source we will not be able to correctly recover the stream processing.Finally, a completely different approach would be if we follow the updates of most or even all the nodes in the replica set. Since a pair ofts & hfield values uniquely identifies each transaction, you can easily combine the results from eachoplogon the application side, so the result of the flow will be the events that were returned by most MongoDB nodes. With this approach, you do not need to worry about whether the node is primary or secondary, you simply follow theoplogall the nodes, and all events that are returned by most of theoplogare considered valid. If you receive events that do not exist in mostoplog, such events are skipped and discarded.
orphan documents in MongoDB Sharded Cluster, since in my case, I am interested in all operations from oplog and consider them idempotent across the _id field, so this does not interfere.Source: https://habr.com/ru/post/279675/
All Articles