Tsafin notes :
Before publishing my series of articles on MapReduce in Caché, it seemed to me important to voice this last year’s point of view from Adam Drake ’s article “Command-line tools can be 235x faster than your Hadoop cluster” . Unfortunately, the original article by Tom Heiden , which he refers to, has become unavailable on Tom’s site, but it can still be found in the archives . For completeness, I suggest to get acquainted with it too.Introduction
Visiting my favorite sites once again, I found a cool
Tom Hayden article about using
Amazon Elastic Map Reduce (EMR) and
mrjob to calculate win / lose statistics for a dataset with chess match statistics that he downloaded from
millionbase archive , and where he started playing using EMR. Since the data volume was only 1.75GB, describing 2 million chess games, I was skeptical about using Hadoop for this task, although its intentions were simple to play around and learn more closely, using a real example, the mrjob utility and the EMR infrastructure.
Initially, the statement of the problem (to find the result string in the file and calculate the alignment) was more suitable for threading through shell commands. I tried this approach with a similar amount of data on my laptop, and I got the result in about 12 seconds (i.e., the effective data processing speed was about 270MB / s), while processing in Hadoop took
26 minutes (i.e., the effective data processing 1.14MB / s).
')
When telling about the processing of such a data volume on a cluster of 7 c1.medium machines, Tom said that it took 26 minutes in a cluster and “perhaps this time is better than if I did it consistently on my machine, but it’s still slower than handled this through a tricky, multi-threaded application locally. "
This is absolutely true, although we note that even local sequential processing can easily beat these 26 stated minutes. Yes, Tom did this project just to have fun and have fun, but often many others use
Big Data (tm) tools for processing and analyzing such a “not very large” amount of data with which you can get results using tools and techniques and simpler and quickly.
One, the most underrated approach to data processing is the use of good old tools and shell constructions. The advantages of this approach are enormous, because creating a pipeline of data from shell commands, you run all the processing steps in parallel with minimal overhead. It’s as if you had your local
Storm cluster. You can even try to translate the concepts of Spouts, Bolts and Sinks into simple commands and pipelines in the shell. You can easily construct a pipeline for processing data from simple commands, and they will work faster than most tools from the
Big Data (tm) arsenal.
Also let's talk about the differences in streaming and batch approaches in data processing. Tom mentioned at the beginning of the article that if he downloaded more than 10,000 game results locally, then his [Python] program would crash out of memory very quickly. This was due to the fact that all game data was loaded first into memory for later analysis. However, with a more in-depth study of the problem, you can create a pipeline for streamlined data processing that will not actually consume memory.
The resulting pipeline that we created was more than 236 times faster than the Hadoop implementation, and almost did not eat up the memory .
Research data
The first step of the exercise is to download these PGN files. Since I had no idea what this format looks like, then I looked into
Wikipedia .
[Event "F/S Return Match"] [Site "Belgrade, Serbia Yugoslavia|JUG"] [Date "1992.11.04"] [Round "29"] [White "Fischer, Robert J."] [Black "Spassky, Boris V."] [Result "1/2-1/2"] (moves from the game follow...)
We are only interested in the results of the games, with 3 possible options. 1-0 will mean that White won, 0-1 - Black won and 1 / 2-1 / 2 means a draw in the game. There is also a result
- which means that the game is going on or was interrupted, but we ignore this case in our exercise.
Getting a datasetFirst of all we will try to download data about the games. This was not so easy, but after a short search in the network I found the
rozim Git repository in which there were many different sources of data about chess games for various periods of time. I used the code from this repository to compile a 3.46GB dataset, which is almost twice the amount of data used by Tom in his test. The next step is to put all this data into our pipeline.
Build a data processing pipeline
If you follow the narration and measure each step, then do not forget to clear the cache of the pages of your operating system in order to get more accurate timekeys when measuring data processing.Using commands from the shell is convenient for data processing, since You can get concurrency in the execution of commands for nothing. Check, for example, how this command will be executed in the terminal.
sleep 3 | echo ", "
Intuitively, it may seem that here we first linger for 3 seconds on the first team and then issue it to the Hello World terminal. But, in fact, both commands are executed at the same time. This is one of the main factors why we can get such acceleration on one machine for simple commands that do not load IO.
Before we start constructing the data processing pipeline, let's define the top performance stream by simply copying the data into / dev / null.
In this case, the operation takes 13 seconds, and for a 3.46GB array, this means 272MB / s. This will be our upper limit to the speed of reading from the disk.
Now we can start building the pipeline, and in the first step we will use cat to generate the data flow:
cat *.pgn
Since we are only interested in the lines with the results, we can scan the files and select only the lines containing the 'Result' using the grep utility:
cat *.pgn | grep "Result"
[
It is not clear why he didn’t immediately launch it as `grep Result * .pgn`? ]
This will give us only strings containing Result. Now, if you want, you can use the sort and uniq commands to get a list of unique elements and to count their number.
cat *.pgn | grep "Result" | sort | uniq -c
This is a very simple approach to analyze, and it gives us the result in 70 seconds. Of course, we can speed it up, but even now we note that, given the linear scaling, this would take about 52 minutes for processing on the Hadoop cluster.
To speed up, we can remove the sort | uniq out of the pipeline and replace them with an AWK call, which is a great tool for handling event data.
cat *.pgn | grep "Result" | awk '{ split($0, a, "-"); res = substr(a[1], length(a[1]), 1); if (res == 1) white++; if (res == 0) black++; if (res == 2) draw++;} END { print white+black+draw, white, black, draw }'
Here we take the entire record, divide the hyphen by the sign, and take the character immediately preceding from the left, if there is 0, then Black won, 1 is white, and 2 is a draw. Note that here $ 0 is a built-in variable representing the entire entry.
This reduces the execution time to about 65 seconds, and we process about 2 times more data, then this acceleration is about 47 times.
Even at the current step, we have acceleration 47 times with local execution. Moreover, the size of the memory used is almost zero, since only the current record data is stored, and incrementing 3 integer type variables is an insignificant operation in terms of memory consumption. However, htop shows that grep in this sequence is a bottleneck, and fully utilizes the processor time of one core.
Bottleneck parallelization
The problem of under-using additional kernels can be solved with the help of the remarkable xargs command, which can run grep in parallel. Since xargs expects input in a specific format, then we will use find with the argument -print0, so that the name of the file passed to xargs ends with zero. Accordingly, we pass -0 so that xargs on its side will wait for zero-terminated strings. The -n option controls the number of lines passed in one call, and -P means the number of parallel commands to be executed. It is important to note that such a parallel pipeline does not guarantee the order of delivery, but this is not a problem if you were counting on distributed processing initially. The option -F in grep means that we are looking for a simple string match, and do not ask for any confused regular expressions, which in theory can give an additional gain in speed, which was not noticed in the experiments
[
this statement is not quite true for our example, gnu grep builds a deterministic finite automaton, which in this simple expression will be equivalent to a simple string match. The only thing we gain is the compile time of the regular expression, which can be neglected in this case ].
find . -type f -name '*.pgn' -print0 | xargs -0 -n1 -P4 grep -F "Result" | gawk '{ split($0, a, "-"); res = substr(a[1], length(a[1]), 1); if (res == 1) white++; if (res == 0) black++; if (res == 2) draw++;} END { print NR, white, black, draw }'
As a result, we get a time of 38 seconds, which gave a “weld” of 40% only due to the parallelization of the launch of grep in our pipeline. Now we are about 77 times faster implementation on Hadoop.
Although we have improved performance "dramatically" by parallelizing the step with grep, we can generally get rid of it by making awk itself look for the necessary records and work only with those that contain the string "Result".
find . -type f -name '*.pgn' -print0 | xargs -0 -n1 -P4 awk '/Result/ { split($0, a, "-"); res = substr(a[1], length(a[1]), 1); if (res == 1) white++; if (res == 0) black++; if (res == 2) draw++;} END { print white+black+draw, white, black, draw }'
This may seem like the right decision, but it gives the results for
each file separately. Whereas we need a common, aggregated result. The correct implementation of aggregation is conceptually very similar to MapReduce:
find . -type f -name '*.pgn' -print0 | xargs -0 -n4 -P4 awk '/Result/ { split($0, a, "-"); res = substr(a[1], length(a[1]), 1); if (res == 1) white++; if (res == 0) black++; if (res == 2) draw++ } END { print white+black+draw, white, black, draw }' | awk '{games += $1; white += $2; black += $3; draw += $4; } END { print games, white, black, draw }'
By adding a second awk call, we end up with the desired aggregate information about the games.
This further increases the final speed, reducing the launch time to 18 seconds, which is 174 times faster than the Hadoop implementation.
However, we can speed it up a bit more by using
mawk , which completely replaces gawk, while maintaining the same launch options, but providing better performance.
find . -type f -name '*.pgn' -print0 | xargs -0 -n4 -P4 mawk '/Result/ { split($0, a, "-"); res = substr(a[1], length(a[1]), 1); if (res == 1) white++; if (res == 0) black++; if (res == 2) draw++ } END { print white+black+draw, white, black, draw }' | mawk '{games += $1; white += $2; black += $3; draw += $4; } END { print games, white, black, draw }'
Thus, the pipeline using output redirection between
find | xargs mawk | mawk allowed processing in 12 seconds, which is about 270 MB / s, and that is more than 235 times faster than the implementation on Hadoop.
Conclusion
Hopefully, we were able to argue several of our ideas regarding the vicious practice of using Hadoop tools to process a not very large set of data, instead of processing them on the same computer using ordinary shell commands and tools. If you have a really gigantic dataset or really need distributed computing, then of course you may need to use [heavy] Hadoop tools, but more often we see a situation where Hadoop is used instead of traditional relational databases, or where other solutions are both faster and cheaper and easier to maintain.