python+requests接口自動化框架的實現
為什么要做接口自動化框架
1、業務與配置的分離
2、數據與程序的分離;數據的變更不影響程序
3、有日志功能,實現無人值守
4、自動發送測試報告
5、不懂編程的測試人員也可以進行測試
正常接口測試的流程是什么?
確定接口測試使用的工具----->配置需要的接口參數----->進行測試----->檢查測試結果----->生成測試報告
測試的工具:python+requests
接口測試用例:excel
一、接口框架如下:
1、action包:用來存放關鍵字函數
2、config包:用來存放配置文件
3、TestData:用來存放測試數據,excel表
4、Log包:用來存放日志文件
5、utils包:用來存放公共的類
6、運行主程序interface_auto_test.py
7、Readme.txt:告訴團隊組員使用改框架需要注意的地方
二、接口的數據規范設計---Case設計
一個sheet對應數據庫里面一張表
APIsheet存放編號;從1開始接口的名稱(APIName);請求的url(RequestUrl);請求的方法(RequestMethod);傳參的方式(paramsType):post/get請求方法不一樣用例說明(APITestCase)是否執行(Active)部分接口已測通,下次不用測試,直接把這里設置成N,跳過此接口
post與get的區別
查看post詳情
post請求參數一般是json串,參數放在from表單里面;參數一般不可見,相對來說安全性高些
查看get詳情
get請求參數一般直接放在url里面
2.1注冊接口用例
RequestData:請求的數據(開發制定的傳參方式)RelyData:數據依賴ResponseCode:響應codeResponseData:響應數據DataStore:存儲的依賴數據;如果存在數據庫里面,在表里增加一個字段用來存依賴的數據(存儲的方式是編寫接口自動化的人員來設定的存儲方式)CheckPoint:檢查點Active:是否執行Status:執行用例的狀態,方便查看用例是否執行成功ErrorInfo:case運行失敗,失敗的錯誤信息;eg:是也本身的原因還是case設置失敗,還是其他原因
2.2登錄接口用例
RequestData:請求的數據(開發制定的傳參方式)RelyData:數據依賴(存儲的方式是編寫接口自動化的人員來設定的存儲方式)ResponseCode:響應codeResponseData:響應數據DataStore:存儲的依賴數據;如果存在數據庫里面,在表里增加一個字段用來存依賴的數據(存儲的方式是編寫接口自動化的人員來設定的存儲方式)CheckPoint:檢查點Active:是否執行Status:執行用例的狀態,方便查看用例是否執行成功ErrorInfo:case運行失敗,失敗的錯誤信息;eg:是也本身的原因還是case設置失敗,還是其他原因
重點說明下RelyData:數據依賴采取的是字典:key:value來存儲數據格式;{'request':{'username':'register->1','password':'register->1'},'response':{'code':'register->1'}}
格式化之后:
{ 'request':{ 'username':'register->1', 'password':'register->1' }, 'response':{ 'code':'register->1' }}
三、創建utils包:用來存放公共的類
3.1 ParseExcel.py 操作封裝excel的類(ParseExcel.py)
#encoding=utf-8import openpyxlfrom openpyxl.styles import Border, Side, Fontimport timeclass ParseExcel(object): def __init__(self): self.workbook = None self.excelFile = None self.font = Font(color = None) # 設置字體的顏色 # 顏色對應的RGB值 self.RGBDict = {’red’: ’FFFF3030’, ’green’: ’FF008B00’} def loadWorkBook(self, excelPathAndName): # 將excel文件加載到內存,并獲取其workbook對象 try: self.workbook = openpyxl.load_workbook(excelPathAndName) except Exception as err: raise err self.excelFile = excelPathAndName return self.workbook def getSheetByName(self, sheetName): # 根據sheet名獲取該sheet對象 try: # sheet = self.workbook.get_sheet_by_name(sheetName) sheet = self.workbook[sheetName] return sheet except Exception as err: raise err def getSheetByIndex(self, sheetIndex): # 根據sheet的索引號獲取該sheet對象 try: # sheetname = self.workbook.get_sheet_names()[sheetIndex] sheetname = self.workbook.sheetnames[sheetIndex] except Exception as err: raise err # sheet = self.workbook.get_sheet_by_name(sheetname) sheet = self.workbook[sheetname] return sheet def getRowsNumber(self, sheet): # 獲取sheet中有數據區域的結束行號 return sheet.max_row def getColsNumber(self, sheet): # 獲取sheet中有數據區域的結束列號 return sheet.max_column def getStartRowNumber(self, sheet): # 獲取sheet中有數據區域的開始的行號 return sheet.min_row def getStartColNumber(self, sheet): # 獲取sheet中有數據區域的開始的列號 return sheet.min_column def getRow(self, sheet, rowNo): # 獲取sheet中某一行,返回的是這一行所有的數據內容組成的tuple, # 下標從1開始,sheet.rows[1]表示第一行 try: rows = [] for row in sheet.iter_rows():rows.append(row) return rows[rowNo - 1] except Exception as err: raise err def getColumn(self, sheet, colNo): # 獲取sheet中某一列,返回的是這一列所有的數據內容組成tuple, # 下標從1開始,sheet.columns[1]表示第一列 try: cols = [] for col in sheet.iter_cols():cols.append(col) return cols[colNo - 1] except Exception as err: raise err def getCellOfValue(self, sheet, coordinate = None, rowNo = None, colsNo = None): # 根據單元格所在的位置索引獲取該單元格中的值,下標從1開始, # sheet.cell(row = 1, column = 1).value, # 表示excel中第一行第一列的值 if coordinate != None: try:return sheet[coordinate] except Exception as err:raise err elif coordinate is None and rowNo is not None and colsNo is not None: try:return sheet.cell(row = rowNo, column = colsNo).value except Exception as err:raise err else: raise Exception('Insufficient Coordinates of cell !') def getCellOfObject(self, sheet, coordinate = None, rowNo = None, colsNo = None): # 獲取某個單元格的對象,可以根據單元格所在位置的數字索引, # 也可以直接根據excel中單元格的編碼及坐標 # 如getCellObject(sheet, coordinate = ’A1’) or # getCellObject(sheet, rowNo = 1, colsNo = 2) if coordinate != None: try:# return sheet.cell(coordinate = coordinate)return sheet[coordinate] except Exception as err:raise err elif coordinate == None and rowNo is not None and colsNo is not None: try:return sheet.cell(row = rowNo,column = colsNo) except Exception as err:raise err else: raise Exception('Insufficient Coordinates of cell !') def writeCell(self, sheet, content, coordinate = None, rowNo = None, colsNo = None, style = None): #根據單元格在excel中的編碼坐標或者數字索引坐標向單元格中寫入數據, # 下標從1開始,參style表示字體的顏色的名字,比如red,green if coordinate is not None: try:# sheet.cell(coordinate = coordinate).value = contentsheet[coordinate] = contentif style is not None: sheet[coordinate]. font = Font(color = self.RGBDict[style])self.workbook.save(self.excelFile) except Exception as e:raise e elif coordinate == None and rowNo is not None and colsNo is not None: try:sheet.cell(row = rowNo,column = colsNo).value = contentif style: sheet.cell(row = rowNo,column = colsNo). font = Font(color = self.RGBDict[style])self.workbook.save(self.excelFile) except Exception as e:raise e else: raise Exception('Insufficient Coordinates of cell !') def writeCellCurrentTime(self, sheet, coordinate = None,rowNo = None, colsNo = None): # 寫入當前的時間,下標從1開始 now = int(time.time()) #顯示為時間戳 timeArray = time.localtime(now) currentTime = time.strftime('%Y-%m-%d %H:%M:%S', timeArray) if coordinate is not None: try:sheet.cell(coordinate = coordinate).value = currentTimeself.workbook.save(self.excelFile) except Exception as e:raise e elif coordinate == None and rowNo is not None and colsNo is not None: try:sheet.cell(row = rowNo, column = colsNo ).value = currentTimeself.workbook.save(self.excelFile) except Exception as e:raise e else: raise Exception('Insufficient Coordinates of cell !')if __name__ == ’__main__’: # 測試代碼 pe = ParseExcel() pe.loadWorkBook(r’D:ProgramSourceCodePython Source CodeWorkSpaceInterfaceFrame2018inter_test_data.xlsx’) sheetObj = pe.getSheetByName(u'API') print('通過名稱獲取sheet對象的名字:', sheetObj.title) # print help(sheetObj.rows) print('通過index序號獲取sheet對象的名字:', pe.getSheetByIndex(0).title) sheet = pe.getSheetByIndex(0) print(type(sheet)) print(pe.getRowsNumber(sheet)) #獲取最大行號 print(pe.getColsNumber(sheet)) #獲取最大列號 rows = pe.getRow(sheet, 1) #獲取第一行 for i in rows: print(i.value) # # 獲取第一行第一列單元格內容 # print pe.getCellOfValue(sheet, rowNo = 1, colsNo = 1) # pe.writeCell(sheet, u’我愛祖國’, rowNo = 10, colsNo = 10) # pe.writeCellCurrentTime(sheet, rowNo = 10, colsNo = 11)
3.2 封裝get/post請求(HttpClient.py)
import requestsimport jsonclass HttpClient(object): def __init__(self): pass def request(self, requestMethod, requestUrl, paramsType,requestData, headers =None, **kwargs): if requestMethod == 'post': print('---', requestData, type(requestData)) if paramsType == 'form':response = self.__post(url = requestUrl, data = json.dumps(eval(requestData)), headers = headers, **kwargs)return response elif paramsType == 'json':response = self.__post(url = requestUrl, json = json.dumps(eval(requestData)), headers = headers, **kwargs)return response elif requestMethod == 'get': request_url = requestUrl if paramsType == 'url':request_url = '%s%s' %(requestUrl, requestData) response = self.__get(url = request_url, params = requestData, **kwargs) return response def __post(self, url, data = None, json = None, headers=None,**kwargs): print('----') response = requests.post(url=url, data = data, json=json, headers=headers) return response def __get(self, url, params = None, **kwargs): response = requests.get(url, params = params, **kwargs) return responseif __name__ == '__main__': hc = HttpClient() res = hc.request('get', 'http://39.106.41.11:8080/getBlogContent/', 'url',’2’) print(res.json())
3.3 封裝MD5(md5_encrypt)
import hashlibdef md5_encrypt(text): m5 = hashlib.md5() m5.update(text.encode('utf-8')) value = m5.hexdigest() return valueif __name__ == '__main__': print(md5_encrypt('sfwe'))
3.4 封裝Log
import loggingimport logging.configfrom config.public_data import baseDir# 讀取日志配置文件logging.config.fileConfig(baseDir + 'configLogger.conf')# 選擇一個日志格式logger = logging.getLogger('example02')#或者example01def debug(message): # 定義dubug級別日志打印方法 logger.debug(message)def info(message): # 定義info級別日志打印方法 logger.info(message)def warning(message): # 定義warning級別日志打印方法 logger.warning(message)
3.5 封裝發送Email類
import smtplibfrom email.mime.text import MIMETextfrom email.mime.multipart import MIMEMultipartfrom email.header import Headerfrom ProjVar.var import *import osimport smtplibfrom email import encodersfrom email.mime.base import MIMEBasefrom email.mime.text import MIMETextfrom email.mime.multipart import MIMEMultipartfrom email.header import Headerfrom email.utils import formataddrdef send_mail(): mail_host='smtp.126.com' #設置服務器 mail_user='testman1980' #用戶名 mail_pass='wulaoshi1980' #口令 sender = ’testman1980@126.com’ receivers = [’2055739@qq.com’,'testman1980@126.com'] # 接收郵件,可設置為你的QQ郵箱或者其他郵箱 # 創建一個帶附件的實例 message = MIMEMultipart() message[’From’] = formataddr(['光榮之路吳老師', 'testman1980@126.com']) message[’To’] = ’,’.join(receivers) subject = ’自動化測試執行報告’ message[’Subject’] = Header(subject, ’utf-8’) message['Accept-Language']='zh-CN' message['Accept-Charset']='ISO-8859-1,utf-8,gbk' # 郵件正文內容 message.attach(MIMEText(’最新執行的自動化測試報告,請參閱附件內容!’, ’plain’, ’utf-8’)) # 構造附件1,傳送測試結果的excel文件 att = MIMEBase(’application’, ’octet-stream’) att.set_payload(open(ProjDirPath+'testdatatestdata.xlsx', ’rb’).read()) att.add_header(’Content-Disposition’, ’attachment’, filename=(’gbk’, ’’, '自動化測試報告.xlsx')) encoders.encode_base64(att) message.attach(att) ''' # 構造附件2,傳送當前目錄下的 runoob.txt 文件 att2 = MIMEText(open(’e:a.py’,’rb’).read(), ’base64’, ’utf-8’) att2['Content-Type'] = ’application/octet-stream’ att2['Content-Disposition'] = ’attachment; filename='a.py'’ message.attach(att2) ''' try: smtpObj = smtplib.SMTP(mail_host) smtpObj.login(mail_user, mail_pass) smtpObj.sendmail(sender, receivers, message.as_string()) print('郵件發送成功') except smtplib.SMTPException as e: print('Error: 無法發送郵件', e)if __name__ == '__main__': send_mail()
四、 創建config包 用來存放公共的參數、配置文件、長時間不變的變量值
創建public_data.py
import os# 整個項目的根目錄絕對路勁baseDir = os.path.dirname(os.path.dirname(__file__))# 獲取測試數據文件的絕對路徑file_path = baseDir + '/TestData/inter_test_data.xlsx'API_apiName = 2API_requestUrl = 3API_requestMothod = 4API_paramsType = 5API_apiTestCaseFileName = 6API_active = 7CASE_requestData = 1CASE_relyData = 2CASE_responseCode = 3CASE_responseData = 4CASE_dataStore = 5CASE_checkPoint = 6CASE_active = 7CASE_status = 8CASE_errorInfo = 9# 存儲請求參數里面依賴的數據REQUEST_DATA = {}# 存儲響應對象中的依賴數據RESPONSE_DATA = {}if __name__=='__main__': print(file_path) print(baseDir)
五、創建TestData目錄,用來存放測試文件
inter_test_data.xlsx
六、創建action包,用來存放關鍵字函數
6.1 解決數據依賴 (GetRely.py)
from config.public_data import REQUEST_DATA, RESPONSE_DATAfrom utils.md5_encrypt import md5_encryptREQUEST_DATA = {'用戶注冊':{'1':{'username':'zhangsan', 'password':'dfsdf23'}, 'headers':{'cookie':'asdfwerw'}}}RESPONSE_DATA = {'用戶注冊':{'1':{'code':'00'}, 'headers':{'age':2342}}}class GetRely(object): def __init__(self): pass @classmethod def get(self, dataSource, relyData, headSource = {}): print(type(dataSource)) print(dataSource) data = dataSource.copy() for key, value in relyData.items(): if key == 'request':#說明應該去REQUEST_DATA中獲取for k, v in value.items(): interfaceName, case_idx = v.split('->') val = REQUEST_DATA[interfaceName][case_idx][k] if k == 'password': data[k] = md5_encrypt(val) else: data[k] = val elif key == 'response':# 應該去RESPONSE_DATA中獲取for k, v in value.items(): interfaceName, case_idx = v.split('->') data[k] = RESPONSE_DATA[interfaceName][case_idx][k] elif key == 'headers':if headSource: for key, value in value.items(): if key == 'request': for k, v in value.items():for i in v: headSource[i] = REQUEST_DATA[k]['headers'][i] elif key == 'response': for i, val in value.items():for j in val: headSource[j] = RESPONSE_DATA[i]['headers'][j] return '%s' %dataif __name__ == '__main__': s = {'username': '', 'password': '','code':''} h = {'cookie':'123', 'age':332} rely = {'request': {'username': '用戶注冊->1', 'password': '用戶注冊->1'}, 'response':{'code':'用戶注冊->1'}, 'headers':{'request':{'用戶注冊':['cookie']},'response':{'用戶注冊':['age']}} } print(GetRely.get(s, rely, h))
6.2 解決數據存儲(RelyDataStore.py)
from config.public_data import RESPONSE_DATA, REQUEST_DATAclass RelyDataStore(object): def __init__(self): pass @classmethod def do(cls, storePoint, apiName, caseId, request_source = {}, response_source = {}, req_headers={}, res_headers = {}): for key, value in storePoint.items(): if key == 'request':# 說明需要存儲的依賴數據來自請求參數,應該將數據存儲到REQUEST_DATAfor i in value: if i in request_source: val = request_source[i] if apiName not in REQUEST_DATA: # 說明存儲數據的結構還未生成,需要指明數據存儲結構 REQUEST_DATA[apiName]={str(caseId): {i: val}} else: #說明存儲數據結構中最外層結構已存在 if str(caseId) in REQUEST_DATA[apiName]:REQUEST_DATA[apiName][str(caseId)][i] = val else:# 說明內層結構不完整,需要指明完整的結構REQUEST_DATA[apiName][str(caseId)] = {i: val} else: print('請求參數中不存在字段' + i) elif key == 'response':#說明需要存儲的依賴數據來自接口的響應body,應該將數據存儲到RESPONSE_DATAfor j in value: if j in response_source: val = response_source[j] if apiName not in RESPONSE_DATA: # 說明存儲數據的結構還未生成,需要指明數據存儲結構 RESPONSE_DATA[apiName]={str(caseId): {j: val}} else: #說明存儲數據結構中最外層結構已存在 if str(caseId) in RESPONSE_DATA[apiName]:RESPONSE_DATA[apiName][str(caseId)][j] = val else:# 說明內層結構不完整,需要指明完整的結構RESPONSE_DATA[apiName][str(caseId)] = {j: val} else: print('接口的響應body中不存在字段' + j) elif key == 'headers':for k, v in value.items(): if k == 'request': # 說明需要往REQUEST_DATA變量中寫入存儲數據 for item in v: if item in req_headers:header = req_headers[item]if 'headers' in REQUEST_DATA[apiName]: REQUEST_DATA[apiName]['headers'][item] = headerelse: REQUEST_DATA[apiName]['headers'] = {item: header} elif k == 'response': # 說明需要往RESPONSE_DATA變量中寫入存儲數據 for it in v: if it in res_headers:header = res_headers[it]if 'headers' in RESPONSE_DATA[apiName]: RESPONSE_DATA[apiName]['headers'][it] = headerelse: RESPONSE_DATA[apiName]['headers'] = {item: header} print(REQUEST_DATA) print(RESPONSE_DATA)if __name__ == '__main__': r = {'username': 'srwcx01', 'password': 'wcx123wac1', 'email': 'wcx@qq.com'} req_h = {'cookie':'csdfw23'} res_h = {'age':597232} s = {'request': ['username', 'password'], 'response': ['userid'],'headers':{'request':['cookie'], 'response':['age']}} res = {'userid': 12, 'code': '00'} RelyDataStore.do(s, 'register', 1, r, res, req_headers=req_h, res_headers=res_h) print(REQUEST_DATA) print(RESPONSE_DATA)
6.3 校驗數據結果(CheckResult.py)
import reclass CheckResult(object): def __init__(self): pass @classmethod def check(self, responseObj, checkPoint): responseBody = responseObj.json() # responseBody = {'code': '', 'userid': 12, 'id': '12'} errorKey = {} for key, value in checkPoint.items(): if key in responseBody:if isinstance(value, (str, int)): # 等值校驗 if responseBody[key] != value: errorKey[key] = responseBody[key]elif isinstance(value, dict): sourceData = responseBody[key] if 'value' in value: # 模糊匹配校驗 regStr = value['value'] rg = re.match(regStr, '%s' %sourceData) if not rg: errorKey[key] = sourceData elif 'type' in value: # 數據類型校驗 typeS = value['type'] if typeS == 'N': # 說明是整形校驗 if not isinstance(sourceData, int):errorKey[key] = sourceData else:errorKey[key] = '[%s] not exist' %key return errorKeyif __name__ == '__main__': r = {'code': '00', 'userid': 12, 'id': 12} c = {'code': '00', 'userid': {'type': 'N'}, 'id': {'value': 'd+'}} print(CheckResult.check(r, c))
6.4 往excel里面寫結果
from config.public_data import *def write_result(wbObj, sheetObj, responseData, errorKey, rowNum): try: # 寫響應body wbObj.writeCell(sheetObj, content='%s' %responseData, rowNo = rowNum, colsNo=CASE_responseData) # 寫校驗結果狀態及錯誤信息 if errorKey: wbObj.writeCell(sheetObj, content='%s' %errorKey, rowNo=rowNum, colsNo=CASE_errorInfo) wbObj.writeCell(sheetObj, content='faild', rowNo=rowNum, colsNo=CASE_status, style='red') else: wbObj.writeCell(sheetObj, content='pass', rowNo=rowNum, colsNo=CASE_status, style='green') except Exception as err: raise err
七、創建Log目錄用來存放日志
八、主函數
#encoding=utf-8import requestsimport jsonfrom action.get_rely import GetRelyfrom config.public_data import *from utils.ParseExcel import ParseExcelfrom utils.HttpClient import HttpClientfrom action.data_store import RelyDataStorefrom action.check_result import CheckResultfrom action.write_result import write_resultfrom utils.Log import *def main(): parseE = ParseExcel() parseE.loadWorkBook(file_path) sheetObj = parseE.getSheetByName('API') activeList = parseE.getColumn(sheetObj, API_active) for idx, cell in enumerate(activeList[1:], 2): if cell.value == 'y': #需要被執行 RowObj = parseE.getRow(sheetObj, idx) apiName = RowObj[API_apiName -1].value requestUrl = RowObj[API_requestUrl - 1].value requestMethod = RowObj[API_requestMothod - 1].value paramsType = RowObj[API_paramsType - 1].value apiTestCaseFileName = RowObj[API_apiTestCaseFileName - 1].value # 下一步讀取用例sheet表,準備執行測試用例 caseSheetObj = parseE.getSheetByName(apiTestCaseFileName) caseActiveObj = parseE.getColumn(caseSheetObj, CASE_active) for c_idx, col in enumerate(caseActiveObj[1:], 2):if col.value == 'y': #需要執行的用例 caseRowObj = parseE.getRow(caseSheetObj, c_idx) requestData = caseRowObj[CASE_requestData - 1].value relyData = caseRowObj[CASE_relyData - 1].value responseCode = caseRowObj[CASE_responseCode - 1].value responseData = caseRowObj[CASE_responseData - 1].value dataStore = caseRowObj[CASE_dataStore -1].value checkPoint = caseRowObj[CASE_checkPoint - 1].value #發送接口請求之前需要做一下數據依賴的處理 if relyData: logging.info('處理第%s個接口的第%s條用例的數據依賴!') requestData = GetRely.get(eval(requestData), eval(relyData)) httpC = HttpClient() response = httpC.request(requestMethod=requestMethod, requestData=requestData, requestUrl=requestUrl, paramsType=paramsType ) # 獲取到響應結果后,接下來進行數據依賴存儲邏輯實現 if response.status_code == 200: responseData = response.json() # 進行依賴數據存儲 if dataStore: RelyDataStore.do(eval(dataStore), apiName, c_idx - 1, eval(requestData), responseData) # 接下來就是校驗結果 else: logging.info('接口【%s】的第【%s】條用例,不需要進行依賴數據存儲!' %(apiName, c_idx)) if checkPoint: errorKey = CheckResult.check(response, eval(checkPoint)) write_result(parseE, caseSheetObj, responseData, errorKey, c_idx) else: logging.info('接口【%s】的第【%s】條用例,執行失敗,接口協議code非200!' %(apiName, c_idx))else: logging.info('第%s個接口的第%s條用例,被忽略執行!' %(idx -1, c_idx-1)) else: logging.info('第%s行的接口被忽略執行!' %(idx -1))if __name__=='__main__': main()
框架待完善~~請各路神仙多多指教~~
到此這篇關于python+requests接口自動化框架的實現的文章就介紹到這了,更多相關python requests接口自動化內容請搜索好吧啦網以前的文章或繼續瀏覽下面的相關文章希望大家以后多多支持好吧啦網!
相關文章: