Hello! My name is Anton, in Rostelecom I am developing a central data repository. Our repository consists of modules, in which several Informatica instances are used as an orchestrator, some of which we want to transfer to Airflow as part of the transition to an open-source solution. Since Informatica and Airflow are fundamentally different tools, taking and repeating an existing implementation is not so easy. We wanted to get a workflow, on the one hand, as close as possible to the current implementation and, on the other hand, using the most interesting first principle of Airflow - dynamism, which gives flexibility.
In this short article I want to talk about the truly dynamic generation of DAGs in Airflow. On this topic, there are mainly many articles from developers from India on the Internet, which are materials like "you can generate dags dynamically in Airflow, for example: <example for generating 10 HelloWorld tasks / dags>" . We were interested in the generation of dags, which will vary in time with a variable number and task names.
Currently, Airflow is implemented to launch a module that generates data packets on remote source servers for further loading into the repository. It runs on a simple schedule, it is not very interesting to consider it in detail. Also soon, orchestration will be introduced through the Airflow module, which delivers data packets for further loading by layers into intermediate staging. Here we are waiting for a number of rakes, descriptions of which I have not found anywhere and want to share experiences.
On Airflow on Habré there are a couple of articles from the developers of Mail.ru, in which the basic things are well described:
General Description of Airflow
Branching, parametrization through jinja and communication within DAG through Xcom
DAG / DAG - directed acyclic graph. In this case, we mean a sequence of actions that depend on each other and do not form cycles.
SubDAG / Sabdag is the same as DAG, but located inside another DAG, running within the framework of the parent DAG (ie, being a TASK) and not having a separate schedule.
Operator / Operator - a specific step in Dag, performing a specific action. For example, PythonOperator.
Task / Task - a specific operator instance when launching DAG, visualized as a square in the web interface. For example, PythonOperator, which is called run_task and runs in DAG check_dag .
Input data:
In the repository of the orchestrator there is a table, let's call it PKG_TABLE.
There is a mechanism that adds to the PKG_TABLE table that the data packet is ready to be loaded.
What we wanted:
DAG, which will be generated for packages ready for downloading and launching their loading (spoiler: in the end everything turned out).
Using the code below, we generate a dag consisting of the LatestOnlyOperator task and its dependent sabdag task, which is created when the pkg_subdag_factory function is run, which receives a list of packages from the PKG_TABLE table and generates several PythonOperators. If there are no download packages, a DummyOperator is generated.
We decided to make the first version with one PythonOperator, later remaking it into a detailed workflow using Airflow tools.
# -*- coding: utf-8 -*- """ DAG """ from airflow.models import DAG from airflow.operators.python_operator import PythonOperator from airflow.operators.subdag_operator import SubDagOperator from airflow.operators.dummy_operator import DummyOperator from airflow.operators.latest_only_operator import LatestOnlyOperator from airflow.hooks.oracle_hook import OracleHook from datetime import datetime, timedelta import logging from scripts.lib import run_load, select_pkg_data def pkg_subdag_factory( oracle_hook, parent_dag_name, child_dag_name, start_date, schedule_interval, param_dict): """ , DAG PythonOperator\` (1 - 1 PythonOperator) : oracle_hook - airflow.hooks.oracle_hook.OracleHook parent_dag_name - "" child_dag_name - start_date - schedule_interval - param_dict - """ dag = DAG( '%s.%s' % (parent_dag_name, child_dag_name), schedule_interval=schedule_interval, start_date=start_date, catchup=False ) logging.info('selecting pkg data...') pkg_set = select_pkg_data(oracle_hook) if len(pkg_set): logging.info('pkg_set:') logging.info(pkg_set) for pkg in pkg_set: pkg_id = pkg[1] pkg_dict = {'pkg_data_' + str(pkg_id): pkg} param_dict.update(pkg_dict) task_name = 'pkg_' + str(pkg_id) PythonOperator( task_id=task_name, python_callable=run_load, op_kwargs={ 'oracle_hook': oracle_hook, 'param_dict': param_dict, 'pkg_id': pkg_id }, retries=0, dag=dag ) else: logging.info('Undelivered packages not found') DummyOperator(task_id='no_packages_dummy', retries=0, dag=dag) return dag interval = '*/10 * * * *' args = { 'owner': 'airflow', 'start_date': datetime(2018, 11, 12) } oracle_hook = OracleHook('ora_meta') main_dag_name = 'load' load_dag_name = 'load_packages' param_dict = { # } main_dag = DAG( dag_id=main_dag_name, default_args=args, schedule_interval=interval, catchup=False ) subdag = SubDagOperator( subdag=pkg_subdag_factory( oracle_hook, main_dag_name, load_dag_name, args['start_date'], interval, param_dict ), task_id=load_dag_name, dag=main_dag ) # , latest_only = LatestOnlyOperator(task_id='latest_only', dag=main_dag) subdag.set_upstream(latest_only)
The following screenshots show how this looks as a result.
Appearance of DAG:
Appearance sabdaga in the absence of packages for delivery:
Appearance sabdaga in the presence of packages for delivery:
As written in the documentation , “A key capability of the Airflow is that these DAG Runs are atomic, idempotent items, <...>”, which means: “It is understood that the dag is generated unchanged.” Due to the fact that we have violated this "key capability", we have learned some things:
I’ll finish with another nuance of Airflow, which at first is confusing and not described in simple words in other articles - execution_date (which is displayed in all logs, in the interface, etc.) and the actual launch time. In principle, the description is in the airflow and FAQ documentation , but the result is not obvious, so it seems to me that an explanation is required.
Documentation : "Scheduler runs your job at the end of a period."
Result : If you create a Doug with a schedule, for example, @daily, then starting with execution_date "2018-01-01 00:00:00" will actually run "2018-02-01 00:00:00".
Catchup documentation
Documentation for LatestOnlyOperator
More LatestOnlyOperator documentation
Example for LatestOnlyOperator
Some nuances
Question about dependencies from the previous launch
A small example about dynamic generation
A question about dynamic generation with a small description.
Source: https://habr.com/ru/post/435746/
All Articles