⬆️ ⬇️

Automatic migrations to Peewee

Today I want to talk about such an interesting ORM as peewee. The system is easy, fast, the query syntax is a bit more complicated than that of the Django ORM, but it allows you to potentially monitor the SQL code that is produced on the output.



Since I was working on a Python application connecting to a database, the choice fell on a simple solution that would standardize database access. Prior to that, colleagues in similar applications used Django, but installing it would make the application unnecessarily cumbersome (especially since its requirements already had too many dependencies).



After a week of working on the project, the manager asked to add several fields to the database and, accordingly, the question arose: how to do migrate. Migrations to peewee are. their mechanism is described here . However, how we produce these migrations is not clear.



The absolutely pure, virgin migration mechanism suggests that we can use it as we want. Perhaps the first thing that comes to mind is to make the procedure fully automatic and run it every time the application starts. Moreover, the application does not require instant response or lightning-fast execution.

')

Before describing the logic of automatic migrations, I would like to discuss the general conditions of work:



  1. The connection to the database lives in a separate Borg class with the corresponding attribute - a link to the connection.
  2. There is no talk about automatic creation of migration files. Only about applying changes automatically.
  3. The second point allows us to do the migration as we want, and for each new version of our application to use any logic to save previous data.




So let's start by creating an abstract class for migrations.



Abstract class for the migrator
import abc from playhouse.migrate import (migrate, MySQLMigrator) class Migrator(object): """ Migration interface """ __metaclass__ = abc.ABCMeta connection = db_connection.connection # db_connection is a Borg instance migrator = MySQLMigrator(db_connection.connection) @abc.abstractproperty def migrations(self): """ List of the migrations dictionaries :param self: class instance :return: list """ return [ {'statement': 1 != 2, 'migration': ['list', 'of', 'migration', 'options'], 'migration_kwargs': {}, 'pre_migrations': list(), 'post_migrations': list()} ] # Just an example def migrate(self): """ Run migrations """ for migration in self.migrations: if migration['statement']: # Run scripts before the migration pre_migrations = migration.get('pre_migrations', list()) for pre_m in pre_migrations: pre_m() # Migrate with db_connection.connection.transaction(): migration_kwargs = migration.get('migration_kwargs', {}) migrate(*migration['migration'], **migration_kwargs) # Run scripts after the migration post_migrations = migration.get('post_migrations', list()) for post_m in post_migrations: post_m() 


Actually, the class consists of three parts: the predefined arguments that refer to the database connector, the migrations property of the list of dictionaries for migration, and also, f-ii migrate, which starts all the procedures that precede the migration, then the migration itself, and then -ii, which should be executed after migration.



Now, create some controller that will automatically search for migrations and run them.



Sample auto migration search
 import sys import re def get_migration_modules(packages=[]): """ Get python modules with migrations :param packages: iterable - list or tuple with packages names for the searching :return: list - ('module.path', 'module_name') """ # List of the modules to migrate migration_modules = list() for pack in packages: migration_module = __import__(pack, globals(), locals(), fromlist=[str('migrations')]) try: # Check, that imported object is module if inspect.ismodule(migration_module.migrations): # Find submodules inside the module for importer, modname, ispkg in pkgutil.iter_modules(migration_module.migrations.__path__): if re.match(r'^\d{3,}_migration_[\d\w_]+$', modname) and not ispkg: migration_modules.append((migration_module.migrations.__name__, modname)) # Unregister module sys.modules.pop(migration_module.__name__) except AttributeError: pass return migration_modules def get_migration_classes(migration_modules): """ Get list of the migration classes :type migration_modules: iterable :param migration_modules: array with a migration modules :return: list """ migration_classes = list() for mig_mod, m in migration_modules: mig = __import__(mig_mod, globals(), locals(), fromlist=[m]) try: target_module = mig.__getattribute__(m) # Check, that imported object is module if inspect.ismodule(target_module): for name, obj in inspect.getmembers(target_module): # Get all containing elements if inspect.isclass(obj) and issubclass(obj, Migrator) and obj != Migrator: # Save this elements migration_classes.append(obj) # Remove imported module from the stack sys.modules.pop(mig.__name__) except AttributeError: pass return migration_classes 


Auto search starts as follows:



Auto Find and Run Migrations
 # Get modules with migrations m_mods = get_migration_modules(packages=['package_1', 'package_2', 'package_3']) # Get migration classes m_classes = get_migration_classes(m_mods) # Execute migrations for m_class in m_classes: mig = m_class() mig.migrate() 


Pay attention to the line r '^ \ d {3,} _ migration _ [\ d \ w _] + $' - it is needed to search for migrations by pattern. This means that we should create migration files using this template.



In the applications package_1, package_2 and package_3 we create packages (with __init__.py) migrations. And, for example, in package_1.migrations we will create the module 001_migration_add_first_fields.py



Let's try to display the contents of this file:



001_migration_add_first_fields.py
 from controllers.migrator import Migrator from package_1.models import FirstModel class AddNewFields(Migrator): """ Append new fields to the FirstModel """ table_name = FirstModel._meta.db_table # Get name of the table for target model def __field_not_exists(self): """ Check, that new field does not exists :return: bool """ q = 'SELECT COUNT(*) FROM information_schema.COLUMNS WHERE TABLE_NAME = \'{0}\' AND COLUMN_NAME = \'my_new_field\''.format(self.table_name) cursor = self.connection.execute_sql(q) result = int(cursor.fetchone()[0]) return result == 0 @property def migrations(self): return [ # add my_new_field column { 'statement': self.__field_not_exists(), 'migration': [self.migrator.add_column(self.table_name, 'my_new_field', FirstModel.my_new_field)], } ] 


Everything! Now, when autosearch is started, the controller will find AddNewFields and, if the statement is executed, will start migration.



In general, the system is described briefly, and, as you can understand, it has a small power margin:



  1. Allows you to perform procedures before or after starting (respectively, back up and deploy data from backup).
  2. Allows you to check the need for migrations.
  3. You can do NULL fields without defaults.




Consider the last chip in more detail:



Modify def migrations ()
 @property def migrations(self): # Modify NULL field my_new_field = FirstModel.my_new_field my_new_field.default = 'Rewrite me' return [ # add my_new_field column { 'statement': self.__field_not_exists(), 'migration': [self.migrator.add_column(self.table_name, 'my_new_field', FirstModel.my_new_field)], } ] 


Well and, accordingly, if after migrations for each entry we need to form a unique value of the new field, we simply add the element 'post_migrations' to the returned dictionary with a list of functions that will be sequentially launched after adding the field.

Source: https://habr.com/ru/post/262697/



All Articles