⬆️ ⬇️

How to make a DAG trigger in Airflow using Experimental API

When preparing our educational programs, we periodically face difficulties in terms of working with some tools. And at that moment, when we come across them, there is not always enough documentation and articles that would help to cope with this problem.



This was the case, for example, in 2015, and we used the Hadoop cluster with a 35-user Spark for the “Big Data Specialist” program. How to prepare it for such a use case with the use of YARN was not clear. In the end, having figured it out and having traveled on their own, they made a post at Habré and also performed at the Moscow Spark Meetup .



Prehistory



This time we will talk about another program - Data Engineer . Our participants build two types of architecture on it: lambda and kappa. And in the lamdba-architecture, as part of the batch processing, Airflow is used to transfer logs from HDFS to ClickHouse.



All in all good. Let them build their pipelines. However, there is a "but": all our programs are technological from the point of view of the learning process itself. To check the lab, we use automatic checkers: the participant needs to go to his personal account, click the “Check” button, and after a while he sees some extended feedback on what he has done. And at this very moment we begin to approach our problem.



The verification of this lab is arranged like this: we send a control data packet to the participant's Kafka, then Gobblin shifts this data packet to HDFS, then Airflow takes this data packet and puts it into ClickHouse. The trick is that Airflow should not do this in real-time, it does it on schedule: every 15 minutes it takes a pack of files and throws it.



It turns out that we need to somehow trigger their DAG independently at our request while the checker is here and now. Googling, they found out that for later versions of Airflow there is a so-called Experimental API . The word experimental , of course, sounds scary, but what to do ... Suddenly take off.



Next, we describe the whole path: from installing Airflow to forming a POST request that triggers the DAG using the Experimental API. We will work with Ubuntu 16.04.



1. Installing Airflow



Let's check that we have Python 3 and virtualenv.



 $ python3 --version Python 3.6.6 $ virtualenv --version 15.2.0 


If any of this is not, then install.



Now create a directory in which we will continue to work with Airflow.



 $ mkdir <your name of directory> $ cd /path/to/your/new/directory $ virtualenv -p which python3 venv $ source venv/bin/activate (venv) $ 


Install Airflow:



 (venv) $ pip install airflow 


The version on which we worked: 1.10.



Now we need to create an airflow_home directory where the DAG files and Airflow plugins will be located. After creating the directory, set the environment variable AIRFLOW_HOME .



 (venv) $ cd /path/to/my/airflow/workspace (venv) $ mkdir airflow_home (venv) $ export AIRFLOW_HOME=<path to airflow_home> 


The next step is to execute the command that will create and initialize the data flow database in SQLite:



 (venv) $ airflow initdb 


A database will be created in airflow.db by default.



Check if Airflow is installed:



 $ airflow version [2018-11-26 19:38:19,607] {__init__.py:57} INFO - Using executor SequentialExecutor [2018-11-26 19:38:19,745] {driver.py:123} INFO - Generating grammar tables from /usr/lib/python3.6/lib2to3/Grammar.txt [2018-11-26 19:38:19,771] {driver.py:123} INFO - Generating grammar tables from /usr/lib/python3.6/lib2to3/PatternGrammar.txt ____________ _____________ ____ |__( )_________ __/__ /________ __ ____ /| |_ /__ ___/_ /_ __ /_ __ \_ | /| / / ___ ___ | / _ / _ __/ _ / / /_/ /_ |/ |/ / _/_/ |_/_/ /_/ /_/ /_/ \____/____/|__/ v1.10.0 


If the command has completed, Airflow has created its airflow.cfg configuration file in AIRFLOW_HOME :



 $ tree . ├── airflow.cfg └── unittests.cfg 


Airflow has a web interface. It can be run by running the command:



 (venv) $ airflow webserver --port 8081 


Now you can access the web interface in a browser on port 8081 on the host where Airflow was running, for example: <hostname:8081> .



2. Working with the Experimental API



On this Airflow is configured and ready to go. However, we need to run the Experimental API as well. Our checkers are written in Python, so further all requests will be on it using the requests library.



In fact, the API is already working for simple queries. For example, such a request allows you to test its work:



 >>> import requests >>> host = <your hostname> >>> airflow_port = 8081 #   ,    8080 >>> requests.get('http://{}:{}/{}'.format(host, airflow_port, 'api/experimental/test').text 'OK' 


If you received such a message in response, it means that everything works.



However, when we want to override DAG, we’ll face the fact that this kind of request cannot be done without authentication.



To do this, you will need to do a number of actions.



First, you need to add this to the config:



 [api] auth_backend = airflow.contrib.auth.backends.password_auth 


Then, you need to create your user with admin rights:



 >>> import airflow >>> from airflow import models, settings >>> from airflow.contrib.auth.backends.password_auth import PasswordUser >>> user = PasswordUser(models.Admin()) >>> user.username = 'new_user_name' >>> user.password = 'set_the_password' >>> session = settings.Session() >>> session.add(user) >>> session.commit() >>> session.close() >>> exit() 


Then, you need to create a user with normal permissions who will be allowed to make a DAG trigger.



 >>> import airflow >>> from airflow import models, settings >>> from airflow.contrib.auth.backends.password_auth import PasswordUser >>> user = PasswordUser(models.User()) >>> user.username = 'newprolab' >>> user.password = 'Newprolab2019!' >>> session = settings.Session() >>> session.add(user) >>> session.commit() >>> session.close() >>> exit() 


Now everything is ready.



3. Run POST request



The POST request itself will look like this:



 >>> dag_id = newprolab >>> url = 'http://{}:{}/{}/{}/{}'.format(host, airflow_port, 'api/experimental/dags', dag_id, 'dag_runs') >>> data = {"conf":"{\"key\":\"value\"}"} >>> headers = {'Content-type': 'application/json'} >>> auth = ('newprolab', 'Newprolab2019!') >>> uri = requests.post(url, data=json.dumps(data), headers=headers, auth=auth) >>> uri.text '{\n "message": "Created <DagRun newprolab @ 2019-03-27 10:24:25+00:00: manual__2019-03-27T10:24:25+00:00, externally triggered: True>"\n}\n' 


The request was processed successfully.



Accordingly, further we give some time to DAG for processing and make a request to the ClickHouse table, trying to catch a control packet of data.



Check complete.



')

Source: https://habr.com/ru/post/445852/



All Articles