📜 ⬆️ ⬇️

The data structures in memcached / memcacheDB. Part 2

The continuation of the article about data structures in memcached. In this final part, we will look at three more data structures: an event log, an array, and a table.

Event log


Task


The task of this data structure is to store events that occurred in a distributed system in the last T seconds. Each event has a moment in time when it occurred, the rest of the event is determined by the logic of the application.

Operations on the event log:


')

Decision


def time(): """         (, UNIX Epoch). @return:     @rtype: C{int} """ class Event: """ ,    . """ def when(self): """  ,    (). """ def serialize(self): """  . @return:   @rtype: C{str} """ @static def deserialize(serialized): """   . @param serialized:       @type serialized: C{str} @return:    @rtype: C{list(Event)} """ class MCEventLog(MemcacheObject): def __init__(self, mc, name, timeChunk=10, numChunks=10): """ . @param name:    @type name: C{str} @param timeChunk:       @type timeChunk: C{int} @param numChunks:      @type numChunks: C{int} """ super(MCEventLog, self).__init__(mc) self.keyTemplate = 'messagelog' + name + '_%d'; self.timeChunk = timeChunk self.numChunks = numChunks def put(self, event): """    . @param event:  @type event: L{Event} """ serialized = event.serialize() key = self.keyTemplate % (event.when() // self.timeChunk % self.numChunks) while True: try: self.mc.append(key, serialized) return except KeyError: pass try: self.mc.add(key, serialized, self.timeChunk * (self.numChunks-1)) return except KeyError: pass def fetch(self, first=None, last=None): """        (  ). @param first:     @type first: C{int} @param last:     @type last: C{int} @return:   @rtype: C{list(Event)} """ if last is None or last > time(): last = time() if first is None or last < first or (last-first) > self.timeChunk * (self.numChunks-1): first = time() — self.timeChunk * (self.numChunks-1) firstKey = first / self.timeChunk % self.numChunks lastKey = last / self.timeChunk % self.numChunks if firstKey < lastKey: keyRange = range(firstKey, lastKey+1) else: keyRange = range(firstKey, self.numChunks) + range(0, lastKey+1) keys = [self.keyTemplate % n for n in keyRange] result = [] for key in keys: try: events = Event.deserialize(self.mc.get(key)) except KeyError: continue result.extend(filter(lambda e: e.when() >= first and e.when() <= last, l)) return result 


Discussion


The main idea of ​​the event log is a ring buffer consisting of numChunks keys in memcached. Each key is active (that is, supplemented with values) for timeChunk seconds, after which the next key becomes active (if the last key was active, this role goes to the first key). Full buffer cycle, i.e. the time period between two uses of a single key is numChunks * timeChunk seconds, and the lifetime of each key is (numChunks - 1) * timeChunk seconds, so any time you create a key modulo timeChunk by the time of next use the key is guaranteed to be destroyed. Thus, the capacity of the event log (or the time period for which events are saved) is (numChunks - 1) * timeChunk seconds. Such a partitioning of the log into keys allows, upon receiving events from the log, to remove only those keys that correspond to the time interval of interest to us.

The choice of the timeChunk and numChunks depends on the application of the event log: first, the desired period of event storage is determined, then, according to the frequency of events, the timeChunk value is selected so that the size of each key of the event log is relatively small (for example, 10-20Kb). From these considerations, we can find the value of the second parameter, numChunks .

The example uses some class Event , which has the only interesting property for us - the time when the event occurred. In the event log put method, it is assumed that the event event passed as a parameter occurred “recently”, that is, no more than (numChunks - 1) * timeChunk seconds (log capacity event.when() passed since event.when() ). During put operation, the key is calculated, into which information about the event should be placed, in accordance with its timestamp. After that, with the help of the technique already familiar with the previous examples, the key is either created or a serialized event representation is added to the value of the existing key.

The fetch method calculates a potential set of log keys that can contain events that occurred during the time interval from first to last . If the time frames are not set, last is considered to be equal to the current time, and first - to the time, separated from the current by the capacity of the log. The set of keys is calculated taking into account the ring structure of the method, after which the corresponding keys are selected, the events recorded in them are deserialized and additional filtering is performed to hit the [first, last] segment.

The above method signature allows successive calls to output new events from the log:

  1. The first time is to call events = fetch() . Calculated lastSeen as max(events.when()) .
  2. All subsequent calls are as follows: events = fetch(first=lastSeen) , while
    lastSeen recalculated each time.


Array


Task 1


An array stores a list of values ​​of an arbitrary type, the list is relatively rarely updated, and the entire list is received much more often.

Array operations:



Solution 1


 def serializeArray(array): """     . """ def deserializeArray(str): """     . """ class MCArray1(MemcacheObject): def __init__(self, mc, name): """ . @param name:   @type name: C{str} """ super(MCArray1, self).__init__(mc) self.lock = MCLock(name) self.key = 'array' + name def fetch(self): """    . @return:  @rtype: C{list} """ try: return deserializeArray(self.mc.get(self.key)) except KeyError: return [] def change(self, add_elems=[], delete_elems=[]): """   ,      . @param add_elems: ,    @type add_elems: C{list} @param delete_elems: ,    @type delete_elems: C{list} """ while not self.lock.try_lock(): pass try: try: array = deserializeArray(self.mc.get(self.key)) except KeyError: array = [] array = filter(lambda e: e not in delete_elems, array) + add_elems self.mc.set(self.key, serializeArray(array), 0) finally: self.lock.unlock() 


Talk 1


The above solution actually has nothing to do with arrays, but can be applied to any data structure. It is based on the reader-writer model, when there are many readers and relatively few writers. Readers at any time using the fetch method receive the contents of the array, while it is important that the “writer” hange records the contents of a single memcached command, that is, due to the internal atomicity of get and set operations in memcached and despite the lack of synchronization between the fetch and hange , the fetch result will always be consistent: this will be the value before or after the next change. Writers are blocked from simultaneously modifying an array using the MCLock lock described above.

In this situation, it would be possible to avoid using locks and use the gets , cas and add commands from the memcached protocol to ensure that the changes are atomic using the change function.

Task 2


An array stores a list of values ​​of a certain type; an operation of the form “add value to an array” often occurs. Relatively rare array is requested entirely. For ease of implementation, an array of integers will be considered in the future, although the data type is not significant for solving the problem.

Array operations:



Solution 2


 def serializeInt(int): """       (str). """ def deserializeIntArray(str): """       . """ class MCArray2(MemcacheObject): def __init__(self, mc, name): """ . @param name:   @type name: C{str} """ super(MCArray2, self).__init__(mc) self.key = 'array' + name def fetch(self): """    . @return:  @rtype: C{list} """ try: return deserializeIntArray(self.mc.get(self.key)) except KeyError: return [] def add(self, element): """    . @param element: ,      @type element: C{int} """ element = serializeInt(element) while True: try: self.mc.append(self.key, element) except KeyError: return try: self.mc.add(self.key, element, 0) except KeyError: return 


Talk 2


This implementation practically repeats the similar code for the event log, only simplified due to the presence of only one key. Compared with the first implementation of the “array” data type, the number of memcached operations has decreased, all processes that change the array can be executed without delay (no locks). As in the first variant, the presence of duplicates is not checked when adding an element to an array (it can be both good and bad, depending on the application).

The following improvements (or extensions) of the described example are possible:



Table


Task


It is necessary to store multiple lines. Set operations:



You can consider this data structure as a table in which you can quickly find the desired row. Or as a hash stored in distributed memory.

Decision


 def serializeArray(array): """     . """ def deserializeArray(str): """     . """ class MCTable(MemcacheObject): def __init__(self, mc, name): """ . @param name:   @type name: C{str} """ super(MCTable, self).__init__(mc) self.lock = MCLock(name) self.key = 'table' + name def has(self, key): """     . @param key:  @type key: C{str} @rtype: C{bool} """ try: self.mc.get(self.key + '_v_' + key) return True except KeyError: return False def fetch(self): """     . @return:   @rtype: C{list(str)} """ try: return deserializeArray(self.mc.get(self.key + '_keys')) except KeyError: pass def add(self, key): """    . @param key:  @type key: C{str} """ while not self.lock.try_lock(): pass try: try: array = deserializeArray(self.mc.get(self.key + '_keys')) except KeyError: array = [] if key not in array: array.append(key) self.mc.set(self.key + '_v_' + key, 1, 0) self.mc.set(self.key + '_keys', serializeArray(array), 0) finally: self.lock.unlock() def delete(self, key): """    .    add(). """ 


Discussion


Generally speaking, memcached is a huge hash table, although it lacks one operation that is necessary for our data structure: getting a list of keys. Therefore, the implementation of the table uses separate keys to store each element of the table, and separately another key to store a list of all its elements. The implementation of the storage of the list of all elements actually coincides with the implementation of “array 1”. To serialize access to the list of all elements, blocking is used, and the fetch and add methods are not synchronized with each other, since the list of all elements changes atomically and when we read the key we always get some consistent state.

The check for the presence of a key in the table is performed as quickly as possible: the presence of the corresponding key in memcached is checked. Any change to the list of items always takes place simultaneously in the key storing the entire list, and in separate keys for each item (which are used only for verification).

Based on the above scheme, you can implement a full-fledged hash, when the associated value is stored for each table element, this value will need to be written only to separate keys corresponding to the elements, and the list of elements will not contain values.

Conclusion


So, here is a list of "tricks" or "tricks" described in this article:



The article did not address the issues of memcached-specific optimization, for example, the use of multi-get requests. This was done deliberately so as not to overload the source code and the story. In many situations, the examples above should be considered more like pseudocode than as an example of an ideal implementation in Python.

If you find a mistake, you want to offer a clearer, more optimal solution to the tasks, you want to offer an implementation for some other data structure, I will be happy for comments and criticism.

Source: https://habr.com/ru/post/50247/


All Articles