⬆️ ⬇️

Overview of the Luigi Framework for building sequences of tasks

Good day! We opened a completely new direction of learning - BigData , which means that the horizon of materials that we will share with you is expanding a little. Today we will consider Luigi as part of what is revealed in our course.



Luigi is a Python framework for building complex sequences for performing dependent tasks. Quite a large part of the framework is aimed at transforming data from various sources (MySql, Mongo, redis, hdfs) and using various tools (from starting a process to performing tasks of different types on a Hadoop cluster). Developed by Spotify and opened as an open source tool in 2012.



The most important advantage of the framework is the ability to build sequences of dependent tasks. The framework resolves dependencies, tracks the execution graph, manages task startup, handles errors with the ability to restart the necessary tasks, allocates workflow resources with the possibility of parallel operation of independent parts of the task graph.

')

To accomplish all these tasks there are other tools. This Oozie , Pinball , Airflow (is in the status of incubation in Apache - undergoing various checks, recently published a review on Habré ). In this article, we consider only Luigi.







Installation and documentation



To install, you can use the command:



pip install luigi 


Documentation is available here.



Task (Task)



In the luigi_demo_tasks.py file, we define a class that inherits from luigi.Task. We add run call for start possibility from the console.



 from luigi import Task, run class MyTask(Task): pass if __name__ == '__main__': run() 


We start. Additionally, we specify the option --local-scheduler so as not to contact the central task scheduler for now.



 python -m luigi_demo_tasks MyTask --local-scheduler 


Note. The documentation indicates a different way to start without calling run and adding a directory to PYTHONPATH.



We see the following result:



 DEBUG: Checking if MyTask() is complete /usr/local/lib/python3.4/dist-packages/luigi/worker.py:334: UserWarning: Task MyTask() without outputs has no custom complete() method is_complete = task.complete() INFO: Informed scheduler that task MyTask__99914b932b has status PENDING INFO: Done scheduling tasks INFO: Running Worker with 1 processes DEBUG: Asking scheduler for work... DEBUG: Pending tasks: 1 INFO: [pid 5369] Worker Worker(salt=920153035, workers=1, host=your_host, username=username, pid=5369) running MyTask() INFO: [pid 5369] Worker Worker(salt=920153035, workers=1, host=your_host username=username, pid=5369) done MyTask() DEBUG: 1 running tasks, waiting for next task to finish INFO: Informed scheduler that task MyTask__99914b932b has status DONE DEBUG: Asking scheduler for work... DEBUG: Done DEBUG: There are no more tasks to run at this time INFO: Worker Worker(salt=920153035, workers=1, host=your_host, username=username, pid=5369) was stopped. Shutting down Keep-Alive thread INFO: ===== Luigi Execution Summary ===== Scheduled 1 tasks of which: * 1 ran successfully: - 1 MyTask() This progress looks :) because there were no failed tasks or missing external dependencies ===== Luigi Execution Summary ===== 


In the messages we see that the MyTask task is being executed and successfully executed. Restarting gives exactly the same result.



Let's do it now so that MyTask does some work. To do this, override the run method from the base class:



 from luigi import Task, run class MyTask(Task): def run(self): print("Hello world!") if __name__ == '__main__': run() 


In the information about the task will see the following:



 INFO: [pid 7448] Worker Worker(salt=857719525, workers=1, host=your_host, username=username, pid=7448) running MyTask() Hello world! INFO: [pid 7448] Worker Worker(salt=857719525, workers=1, host=your_host, username=username, pid=7448) done MyTask() 


Single task guarantee



It often happens that it is necessary to perform a certain task once. For example, due to the fact that its implementation is resource-intensive. In Luigi, a task is considered done if an object is generated (a file on the machine, a file in hdfs, an artifact in MySql, a table in Hive, and others), and you can check its existence. To specify an object, you must override the output method in the task and return any heir or heirs of the Target class in it. For example, we will use the LocalTarget file in the local file system.



 from luigi import Task, run, LocalTarget class MyTask(Task): filename = "hello_file.txt" def run(self): with open(self.filename, 'w') as f: f.write("Hello world!") def output(self): return LocalTarget(self.filename) if __name__ == '__main__': run() 


The first task launch generates the file hello_file.txt. Restarting the task tells us that all tasks have been completed.



Dependent tasks



In luigi tasks may depend on other tasks. To indicate a dependency on another task, you must override the method requires. In it, return the object of the class of any other task. We define two dependent tasks, each of which writes a file.



 from luigi import Task, run, LocalTarget class MyTaskFirst(Task): filename = "first.txt" def run(self): with open(self.filename, 'w') as f: f.write("first!") def output(self): return LocalTarget(self.filename) class MyTaskSecond(Task): filename = "second.txt" def run(self): with open(self.filename, 'w') as f: f.write("second!") def requires(self): return MyTaskFirst() def output(self): return LocalTarget(self.filename) if __name__ == '__main__': run() 


The task launch has changed a bit. We specify to run the latest task, the framework itself will determine and execute all the dependencies.



 python -m luigi_demo_tasks MyTaskSecond --local-scheduler 


External dependencies



Sometimes the task requires data generated by external systems. Since only other tasks can be specified in dependencies in the requires method, we will need a wrapper task for external data. For example, consider the task of counting the frequency of each character in the file hello_file.txt:



 from collections import defaultdict from luigi import Task, run, LocalTarget, ExternalTask class ExternalData(ExternalTask): def output(self): return LocalTarget("hello_file.txt") class TaskWithExternalData(Task): filename = "char_counts.txt" def run(self): frequencies = defaultdict(int) with open(self.requires().output().path) as f_in: for line in f_in: for c in line: frequencies[c] += 1 with open(self.filename, 'w') as f_out: for c, count in frequencies.items(): f_out.write('{}\t{}\n'.format(c, count)) def requires(self): return ExternalData() def output(self): return LocalTarget(self.filename) if __name__ == '__main__': run() 


Scheduler



For centralized execution of tasks, you can use the central scheduler. Its main tasks: to ensure the absence of simultaneous execution of identical tasks, to provide visualization of all running tasks with their dependencies.



From the documentation run the scheduler:



 $ luigid --background --pidfile <PATH_TO_PIDFILE> --logdir <PATH_TO_LOGDIR> --state-path <PATH_TO_STATEFILE> 


Default scheduler address:

localhost : 8082 /



Run the previous task using the scheduler, after deleting the file char_counts.txt:



 python -m luigi_demo_tasks TaskWithExternalData 


In the scheduler we will see both tasks:







As well as the dependency graph:







Parallel task launch



We implement several dependent tasks, the dependency is designed as an hourglass: the root task of type 1 depends on ten identical tasks of type 2. These 10 tasks of type 2 depend on one type of 3. It, in turn, depends on 10 tasks of type 4, and all these 10 depend on one type 5 task.



Each task writes a file with its name and number as a result of the work, and also sleeps for 10 seconds. This is necessary so that the task scheduler can be traced in the scheduler. Note that you can pass a parameter to the task. This allows you to run different essentially tasks with the same code.



For implementation, we will use inheritance, as this will reduce the code. Run 5 processes simultaneously specifying the option --workers = 5



 python -m luigi_demo_tasks Task1 --Task1-task-index=0 --workers=5 


In the scheduler refreshing the page we will see the following sequence:





















At the same time, no more than five tasks are performed, and sometimes only one, since all the others depend on it. In this case, the selected workers are idle.



Running tasks



To run tasks you need to use any external scheduler, for example cron. Accordingly, it is necessary to independently configure the receipt of the actual code for running, logging and configuring all tasks.



Additional features



In case of errors in the work, luigi can send an email.



For each task set, you can specify an external file with settings by setting up the variable LUIGI_CONFIG_PATH before launching.



Each task can be started with a certain priority. To specify the priority, you must specify the priority field in the class.



Quite a lot of classes of tasks are implemented related to typical examples of data processing on a cluster - hadoop streaming task in Python, hadoop jar task, spark task and others. However, they often require substantial refinement.



You can perform any task in the form of launching a console command with tracking of the process of execution.



The code of the library itself is often quite simple. To understand how to build dependent tasks, just look at the source code of the base classes in luigi. They have a fairly simple interface and good documentation.



Developed by a large company. At the moment there are consistently several commits to the master every week. Most likely it will continue to be supported.



disadvantages



Checking whether the task is completed occurs only during the construction of the dependency graph. Because of this, if you start executing a set of tasks more often than it manages to complete, a situation may arise when the scheduler starts two identical tasks.



No built-in scheduler.



There is no way to obtain meta-information about the tasks performed. Without aids, it is impossible to get documentation on running tasks, data processing processes. There is no way, for example, to get a general list of tasks that depend on a given task.



The web interface of the scheduler is useful only to see why a particular set of tasks is not performed. You can usually look at the dependency graph and see exactly what data is missing.



The setup of task execution logging is not completely obvious.



Quite often unexpected behavior occurs.



Support and development is less active than, for example, in Airflow. For comparison in luigi and in airflow



Conclusion



Luigi is well suited for building data processing. However, before starting to use this library, it makes sense to try to implement several typical tasks on different frameworks and choose one that is best suited for your tasks.



THE END



As always, happy to opinions, questions and slippers.

Source: https://habr.com/ru/post/339904/



All Articles