📜 ⬆️ ⬇️

How to make friends Bagri and MongoDB

About a month ago, I told Habra about the Bagri project: a NoSQL open source database built on top of a distributed cache .

After a fairly good response, I decided to write an article about how to increase the functionality of Bagri by writing extensions using the built-in API of the system.

image


At the moment, Bagri publishes two APIs for connecting to external systems: DataFormat API and DataStore API.
')
The first is designed to parse documents in a new format and convert them to the internal format of the system, as well as to reversely construct documents in a new format from the internal representation of the system.

The second API is used to load / save / delete documents from external storage systems. Often, to connect to a new document source you need to implement both interfaces.

I will show how to implement the DataStore connector to MongoDB and use it as a document storage system. In this case, the implementation of the DataFormat API is not required, since Mongo provides documents in JSON format, which is initially supported by the system.

Just want to make a couple of comments:

  1. The practical benefits of such a connector? Obviously, Mongo can simply be used as a centralized document repository. It may also be useful in the scenarios described in this article , when the data is already stored in Mongo, but its capacity was not enough for the development of the system's functionality;
  2. I am not a MongoDB expert, if there are more optimal ways to work with it, I will be glad to hear them;

So, let's begin.

The DataStore API assumes the implementation of the com.bagri.xdm.cache.api.DocumentStore interface:

public interface DocumentStore { /** * Lifecycle method. Invoked when the store initialized. * * @param context the environment context */ void init(Map<String, Object> context); /** * Lifecycle method. Invoked when parent schema is closing */ void close(); /** * Load document from persistent store * * @param key the document key * @return XDM Document instance if corresponding document found, null otherwise */ Document loadDocument(DocumentKey key); /** * Load bunch of documents from persistent store * * @param keys the collection of document keys to load * @return the map of loaded documents with their keys */ Map<DocumentKey, Document> loadAllDocuments(Collection<DocumentKey> keys); /** * Load document keys. Can do it in synch or asynch way. * * @return iterator over found document keys */ Iterable<DocumentKey> loadAllDocumentKeys(); /** * Stores document to persistent store. * * @param key the document key * @param value the XDM document instance */ void storeDocument(DocumentKey key, Document value); /** * Stores bunch of documents to persistent store * * @param entries the map of document keys and corresponding document instances */ void storeAllDocuments(Map<DocumentKey, Document> entries); /** * Deletes document from persistent store * * @param key the document key */ void deleteDocument(DocumentKey key); /** * Deletes bunch o documents from persistent store * * @param keys the keys identifying documents to be deleted */ void deleteAllDocuments(Collection<DocumentKey> keys); } 

The easiest way is to inherit our class, which implements the DocumentStore interface, from the abstract DocumentStoreBase class, which provides a set of helper methods for accessing the system context. We begin by processing the parameters of the connection to the external system and its initialization.

To connect to Mongo, we need the mongod server address, the database name, and the names of the collections from which we want to load documents. Define names for these parameters: mongo.db.uri, mongo.db.database, mongo.db.collections. Then the initialization code for connecting to the mongo server might look like this:

 public class MongoDBStore extends DocumentStoreBase implements DocumentStore { private MongoClient client; private MongoDatabase db; private Map<String, MongoCollection<org.bson.Document>> clMap = new HashMap<>(); @Override public void init(Map<String, Object> context) { super.setContext(context); String uri = (String) context.get("mongo.db.uri"); MongoClientURI mcUri = new MongoClientURI(uri); client = new MongoClient(mcUri); String dbName = (String) context.get("mongo.db.database"); db = client.getDatabase(dbName); String clNames = (String) context.get("mongo.db.collections"); boolean all = "*".equals(clNames); List<String> clns = Arrays.asList(clNames.split(",")); for (String clName: db.listCollectionNames()) { if (all || clns.contains(clName)) { MongoCollection<org.bson.Document> cln = db.getCollection(clName); clMap.put(clName, cln); } } } @Override public void close() { client.close(); } } 

The init method takes connection settings to the mongod server from the context, establishes a connection, and caches a MongoCollection object for each collection declared for loading. Now we need to implement a method for loading all document keys.

  private String getMappingKey(String id, String cln) { return id + "::" + cln; } private String[] getMappingParts(String keyMap) { return keyMap.split("::"); } @Override public Iterable<DocumentKey> loadAllDocumentKeys() { SchemaRepository repo = getRepository(); if (repo == null) { return null; } String id; DocumentKey key; Map<DocumentKey, String> keyMap = new HashMap<>(); for (MongoCollection<org.bson.Document> cln: clMap.values()) { String clName = cln.getNamespace().getCollectionName(); // load _ids only MongoCursor<org.bson.Document> cursor = cln.find().projection(include(“_id”)).iterator(); while (cursor.hasNext()) { org.bson.Document doc = cursor.next(); id = doc.get(“_id”).toString(); // TODO: handle possible duplicates via revisions key = repo.getFactory().newDocumentKey(id, 0, 1); keyMap.put(key, getMappingKey(id, clName)); } } PopulationManagement popManager = repo.getPopulationManagement(); popManager.setKeyMappings(keyMap); return keyMap.keySet(); } 

And methods for loading documents that correspond to loaded keys:

  @Override public Map<DocumentKey, Document> loadAllDocuments(Collection<DocumentKey> keys) { Map<DocumentKey, Document> entries = new HashMap<>(keys.size()); for (DocumentKey key: keys) { Document doc = loadDocument(key); if (doc != null) { entries.put(key, doc); } } return entries; } @Override public Document loadDocument(DocumentKey key) { SchemaRepository repo = getRepository(); Document doc = null; PopulationManagement popManager = repo.getPopulationManagement(); String id = popManager.getKeyMapping(key); if (id == null) { return null; } String[] mParts = getMappingParts(id); Document newDoc = null; int[] clns = null; com.bagri.xdm.system.Collection xcl = repo.getSchema().getCollection(mParts[1]); if (xcl != null) { clns = new int[] {xcl.getId()}; } MongoCollection<org.bson.Document> cln = clMap.get(mParts[1]); Object oid; Date creDate; try { oid = new ObjectId(mParts[0]); creDate = ((ObjectId) oid).getDate(); } catch (IllegalArgumentException ex) { oid = mParts[0]; creDate = new Date(); } org.bson.Document mongoDoc = cln.find(eq("_id", oid)).first(); String content = mongoDoc.toJson(new JsonWriterSettings(true)); try { DocumentManagementl docMgr = (DocumentManagement) repo.getDocumentManagement(); newDoc = docMgr.createDocument(key, mParts[0], content, “JSON”, creDate, "owner", 1, clns, true); } catch (XDMException ex) { // TODO: log error, but do not stop the whole loading } return doc; } 

To get started, this is enough; the storeDocument / storeAllDocuments and deleteDocument / deleteAllDocuments methods are suggested to be implemented by the reader yourself. Also please note that the above code is intended only to demonstrate the implementation of the connector and does not handle various exceptions and possible additional configuration parameters. The full connector code can be viewed and collected from the bagri-extensions repository.

Now we need to register the DataStore connector and declare the circuit that will use it. To do this, we need to add the configuration of the connector to the file <BAGRI_HOME> /config/config.xml, in the dataStores section:

 <dataStore name="mongo"> <version>1</version> <createdAt>2016-08-01T16:17:20.542+03:00</createdAt> <createdBy>admin</createdBy> <description>MongoDB data store</description> <enabled>true</enabled> <storeClass>com.bagri.samples.MongoDBStore</storeClass> <properties> <entry name="mongo.db.uri">mongodb://localhost:27017</entry> <entry name="mongo.db.database">test</entry> <entry name="mongo.db.collections">*</entry> </properties> </dataStore> 

We will test the work of the connector using the example of the restaurants collection, which is discussed in many examples of working with MongoDB. Download a collection of test documents in Mongo, as shown here: docs.mongodb.com/getting-started/shell/import-data . Now we will register the scheme for working with MongoDB and configure it to load data from this collection. In the same config.xml file, add a new schema to the schemas section:

Add the following parameters:

 <entry name="xdm.schema.store.enabled">true</entry> <entry name="xdm.schema.store.type">mongo</entry> <entry name="mongo.db.collections">restaurants</entry> 

The schemas section in config.xml with new parameters
 <schema name="Mongo" active="true"> <version>1</version> <createdAt>2016-08-01T21:30:58.096+04:00</createdAt> <createdBy>admin</createdBy> <description>Schema for MongoDB</description> <properties> <entry name="xdm.schema.store.tx.buffer.size">1024</entry> <entry name="xdm.schema.data.backup.read">false</entry> <entry name="xdm.schema.trans.backup.async">0</entry> <entry name="xdm.schema.store.enabled">true</entry> <entry name="xdm.schema.thread.pool">10</entry> <entry name="xdm.schema.data.stats.enabled">true</entry> <entry name="xdm.schema.query.cache">true</entry> <entry name="xdm.schema.store.type">mongo</entry> <entry name="mongo.db.collections">restaurants</entry> <entry name="xdm.schema.format.default">JSON</entry> <entry name="xdm.schema.ports.first">10300</entry> <entry name="xdm.schema.ports.last">10400</entry> <entry name="xdm.schema.population.size">1</entry> <entry name="xdm.schema.population.buffer.size">1000000</entry> <entry name="xdm.schema.data.backup.async">1</entry> <entry name="xdm.schema.store.data.path">../data/mongo</entry> <entry name="xdm.schema.dict.backup.sync">0</entry> <entry name="xdm.schema.trans.backup.sync">1</entry> <entry name="xdm.schema.query.backup.sync">0</entry> <entry name="xdm.schema.buffer.size">16</entry> <entry name="xdm.schema.dict.backup.async">1</entry> <entry name="xdm.schema.dict.backup.read">true</entry> <entry name="xdm.schema.trans.backup.read">false</entry> <entry name="xdm.schema.query.backup.async">0</entry> <entry name="xdm.schema.members">localhost</entry> <entry name="xdm.schema.data.backup.sync">0</entry> <entry name="xdm.schema.partition.count">157</entry> <entry name="xdm.schema.query.backup.read">true</entry> <entry name="xdm.schema.transaction.timeout">0</entry> <entry name="xdm.schema.health.threshold.low">25</entry> <entry name="xdm.schema.health.threshold.high">0</entry> <entry name="xdm.schema.query.parallel">true</entry> <entry name="xdm.schema.partition.pool">32</entry> <entry name="xqj.schema.baseUri">file:///../data/mongo/</entry> <entry name="xqj.schema.orderingMode">2</entry> <entry name="xqj.schema.queryLanguageTypeAndVersion">1</entry> <entry name="xqj.schema.bindingMode">0</entry> <entry name="xqj.schema.boundarySpacePolicy">1</entry> <entry name="xqj.schema.scrollability">1</entry> <entry name="xqj.schema.holdability">2</entry> <entry name="xqj.schema.copyNamespacesModePreserve">1</entry> <entry name="xqj.schema.queryTimeout">0</entry> <entry name="xqj.schema.defaultFunctionNamespace"></entry> <entry name="xqj.schema.defaultElementTypeNamespace"></entry> <entry name="xqj.schema.copyNamespacesModeInherit">1</entry> <entry name="xqj.schema.defaultOrderForEmptySequences">2</entry> <entry name="xqj.schema.defaultCollationUri"></entry> <entry name="xqj.schema.constructionMode">1</entry> </properties> <collections> <collection id="1" name="restaurants"> <version>1</version> <createdAt>2016-08-01T01:01:26.965+03:00</createdAt> <createdBy>admin</createdBy> <description>Mongo restaurants collection</description> <enabled>true</enabled> </collection> </collections> <fragments/> <indexes/> <triggers/> </schema> 


In addition, you need to create a new file with the server profile. In the <BAGRI_HOME> / config directory, create the mongo.properties file and specify in it the scheme used by the server:

 xdm.cluster.node.schemas=Mongo 

Make sure that the MongoDB server is running and waiting for connections at the address specified in the connector settings. Now you can start the server Bagri. In the <BAGRI_HOME> / bin directory, run the command> bgcache.cmd mongo (on Windows) or> ./ bgcache.sh mongo (on Linux). This script starts a single Bagri server with settings from the mongo.properties profile. After the download is complete, the server log should contain the following lines:

image

showing that the connector initialized the Mongo scheme and loaded into it 25359 documents from the external MongoDB server.

Now I will show how you can manipulate JSON documents using XQuery queries.

To run XQuery interactively, we need a client that allows it. The Bagri distribution comes with a VisualVM plugin, which provides this functionality. Instructions for installing it here .

Start the Bagri <BAGRI_HOME> / bin / bgadmin administration server. Open the VisualVM application, connect to the Bagri Manager administration server and select the Mongo schema. The DocumentManagement tab allows you to work with documents and collections:

image

, and the QueryManagement tab with XQuery queries. Run the following simple query to select a restaurant by its ID:

 declare namespace m="http://www.w3.org/2005/xpath-functions/map"; let $props := map{'method': 'json', 'indent': fn:true()} for $uri in fn:uri-collection("restaurants") let $map := fn:json-doc($uri) where m:get($map, 'restaurant_id') = '40362098' return (fn:serialize($map, $props), '\&\#xa;') 

* Please note that the newline character in the very last line is escaped \, since Habr turns it into a valid newline, so that when the query is executed, the \ character must be removed.

image

Or another, for selecting restaurants by type of cuisine:

 declare namespace m="http://www.w3.org/2005/xpath-functions/map"; let $props := map{'method': 'json'} for $uri in fn:uri-collection("restaurants") let $map := fn:json-doc($uri) where m:get($map, 'cuisine') = 'Bakery' return (fn:serialize($map, $props), '\&\#xa;') 

image

XQuery easily allows you to make any samples available in Mongo (except for queries on geo-indexes, they are not yet supported out of the box).

Now I’ll show queries that are not supported in MongoDB: JOIN. To do this, you can bring the data in the restaurants collection to a more normalized form, for example, separate reviews of restaurants from the data on the restaurant itself and store them in different collections.

Run this query and save the results to a file, then import the received data into MongoDB into the rest-short collection.

 declare namespace m="http://www.w3.org/2005/xpath-functions/map"; let $props := map{'method': 'json'} for $uri in fn:uri-collection("restaurants") let $rest := fn:json-doc($uri) let $rest := m:remove($rest, '_id') let $rest := m:remove($rest, 'grades') return (fn:serialize($rest, $props), '\&\#xa;') 

The following query displays the data on the reviews. Also save them to a separate file and then import it into MongoDB in the grades collection.

 declare namespace a="http://www.w3.org/2005/xpath-functions/array"; declare namespace m="http://www.w3.org/2005/xpath-functions/map"; let $props := map{'method': 'json'} for $uri in fn:uri-collection("restaurants") let $rest := fn:json-doc($uri) let $grades := m:get($rest, 'grades') return for $i in (1 to a:size($grades)) let $grade := a:get($grades, $i) let $date := m:get($grade, 'date') return ('{"restaurant_id": "', m:get($rest, 'restaurant_id'), '", "date": ', fn:serialize($date, $props), ', "grade": "', m:get($grade, 'grade'), '", "score": "', m:get($grade, 'score'), '"}', '\&\#xa;') 

Now correct the schema settings to declare new collections for download:

 <schema name="Mongo" active="true"> ………... <properties> <entry name="xdm.schema.store.collections">rest-short, grades</entry> ……... </properties> <collections> <collection id="2" name="rest-short"> <version>1</version> <createdAt>2016-08-01T01:01:26.965+03:00</createdAt> <createdBy>admin</createdBy> <description>Restaurant headers collection</description> <enabled>true</enabled> </collection> <collection id="3" name="grades"> <version>1</version> <createdAt>2016-08-01T01:01:26.965+03:00</createdAt> <createdBy>admin</createdBy> <description>Restaurant grades collection</description> <enabled>true</enabled> </collection> </collections> <fragments/> <indexes/> <triggers/> </schema> 

Restart the Bagri server to load new collections with data. Now you can check how the joines work. Run the following query to form the full structure of restaurants from the two collections:

 declare namespace m="http://www.w3.org/2005/xpath-functions/map"; let $props := map{'method': 'json'} for $ruri in fn:uri-collection("rest-short") let $rest := fn:json-doc($ruri) let $rid := m:get($rest, 'restaurant_id') let $addr := m:get($rest, 'address') let $txt := ('{"restaurant_id": "', $rid, '", "cuisine": "', m:get($rest, 'cuisine'), '", "name": "', m:get($rest, 'name'), '", "borough": "', m:get($rest, 'borough'), '", "address": ', fn:serialize($addr, $props), ', "grades": [') return ($txt, fn:string-join( for $guri in fn:uri-collection("grades") let $grade := fn:json-doc($guri) let $gid := m:get($grade, 'restaurant_id') where $gid = $rid return fn:serialize(m:remove(m:remove($grade, '_id'), 'restaurant_id'), $props), ', '), ']}\&\#xa;') 

So, we have examined how you can implement the DataStore connector to MongoDB and use it as a document storage system. I hope this article can be a starting point for you to write other Bagri extensions, or simply encourage you to become more familiar with this interesting product. The project always requires Java developers interested in the development of Bagri, in more detail the project code can be viewed at Github .

Source: https://habr.com/ru/post/312626/


All Articles