Recently, there have been frequent discussions in smoking rooms on the topic of comparing the performance of various data storage formats in Apache Hadoop - including CSV, JSON, Apache Avro and Apache Parquet. Most participants immediately dismiss text formats as obvious outsiders, leaving the main intrigue to the contest between Avro and Parquet.
The prevailing opinions were unconfirmed rumors that one format looked “better” when working with the whole dataset, while the second “better” handled requests for a subset of columns.
Like any self-respecting engineer, I thought that it would be nice to conduct full-fledged performance tests to finally check on which side the truth is. The result of the comparison is under the cut.
Translator's Note:
Initially, the article was conceived as a free translation of Don Drake's text ( @dondrake ) for the Cloudera Engineering Blog on the experience of comparing Apache Avro and Apache Parquet using Apache Spark . However, in the process of translation, I went deep into details and found a lot of controversial points in the tests. I added a subtitle to the article, and the text provided comments with maliciously inaccuracies.
I thought that for tests it would be correct to use real data and real requests. In this case, it can be expected that the performance in the production environment will behave similarly to the test one. In other words, for the test, it is not enough to count the lines on the surrogate data.
The choice of "real data" and "real requests" for the test seems to be a highly controversial idea, since Everyone has different real data and requests. To solve this problem, typical storage performance tests are synthesized, for example, TPC Benchmarks .
I rummaged through the datasets I recently worked with and found two excellent ones for the test. The first of them, let's call it "narrow", consists of only three columns and contains 82.3 million lines, which in the CSV is 3.9 GB.
As will be seen below, this will result in 750-1000 MB of serialized data, and it will be processed into 50 workers. Each worker will get 15-20 MB of data. Most likely, the initialization of the worker will take longer than reading and processing data.
The second one, let's call it “wide”, contains 103 columns and 694 million lines, which gives a CSV file of 194 GB in size. I think this approach will allow us to assess which format works best with large and small files.
"Wide" dataset is not only 30 times wider, but 8 times longer. And 49 times the original size. It is more correct to call datasets "small" and "large".
In addition, judging by the size ratio, it seems that columns of different types are represented in datasets. In this paper, differences in data types are generally ignored. Meanwhile, this is a key aspect of the storage format.
I chose Apache Spark 1.6 as a workhorse for tests. Spark supports Parquet out of the box, support for Avro and CSV is connected separately. All operations were performed on a CDH 5.5.x cluster of 100+ machines.
I was interested in measuring the performance of formats on various types of processing - downloads, simple requests, non-trivial requests, processing of a whole dataset, as well as the amount of disk space used.
I ran tests through a spark-shell
with the same configuration for both datasets (the only difference was in the number of executors). Shell mode :paste
saved my life by allowing me to copy the Scala code directly into the REPL, without worrying about multi-line commands that might confuse the interpreter.
#!/bin/bash -x # Drake export HADOOP_CONF_DIR=/etc/hive/conf export SPARK_HOME=/home/drake/coolstuff/spark/spark-1.6.0-bin-hadoop2.6 export PATH=$SPARK_HOME/bin:$PATH # use Java8 export JAVA_HOME=/usr/java/latest export PATH=$JAVA_HOME/bin:$PATH # NARROW NUM_EXECUTORS=50 # WIDE NUM_EXECUTORS=500 spark-shell —master yarn-client \ —conf spark.eventLog.enabled=true \ —conf spark.eventLog.dir=hdfs://nameservice1/user/spark/applicationHistory \ —conf spark.yarn.historyServer.address=http://yarnhistserver.allstate.com:18088 \ —packages com.databricks:spark-csv_2.10:1.3.0,com.databricks:spark-avro_2.10:2.0.1 \ —driver-memory 4G \ —executor-memory 2G \ —num-executors $NUM_EXECUTORS \ ...
I took the query execution time from the Job tab in the Spark Web UI. I repeated each test three times and then calculated the average time. Requests to a narrow dataset were performed on a relatively loaded cluster, while requests to a wide dataset were executed during the moments of complete cluster idleness. It did not happen on purpose, rather, it is a coincidence.
Using different environments between tests (including a different number of workers and a different cluster workload) makes it impossible to compare absolute values.
The use of a loaded cluster in itself negatively affects the reproducibility of the measurement results when it is restarted — they simply cannot be trusted.
A three-time repetition of the experiment looks statistically frivolous - the confidence interval of the assessment will be very large. However, the author does not even mention the confidence intervals.
When reading a narrow dataset from CSV, I didn’t draw diagrams, but I had to convert a String
column to a Timestamp
. I did not include the time for this conversion in the result, since it does not apply to storage formats. When working with a wide dataset, I used the output of the circuit, but I also did not take into account the time for this.
The schema output (in the original - infer schema ) is implicit conversion fromRDD
toDataFrame
using Reflection.
During the testing process, I was surprised to learn that you cannot save an Avro file with a Timestamp column. In fact, Avro versions 1.7.x basically does not support either Date
or Timestamp
.
Avro 1.8 supports the logical typesDate
,Timestamp
and their derivatives. In essence, they are just a wrapper overint
orlong
.
To begin with, I estimated the time over which a narrow dataset can be written to disk in Avro or Parquet format. I counted only the effective time to write, after the data was read into the data frame. It turned out the difference within the statistical error. Thus, the recording performance of a narrow dataset is approximately the same for both formats.
The serialization time was incredibly large, even taking into account the possible network overhead and so on - after all, one worker has less than 20 MB of output.
It looks as if the author incorrectly separated the time for reading and processing and the time for writing. In this case, it may well turn out that most of this time is reading a 4-gigabyte CSV file, perhaps even in one stream. And everything else takes 5-10 seconds.
The recording time on a narrow dataset disk, in seconds (the smaller, the better):
After that, I looked at how long it took to calculate the number of rows in a narrow dataset. Avro and Parquet worked equally fast [^ fast-row-count]. For comparison and to intimidate readers, I also calculated the time for counting uncompressed CSV.
Parquet files contain the number of objects in the block in the metadata. With this ratio of data volume per worker each gets no more than one Parquet block. Thus, for counting, it is enough for everyone to read one number, and then make a general reduce to get the total amount.
For Avro, the task is much more complicated - Avro blocks also contain the number of objects in a block, but the blocks themselves are much smaller ( 64 KB by default ), and the file contains many blocks. Theoretically, the counting time for all objects in the avro file should be longer. In practice, for such small files, the difference can be overlooked.
To count the number of lines in a CSV file, you must fully read this file, as is the case with Avro. If you shard a 4 GB file correctly, each worker has 80 MB of data, which can be read in a few seconds. However, the author’s reading process takes 45 seconds, which testifies to the fact that the file is not parallelized enough.
The time of counting the number of rows in a narrow dataset, in seconds (the smaller, the better):
After I tried a more complex query with GROUP BY
grouping. One of the columns in this dataset is a time stamp, and I calculated the amount on another column for each day. Since Avro does not support Date
and Timestamp
, I had to correct the request in order to get a similar result.
Parquet Request:
val sums = sqlContext.sql("""select to_date(precise_ts) as day, sum(replacement_cost) from narrow_parq group by to_date(precise_ts) """)
Request for Avro query:
val a_sums = sqlContext.sql("""select to_date(from_unixtime(precise_ts/1000)) as day, sum(replacement_cost) from narrow_avro group by to_date(from_unixtime(precise_ts/1000)) """)
For a request with the Parquet grouping, Avro turned out to be 2.6 times faster:
Next, I decided to perform a .map()
DataFrame
on a DataFrame
e to simulate the processing of the entire dataset. I chose a transformation that counts the number of columns in a row and collects all their unique values.
def numCols(x: Row): Int = { x.length } val numColumns = narrow_parq.rdd.map(numCols).distinct.collect
The.distinct()
operation significantly complicates the task. For simplicity, we can assume that it adds the reduce phase to the process, which in itself means that not only the.map()
is measured for the entire dataset, but also the overhead to the data exchange between the workers.
This is not exactly the task that will be performed during actual data processing, but nevertheless, it forces the entire dataset to process. And again Parquet turns out to be almost 2 times faster Avro:
The last thing that needs to be done is to compare the dimensions of the dataset on the disk. The graph shows the size in bytes. Avro was configured to use the Snappy compression codec, and the default settings were used for Parquet.
Dataset in Parquet turned out to be less than 25% Avro.
Using the default compression settings and not even looking at them is a very bad practice.
However, Parquet uses gzip by default . Gzip squeezes noticeably stronger than Snappy. Suddenly the difference in size is due exclusively to different codecs? For correct comparison, you need to calculate the dimensions of datasets using the same compression or without it at all.
Also for fairness it should be noted that usually a text file can be compressed at times. I admit that aggressively gzip-avanny source CSV-file will take no more than 1.5 GB. So the advantage of binary formats will not be so dramatic.
I performed similar operations on a large "wide" dataset. Let me remind you that this dataset contains 103 columns and 694 million lines, which translates into 194 GB of uncompressed CSV file.
And I, running ahead, will inform you that this translates into 5 GB Parquet and 17 GB Avro. With 500 workers, we have a load of 100 MB for Parquet or 340 MB for Avro. By compact storage won, of course, Parquet. But in Avro files more blocks came out, which means that their processing speed can be increased by increasing the number of workers. So, if the cluster is not loaded into the ceiling and the number of workers is dynamically calculated, Avro can achieve better performance than in these tests.
First, I measured the time to save a wide dataset in both formats. Parquet every time was faster Avro:
In counting the number of lines, Parquet utterly crashed Avro, producing a result in less than 3 seconds:
Parquet by default uses a block size of 128 MB , which is larger than the average amount of data per worker. Thus, when working with Parquet, the trick from the “narrow” dataset continues to operate, when to calculate the number of rows in a dataset, it suffices to read one number from the metadata.
For Avro files, you have to do a full dataset reading, interpreting only the metadata of each block and skipping (without deserializing) the data itself. This translates into a "real" disc work. For CSV, the situation is even worse - there you have to parse every byte.
For more complex GROUP BY
queries, Parquet again takes the lead:
Here it is worth remembering that it is possible to run 3.4 times more workers for Avro. Will Parquet then keep the lead?
And even for .map()
transformations of the entire dataset Parquet wins again with a convincing margin:
And here it is also worth remembering that it is possible to run 3.4 times more workers for Avro. And what share in the operation time of the operation does .distinct()
, and which share is actually reading from the disk?
The last test, a test of disk utilization efficiency, showed impressive results for both participants. Parquet was able to compress the original 194 GB to 4.7 GB, providing a grand compression above 97%. Avro also showed impressive results, compressing data to 16.9 GB (91% compression). Applaud to both participants:
As a result, Parquet showed at least not the worst performance on each test. With increasing data volume, its advantage became apparent. Parquet partially owes its good results to better compression efficiency, since Avro had to read 3.5 times more than Parquet. And Avro did not show that high performance when reading the entire dataset, which attributed to him rumor.
When you have to choose the storage format in Hadoop, you need to take into account many factors, such as integration with third-party applications, circuit evolution, support for specific data types ... But if you put performance at the forefront, then the tests above clearly show that Parquet is your best choice .
And from myself add. This is a good measure of the performance of formats. It is confirmed by numerous isolated observations from the industrial experience of our team. Nevertheless, the testing methodology distorts measurements with outside actions (reading CSV, GROUP BY', '.distinct()
, ...), and sometimes completely ignores important issues (compression, data formats, ...). I realize that it is not easy to do a canonical test with "distilled" metrics. But from Cloudera's blog, I was expecting that.
Source: https://habr.com/ru/post/282552/
All Articles