
Recently, we
wrote about how for the first time we faced the task of large-scale migration of user data between data centers and how we solved it.
This time we’ll take a closer look at how the photos of users were migrated and what data structures were used to limit the load on servers with photos.
Badoo users upload approximately 3 million photos daily. To store them, we have identified a special cluster of servers that also deal with resizing, watermarking, importing photos from other social networks and other file manipulations.
All the machines of this cluster can be divided into three groups. The first is the servers responsible for the quick delivery of photos to users (you can say, your own CDN implementation). In the context of migration, these servers will not be interesting to us. The second group is the repositories with disks, on which, in fact, are all the photos. And the third group is the servers providing the interface to the second group, let's conditionally call them photoservers. Disk arrays of storages are mounted on them via optical fiber, photos are loaded on the same machines, and all the scripts that perform any operations with files work here.
Thus, for PHP code it does not matter at all what disk of which repository the photo is on. All you need to do is transfer the user's photos from one photoserver to another and update this information in the database and some demons. It is important to note here that all photos of the user are always on the same photoserver.
Formulation of the problem
The total volume of all photos ever uploaded by our users is about 600 TB. This number includes both originals of photos, as well as a set of photos with resized, necessary for display in one way or another.
A rough estimate shows that if 190 million users downloaded 600 TB of data, then 1.5 million users from Thailand (the largest of the countries transferred between data centers) downloaded 4.7 TB. The bandwidth between our data centers at the time of the migration was 200 Mbps. By simple calculations, we get 55 hours to transfer all photos of users from Thailand. Naturally, this channel is already partially occupied by other data that is constantly circulating between data centers, and in fact it will take more than 55 hours. And our goal is to make it in 8 hours.
It would be possible to transfer only the originals of the photos and make “resize” for them on the new photoserver, but this would lead to an undesirable increase in the load on the processors. Therefore, we decided to transfer photos in advance, hoping that users will not have time to upload many new photos in the time required to transfer photos of the whole country.
That is, first we simply copy already existing photos, and during the migration of all user data (when the site is unavailable for it and it is impossible to change something in our photos), we check if there have been any changes after transferring the photos. If the changes were, then copy the photos again (or rather, make rsync), and only after that we will update the data in the databases and demons so that the user's photos will be shown and loaded on the new photo server.
As practice has shown, our expectations were met, and rsync had to be done a second time for a very small percentage of users.
Another limitation for us was the performance of disk storage. We found out that even having a channel of hundreds of terabits, we will not be able to use it to its fullest capacity, since photos are constantly being loaded onto the photo servers, various operations are performed with them. In addition, our CDN "reads" these photos from disks, and the additional load on reading can significantly slow down everyday operations. That is, the intensity of migration of photos should be artificially limited.
Implementation
The first thing that came to our mind was the queue in the database (we use MySQL), which will contain all the users whose photos need to be transferred. The queue will be processed in multiple processes. By limiting the number of processes per one photo server, we thereby solve the problem of limiting the load on the disks. The restriction on the total number of processes will allow us to regulate the load on the channel between data centers.
Suppose we have a table MigrationPhoto of the following structure:
CREATE TABLE MigrationPhoto ( user_id INT PRIMARY KEY, updated TIMESTAMP, photoserver VARCHAR(255),
First, we add all users whose photos we want to transfer to our table:
')
INSERT INTO MigrationPhoto (user_id,photoserver) VALUES (00000000, 'photoserver1')
The limitation on the total number of processes is easily achieved using the pcntl extension and is not of great interest, therefore, we will consider further one process involved in the transfer.
We need to provide a specific number of processes per photo server. First, let's figure out which users from which photoservers are in the queue. In order not to do
SELECT photoserver, COUNT (*) FROM MigrationPhoto each time, we will
create a separate table:
CREATE TABLE MigrationPhotoCounters ( photoserver VARCHAR(255) PRIMARY KEY, users INT )
We will fill it in when inserting each user into the MigrationPhoto table:
INSERT INTO MigrationPhotoCounters (photoserver, users) VALUES ('photoserver1', 1) ON DUPLICATE KEY UPDATE users = users + VALUES(users)
Or, after filling in the MigrationPhoto, we do this:
INSERT INTO MigrationPhotoCounters (bphotos_server, users) VALUES (SELECT photoserver, COUNT(*) AS users FROM MigrationPhoto)
Having such a table, at the start of each process we will do
SELECT photoserver FROM MigrationPhotoCounters WHERE users>0 ORDER BY RAND()
Having received the list of all photoservers, we will determine for which of them you can start the process so as not to exceed the limits. To do this, we put in the database a lock, whose name consists of the name of the photoserver and the sequence number of the process within this server:
$processNumber = null; foreach ($photoservers as $serverName) { for ($i = 1; $i <= PROCESSES_PER_SERVER; $i++) { $lock = executeQuery("SELECT GET_LOCK('migration" . $serverName . '_' . $processNumber . "', 0)"); if ($lock === '1') { $processNumber = $i; break; } } if ($processNumber) { $serverName = $serverName; $scriptName = 'migration' . $serverName . '_' . $processNumber; break; } }
Thus, in two nested loops we iterate through all the photo servers and process numbers for them. If all iterations of the inner loop are executed and the
$ processNumber variable is not defined, it means that a limit on the number of processes has been reached for this photoserver. If all iterations of the outer loop have been performed, then this limit has been reached for all photo servers that still have users to be transferred.
Suppose we selected the photoserver photoserver1, and this is the second process for it, that is, the process ID will be
$ scriptName = 'migration_photoserver1_2' .
Before proceeding further, we will return to the general queue those users who for one reason or another remained marked with the process identifier
($ scriptName) chosen by us during previous launches:
UPDATE MIgrationPhoto SET script_name = NULL WHERE done = 0 AND script_name = 'migration_photoserver1_2'
Mark a portion of users as processed by this process:
UPDATE SET MigrationPhoto script_name='migration_photoserver1_2' WHERE photoserver='photoserver1' AND done = 0 AND script_name IS NULL LIMIT 100;
Now let's take several users from the queue that correspond to the selected photoserver and are not yet processed by another process:
SELECT * FROM MigrationPhoto WHERE script_name='migration_photoserver1_2' AND done=0;
After that, we can be sure that the records we have selected are not processed by any other process.
We remember when we transferred photos of the user:
UPDATE MigrationPhoto SET updated=NOW() WHERE user_id = 00000000
Perform all the operations we need, they can be simplified as rsync. After successfully transferring the photos, we need to mark this in the database (for each of the selected users):
BEGIN; UPDATE MigrationPhotoCounters SET users = users - 1 WHERE photoserver = 'photoserver1'; UPDATE MigrationPhoto SET done = 1 WHERE user_id = 00000000; COMMIT;
It may happen that of the 100 users taken for processing, for some it will not be possible to carry out the transfer for a variety of reasons. These users need to be returned to the queue to transfer them later:
UPDATE MigrationPhoto SET script_name = NULL WHERE user_id IN (<failed_ids>)
Finishing the process:
SELECT RELEASE_LOCK('migration_photoserver1_2')
It would seem that this can be finished. But our scheme has a fundamental flaw.
Suppose that a process has started with
$ scriptName = 'migration_photoserver1_10' , while
PROCESSES_PER_SERVER = 10 . And this process fell without returning the users it took to the queue. In order for these users to be selected again, either the process with the same
$ scriptName must start again, or someone must put these users in the base
script_name = NULL . Running the process with the same
$ scriptName may not happen anymore.
For example, we have 100 photoservers in MigrationPhotoCounters, the limit on the total number of processes is 50, the limit on the number of processes per one is 10, then it is obvious that if at some point there were 10 processes per photoserver, then later this photoserver can receive only one process. Therefore, we will write another script that, for example, once per minute will set
script_name = NULL for those users whose processes are not currently running:
foreach ($photoservers as $serverName) { for ($i = 1; $i <= PROCESSES_PER_SERVER; $i++) { $lock = executeQuery("SELECT GET_LOCK('migration" . $serverName . '_' . $processNumber . "', 0)"); if ($lock === '1') { executeQuery("UPDATE MigrationPhoto SET script_name = NULL WHERE done = 0 AND script_name = 'migration" . $serverName . '_' . $processNumber . "'"); } } }
Now, even if the process crashes, its users will be available for processing by other processes. In addition, this will allow changing the limit on the number of processes per one photo server on the fly.
When the process is completed and the migration of all other user data begins, it is enough to check that the user's photos have not changed since the time specified in the updated MigrationPhoto table field. And if you change - just repeat rsync. It does not take a lot of time, since almost none of the users change all their photos for 2-3 days.
As a result, we had 63 photoservers from which we read photos, and 30 servers to which we wrote. All this was done by 80 processes, with a limit of no more than 3 processes per photo server. With such restrictions, traffic was 150 Mbps. Transferring photos for users from Thailand took a little less than three days. Given the amount of data, we got an excellent result.
Conclusion
Of course, our scheme allows extensions and improvements. For example, you can limit the number of processing processes for each photoserver individually. You can adjust this number depending on how many users of a particular photoserver are in the queue. You can add priorities, a progressive timeout for the next record processing (in case of the current failure), the maximum number of failed processing, logs, graphics, and much more.
But we wanted to convey the very idea of ​​parallel processing of a queue with given restrictions, which, of course, can be used not only to transfer files between servers.
Today we did not describe the processing of various errors and exceptions, as well as the mechanism for creating and maintaining a given number of processes, and the implementation of file transfer. But if it is interesting to you - ask, and we will answer in the comments.
Anton Stepanenko, Team Lead, PHP developer