📜 ⬆️ ⬇️

How to handle terabytes of data in 1000 streams in PHP - Hadoop / MapReduce

Hello!

Already heard about Bigdata ? Well, yes, the web is growing, data is becoming more and they need to be kept under control and periodically analyzed. Databases - burst under load, relational theory does not quite cope with the problem, you need a solution. Marketing actively presses from above, and iron with sharp corners - from below and smacks of suicide.

In this post I will try to give specific working recipes and pieces of code with brief theoretical conclusions, how to handle> = terabytes in> = 1000 threads in PHP. To be able to take and solve the problem, without losing time and not scoring a head theory.
')
However, if suddenly it became nauseated and dizzy, you can no longer read - and admire the beautiful birds and forget about the above. But be on the alert, Bigdata may take a knock on the door tomorrow ;-)



As usual done



As usually happens on the web. Add the data to the database until it crashes. If it bursts, conversations about MySQL sharding, partitioning begin, remember a multi-master cluster in RAM.

If it does not help, begin the search and implementation of NoSQL solutions like redis or cloud services like DynamoDB . Not bad proved itself as an effective search engine for volumetric data Sphinx .

Subconsciously, the calculation is going to be saved in the database and then we will analyze the information. And it often works. But not always ... and this "not always" becomes more frequent.

More data, online analytics required



It is not always possible to answer the business - wait for a day, analyze logs / data and give tsiferki. It is often important for business to have tsiferki online, to manage the situation with instruments with live arrows.


It is terrible to imagine the control of the aircraft by analyzing the information recorded in black boxes once a day in a hotel for pilots :-)



When the data flow becomes even more intense or business logic requires the availability of current information on data that has not yet been processed ... Then we are helped by the tools of "stream analysis" of the type:
1) pinba
2) Amazon Kinesis
3) Stream parsers based on nginx / ragel

It is useful at least once to understand each of these invaluable tools with a piece of paper and a pencil, even more useful to “sleep” with the manual and prototype at least overnight.


I especially want to highlight here pinba for ease of setup and ease of operation and the minimum load created. To organize the collection of statistics on the performance of a web application in the browser of its clients based on js Navigation Timing API - done in 2 PHP files on 30 lines.

When it is not possible to analyze the data online, the search for a solution to the parallel analysis of the accumulated data and the associated algorithms begins.

Parallel array processing



There is a list of objects, let's say these are files in the s3 cloud, of which you have tens of millions. No matter how much we trust the cloud, you need to periodically upload these files to another cloud / server. Each file is encrypted, compressed, other operations occur and copied.

There are many similar problems in nature:


These tasks fall under the general divide-and-conquer algorithm:
- distribute puzzles to pieces
- each part is processed separately and in parallel with other parts
- combine the results through aggregation


For PHP, you can try to solve this problem using a queue such as RabbitMQ and / or Gearman - but you have to tinker a lot to solve exceptions, sharding the shared file system, clustering on 20 servers, etc.

Therefore, if your task can be solved in 30 PHP threads on a single server, the listed tools are usually sufficient. However, if you are “unlucky” and you need to process several terabytes in an hour and give how much iron you will carry, there is a way out :-)

Yes, yes, of course this is Hadoop , which implements the MapReduce paradigm that correlates with the photos of girls above ;-)

Who is too lazy to read further and want to know the recipe, here is an example of the original problem and its solutions on Hadoop:

It is necessary to compress, encrypt and transfer 10 million files from baket1 s3 to baket2 s3.
If you do it using PHP on the server, then you can fork a maximum of up to 20-30 PHP threads, which each will be executed in its process. And it will take several weeks. And the amount of data is growing and you need a system solution.
If the same thing is done using Hadoop, then the task can be completed in an hour, but on a large number of hardware. If you choose a reasonable number of glands with 15 threads each - then you can keep within 2 days.
Those. if in six months the number of files for processing grows from 10 million to 50 million, you will need to change only one dial in the Hadoop cluster launch configuration, increasing the number of pieces of hardware only.
Is not it beautiful and systemic? :-)


Hadoop



In general, this is a rather large product, and there probably won't be enough weeks for reading manuals 24/7 - but this is not required. We will learn to use this technology efficiently and quickly, saving you and our time.

Installation


In addition to installing java-software, you will need to configure the cluster file system. Why - and how will the cluster nodes exchange shared files? But we’ll get smarter - we’ll launch the Hadoop cluster in Amazon . There everything is already configured and installed.

Preparing map and reduce scripts


Here is the most interesting post. Hadoop allows you to use scripts in any language - and to sort the file in bash or processing in PHP / Python / Perl.

Scriptists read from standard input and write to standard output. Well, what could be easier?

Skriptik should be 2: mapper, reducer.

If you just need to parallelize the task on N servers, just write one mapper.

Mapper example


#!/usr/bin/php <?php error_reporting(-1); set_time_limit(0); ini_set('memory_limit', '2048M'); gc_enable(); require '/usr/share/php/aws.phar'; $fp=fopen("php://stdin","r"); while (true) { $line=stream_get_line($fp,65535,"\n"); //    : , , ,  ... } echo "s3 copied direct\t".$copy_count."\n"; echo "s3 copied precond\t".$copy_precond_count ."\n"; echo "s3 src not found\t".$s3_src_not_found ."\n"; 


If aggregated statistics is not needed, the second script is not needed. If needed, write a reducer:

Example reducer


 #!/usr/bin/php <?php error_reporting(-1); ini_set('memory_limit', '1024M'); set_time_limit(0); gc_enable(); $ar_reduce = array(); while (($line = fgets(STDIN)) !== false) { $line = str_replace("\n","",$line); $ar_line = explode("\t", $line); if ( !isset($ar_reduce[$ar_line[0]]) ) $ar_reduce[$ar_line[0]] = 0; $ar_reduce[$ar_line[0]] += intval($ar_line[1]); } foreach ($ar_reduce as $key=>$value) { echo $key."\t".$value."\n"; } ?> 


Initializing Cluster Servers


Since our PHP scripts, you need to prepare an initialization script that runs on each server in the cluster:
 sudo apt-get -y update sudo apt-get -y install libssh2-php sudo apt-get -y install php5-curl sudo rm -f /etc/php5/cli/conf.d/suhosin.ini sudo mkdir -p /usr/share/php cd /usr/share/php sudo wget https://github.com/aws/aws-sdk-php/releases/download/2.5.0/aws.phar ... 


Uploading scripts in PHP and bash to the cloud (s3)


 for FILE in bkp_s3_folder_hadoop_bootstrap.sh bkp_s3_folder_hadoop_mapper.php bkp_s3_folder_hadoop_reducer.php; do s3cmd -c /root/.s3cfg-key put /home/project/cron_jobs/$FILE s3://# #/code/ done 


Data upload for processing in s3


Simply, for example with the help of s3cmd, we unload the initial data for processing into a folder in s3. This data will then spread over the cluster automatically. You can upload as many data as you like and let the cluster suffer with them.

Starting data processing in a cluster


And finally, such a tasty treat - we launch a cluster to process our data.

 D=$(date +"%Y-%m-%d_%H-%M-%S") /opt/aws/emr/elastic-mapreduce --create --stream \ --name myproject_$D \ --step-name step_$D \ --with-termination-protection \ --step-action CANCEL_AND_WAIT \ --ami-version '2.4.2' \ --bootstrap-action '#    , . #' \ --bootstrap-action 's3://elasticmapreduce/bootstrap-actions/configure-hadoop' \ --args "-m,mapred.map.max.attempts=20,-m,mapred.tasktracker.map.tasks.maximum=15,-m,mapred.task.timeout=600000" \ --input 's3://#     #/input/' \ --mapper 's3://# #/code/# mapper#.php' \ --reducer 's3://# #/code/# reducer#.php' \ --output 's3://#  #/output_'$D \ --log-uri 's3://#  #/logs/' \ --num-instances 5 \ --master-instance-type m1.small \ --slave-instance-type m1.xlarge \ --key-pair 'myproject_mapreduce' 


Here it is important to choose the correct number of pieces of iron for the reproduction of a cluster - the more, the faster of course. In this example, we install no more than 15 processes on one server. It can be more, it depends on the amount of RAM, but carefully - we monitor its consumption.

After the cluster has been worked out, it will be possible to see the aggregated statistics in the logs, the logs will also be uploaded to s3.

Usually, the processing speed that has been done for weeks - amazes, inspires and takes a new level of awareness of IT-continuum better than the last part of “300 Spartans” :-)


Results



As a result, you have a business tool that is managed by 2 scripts in PHP. The number of servers (--num-instances 5) directly affects the processing speed of the loaded data array. In principle, no one forbids starting up 100 servers with 10 threads on each and processing data much faster than it could be done on a single server using a queue of tasks.

Using this technology in a simple and understandable way, we have reduced the processing time of tens of millions of objects in s3 from weeks to 2 days on one of our projects .

Colleagues, if you have questions, please ask in the comments and attend our conferences - we will be happy to share our experience. And good luck to all in the implementation of web projects and victories over Bigdata!

Source: https://habr.com/ru/post/218003/


All Articles