基于SQLAlchemy實現操作MySQL并執行原生sql語句
場景應用
老大我讓爬取內部網站獲取數據,插入到新建的表中,并每天進行爬取更新數據(后面做了定時任務)。然后根據該表統計每日的新增數量/更新數量進行制圖制表,向上級匯報。
思路構建
選用sqlalchemy+mysqlconnector,連接數據庫,創建表,對指定表進行CRUD
from sqlalchemy import exists, Column, Integer, String, ForeignKey, DateTime, Text, funcfrom sqlalchemy.ext.declarative import declarative_basefrom sqlalchemy import create_enginefrom sqlalchemy.orm import sessionmakerfrom conf.parseConfig import parseConf# 從配置文件中獲取數據庫信息host = parseConf.get_conf(’MySQLInfo’, ’host’)port = parseConf.get_conf(’MySQLInfo’, ’port’)dbname = parseConf.get_conf(’MySQLInfo’, ’dbname’)usernm = parseConf.get_conf(’MySQLInfo’, ’usernm’)passwd = parseConf.get_conf(’MySQLInfo’, ’passwd’)# 連接數據庫engine_str = 'mysql+mysqlconnector://{0}:{1}@{2}:{3}/{4}'.format(usernm, passwd, host, port, dbname)# 創建的數據庫引擎engine = create_engine(engine_str, encoding=’utf-8’)#創建session類型DBSession = sessionmaker(bind=engine)# 創建session對象,進行增刪改查:session = DBSession()# 實例化官宣模型 - Base 就是 ORM 模型Base = declarative_base()# 創建服務單表 繼承Base基類class ServiceOrder(Base): __tablename__ = ’serviceOrderTable’ serviceOrderId = Column(String(32), primary_key=True, comment=’服務單ID’) serviceDesc = Column(String(512), comment=’服務說明’) transferTimes = Column(String(32), comment=’轉派次數’) # 創建更新時間,對數據的更新進行記錄 updateTime = Column(DateTime, server_default=func.now(), onupdate=func.now())def init_db(): Base.metadata.create_all(engine)def drop_db(): Base.metadata.drop_all(engine)if __name__ == '__main__': # 每次執行時 會判斷表的存在性 對于數據庫中不存在的表進行創建 已存在的表則可以直接進行增刪改查 init_db() ### 首先講一下使用sqlalchemy執行原生的sql語句### # 方式一: res = session.execute(’select * from ServiceOrder’) # res是獲取的對象 all_res_list = res.fetchall() # all_res_list具體的結果 是列表 print(all_res_list ) # 結果: [(’數據提取’,), (’非數據提取’,)] # 方式二: conn = engine.connect() res = conn.execute(’select * from ServiceOrder’) all_res_list = res.fetchall() ### 使用創建好的session對象進行增刪改查 ### # 插入單條數據 # 創建新service0rder對象 new_serviceorder = ServiceOrder(serviceOrderId=’001’, serviceDesc=’ack’, transferTimes=’9’) # 添加到session session.add(new_serviceorder) # 提交即保存到數據庫 session.commit() # 插入多條數據 serviceorder_list = [ServiceOrder(serviceOrderId=’002’, serviceDesc=’好的’, transferTimes=’9’),ServiceOrder(serviceOrderId=’003’, serviceDesc=’起床’, transferTimes=’9’)] session.add_all(serviceorder_list) session.commit() # session.close() # 查詢 # 查詢是否存在 結果是布爾值 it_exists = session.query( exists().where(ServiceOrder.serviceOrderId == ’002’) ).scalar() # 創建Query查詢,filter是where條件 # 調用one() first()返回唯一行,如果調用all()則已列表的形式返回所有行: server_order = session.query(ServiceOrder).filter(ServiceOrder.serviceOrderId == ’003’).first() print(server_order.serviceDesc) serciceorders = session.query(ServiceOrder).filter(ServiceOrder.serviceDesc == ’好的’).all() # 改 更新數據 # 數據更新,將值為Mack的serviceDesc修改為Danny update_obj = session.query(ServiceOrder).filter(ServiceOrder.serviceDesc == ’Mack’).update({'serviceDesc': 'Danny'}) # 或則 update_objp = session.query(ServiceOrder).filter(ServiceOrder.serviceDesc == ’Mack’).first() update_objp.serviceDesc = ’Danny’ session.commit() # 刪除 update_objk = session.query(ServiceOrder).filter(ServiceOrder.serviceDesc == ’Mack’).delete() # 或則 update_objkp = session.query(ServiceOrder).filter(ServiceOrder.serviceDesc == ’Mack’).one() update_objkp.delete() session.commit() session.close()
以上就是本文的全部內容,希望對大家的學習有所幫助,也希望大家多多支持好吧啦網。
相關文章: