📜 ⬆️ ⬇️

Using LevelDB

Faced a situation where my colleagues use SQLite, MemcacheDB, Redis to ignore embedded repositories such as LevelDB, Sophia, HamsterDB, etc. to organize local persistent key-value repositories.

I broke the article into two parts:
  1. small introduction to LevelDB api;
  2. using LevelDB, for storing time series.



LevelDB and its API


Some properties of LevelDB:

')
Opening and closing

Opening:
#include <assert> #include "leveldb/db.h" leveldb::DB* db; leveldb::Options options; options.create_if_missing = true; leveldb::Status status = leveldb::DB::Open(options, "/tmp/testdb", &db); assert(status.ok()); 


Closing:
 delete db; 


Options:
NameDescriptionDefault value
comparatorkey order setting comparatorBytewiseComparator, uses within itself memcmp
create_if_missingcreate base if absentfalse
error_if_existsthrow error if base existsfalse
paranoid_checksaggressive base integrity checkfalse
envenvironment through which I / O operations will be performedEnv :: Default ()
write_buffer_sizewrite buffer size4MB
max_open_filesnumber of open files1000
block_cacheuse special block cacheNULL, creates and uses 8MB internal cache
block_sizeapproximate amount of user data in the block4K
compressionblock compressionkSnappyCompression
filter_policyfilter (bloom) to reduce disk read operations.Null


Slice

Slice is a structure representing the key and value in a number of methods. Slice contains a pointer to the data and data size, while Slice does not contain a buffer for the data, so you should not allow such situations:
 leveldb::Slice slice; if (...) { std::string str = ...; slice = str; } Use(slice); 

leveldb :: Slice has a number of constructors for usability:
 Slice(const char* d, size_t n) : data_(d), size_(n) { } Slice(const std::string& s) : data_(s.data()), size_(s.size()) { } Slice(const char* s) : data_(s), size_(strlen(s)) { } 

And methods for getting data
 const char* leveldb::Slice::data() const; char leveldb::Slice::operator[](size_t n) const; std::string leveldb::Slice::ToString() const; 


Status


To report possible errors, most functions in LevelDB return status.
By status, you can check the function completed successfully and get a text description of the error.
 leveldb::Status s = ...; if (!s.ok()) cerr << s.ToString() << endl; 


Read, write, delete

Signatures Put, Get, Delete:
 Status leveldb::DB::Put(const WriteOptions& options, const Slice& key, const Slice& value); Status leveldb::DB::Get(const ReadOptions& options, const Slice& key, std::string* value); Status leveldb::DB::Delete(const WriteOptions& options, const Slice& key); 

Usage example:
 std::string value; leveldb::Status s = db->Get(leveldb::ReadOptions(), key1, &value); if (s.ok()) s = db->Put(leveldb::WriteOptions(), key2, value); if (s.ok()) s = db->Delete(leveldb::WriteOptions(), key1); 


Iterator


The iterator is represented by the leveldb :: Iterator class and has the following interface:
 bool leveldb::Iterator::Valid() const; void leveldb::Iterator::SeekToFirst(); void leveldb::Iterator::SeekToLast(); void leveldb::Iterator::Seek(const Slice& target); void leveldb::Iterator::Next(); void leveldb::Iterator::Prev(); Slice leveldb::Iterator::key() const; Slice leveldb::Iterator::value() const; Status leveldb::Iterator::status() const; 


The iterator interface provides methods for sequential and arbitrary
access. Sequential access can be performed both in direct and in reverse.
direction.

 leveldb::Iterator* it = db->NewIterator(leveldb::ReadOptions()); for (it->SeekToFirst(); it->Valid(); it->Next()) { cout << it->key().ToString() << ": " << it->value().ToString() << endl; } assert(it->status().ok()); // Check for any errors found during the scan delete it; 

 for (it->Seek(start); it->Valid() && it->key().ToString() < limit; it->Next()) { ... } 

 for (it->SeekToLast(); it->Valid(); it->Prev()) { ... } 


Use LevelDB to store time series.



My work is related to monitoring systems, and therefore time series and time series database appeared .

We confine ourselves to two operations:


Data schema


The following scheme is used to store metrics in a key-value repository: key = metric + timestamp + tags (optional tags).
Similarly, OpenTSDB works on top of HBase.
Inside OpenTSDB there is a metric uid scheme and a data schema. The same principle will be involved here.

One database will be used to store identifiers of metrics. The key here will be a number in size_t , the value of the string in the style of C.

The second base will be used for the data, the key here will be the structure of the form:
 struct Key { size_t muid; time_t timestamp; }; 

the value will be stored as double . Here the fact that the key and value in LevelDB is an array of bytes, is used to its fullest
so we can use simple data structures without any serialization.

Storage interface


 #pragma once #include <ctime> #include <cstdint> #include <memory> #include <unordered_map> namespace leveldb { class DB; class Iterator; class Slice; class Comparator; class Cache; } /*! *   */ class Storage { public: class Iterator; typedef size_t MetricUid; /*! *   */ struct Key { MetricUid muid; //!< uid  time_t timestamp; //!<  }; /*! *  * @param dir         * @param cacheSizeMb    */ Storage(const std::string& dir, size_t cacheSizeMb = 16); /*! * @brief  . * @param name   * @return    * *     UID'   UID . *     ,   UID  */ MetricUid addMetric(const std::string& name); /*! *   * @param muid   * @param timestamp   * @param value  * @return true    */ bool put(MetricUid muid, time_t timestamp, double value); /*! *       * @param muid   * @param from   * @param to   * @return  */ Iterator get(MetricUid muid, time_t from, time_t to); Storage(const Storage&) = delete; Storage& operator=(const Storage&) = delete; private: /*! *   uid  */ void initUID(); /*! *   */ void initData(); private: /*! *    UID */ MetricUid m_currentIndx; /*! *   */ std::string m_dir; /*! *    */ size_t m_cacheSizeMb; /*! *    */ std::shared_ptr<leveldb::Cache> m_dataCache; /*! *  UID' */ std::shared_ptr<leveldb::DB> m_uid; /*! *   */ std::shared_ptr<leveldb::DB> m_data; /*! *   -> uid */ std::unordered_map<std::string, MetricUid> m_metric2uid; }; /*! *      */ class Storage::Iterator { public: typedef std::tuple<time_t, double> Value; typedef std::shared_ptr<leveldb::Iterator> IteratorPrivate; Iterator(); Iterator(const IteratorPrivate& iter, const Key& limit); /*! *     * @return true    */ bool valid() const; /*! *   * @return  <, > */ Value value() const; /*! *     */ void next(); private: IteratorPrivate m_iter; //!<  LevelDB Key m_limit; //!<      }; 


The Storage constructor takes the path to the directory where the base with the uid and the database will be placed, the size of the cache block.

Implementation


Let's start with the comparator, because Memcmp is not suitable for comparing numbers. Due to the use of the structure as a key, the code is simple and readable:
 namespace { class TimeMeasurementComporator: public leveldb::Comparator { public: int Compare(const leveldb::Slice& a, const leveldb::Slice& b) const { const char* dataA = a.data(); const char* dataB = b.data(); const Storage::Key* keyA = reinterpret_cast<const Storage::Key*>(dataA); const Storage::Key* keyB = reinterpret_cast<const Storage::Key*>(dataB); if (keyA->muid < keyB->muid) { return -1; } else if (keyA->muid > keyB->muid) { return 1; } if (keyA->timestamp < keyB->timestamp) { return -1; } else if (keyA->timestamp > keyB->timestamp) { return 1; } return 0; } // Ignore the following methods for now: const char* Name() const { return "TimeMeasurementComporator"; } void FindShortestSeparator(std::string*, const leveldb::Slice&) const { } void FindShortSuccessor(std::string*) const { } }; TimeMeasurementComporator GLOBAL_COMPORATOR; } 


Further initialization / creation of a database for data:

 void Storage::initData() { DB* data; Options options; options.create_if_missing = true; options.compression = kNoCompression; options.comparator = &GLOBAL_COMPORATOR; if (m_cacheSizeMb) { options.block_cache = leveldb::NewLRUCache(m_cacheSizeMb * 1048576); m_dataCache.reset(options.block_cache); } Status status = DB::Open(options, m_dir + "/data", &data); if (!status.ok()) { LOG(ERROR)<<"Error opening database "<<status.ToString(); exit(1); } m_data.reset(data); } 

In the options, the global comparator is transmitted, and compression is disabled. LelelDB is going without Snappy.

Database initialization with metric identifiers:
 void Storage::initUID() { Options options; options.create_if_missing = true; options.compression = kNoCompression; DB* cfg; Status status = DB::Open(options, m_dir + "/conf", &cfg); if (!status.ok()) { LOG(ERROR)<<"Error opening database "<<status.ToString(); exit(1); } m_uid.reset(cfg); std::unique_ptr<leveldb::Iterator> it( m_uid->NewIterator(leveldb::ReadOptions())); for (it->SeekToFirst(); it->Valid(); it->Next()) { const size_t* index = reinterpret_cast<const size_t*>(it->key().data()); m_metric2uid[it->value().ToString()] = *index; m_currentIndx = *index; } } 

This is where the base is initialized and the display of the metric in the UID is filled.

Adding data is quite simple:
 Storage::MetricUid Storage::addMetric(const std::string& name) { auto result = m_metric2uid.find(name); if (result != m_metric2uid.end()) { return result->second; } ++m_currentIndx; m_metric2uid[name] = m_currentIndx; const auto s = m_uid->Put(WriteOptions(), Slice(reinterpret_cast<const char*>(&m_currentIndx), sizeof(m_currentIndx)), name); if (!s.ok()) { LOG(ERROR)<<"Error put "<<s.ToString(); } return m_currentIndx; } bool Storage::put(MetricUid muid, time_t timestamp, double value) { const Key key = {muid, timestamp}; const auto s = m_data->Put(WriteOptions(), Slice(reinterpret_cast<const char*>(&key), sizeof(key)), Slice(reinterpret_cast<char*>(&value), sizeof(value))); if (!s.ok()) { LOG(ERROR)<<"Error put "<<s.ToString(); } return s.ok(); } 


Data acquisition is implemented by creating a wrapper over a LevelDB iterator:
 Storage::Iterator Storage::get(MetricUid muid, time_t from, time_t to) { const Key begin = {muid, from}; const Key end = { muid, to }; Storage::Iterator::IteratorPrivate iter(m_data->NewIterator(ReadOptions())); iter->Seek(Slice(reinterpret_cast<const char*>(&begin), sizeof(begin))); return Storage::Iterator(iter, end); } Storage::Iterator::Iterator(): m_iter(nullptr) { memset(&m_limit, 0, sizeof(m_limit)); } Storage::Iterator::Iterator(const IteratorPrivate& iter, const Key& limit) : m_iter(iter), m_limit(limit) { } bool Storage::Iterator::valid() const { if(!m_iter) { return false; } const Slice right(reinterpret_cast<const char*>(&m_limit), sizeof(m_limit)); return m_iter->Valid() && (GLOBAL_COMPORATOR.Compare(m_iter->key(),right) < 0); } Storage::Iterator::Value Storage::Iterator::value() const { if(!m_iter) { return Value(0,0); } const Key* data =reinterpret_cast<const Key*>(m_iter->key().data()); double val = *reinterpret_cast<const double*>(m_iter->value().data()); return Value(data->timestamp, val); } void Storage::Iterator::next() { if(m_iter && m_iter->Valid()) { m_iter->Next(); } } 


The source code for the prototype is on github .

From the interesting:
An overview of popular modern disk storage algorithms: LevelDB, TokuDB, LMDB, Sophia
Deep immersion in disk data structures, B-trees, LSM-trees and fractal trees

Source: https://habr.com/ru/post/256207/


All Articles