Hi, Habr! My name is Dean, and I am developing a game data warehouse for solving analytics problems in Mail.Ru Group. Our team uses Apache Airflow (hereinafter Airflow) to develop batch data processing processes, yuryemeliyanov wrote about this in a recent article . Airflow is a opensource library for developing ETL / ELT processes. Individual tasks are combined into periodically executed task chains (DAG - Directed Acyclic Graph).
As a rule, 80% of the project on Airflow are standard DAGs. In my article we will discuss the remaining 20%, which require complex branches, communication between tasks - in a word, DAGs that require non-trivial algorithms.
Imagine that we are faced with the task of daily data collection from several shards. We simultaneously write them into the staging area, and then build them on the target table in the repository. If during the work for some reason an error occurred - for example, some shards were not available, the DAG will look like this:
In order to proceed to the next task, you need to handle errors in previous ones. One of the operator’s parameters, trigger_rule, is responsible for this. Its default value, all_success , means that the task will start if and only if all previous ones have been successfully completed.
Also, trigger_rule can have the following values:
To implement the if-then-else logic, you can use the branch operator BranchPythonOperator . The called function must implement the algorithm for selecting the task, which will be launched next. You can not return anything, then all subsequent tasks will be marked as not needing execution.
In our example, it turned out that the inaccessibility of shards is connected with the periodic shutdown of the game servers, respectively, when they are turned off we don’t lose any data for the period we need. True, the windows should be built taking into account the number of servers included.
Here’s what the same DAG looks like with a bunch of two tasks with the trigger_rule parameter, which takes one_success values ​​(at least one of the previous tasks is successful) and all_done (all previous tasks ended), and the select_next_task branch operator instead of a single PythonOperator.
# , all_done = DummyOperator(task_id='all_done', trigger_rule='all_done', dag=dag) # , one_success = DummyOperator(task_id='one_success', trigger_rule='one_success', dag=dag) # def select_next_task(): success_shard_count = get_success_shard_count() if success_shard_count == 0: return 'no_data_action' elif success_shard_count == 6: return 'all_shards_action' else: return 'several_shards_action' select_next_task = BranchPythonOperator(task_id='select_next_task', python_callable=select_next_task, dag=dag)
Documentation on the parameter of the statement trigger_rule
Operator Documentation BranchPythonOperator
Airflow operators also support the rendering of transmitted parameters using Jinja. This is a powerful template engine, you can read more about it in the documentation, but I will only talk about its aspects that we use in working with Airflow.
The template engine processes:
provide_context=True
): def index_finder(conn_id, task, **kwargs): sql = "SELECT MAX(idtransaction) FROM {{ params.billing }}" max_id_sql = task.render_template("", sql, kwargs) ...
Here is how we apply Jinja to Airflow:
I will give a few examples of parameters rendering in the Airflow interface. In the first, we delete records older than the number of days passed by the cut_days parameter. This is what sql looks like using airflow jinja templates:
In the processed sql, a specific date is already substituted for the expression:
The second example is more complicated. It uses date conversion in unixtime to simplify the filtering of data at the source. The construction "{: .0f}" is used to get rid of the display of decimals:
Jinja replaces the expressions between the double braces with unixtime, which corresponds to the DAG's date of execution and the date following it:
Well, in the last example we use the truncshift function, passed as a parameter:
Instead of this expression, the template engine substitutes the result of the function:
In one of our sources there is an interesting log storage system. Every five days, the source creates a new table of this type: squads_02122017. In its name there is a date, so the question arose how to calculate it. For a while, we used tables with names from all five days. Four requests fell, but trigger_rule = 'one_success' saved us (just the case when performing all five tasks is optional).
After some time, instead of trigger_rule, we began to use the technology built in Airflow to exchange messages between tasks in one DAG - XCom (short for cross-communication). XComs are defined by a key-value pair and the name of the task from which it was sent.
XCom is created in PythonOperator based on the value returned by it. You can create an XCom manually using the xcom_push function. After the task is completed, the value is stored in the context, and any subsequent task can receive XCom by the xcom_pull function in another PythonOperator or from the jinja template within any pre-processed string.
Here is how getting the name of the table now:
def get_table_from_mysql(**kwargs): """ """ hook = MySqlHook(conn_name) cursor = hook.get_conn().cursor() cursor.execute(kwargs['templates_dict']['sql']) table_name = cursor.fetchall() # XCom 'table_name' kwargs['ti'].xcom_push(key='table_name', value=table_name[0][1]) # XCom': # return table_name[0][1] # - # , PostgreSQL select_table_from_mysql_sql = ''' SELECT table_name FROM information_schema.TABLES WHERE table_schema = 'jungle_logs' AND table_name IN ('squads_{{ macros.ds_format(ds, "%Y-%m-%d", "%d%m%Y") }}', 'squads_{{ macros.ds_format( macros.ds_add(ds, -1), "%Y-%m-%d", "%d%m%Y") }}', 'squads_{{ macros.ds_format( macros.ds_add(ds, -2), "%Y-%m-%d", "%d%m%Y") }}', 'squads_{{ macros.ds_format( macros.ds_add(ds, -3), "%Y-%m-%d", "%d%m%Y") }}', 'squads_{{ macros.ds_format( macros.ds_add(ds, -4), "%Y-%m-%d", "%d%m%Y") }}') ''' select_table_from_mysql = PythonOperator( task_id='select_table_from_mysql', python_callable=get_table_from_mysql, provide_context=True, templates_dict={'sql': select_table_from_mysql_sql}, dag=dag ) # XCom 'select_table_from_mysql' 'table_name' sensor_jh_squad_sql = ''' SELECT 1 FROM jungle_logs.{{ task_instance.xcom_pull(task_ids='select_table_from_mysql', key='table_name') }} LIMIT 1 '''
Another example of using XCom technology is sending email notifications with text sent from Python Operator:
kwargs['ti'].xcom_push(key='mail_body', value=mail_body)
But getting the text of the letter inside the operator EmailOperator:
email_notification_lost_keys = EmailOperator( task_id='email_notification_lost_keys', to=alert_mails, subject='[airflow] Lost keys', html_content='''{{ task_instance.xcom_pull(task_ids='find_lost_keys', key='mail_body') }}''', dag=dag )
I talked about branching techniques, communication between tasks, and substitution patterns. Using the built-in mechanisms of Airflow, you can solve a variety of tasks without departing from the general concept of the implementation of DAGs. At this interesting nuances Airflow does not end there. My colleagues and I have ideas for the following articles on this topic. If you are interested in this tool, write what you would like to read about next time.
Source: https://habr.com/ru/post/344398/
All Articles