📜 ⬆️ ⬇️

Introduction to Data Engineering. ETL, star schema and airflow

The ability of the data scientist to extract value from data is closely related to how well the data storage and processing infrastructure is developed in the company. This means that the analyst should not only be able to build models, but also have sufficient skills in the field of data engineering to meet the needs of the company and take on more and more ambitious projects.

At the same time, despite all the importance, education in the field of data engineering continues to be very limited. I was lucky, because I managed to work with many engineers who patiently explained to me every aspect of working with data, but not all have this capability. That is why I decided to write this article - an introduction to data engineering, in which I will talk about what ETL is, the difference between SQL and JVM-oriented ETL, normalization and partitioning of data, and finally consider an example query in Airflow.



Data Engineering


Maxime Beauchemin, one of the developers of Airflow, described data engineering as follows: “This is an area that can be viewed as a mixture of business intelligence and databases that brings in more programming elements. This area includes a specialization in working with distributed big data systems, an expanded Hadoop ecosystem, and scalable computing. "
')
Among the many skills of a data engineer, we can single out one that is most important — the ability to develop, build, and maintain data warehouses. The lack of a quality data storage infrastructure means that any activity related to data analysis is either too expensive or unscalable.

ETL: Extract, Transform, Load


Extract, Transform, and Load are 3 conceptually important steps that determine how most modern data pipelines are arranged. Today, this is the basic model of how raw data is made ready for analysis.



Extract. This is the step at which sensors take input from various sources (user logs, copies of a relational database, external data set, etc.), and then pass them on for subsequent transformations.

Transform. This is the “heart” of any ETL, the stage when we apply business logic and do filtering, grouping and aggregation to convert raw data into analysis data ready for analysis. This procedure requires an understanding of business objectives and the availability of basic knowledge in the field.

Load. Finally, we load the processed data and send it to the place of final use. The resulting data set can be used by end users, or it can be an input stream to another ETL.

What ETL framework to choose?


There are several open source platforms in the world of batch data processing that you can try to play with. Some of them are: Azkaban is an open-source workflow manager from Linkedin, which features lightweight dependency management in Hadoop, Luigi is a framework from Spotify, based on Python and Airflow, which is also based on Python, from Airbnb.

Each platform has its pros and cons, many experts try to compare them (see here and here ). When choosing a particular framework, it is important to consider the following characteristics:



Configuration. ETLs are by their nature quite complex, so it’s important how the user of the framework will design them. Is it based on the user interface or are the queries created in any programming language? Today, the second method is gaining more and more popularity, because programming of the pipelines makes them more flexible, allowing you to change any detail.

Monitoring errors and alerts. Bulk and long batch requests sooner or later fall with an error, even if there are no bugs in the job itself. As a result, monitoring and error reporting come to the fore. How well does the framework visualize query progress? Do alerts arrive on time?

Backfilling data. Often, after building the finished pipeline, we need to go back and re-process historical data. Ideally, we would not like to build two independent jobs, one for the reverse of historical data and one for current activities. How easy is backfilling with this framework? Is the solution obtained scalable and effective?

2 paradigms: SQL vs. JVM


As we found out, companies have a huge choice of which tools to use for ETL, and for the novice data scientist, it’s not always clear what framework to devote time to. This is about me: in Washington Post Labs, the sequence of jobs was carried out primitively, with the help of Cron, on Twitter, ETL jobs were built in Pig, and now in Airbnb we write pipelines to Hive via Airflow. Therefore, before you go to this or that company, try to find out exactly how ETL is organized in them. Simplified, two main paradigms can be distinguished: SQL and JVM-oriented ETL.

JVM-oriented ETL is usually written in a JVM-oriented language (Java or Scala). Building data pipelines in such languages ​​means defining data transformations through key-value pairs, however, it becomes easier to write user-defined functions and test jobs, since there is no need to use another programming language to do this. This paradigm is very popular among engineers.

SQL-oriented ETL is most often written in SQL, Presto or Hive. In them, almost everything revolves around SQL and tables, which is very convenient. At the same time, writing custom functions can be problematic, since it requires the use of another language (for example, Java or Python). This approach is popular among data scientists.

Having worked with both paradigms, I still prefer SQL-oriented ETL, because, being a beginner data scientist, it is much easier to learn SQL than Java or Scala (if, of course, you are not familiar with them) and concentrate on learning new practices than to impose it on top of learning a new language.

Data modeling, normalization and star schema


In the process of building a high-quality analytical platform, the main goal of the system designer is to make analytical queries easy to write, and various statistics considered effective. To do this, first of all you need to determine the data model.

As one of the first steps in data modeling, it is necessary to understand the extent to which tables should be normalized . In the general case, normalized tables are distinguished by simpler schemes, more standardized data, and also exclude some types of redundancy. At the same time, the use of such tables leads to the fact that to establish the relationship between the tables requires more accuracy and diligence, queries become more difficult (more JOIN-s), and also need to support more ETL jobs.

On the other hand, it is much easier to write queries to denormalized tables, since all dimensions and metrics are already connected. However, given the larger size of the tables, data processing becomes slower (“You can argue, because everything depends on how the data is stored and what requests are. For example, you can store large tables in Hbase and access individual columns, then the queries will be fast ”- lane comment).

Among all the data models that are trying to find the perfect balance between the two approaches, one of the most popular (we use it in Airbnb) is the star scheme . This scheme is based on the construction of normalized tables (fact tables and dimension tables), from which, in which case, denormalized tables can be obtained. As a result, this design attempts to strike a balance between the ease of analytics and the complexity of ETL support.



Fact tables and dimension tables


To better understand how to build denormalized tables from fact tables and dimension tables, we will discuss the roles of each of them:

Fact tables often contain transactional data at specific points in time. Each row in a table can be extremely simple and most often is a single transaction. We have a lot of fact tables in Airbnb that store data by type of transaction: booking, ordering, cancellation, etc.

Dimension tables contain slowly changing attributes of certain keys from the fact table, and they can be connected to it by these keys. The attributes themselves can be organized within a hierarchical structure. Airbnb, for example, has dimension tables with users, orders, and markets that help us analyze data in detail.

Below is a simple example of how fact tables and dimension tables (normalized) can be combined to answer a simple question: how many bookings have been made in the last week for each of the markets?

SELECT b.dim_market , SUM(a.m_bookings) AS m_bookings FROM ( SELECT id_listing , 1 AS m_bookings , m_a # not used (for illustration only) , m_b # not used (for illustration only) , m_c # not used (for illustration only) FROM fct_bookings WHERE ds BETWEEN '{{ last_sunday }}' AND '{{ this_saturday }}' ) a JOIN ( SELECT id_listing , dim_market , dim_x # not used (for illustration only) , dim_y # not used (for illustration only) , dim_z # not used (for illustration only) FROM dim_listings WHERE ds BETWEEN '{{ latest_ds }}' ) b ON (a.id_listing = b.id_listing) GROUP BY b.dim_market ; 

Timestamp Partitioning


Now, when the cost of storing data is very small, companies can afford to store historical data in their vaults, rather than throwing it away. The flip side of this trend is that with the accumulation of data, analytical queries become ineffective and slow. Along with such SQL principles as “filter data more often and earlier” and “use only those fields that are needed”, we can single out another one that allows increasing the efficiency of queries: data partitioning .

The basic idea of ​​partitioning is quite simple - instead of storing data in one piece, we divide them into several independent parts. All parts retain the primary key from the original piece, so you can access any data fairly quickly.

In particular, the use of a timestamp as a key for which the partitioning takes place has several advantages. First, in S3-type repositories, raw data is often sorted by timestamp and stored in directories, also labeled. Secondly, the batch-ETL job usually takes about one day, that is, new data partitions are created every day for each job. Finally, many analytical queries include counting the number of events that have occurred over a certain time period, so partitioning over time is very useful here.

Backfilling of historical data


Another important advantage of using a timestamp as a partitioning key is the ease of backfilling data. If the ETL pipeline is already built, then it calculates the metrics and measurements in advance, not retrospectively. Often we would like to look at the established trends by calculating measurements in the past - this process is called backfilling .

Backfilling is so common that Hive has built-in dynamic partitioning to perform the same SQL queries across several partitions at once. We illustrate this idea with an example: let it be required to fill in the number of bookings for each market for dashboards, starting with earliest_ds and ending with the latest_ds . One of the possible solutions looks like this:

 INSERT OVERWRITE TABLE bookings_summary PARTITION (ds= '{{ earliest_ds }}') SELECT dim_market , SUM(m_bookings) AS m_bookings FROM fct_bookings WHERE ds = '{{ earliest_ds }}' GROUP BY dim_market ; # after many insertions from '{{ earliest_ds + 1 day }}' to '{{ latest_ds - 1 day }}' INSERT OVERWRITE TABLE bookings_summary PARTITION (ds= '{{ latest_ds }}') SELECT dim_market , SUM(m_bookings) AS m_bookings FROM fct_bookings WHERE ds = '{{ latest_ds }}' GROUP BY dim_market ; 

Such a query is possible, but it is too cumbersome, since we perform the same operation, only on different partitions. Using dynamic partitioning, we can simplify everything to a single query:

 INSERT OVERWRITE TABLE bookings_summary PARTITION (ds) SELECT dim_market , SUM(m_bookings) AS m_bookings , ds # For Hive to know we are using dynamic partitions FROM fct_bookings WHERE ds BETWEEN '{{ earliest_ds }}' AND '{{ latest_ds }}' GROUP BY dim_market , ds ; 

Note that we added ds to the SELECT and GROUP BY expressions, extended the range in the WHERE operation, and changed the syntax from PARTITION (ds = '{{ds}}}) to PARTITION (ds) . The beauty of dynamic partitioning is that we wrapped GROUP BY ds around the necessary operations to insert query results into all partitions in one run. This approach is very effective and is used in many Airbnb pipelines.

Now, consider all the concepts studied on the example of ETL Jobs in Airflow.

Directed Acyclic Graph (DAG)


It would seem that from the point of view of the ETL idea, the Jobs are very simple, but in fact they are often very confusing and consist of many combinations of Extract, Transform, and Load operations. In this case, it is very useful to visualize the entire data stream using a graph in which the node displays the operation, and the arrow shows the relationship between the operations. Given that each operation is performed once, and the data goes further along the graph, it is directional and acyclic, hence the name.



One of the features of the Airflow interface is the presence of a mechanism that allows you to visualize the pipeline data through the DAG. The author of the pipeline must define the interrelationships between operations so that Airflow writes the ETL Jobe specification to a separate file.

At the same time, in addition to DAGs, which determine the order in which operations are launched, there are operators in Airflow that specify what needs to be done within the pipeline. Usually there are 3 types of operators, each of which simulates one of the stages of the ETL process:


Simple example


Below is a simple example of how to declare a DAG file and define the structure of a graph using the operators in Airflow, which we discussed above:

 """ A DAG docstring might be a good way to explain at a high level what problem space the DAG is looking at. Links to design documents, upstream dependencies etc are highly recommended. """ from datetime import datetime, timedelta from airflow.models import DAG # Import the DAG class from airflow.operators.sensors import NamedHivePartitionSensor from airflow.operators.hive_operator import HiveOperator ### You can import more operators as you see fit! # from airflow.operators.bash_operator import BashOperator # from airflow.operators.python_operator import PythonOperator # setting some default arguments for the DAG default_args = { 'owner': 'you', 'depends_on_past': False, 'start_date': datetime(2018, 2, 9), } # Instantiate the Airflow DAG dag = DAG( dag_id='anatomy_of_a_dag', description="This describes my DAG", default_args=default_args, schedule_interval=timedelta(days=1)) # This is a daily DAG. # Put upstream dependencies in a dictionary wf_dependencies = { 'wf_upstream_table_1': 'upstream_table_1/ds={{ ds }}', 'wf_upstream_table_2': 'upstream_table_2/ds={{ ds }}', 'wf_upstream_table_3': 'upstream_table_3/ds={{ ds }}', } # Define the sensors for upstream dependencies for wf_task_id, partition_name in wf_dependencies.iteritems(): NamedHivePartitionSensor( task_id=wf_task_id, partition_names=[partition_name], dag=dag ) # Put the tasks in a list tasks = [ ('hql', 'task_1'), ('hql', 'task_2'), ] # Define the operators in the list above for directory, task_name in tasks: HiveOperator( task_id=task_name, hql='{0}/{1}.hql'.format(directory, task_name), dag=dag, ) # Put the dependencies in a map deps = { 'task_1': [ 'wf_upstream_table_1', 'wf_upstream_table_2', ], 'task_2': [ 'wf_upstream_table_1', 'wf_upstream_table_2', 'wf_upstream_table_3', ], } # Explicitly define the dependencies in the DAG for downstream, upstream_list in deps.iteritems(): for upstream in upstream_list: dag.set_dependency(upstream, downstream) 

When the graph is built, you can see the following image:



So, I hope that in this article I managed to quickly and efficiently immerse you in an interesting and diverse area - Data Engineering. We learned what ETL is, the advantages and disadvantages of various ETL platforms. Then they discussed data modeling and the “star” scheme, in particular, and also considered the differences between fact tables and measurement tables. Finally, having considered such concepts as data partitioning and backfilling, we switched to the example of a small ETL job in Airflow. Now you can independently study the work with data, increasing the baggage of your knowledge. See you!

————

Robert notes an insufficient number of data engineering programs in the world, but we are doing so, and not for the first time. In October, Data Engineer 3.0 starts with us, register and expand your professional capabilities!

Source: https://habr.com/ru/post/358530/


All Articles