def func(who): print "Hello, %s!" % who func() FunctionDef( name='func', # args=arguments( # args=[Name(id='who', ctx=Param())], vararg=None, kwarg=None, defaults=[]), body=[ # Print(dest=None, values=[ BinOp(left=Str(s='Hello %s!'), op=Mod(), right=Name(id='who', ctx=Load()))], nl=True)], decorator_list=[]), # Expr(value=Call( # func=Name(id='func', ctx=Load()), # args=[], # keywords=[], # kv starargs=None, # *args kwargs=None)) # **kwargs @shortgen and a binary shift operator << . mkdir tornado-shortgen cd tornado-shortgen/ virtualenv .env source .env/bin/activate pip install tornado import tornado.ioloop import tornado.web import tornado.gen import os class Handler(web.RequestHandler): @asynchronous @gen.engine @shortgen def get_short(self): (result, status) << self.db.posts.find_e({'name': 'post'}) @asynchronous @gen.engine def get(self): (result, status) = yield gen.Task(self.db.posts.find_e, {'name': 'post'}) application = tornado.web.Application([ (r"/", Handler), ]) if __name__ == "__main__": application.listen(8888) tornado.ioloop.IOLoop.instance().start() $ python >>> import ast >>> print ast.dump(ast.parse(open("shortgen_test.py").read())) get_short and getget_short is an initial function with a binary shift and a decorator FunctionDef( name='get_short', args=arguments(args=[Name(id='self', ctx=Param())], vararg=None, kwarg=None, defaults=[]), body=[ Expr(value=BinOp( # 2- left=Tuple( # - elts=[Name(id='result', ctx=Load()), Name(id='status', ctx=Load())], ctx=Load()), op=LShift(), # right=Call( # - self.db.posts.find_e func=Attribute( value=Attribute( value=Attribute( value=Name(id='self', ctx=Load()), attr='db', ctx=Load()), attr='posts', ctx=Load()), attr='find_e', ctx=Load()), args=[Dict(keys=[Str(s='name')], values=[Str(s='post')])], # keywords=[], starargs=None, kwargs=None)))], decorator_list=[ # Attribute(value=Name(id='web', ctx=Load()), attr='asynchronous', ctx=Load()), Attribute(value=Name(id='gen', ctx=Load()), attr='engine', ctx=Load()), Name(id='shortgen', ctx=Load())]) # ! get - desired result FunctionDef( name='get', args=arguments(args=[Name(id='self', ctx=Param())], vararg=None, kwarg=None, defaults=[]), body=[ Assign( # targets=[ Tuple(elts=[ # = - tuple, ctx Store() Name(id='result', ctx=Store()), Name(id='status', ctx=Store())], ctx=Store())], value=Yield( # - yield value=Call( # gen.Task func=Attribute( value=Name(id='gen', ctx=Load()), attr='Task', ctx=Load()), args=[Attribute( # - self.db.posts.find_e value=Attribute( value=Attribute( value=Name(id='self', ctx=Load()), attr='db', ctx=Load()), attr='posts', ctx=Load()), attr='find_e', ctx=Load()), Dict(keys=[Str(s='name')], values=[Str(s='post')])], keywords=[], # starargs=None, kwargs=None)))], decorator_list=[ Name(id='asynchronous', ctx=Load()), Attribute(value=Name(id='gen', ctx=Load()), attr='engine', ctx=Load())]) # shortgen Expr completely goneBinOp(left, op, right) now Assign(targets, value)ctx value has changed from Load to Storeself.db.posts.find_e(...) replaced by gen.Task(self.db.posts.find_e, ...)@shortgen@shortgenBinOpctx from Load to Store , from the right operand, extract the function name and its arguments (positional, kw, and "star" - *, **)self.db.posts.find_e ) as the first positional argument (i.e. in our example, we get the positional arguments [self.db.posts.find_e, {'name': 'post'}] , and all the rest are emptyCall , but already functions gen.Task with these argumentsAssign(targets, value) and as targets take the previously left BinOp operand and, as the value, Yield we just createdExpr source tree with our freshly picked Assignvisit_[NodeType] in it, for example, visit_FunctionDef or visit_Expr . The value that the method returns will be the new value of the AST element. And the Visitor itself just recursively goes around the tree, calling our methods when the corresponding element is encountered in the tree. This will help us more conveniently organize our code.visit_FunctionDef method to catch the decorated function. In it, we check that the function is wrapped in the decorator, if wrapped, we remove the decorator and put the mark self.decoratedvisit_Expression method for catching the binary shift. In it, we check that the self.decorated flag is self.decorated and that Expr is a binary shift. We Expr remaining manipulations (conversion of Expr to Assign ) manually. Fortunately, all the necessary data is already side by side. # -*- coding: utf-8 -*- ''' Created on 2012-10-07 @author: Sergey <me@seriyps.ru> http://habrahabr.ru/post/153595/ AST ''' import ast import marshal import py_compile import time import os.path class RewriteGenTask(ast.NodeTransformer): def __init__(self, *args, **kwargs): self.on_decorator = [] self.on_assign = [] super(RewriteGenTask, self).__init__(*args, **kwargs) def shortgen_deco_pos(self, decorator_list): # , # shortgen . for pos, deco in enumerate(decorator_list): # Name(id='shortgen', ctx=Load()) if isinstance(deco, ast.Name) and deco.id == 'shortgen': return pos return -1 def visit_FunctionDef(self, node): """ , shortgen. , . FunctionDef( name='get_short', args=arguments(...), body=[...], decorator_list=[ Attribute(value=Name(id='web', ...), attr='asynchronous', ...), Attribute(value=Name(id='gen', ...), attr='engine', ...), Name(id='shortgen', ctx=Load())]) """ deco_pos = self.shortgen_deco_pos(node.decorator_list) if deco_pos >= 0: # shortgen , , # Visitor # self.on_decorator.append(True) node.decorator_list.pop(deco_pos) self.generic_visit(node) # self.on_decorator.pop() return node def visit_Expr(self, expr): """ == == result2 << func(arg, k=v, *args, **kwargs) result2 = gen.Task(func, arg, k=v, *args, **kwargs) AST "stmt << func(...)" ( ): Expr(value=BinOp(left=Name(id='result', ctx=Load()), op=LShift(), right=Call( func=Name(id='fetch', ctx=Load()), args=[Num(n=1)], keywords=[keyword(arg='k', value=Num(n=2))], starargs=Tuple(elts=[Num(n=3)], ctx=Load()), kwargs=Dict(keys=[Str(s='k2')], values=[Num(n=4)]))))) ---- vvvvvvvvvvv ---- AST "stmt = yield func(...)" (): Assign(targets=[Name(id='result', ctx=Store())], value=Yield(value=Call( func=Attribute(value=Name(id='gen', ctx=Load()), attr='Task', ctx=Load()), args=[Name(id='fetch', ctx=Load()), Num(n=1)], keywords=[keyword(arg='k', value=Num(n=2))], starargs=Tuple(elts=[Num(n=3)], ctx=Load()), kwargs=Dict(keys=[Str(s='k2')], values=[Num(n=4)])))) """ node = expr.value # BinOp if not (self.on_decorator and isinstance(expr.value, ast.BinOp) and isinstance(node.op, ast.LShift)): # (on_decorator ), # return expr # , LShift, , # gen.Task() # (stmt <<) # (stmt =). ctx=Load # ctx=Store ( self.visit_Load()) self.on_assign.append(True) assign_target = self.visit(node.left) self.on_assign.pop() # ... = ... (new_node, ) = ast.Assign( targets = [assign_target], value = ast.Yield( value=self.construct_gen_task_call(node.right))), # new_node = ast.fix_missing_locations(ast.copy_location(new_node, expr)) return new_node def construct_gen_task_call(self, func_call): """ gen.Task func(arg, k=v, *args, **kwargs) gen.Task(func, arg, k=v, *args, **kwargs) AST "func(...)": Call( func=Name(id='fetch', ctx=Load()), args=[Num(n=1)], keywords=[keyword(arg='k', value=Num(n=2))], starargs=Tuple(elts=[Num(n=3)], ctx=Load()), kwargs=Dict(keys=[Str(s='k2')], values=[Num(n=4)]))) ---- vvvvvvvvv ---- AST "gen.Task(func, ...)": Call( func=Attribute(value=Name(id='gen', ctx=Load()), attr='Task', ctx=Load()), args=[Name(id='fetch', ctx=Load()), Num(n=1)], keywords=[keyword(arg='k', value=Num(n=2))], starargs=Tuple(elts=[Num(n=3)], ctx=Load()), kwargs=Dict(keys=[Str(s='k2')], values=[Num(n=4)])) """ # gen.Task gen_task = ast.Attribute( value=ast.Name(id='gen', ctx=ast.Load()), attr='Task', ctx=ast.Load()) # gen.Task(func, ...) call = ast.Call( func=gen_task, # - 1- : args=[func_call.func] + func_call.args, keywords=func_call.keywords, starargs=func_call.starargs, kwargs=func_call.kwargs) return self.visit(call) def visit_Load(self, node): # Load() Store() if self.on_assign: return ast.copy_location(ast.Store(), node) return node def shortgen(f): raise RuntimeError("ERROR! file must be compiled with yield_ast!") def compile_file(filepath): path, filename = os.path.split(filepath) with open(filepath) as src: orig_ast = ast.parse(src.read()) new_ast = RewriteGenTask().visit(orig_ast) code = compile(new_ast, filename, 'exec') pyc_filename = os.path.splitext(filename)[0] + '.pyc' pyc_filepath = os.path.join(path, pyc_filename) with open(pyc_filepath, 'wb') as fc: fc.write(py_compile.MAGIC) py_compile.wr_long(fc, long(time.time())) marshal.dump(code, fc) fc.flush() if __name__ == '__main__': import sys if len(sys.argv) < 2: print "Usage: %s file_to_compile1.py [file2.py] ..." % sys.argv[0] for filename in sys.argv[1:]: compile_file(filename) with open(filepath) as src: orig_ast = ast.parse(src.read()) new_ast = RewriteGenTask().visit(orig_ast) code = compile(new_ast, filename, 'exec') exec code python my_module.pyo @asynchronous @gen.engine @shortgen How to use AST with just one @shortgen and how without AST?@shortgen , @my_module.shortgen will not work anymore. Plus it is required that the tornado.gen module be imported as from tornado import gen ; import tornado.gen or import tornado.gen or from tornado.gen import Task no longer a roll. How to fix it?Source: https://habr.com/ru/post/153949/
All Articles