Imagine that you have a class:
class MyCounter(object): def __init__(self): self.__counter = 0 def incCounter(self): self.__counter += 1 def getCounter(self): return self.__counter
And you want to make it distributed. Just inherit it from SyncObj (passing it a list of servers you need to synchronize with) and mark all the methods that change the internal state of the class with the @replicated decorator:
class MyCounter(SyncObj): def __init__(self): super(MyCounter, self).__init__('serverA:4321', ['serverB:4321', 'serverC:4321']) self.__counter = 0 @replicated def incCounter(self): self.__counter += 1 def getCounter(self): return self.__counter
PySyncObj will automatically provide replication of your class between servers, fault tolerance (everything will work as long as more than half of the servers are alive), as well as (if necessary) asynchronous content dump to disk.
On the basis of PySyncObj, you can build various distributed systems, for example, a distributed mutex, decentralized databases, billing systems and other similar things. All those where reliability and fault tolerance come first.
general description
For replication, PySyncObj uses the
Raft algorithm. Raft is a simple algorithm for achieving consensus in a distributed system. Raft was developed as a simpler replacement for the
Paxos algorithm. In short, the raft algorithm works as follows. Among all nodes, a leader is chosen who pings the remaining nodes after a certain period of time. Each node selects a random time interval that it will wait for the ping from the leader. When the waiting time ends, and the ping from the leader did not come - the node believes that the leader fell and sends a message to the other nodes, in which he says that he himself has become the leader. With good luck, this is all (the other nodes agree). And if the two nodes want to become leaders at the same time, the procedure for choosing a leader is repeated (but with other random values of the waiting time). You can learn more about choosing a leader by viewing the
visualization , or by reading a
scientific article .
Once a leader is defined, he is responsible for maintaining the distributed log. All actions that change the state of the system are written to the distributed log. The action applies to the system only if the majority of nodes confirm receipt of the record - this ensures consistency. In order that the number of entries in the distributed log does not grow to infinity, an operation called log compaction occurs periodically. The current log is discarded, and instead of it, the serialized state of the system is currently stored.
In order not to lose the content (for example, when all the servers are turned off), it should be periodically saved to disk. Since the amount of data can be very large, the content is stored asynchronously. In order to simultaneously be able to work with data and simultaneously save them to disk, PySyncObj uses CopyOnWrite through the fork of the process. After the fork, the parent and child process have shared memory. Data copying is performed by the operating system only in case of an attempt to overwrite this data.
')
PySyncObj is implemented entirely in Python (Python 2 and Python 3 are supported) and does not use any external libraries. Networking is done using select or poll, depending on the platform.
Examples
And now a few examples.
Key-value storage
class KVStorage(SyncObj): def __init__(self, selfAddress, partnerAddrs, dumpFile): conf = SyncObjConf( fullDumpFile=dumpFile, ) super(KVStorage, self).__init__(selfAddress, partnerAddrs, conf) self.__data = {} @replicated def set(self, key, value): self.__data[key] = value @replicated def pop(self, key): self.__data.pop(key, None) def get(self, key): return self.__data.get(key, None)
In general, all the same as with the counter. In order to periodically save data to disk, we create SyncObjConf and transfer to it fullDumpFile.
Callback
PySyncObj supports callbacks - you can create methods that return some values, they will automatically be thrown into the callback:
class Counter(SyncObj): def __init__(self): super(Counter, self).__init__('localhost:1234', ['localhost:1235', 'localhost:1236']) self.__counter = 0 @replicated def incCounter(self): self.__counter += 1 return self.__counter def onAdd(res, err): print 'OnAdd: counter = %d:' % res counter = Counter() counter.incCounter(callback=onAdd)
Distributed lock
An example is a bit more complicated - distributed lok. You can look at all code on
github , and here I will just describe the main aspects of its work.
Let's start with the interface. Lock supports the following operations:
- tryAcquireLock - an attempt to take a lock
- isAcquired - check whether the lock is taken or released
- release - release lok
The first possible implementation of the lock is a similar key-value store. If the lockA key has something, then the lock is taken, otherwise it is free, and we can take it ourselves. But not everything is so simple.
First, if we just use the kv-storage from the example above without any modifications, then the operations of checking for the presence of an element (checking whether the lock is taken) and writing the element (taking the lock) will not be atomic (that is, we can overwrite someone else’s ). Therefore, checking and locking should be a single operation that is implemented inside the replicable class (in this case, tryAcquireLock).
Secondly, in the event that one of the clients who took the lock falls, the lock will remain forever (well, or until the client rises and releases him). In most cases, this is undesirable behavior. Therefore, we will enter a timeout, after which lock will be considered released. You will also have to add an operation confirming the taking of a lock (let's call it ping), which will be called at intervals of timeout / 4, and which will prolong the life of the locks taken.
The third feature - the replicated classes must provide identical behavior on all servers. This means that they should not use within themselves any data that may differ. For example, the list of processes on the server, the value of random or time. Therefore, if we still want to use time, we will have to pass it as a parameter to all the methods of the class in which it is used.
With this in mind, the resulting implementation consists of two classes - LockImpl, which is a replicable object as well as Lock, a wrapper over it. Inside Lock, we automatically add the current time to all operations on LockImpl as well as perform periodic ping in order to confirm the locks taken. The resulting lock is just a minimal example that can be refined with the required functionality. For example, add callbacks informing us about taking and releasing a lock.
Conclusion
We use PySyncObj in the WOT Blitz project to synchronize data between servers in different regions. For example, for the counter remaining tanks during the event
IS-3 Defender . PySyncObj is a good alternative to existing data storage mechanisms in distributed systems. The main analogues are various distributed databases, such as
Apache Zookeeper ,
etcd, and others. In contrast, PySyncObj is not a DB. It is a lower level tool and allows replication of complex state machines. In addition, it does not require external servers and easily integrates into python applications. Among the shortcomings of the current version is potentially not the highest performance (now it is completely python code, there are plans to try to rewrite as c ++ extensions) as well as the lack of separation into the server / client part - sometimes it may be necessary to have a large number of node clients (often connecting / disconnects) and only a few constantly running servers.
Links