def func(who): print "Hello, %s!" % who func()
FunctionDef( name='func', # args=arguments( # args=[Name(id='who', ctx=Param())], vararg=None, kwarg=None, defaults=[]), body=[ # Print(dest=None, values=[ BinOp(left=Str(s='Hello %s!'), op=Mod(), right=Name(id='who', ctx=Load()))], nl=True)], decorator_list=[]), # Expr(value=Call( # func=Name(id='func', ctx=Load()), # args=[], # keywords=[], # kv starargs=None, # *args kwargs=None)) # **kwargs
@shortgen
and a binary shift operator <<
. mkdir tornado-shortgen cd tornado-shortgen/ virtualenv .env source .env/bin/activate pip install tornado
import tornado.ioloop import tornado.web import tornado.gen import os class Handler(web.RequestHandler): @asynchronous @gen.engine @shortgen def get_short(self): (result, status) << self.db.posts.find_e({'name': 'post'}) @asynchronous @gen.engine def get(self): (result, status) = yield gen.Task(self.db.posts.find_e, {'name': 'post'}) application = tornado.web.Application([ (r"/", Handler), ]) if __name__ == "__main__": application.listen(8888) tornado.ioloop.IOLoop.instance().start()
$ python >>> import ast >>> print ast.dump(ast.parse(open("shortgen_test.py").read()))
get_short
and get
get_short
is an initial function with a binary shift and a decorator FunctionDef( name='get_short', args=arguments(args=[Name(id='self', ctx=Param())], vararg=None, kwarg=None, defaults=[]), body=[ Expr(value=BinOp( # 2- left=Tuple( # - elts=[Name(id='result', ctx=Load()), Name(id='status', ctx=Load())], ctx=Load()), op=LShift(), # right=Call( # - self.db.posts.find_e func=Attribute( value=Attribute( value=Attribute( value=Name(id='self', ctx=Load()), attr='db', ctx=Load()), attr='posts', ctx=Load()), attr='find_e', ctx=Load()), args=[Dict(keys=[Str(s='name')], values=[Str(s='post')])], # keywords=[], starargs=None, kwargs=None)))], decorator_list=[ # Attribute(value=Name(id='web', ctx=Load()), attr='asynchronous', ctx=Load()), Attribute(value=Name(id='gen', ctx=Load()), attr='engine', ctx=Load()), Name(id='shortgen', ctx=Load())]) # !
get
- desired result FunctionDef( name='get', args=arguments(args=[Name(id='self', ctx=Param())], vararg=None, kwarg=None, defaults=[]), body=[ Assign( # targets=[ Tuple(elts=[ # = - tuple, ctx Store() Name(id='result', ctx=Store()), Name(id='status', ctx=Store())], ctx=Store())], value=Yield( # - yield value=Call( # gen.Task func=Attribute( value=Name(id='gen', ctx=Load()), attr='Task', ctx=Load()), args=[Attribute( # - self.db.posts.find_e value=Attribute( value=Attribute( value=Name(id='self', ctx=Load()), attr='db', ctx=Load()), attr='posts', ctx=Load()), attr='find_e', ctx=Load()), Dict(keys=[Str(s='name')], values=[Str(s='post')])], keywords=[], # starargs=None, kwargs=None)))], decorator_list=[ Name(id='asynchronous', ctx=Load()), Attribute(value=Name(id='gen', ctx=Load()), attr='engine', ctx=Load())]) # shortgen
Expr
completely goneBinOp(left, op, right)
now Assign(targets, value)
ctx
value has changed from Load
to Store
self.db.posts.find_e(...)
replaced by gen.Task(self.db.posts.find_e, ...)
@shortgen
@shortgen
BinOp
ctx
from Load
to Store
, from the right operand, extract the function name and its arguments (positional, kw, and "star" - *, **)self.db.posts.find_e
) as the first positional argument (i.e. in our example, we get the positional arguments [self.db.posts.find_e, {'name': 'post'}]
, and all the rest are emptyCall
, but already functions gen.Task
with these argumentsAssign(targets, value)
and as targets take the previously left BinOp
operand and, as the value, Yield
we just createdExpr
source tree with our freshly picked Assign
visit_[NodeType]
in it, for example, visit_FunctionDef
or visit_Expr
. The value that the method returns will be the new value of the AST element. And the Visitor itself just recursively goes around the tree, calling our methods when the corresponding element is encountered in the tree. This will help us more conveniently organize our code.visit_FunctionDef
method to catch the decorated function. In it, we check that the function is wrapped in the decorator, if wrapped, we remove the decorator and put the mark self.decorated
visit_Expression
method for catching the binary shift. In it, we check that the self.decorated
flag is self.decorated
and that Expr
is a binary shift. We Expr
remaining manipulations (conversion of Expr
to Assign
) manually. Fortunately, all the necessary data is already side by side. # -*- coding: utf-8 -*- ''' Created on 2012-10-07 @author: Sergey <me@seriyps.ru> http://habrahabr.ru/post/153595/ AST ''' import ast import marshal import py_compile import time import os.path class RewriteGenTask(ast.NodeTransformer): def __init__(self, *args, **kwargs): self.on_decorator = [] self.on_assign = [] super(RewriteGenTask, self).__init__(*args, **kwargs) def shortgen_deco_pos(self, decorator_list): # , # shortgen . for pos, deco in enumerate(decorator_list): # Name(id='shortgen', ctx=Load()) if isinstance(deco, ast.Name) and deco.id == 'shortgen': return pos return -1 def visit_FunctionDef(self, node): """ , shortgen. , . FunctionDef( name='get_short', args=arguments(...), body=[...], decorator_list=[ Attribute(value=Name(id='web', ...), attr='asynchronous', ...), Attribute(value=Name(id='gen', ...), attr='engine', ...), Name(id='shortgen', ctx=Load())]) """ deco_pos = self.shortgen_deco_pos(node.decorator_list) if deco_pos >= 0: # shortgen , , # Visitor # self.on_decorator.append(True) node.decorator_list.pop(deco_pos) self.generic_visit(node) # self.on_decorator.pop() return node def visit_Expr(self, expr): """ == == result2 << func(arg, k=v, *args, **kwargs) result2 = gen.Task(func, arg, k=v, *args, **kwargs) AST "stmt << func(...)" ( ): Expr(value=BinOp(left=Name(id='result', ctx=Load()), op=LShift(), right=Call( func=Name(id='fetch', ctx=Load()), args=[Num(n=1)], keywords=[keyword(arg='k', value=Num(n=2))], starargs=Tuple(elts=[Num(n=3)], ctx=Load()), kwargs=Dict(keys=[Str(s='k2')], values=[Num(n=4)]))))) ---- vvvvvvvvvvv ---- AST "stmt = yield func(...)" (): Assign(targets=[Name(id='result', ctx=Store())], value=Yield(value=Call( func=Attribute(value=Name(id='gen', ctx=Load()), attr='Task', ctx=Load()), args=[Name(id='fetch', ctx=Load()), Num(n=1)], keywords=[keyword(arg='k', value=Num(n=2))], starargs=Tuple(elts=[Num(n=3)], ctx=Load()), kwargs=Dict(keys=[Str(s='k2')], values=[Num(n=4)])))) """ node = expr.value # BinOp if not (self.on_decorator and isinstance(expr.value, ast.BinOp) and isinstance(node.op, ast.LShift)): # (on_decorator ), # return expr # , LShift, , # gen.Task() # (stmt <<) # (stmt =). ctx=Load # ctx=Store ( self.visit_Load()) self.on_assign.append(True) assign_target = self.visit(node.left) self.on_assign.pop() # ... = ... (new_node, ) = ast.Assign( targets = [assign_target], value = ast.Yield( value=self.construct_gen_task_call(node.right))), # new_node = ast.fix_missing_locations(ast.copy_location(new_node, expr)) return new_node def construct_gen_task_call(self, func_call): """ gen.Task func(arg, k=v, *args, **kwargs) gen.Task(func, arg, k=v, *args, **kwargs) AST "func(...)": Call( func=Name(id='fetch', ctx=Load()), args=[Num(n=1)], keywords=[keyword(arg='k', value=Num(n=2))], starargs=Tuple(elts=[Num(n=3)], ctx=Load()), kwargs=Dict(keys=[Str(s='k2')], values=[Num(n=4)]))) ---- vvvvvvvvv ---- AST "gen.Task(func, ...)": Call( func=Attribute(value=Name(id='gen', ctx=Load()), attr='Task', ctx=Load()), args=[Name(id='fetch', ctx=Load()), Num(n=1)], keywords=[keyword(arg='k', value=Num(n=2))], starargs=Tuple(elts=[Num(n=3)], ctx=Load()), kwargs=Dict(keys=[Str(s='k2')], values=[Num(n=4)])) """ # gen.Task gen_task = ast.Attribute( value=ast.Name(id='gen', ctx=ast.Load()), attr='Task', ctx=ast.Load()) # gen.Task(func, ...) call = ast.Call( func=gen_task, # - 1- : args=[func_call.func] + func_call.args, keywords=func_call.keywords, starargs=func_call.starargs, kwargs=func_call.kwargs) return self.visit(call) def visit_Load(self, node): # Load() Store() if self.on_assign: return ast.copy_location(ast.Store(), node) return node def shortgen(f): raise RuntimeError("ERROR! file must be compiled with yield_ast!") def compile_file(filepath): path, filename = os.path.split(filepath) with open(filepath) as src: orig_ast = ast.parse(src.read()) new_ast = RewriteGenTask().visit(orig_ast) code = compile(new_ast, filename, 'exec') pyc_filename = os.path.splitext(filename)[0] + '.pyc' pyc_filepath = os.path.join(path, pyc_filename) with open(pyc_filepath, 'wb') as fc: fc.write(py_compile.MAGIC) py_compile.wr_long(fc, long(time.time())) marshal.dump(code, fc) fc.flush() if __name__ == '__main__': import sys if len(sys.argv) < 2: print "Usage: %s file_to_compile1.py [file2.py] ..." % sys.argv[0] for filename in sys.argv[1:]: compile_file(filename)
with open(filepath) as src: orig_ast = ast.parse(src.read()) new_ast = RewriteGenTask().visit(orig_ast) code = compile(new_ast, filename, 'exec') exec code
python my_module.pyo
@asynchronous @gen.engine @shortgen
How to use AST with just one @shortgen
and how without AST?@shortgen
, @my_module.shortgen
will not work anymore. Plus it is required that the tornado.gen module be imported as from tornado import gen
; import tornado.gen
or import tornado.gen
or from tornado.gen import Task
no longer a roll. How to fix it?Source: https://habr.com/ru/post/153949/
All Articles