def time(): """ (, UNIX Epoch). @return: @rtype: C{int} """ class Event: """ , . """ def when(self): """ , (). """ def serialize(self): """ . @return: @rtype: C{str} """ @static def deserialize(serialized): """ . @param serialized: @type serialized: C{str} @return: @rtype: C{list(Event)} """ class MCEventLog(MemcacheObject): def __init__(self, mc, name, timeChunk=10, numChunks=10): """ . @param name: @type name: C{str} @param timeChunk: @type timeChunk: C{int} @param numChunks: @type numChunks: C{int} """ super(MCEventLog, self).__init__(mc) self.keyTemplate = 'messagelog' + name + '_%d'; self.timeChunk = timeChunk self.numChunks = numChunks def put(self, event): """ . @param event: @type event: L{Event} """ serialized = event.serialize() key = self.keyTemplate % (event.when() // self.timeChunk % self.numChunks) while True: try: self.mc.append(key, serialized) return except KeyError: pass try: self.mc.add(key, serialized, self.timeChunk * (self.numChunks-1)) return except KeyError: pass def fetch(self, first=None, last=None): """ ( ). @param first: @type first: C{int} @param last: @type last: C{int} @return: @rtype: C{list(Event)} """ if last is None or last > time(): last = time() if first is None or last < first or (last-first) > self.timeChunk * (self.numChunks-1): first = time() — self.timeChunk * (self.numChunks-1) firstKey = first / self.timeChunk % self.numChunks lastKey = last / self.timeChunk % self.numChunks if firstKey < lastKey: keyRange = range(firstKey, lastKey+1) else: keyRange = range(firstKey, self.numChunks) + range(0, lastKey+1) keys = [self.keyTemplate % n for n in keyRange] result = [] for key in keys: try: events = Event.deserialize(self.mc.get(key)) except KeyError: continue result.extend(filter(lambda e: e.when() >= first and e.when() <= last, l)) return result
numChunks
keys in memcached. Each key is active (that is, supplemented with values) for timeChunk
seconds, after which the next key becomes active (if the last key was active, this role goes to the first key). Full buffer cycle, i.e. the time period between two uses of a single key is numChunks * timeChunk
seconds, and the lifetime of each key is (numChunks - 1) * timeChunk
seconds, so any time you create a key modulo timeChunk
by the time of next use the key is guaranteed to be destroyed. Thus, the capacity of the event log (or the time period for which events are saved) is (numChunks - 1) * timeChunk
seconds. Such a partitioning of the log into keys allows, upon receiving events from the log, to remove only those keys that correspond to the time interval of interest to us.timeChunk
and numChunks
depends on the application of the event log: first, the desired period of event storage is determined, then, according to the frequency of events, the timeChunk
value is selected so that the size of each key of the event log is relatively small (for example, 10-20Kb). From these considerations, we can find the value of the second parameter, numChunks
.Event
, which has the only interesting property for us - the time when the event occurred. In the event log put
method, it is assumed that the event event
passed as a parameter occurred “recently”, that is, no more than (numChunks - 1) * timeChunk
seconds (log capacity event.when()
passed since event.when()
). During put
operation, the key is calculated, into which information about the event should be placed, in accordance with its timestamp. After that, with the help of the technique already familiar with the previous examples, the key is either created or a serialized event representation is added to the value of the existing key.fetch
method calculates a potential set of log keys that can contain events that occurred during the time interval from first
to last
. If the time frames are not set, last
is considered to be equal to the current time, and first
- to the time, separated from the current by the capacity of the log. The set of keys is calculated taking into account the ring structure of the method, after which the corresponding keys are selected, the events recorded in them are deserialized and additional filtering is performed to hit the [first, last]
segment.events = fetch()
. Calculated lastSeen
as max(events.when())
.events = fetch(first=lastSeen)
, whilelastSeen
recalculated each time. def serializeArray(array): """ . """ def deserializeArray(str): """ . """ class MCArray1(MemcacheObject): def __init__(self, mc, name): """ . @param name: @type name: C{str} """ super(MCArray1, self).__init__(mc) self.lock = MCLock(name) self.key = 'array' + name def fetch(self): """ . @return: @rtype: C{list} """ try: return deserializeArray(self.mc.get(self.key)) except KeyError: return [] def change(self, add_elems=[], delete_elems=[]): """ , . @param add_elems: , @type add_elems: C{list} @param delete_elems: , @type delete_elems: C{list} """ while not self.lock.try_lock(): pass try: try: array = deserializeArray(self.mc.get(self.key)) except KeyError: array = [] array = filter(lambda e: e not in delete_elems, array) + add_elems self.mc.set(self.key, serializeArray(array), 0) finally: self.lock.unlock()
fetch
method receive the contents of the array, while it is important that the “writer” hange
records the contents of a single memcached command, that is, due to the internal atomicity of get
and set
operations in memcached and despite the lack of synchronization between the fetch
and hange
, the fetch
result will always be consistent: this will be the value before or after the next change. Writers are blocked from simultaneously modifying an array using the MCLock
lock described above.gets
, cas
and add
commands from the memcached protocol to ensure that the changes are atomic using the change
function. def serializeInt(int): """ (str). """ def deserializeIntArray(str): """ . """ class MCArray2(MemcacheObject): def __init__(self, mc, name): """ . @param name: @type name: C{str} """ super(MCArray2, self).__init__(mc) self.key = 'array' + name def fetch(self): """ . @return: @rtype: C{list} """ try: return deserializeIntArray(self.mc.get(self.key)) except KeyError: return [] def add(self, element): """ . @param element: , @type element: C{int} """ element = serializeInt(element) while True: try: self.mc.append(self.key, element) except KeyError: return try: self.mc.add(self.key, element, 0) except KeyError: return
+1 +3 +4 -3 +5
will form an array after deserialization [1, 4, 5]
; with howappend
). def serializeArray(array): """ . """ def deserializeArray(str): """ . """ class MCTable(MemcacheObject): def __init__(self, mc, name): """ . @param name: @type name: C{str} """ super(MCTable, self).__init__(mc) self.lock = MCLock(name) self.key = 'table' + name def has(self, key): """ . @param key: @type key: C{str} @rtype: C{bool} """ try: self.mc.get(self.key + '_v_' + key) return True except KeyError: return False def fetch(self): """ . @return: @rtype: C{list(str)} """ try: return deserializeArray(self.mc.get(self.key + '_keys')) except KeyError: pass def add(self, key): """ . @param key: @type key: C{str} """ while not self.lock.try_lock(): pass try: try: array = deserializeArray(self.mc.get(self.key + '_keys')) except KeyError: array = [] if key not in array: array.append(key) self.mc.set(self.key + '_v_' + key, 1, 0) self.mc.set(self.key + '_keys', serializeArray(array), 0) finally: self.lock.unlock() def delete(self, key): """ . add(). """
fetch
and add
methods are not synchronized with each other, since the list of all elements changes atomically and when we read the key we always get some consistent state.add
/ set
pair, etc.);Source: https://habr.com/ru/post/50247/
All Articles