CREATE TABLE dicts.points ( id_point serial NOT NULL, -- "name" character varying(50) DEFAULT ''::character varying, -- sync_hour integer DEFAULT 0, -- ... is_active boolean DEFAULT true, -- CONSTRAINT pk_points PRIMARY KEY (id_point) );
import pg class Connection: """ DB API2 """ def __init__(self,dbname,user,password,host,port): self.dbname = dbname self.user = user self.password = password self.host = host self.port = port self.db = None # self.query_collector = None # self.err = '' # def Connect(self): """ """ try: self.db = pg.DB(dbname=self.dbname,user=self.user,passwd=self.password,host=self.host,port=self.port) except Exception,err: raise Exception(" : %s" % err.message) def Disconnect(self): self.db.close() def SendQueryReturn(self,query): """ SELECT- """ try: self.query_collector = self.db.query(query) except ProgrammingError,err: self.err = " %s: \n %s" % (__name__,err.message) return -1 else: return self.query_collector.ntuples() def SendQueryNoreturn(self,query): """ , """ try: self.db.query(query) except ProgrammingError,err: self.err = " %s: \n %s" % (__name__,err.message) return -1 else: return 0 def SendBEGIN(self): """ """ try: self.db.query("BEGIN") except ProgrammingError,err: self.err = " %s: \n %s" % (__name__,err.message) return -1 else: return 0 def SendCOMMIT(self): """ """ try: self.db.query("COMMIT") except ProgrammingError,err: self.err = " %s: \n %s" % (__name__,err.message) return -1 else: return 0 def SendROLLBACK(self): """ """ try: self.db.query("ROLLBACK") except ProgrammingError,err: self.err = " %s: \n %s" % (__name__,err.message) return -1 else: return 0 def GetTupleResult(self): """ """ try: res = self.query_collector.getresult() except: res = [] self.query_collector = None return res def GetDictResult(self): """ """ try: res = self.query_collector.dictresult() except: res = {} self.query_collector = None return res def GetError(self): """ """ res = self.err self.err = '' return res def GetObjectStruct(self,name): try: return self.db.get_attnames(name) except Exception,err: self.err = " %s: \n%s"%(__name__,err.message) return ()
def set_connection(conn): global connection if conn == 'pg': import connection.pg_connection as connection ...
from query_constants import * class QueryLayout: def __init__(self,dbname,user,password,host='localhost',port=5432): global connection self.err_list = [] if connection: self.conn = connection.Connection(dbname,user,password,host,port) else: self.CONNECTED = False self.err_list = [] self.CONNECTED = self.SetConnection() def SetConnection(self): try: self.conn.Connect() except Exception,err: self.err_list.append(err.message) return False else: return True def CloseConnection(self): self.conn.Disconnect() def QueryNoreturn(self,query): """ , """ if self.conn.SendQueryNoreturn(query) == 0: return 1 else: self.err_list.append(self.conn.GetError()) return 0 def QueryReturn(self,query): """ , """ res = self.conn.SendQueryReturn(query) if res < 0: self.err_list.append(self.conn.GetError()) return res def GetDataTuple(self): return self.conn.GetTupleResult() def GetDataDict(self): return self.conn.GetDictResult() def GetErrors(self): res = self.err_list self.err_list = [] return res def GetObjectStruct(self,objname): return self.conn.GetObjectStruct(objname) class CustomQuery(QueryLayout): def __init__(self,dbname,user,password,host='localhost',port=5432): QueryLayout.__init__(self,dbname,user,password,host,port) def BEGIN(self): if self.conn.SendBEGIN() < 0: err_list = self.conn.GetErrors() err_list.insert(0,u" ") return False return True def COMMIT(self): if self.conn.SendCOMMIT() < 0: err_list = self.conn.GetErrors() err_list.insert(0,u" ") return False return True def ROLLBACK(self): if self.conn.SendROLLBACK() < 0: err_list = self.conn.GetErrors() err_list.insert(0,u" ") return False return True def CustomGet(self,query,mode='dict',warn=False): nRes = self.QueryReturn(query) if nRes > 0: if mode == 'tuple': res = self.GetDataTuple() else: res = self.GetDataDict() return {'res':len(res),'err':[],'inf':res} elif nRes == 0: if warn: return {'res':-1,'err':[u" "],'inf':[]} else: return {'res':0,'err':[],'inf':{}} else: err_list = self.GetErrors() err_list.insert(0,u" ") return {'res':-1,'err':err_list,'inf':[]} def CustomSet(self,query): nRes = self.QueryNoreturn(query) if nRes == 1: return {'res':0,'err':[],'inf':[]} else: err_list = self.GetErrors() err_list.insert(0,u" ") return {'res':-1,'err':err_list,'inf':[]}
ADD_NEW_POINT = "select * from dicts.insert_point('%s',%s)" DELETE_POINT = "select * from dicts.delete_point(%s)" EDIT_POINT = "select * from dicts.update_point(%s,'%s',%s)" GET_ALL_POINTS = "select * from dicts.get_all_points"
class QueryCollector(CustomQuery): def __init__(self,dbname,user,password,host='localhost',port=5432): CustomQuery.__init__(self,dbname,user,password,host,port) def AddNewPoint(self,sPointName,nHour): return self.CustomSet(ADD_NEW_POINT%(sPointName,nHour)) def DeletePoint(self,nIdPoint): return self.CustomSet(DELETE_POINT%nIdPoint) def EditPoint(self,nIdPoint,sPointName,nHour): return self.CustomSet(EDIT_POINT%(nIdPoint,sPointName,nHour)) def GetAllPoints(self): return self.CustomGet(GET_ALL_POINTS)
COLUMN_TYPES = {'int':lambda:int(),'bool':lambda:bool(),'text':lambda:unicode()} CAST_TYPES = {'int':lambda x:int(x),'bool':lambda x:bool(x),'text':lambda x:unicode(x,'utf-8')} POINTS = {'obj_name':'Point','struct':'dicts.get_all_points','select':'GetAllPoints','insert':'AddNewPoint','update':'EditPoint','delete':'DeletePoint'} CONST_NAMES = {'Points':POINTS}
class ModelManager: def __init__(self,dbname,user,password,host='localhost',port=5432): self.query_collector = query_layout.QueryCollector(dbname,user,password,host,port) def BuildModel(self,name,**props): """ model builder NAME - the name of data struct, props - additional properties """ c_atts = self.GetMetaData(name) or {} struct = self.query_collector.GetObjectStruct(c_atts['struct']) or {} dctOptions = {} for i in struct.items(): dctOptions[i[0]] = m_const.COLUMN_TYPES.get(i[1])() for i in props.items(): dctOptions[i[0]] = i[1] return [struct,dctOptions] def GetMetaData(self,name): """ get meta data for loading struct """ return m_const.CONST_NAMES.get(name) def GetInlineMethods(self,name,**methods): """ get's methods from QueryCollector object""" c_atts = self.GetMetaData(name) dctMethods = {} if methods: dctMethods.update(methods) if c_atts: try: dctMethods['Update'] = getattr(self.query_collector,c_atts['update']) except: dctMethods['Update'] = lambda:Warning("This method is not implemented!") try: dctMethods['Delete'] = getattr(self.query_collector,c_atts['delete']) except: dctMethods['Delete'] = lambda:Warning("This method is not implemented!") return dctMethods def GetCollectMethods(self,name,**methods): """ get's methods from QueryCollector to Collection object """ c_atts = self.GetMetaData(name) dctMethods = {} if methods: dctMethods.update(methods) if c_atts: try: dctMethods['Insert'] = getattr(self.query_collector,c_atts['insert']) except: dctMethods['Insert'] = lambda:Warning("This method is not implemented!") try: dctMethods['Select'] = getattr(self.query_collector,c_atts['select']) except: dctMethods['Select'] = lambda:Warning("This method is not implemented!") return dctMethods def CreateClass(self,name,i_methods={},props={}): """ creates a new object """ c_atts = self.GetMetaData(name) print c_atts o_meth = self.GetInlineMethods(name,**i_methods) struct,o_prop = self.BuildModel(name,**props) obj = classobj(c_atts['obj_name'],(object,),o_meth) setattr(obj,'struct',struct) for i in o_prop.items(): setattr(obj,i[0],i[1]) return obj def InitObject(self,obj,**values): dct_keys = obj.__dict__.keys() new_obj = obj() for i in values.items(): if i[0] in dct_keys: try: new_obj.__dict__[i[0]] = m_const.CAST_TYPES[new_obj.struct[i[0]]](i[1]) except: new_obj.__dict__[i[0]] = None return new_obj def InitCollection(self,name,**props): o = self.CreateClass(name) coll_meth = self.GetCollectMethods(name) collection = classobj(name,(object,),coll_meth)() if props: collection.__dict__.update(props) setattr(collection,'items',[]) dctRes = collection.Select() if dctRes['res']>= 0: collection.items = [self.InitObject(o,**i) for i in dctRes['inf']] return collection
Source: https://habr.com/ru/post/122923/
All Articles