一、框架菜单
1.1 common模块
1.2 其他
二、Excel接口测试案例编写
三、读取Excel测试封装(核心封装)
excel_utils.py 读取Excel中的数据
import os import xlrd #内置模块、第三方模块pip install 自定义模块 class ExcelUtils(): def __init__(self,file_path,sheet_name): self.file_path = file_path self.sheet_name = sheet_name self.sheet = self.get_sheet() # 整个表格对象 def get_sheet(self): wb = xlrd.open_workbook(self.file_path) sheet = wb.sheet_by_name(self.sheet_name) return sheet def get_row_count(self): row_count = self.sheet.nrows return row_count def get_col_count(self): col_count = self.sheet.ncols return col_count def __get_cell_value(self,row_index, col_index): cell_value = self.sheet.cell_value(row_index,col_index) return cell_value def get_merged_info(self): merged_info = self.sheet.merged_cells return merged_info def get_merged_cell_value(self,row_index, col_index): \"\"\"既能获取普通单元格的数据又能获取合并单元格数据\"\"\" cell_value = None for (rlow, rhigh, clow, chigh) in self.get_merged_info(): if (row_index >= rlow and row_index < rhigh): if (col_index >= clow and col_index < chigh): cell_value = self.__get_cell_value(rlow, clow) break; # 防止循环去进行判断出现值覆盖的情况 else: cell_value = self.__get_cell_value(row_index, col_index) else: cell_value = self.__get_cell_value(row_index, col_index) return cell_value def get_sheet_data_by_dict(self): all_data_list = [] first_row = self.sheet.row(0) #获取首行数据 for row in range(1, self.get_row_count()): row_dict = {} for col in range(0, self.get_col_count()): row_dict[first_row[col].value] = self.get_merged_cell_value(row, col) all_data_list.append(row_dict) return all_data_list if __name__==\'__main__\': current_path = os.path.dirname(__file__) excel_path = os.path.join( current_path,\'..\',\'samples/data/test_case.xlsx\' ) excelUtils = ExcelUtils(excel_path,\"Sheet1\") for row in excelUtils.get_sheet_data_by_dict(): print( row )
import os from common.excel_utils import ExcelUtils from common.config_utils import config current_path = os.path.dirname(__file__) test_data_path = os.path.join( current_path,\'..\', config.CASE_DATA_PATH ) class TestdataUtils(): def __init__(self,test_data_path = test_data_path): self.test_data_path = test_data_path self.test_data = ExcelUtils(test_data_path,\"Sheet1\").get_sheet_data_by_dict() self.test_data_by_mysql = SqlUtils().get_mysql_test_case_info() def __get_testcase_data_dict(self): testcase_dict = {} for row_data in self.test_data: testcase_dict.setdefault( row_data[\'测试用例编号\'],[] ).append( row_data ) return testcase_dict def def_testcase_data_list(self): testcase_list = [] for k,v in self.__get_testcase_data_dict().items(): one_case_dict = {} one_case_dict[\"case_id\"] = k one_case_dict[\"case_info\"] = v testcase_list.append( one_case_dict ) return testcase_list if __name__==\"__main__\": testdataUtils = TestdataUtils() for i in testdataUtils.def_testcase_data_list(): print( i )
testdata_utils.py 读取Excel中的数据后处理成需要的数据
四、request封装(核心封装)
requests_utils.py 包含post请求,get请求,异常,调用断言
import ast import re import requests import jsonpath from requests.exceptions import RequestException from requests.exceptions import ProxyError from requests.exceptions import ConnectionError from common.config_utils import config from common.check_utils import CheckUtils class RequestsUtils(): def __init__(self): self.hosts = config.hosts self.headers = {\"ContentType\":\"application/json;charset=utf-8\"} self.session = requests.session() self.temp_variables = {} def __get(self,get_info): try: url = self.hosts + get_info[\"请求地址\"] response = self.session.get( url = url, params = ast.literal_eval(get_info[\"请求参数(get)\"]) ) response.encoding = response.apparent_encoding if get_info[\"取值方式\"] == \"json取值\": value = jsonpath.jsonpath( response.json(),get_info[\"取值代码\"] )[0] self.temp_variables[ get_info[\"传值变量\"] ] = value elif get_info[\"取值方式\"] == \"正则取值\": value = re.findall(get_info[\"取值代码\"],response.text)[0] self.temp_variables[get_info[\"传值变量\"]] = value result = CheckUtils(response).run_check(get_info[\'期望结果类型\'], get_info[\'期望结果\']) except ProxyError as e: result = {\'code\': 4, \'result\': \'[%s]请求:代理错误异常\' % (get_info[\"接口名称\"])} except ConnectionError as e: result = {\'code\': 4, \'result\': \'[%s]请求:连接超时异常\' % (get_info[\"接口名称\"])} except RequestException as e: result = {\'code\': 4, \'result\': \'[%s]请求:Request异常,原因:%s\' % (get_info[\"接口名称\"], e.__str__())} except Exception as e: result = {\'code\':4,\'result\':\'[%s]请求:系统异常,原因:%s\'%(get_info[\"接口名称\"],e.__str__())} return result def __post(self,post_info): try: url = self.hosts + post_info[\"请求地址\"] response = self.session.post( url = url, headers = self.headers, params = ast.literal_eval(post_info[\"请求参数(get)\"]), # data = post_infos[\"提交数据(post)\"], json=ast.literal_eval(post_info[\"提交数据(post)\"]) ) response.encoding = response.apparent_encoding if post_info[\"取值方式\"] == \"json取值\": value = jsonpath.jsonpath( response.json(),post_info[\"取值代码\"] )[0] self.temp_variables[ post_info[\"传值变量\"] ] = value elif post_info[\"取值方式\"] == \"正则取值\": value = re.findall(post_info[\"取值代码\"],response.text)[0] self.temp_variables[post_info[\"传值变量\"]] = value #调用CheckUtils() result = CheckUtils(response).run_check(post_info[\'期望结果类型\'],post_info[\'期望结果\']) except ProxyError as e: result = {\'code\': 4, \'result\': \'[%s]请求:代理错误异常\' % (post_info[\"接口名称\"])} except ConnectionError as e: result = {\'code\': 4, \'result\': \'[%s]请求:连接超时异常\' % (post_info[\"接口名称\"])} except RequestException as e: result = {\'code\': 4, \'result\': \'[%s]请求:Request异常,原因:%s\' % (post_info[\"接口名称\"], e.__str__())} except Exception as e: result = {\'code\':4,\'result\':\'[%s]请求:系统异常,原因:%s\'%(post_info[\"接口名称\"],e.__str__())} return result def request(self,step_info): try: request_type = step_info[\"请求方式\"] param_variable_list = re.findall(\'\\\\${\\w+}\', step_info[\"请求参数(get)\"]) if param_variable_list: for param_variable in param_variable_list: step_info[\"请求参数(get)\"] = step_info[\"请求参数(get)\"]\\ .replace(param_variable,\'\"%s\"\' % self.temp_variables.get(param_variable[2:-1])) if request_type == \"get\": result = self.__get( step_info ) elif request_type == \"post\": data_variable_list = re.findall(\'\\\\${\\w+}\', step_info[\"提交数据(post)\"]) if data_variable_list: for param_variable in data_variable_list: step_info[\"提交数据(post)\"] = step_info[\"提交数据(post)\"] \\ .replace(param_variable, \'\"%s\"\' % self.temp_variables.get(param_variable[2:-1])) result = self.__post( step_info ) else: result = {\'code\':1,\'result\':\'请求方式不支持\'} except Exception as e: result = {\'code\':4,\'result\':\'用例编号[%s]的[%s]步骤出现系统异常,原因:%s\'%(step_info[\'测试用例编号\'],step_info[\"测试用例步骤\"],e.__str__())} return result def request_by_step(self,step_infos): self.temp_variables = {} for step_info in step_infos: temp_result = self.request( step_info ) # print( temp_result ) if temp_result[\'code\']!=0: break return temp_result if __name__==\"__main__\": case_info = [ {\'请求方式\': \'get\', \'请求地址\': \'/cgi-bin/token\', \'请求参数(get)\': \'{\"grant_type\":\"client_credential\",\"appid\":\"wxXXXXXxc16\",\"secret\":\"XXXXXXXX\"}\', \'提交数据(post)\': \'\', \'取值方式\': \'json取值\', \'传值变量\': \'token\', \'取值代码\': \'$.access_token\', \'期望结果类型\': \'正则匹配\', \'期望结果\': \'{\"access_token\":\"(.+?)\",\"expires_in\":(.+?)}\'}, {\'请求方式\': \'post\', \'请求地址\': \'/cgi-bin/tags/create\', \'请求参数(get)\': \'{\"access_token\":${token}}\', \'提交数据(post)\': \'{\"tag\" : {\"name\" : \"衡东\"}}\',\'取值方式\': \'无\', \'传值变量\': \'\', \'取值代码\': \'\', \'期望结果类型\': \'正则匹配\', \'期望结果\': \'{\"tag\":{\"id\":(.+?),\"name\":\"衡东\"}}\'} ] RequestsUtils().request_by_step(case_info)
五、断言封装(核心封装)
check_utils.py 断言封装,与实际结果核对
import re import ast class CheckUtils(): def __init__(self,check_response=None): self.ck_response=check_response self.ck_rules = { \'无\': self.no_check, \'json键是否存在\': self.check_key, \'json键值对\': self.check_keyvalue, \'正则匹配\': self.check_regexp } self.pass_result = { \'code\': 0, \'response_reason\': self.ck_response.reason, \'response_code\': self.ck_response.status_code, \'response_headers\': self.ck_response.headers, \'response_body\': self.ck_response.text, \'check_result\': True, \'message\': \'\' # 扩招作为日志输出等 } self.fail_result = { \'code\': 2, \'response_reason\': self.ck_response.reason, \'response_code\': self.ck_response.status_code, \'response_headers\': self.ck_response.headers, \'response_body\': self.ck_response.text, \'check_result\': False, \'message\': \'\' # 扩招作为日志输出等 } def no_check(self): return self.pass_result def check_key(self,check_data=None): check_data_list = check_data.split(\',\') #把需要判断的值做切割,取出键值 res_list = [] #存放每次比较的结果 wrong_key = [] #存放比较失败key for check_data in check_data_list: #把切割的键值和取出响应结果中的所有的键一个一个对比 if check_data in self.ck_response.json().keys(): res_list.append(self.pass_result ) else: res_list.append( self.fail_result ) wrong_key.append(check_data) #把失败的键放进来,便于后续日志输出 # print(res_list) # print(wrong_key) if self.fail_result in res_list: return self.fail_result else: return self.pass_result def check_keyvalue(self,check_data=None): res_list = [] # 存放每次比较的结果 wrong_items = [] # 存放比较失败 items for check_item in ast.literal_eval(check_data).items(): #literal_eval()安全性的把字符串转成字典,items()取出键值对 if check_item in self.ck_response.json().items(): res_list.append( self.pass_result ) else: res_list.append( self.fail_result ) wrong_items.append(check_item) # print( res_list ) # print( wrong_items ) if self.fail_result in res_list: return self.fail_result else: return self.pass_result def check_regexp(self,check_data=None): pattern = re.compile(check_data) if re.findall(pattern=pattern,string=self.ck_response.text): #匹配到了,不为空,为true return self.pass_result else: return self.fail_result def run_check(self,check_type=None,check_data=None): code = self.ck_response.status_code if code == 200: if check_type in self.ck_rules.keys(): result=self.ck_rules[check_type](check_data) return result else: self.fail_result[\'message\'] = \'不支持%s判断方法\'%check_type return self.fail_result else: self.fail_result[\'message\'] = \'请求的响应状态码非%s\'%str(code) return self.fail_result if __name__==\"__main__\": # 检查键是否存在,{\"access_token\":\"hello\",\"expires_\":7200} 设为响应结果,\"access_token,expires_in\" 为检查对象值 CheckUtils({\"access_token\":\"hello\",\"expires_\":7200}).check_key(\"access_token,expires_in\") #检查键值对是否存在 CheckUtils({\"access_token\":\"hello\",\"expires_i\":7200}).check_keyvalue(\'{\"expires_in\": 7200}\') #正则对比 #TURE print(CheckUtils(\'{\"access_token\":\"hello\",\"expires_in\":7200}\').check_regexp(\'\"expires_in\":(.+?)\')) #False print(CheckUtils(\'{\"access_token\":\"hello\",\"expires\":7200}\').check_regexp(\'\"expires_in\":(.+?)\'))
六、api_testcase下的api_test.py 封装
import warnings import unittest import paramunittest from common.testdata_utils import TestdataUtils from common.requests_utils import RequestsUtils #如果是mysql数据源的话切换成 def_testcase_data_list_by_mysql() exccel数据源:def_testcase_data_list() case_infos = TestdataUtils().def_testcase_data_list_by_mysql() @paramunittest.parametrized( *case_infos ) class APITest(paramunittest.ParametrizedTestCase): def setUp(self) -> None: warnings.simplefilter(\'ignore\', ResourceWarning) #不会弹出警告提示 def setParameters(self, case_id, case_info): self.case_id = case_id self.case_info = case_info def test_api_common_function(self): \'\'\'测试描述\'\'\' self._testMethodName = self.case_info[0].get(\"测试用例编号\") self._testMethodDoc = self.case_info[0].get(\"测试用例名称\") actual_result = RequestsUtils().request_by_step(self.case_info) self.assertTrue( actual_result.get(\'check_result\'),actual_result.get(\'message\') ) if __name__ == \'__main__\': unittest.main()
七、common下的log_utils.py 封装
import os import logging import time from common.config_utils import config current_path = os.path.dirname(__file__) log_output_path = os.path.join( current_path,\'..\', config.LOG_PATH ) class LogUtils(): def __init__(self,log_path=log_output_path): self.log_name = os.path.join( log_output_path ,\'ApiTest_%s.log\'%time.strftime(\'%Y_%m_%d\') ) self.logger = logging.getLogger(\"ApiTestLog\") self.logger.setLevel( config.LOG_LEVEL ) console_handler = logging.StreamHandler() # 控制台输出 file_handler = logging.FileHandler(self.log_name,\'a\',encoding=\'utf-8\') # 文件输出 formatter = logging.Formatter(\"%(asctime)s %(name)s %(levelname)s %(message)s\") console_handler.setFormatter(formatter) file_handler.setFormatter(formatter) self.logger.addHandler( console_handler ) self.logger.addHandler( file_handler ) console_handler.close() # 防止打印日志重复 file_handler.close() # 防止打印日志重复 def get_logger(self): return self.logger logger = LogUtils().get_logger() # 防止打印日志重复 if __name__ == \'__main__\': logger.info(\'hello\')
八、common下的config_utils.py的封装
配置文件的编写:
对配置文件的读取封装:
import os import configparser current_path = os.path.dirname(__file__) cfgpath = os.path.join(current_path, \"../conf/local_config.ini\") print(cfgpath) class ConfigUtils: def __init__(self,config_path=cfgpath): self.__conf=configparser.ConfigParser() self.__conf.read(config_path, encoding=\"utf-8\") def read_ini(self,sec,option): value=self.__conf.get(sec,option) return value @property def hosts(self): value=self.read_ini(\'default\',\'hosts\') return value @property def LOG_PATH(self): value = self.read_ini(\'path\', \'LOG_PATH\') return value @property def CASE_DATA_PATH(self): value = self.read_ini(\'path\', \'CASE_DATA_PATH\') return value @property def REPORT_PATH(self): value = self.read_ini(\'path\', \'REPORT_PATH\') return value @property def LOG_LEVEL(self): value = int(self.read_ini(\'log\', \'LOG_LEVEL\')) return value @property def smtp_server(self): smtp_server_value = self.read_ini(\'email\', \'smtp_server\') return smtp_server_value @property def smtp_sender(self): smtp_sender_value = self.read_ini(\'email\', \'smtp_sender\') return smtp_sender_value @property def smtp_password(self): smtp_password_value = self.read_ini(\'email\', \'smtp_password\') return smtp_password_value @property def smtp_receiver(self): smtp_receiver_value = self.read_ini(\'email\', \'smtp_receiver\') return smtp_receiver_value @property def smtp_cc(self): smtp_cc_value = self.read_ini(\'email\', \'smtp_cc\') return smtp_cc_value @property def smtp_subject(self): smtp_subject_value = self.read_ini(\'email\', \'smtp_subject\') return smtp_subject_value config=ConfigUtils() if __name__==\'__main__\': current_path = os.path.dirname(__file__) cfgpath = os.path.join(current_path, \"../conf/local_config.ini\") config_u=ConfigUtils() print(config_u.hosts) print(config_u.LOG_LEVEL)
九、test_runner下的run_case.py 封装
class RunCase(): def __init__(self): self.test_case_path = test_case_path self.report_path = test_report_path self.title = \'P1P2接口自动化测试报告\' self.description = \'自动化接口测试框架\' self.tester = \'测试开发组\' def load_test_suite(self): discover = unittest.defaultTestLoader.discover(start_dir=self.test_case_path, pattern=\'api_test.py\', top_level_dir=self.test_case_path) all_suite = unittest.TestSuite() all_suite.addTest( discover ) return all_suite def run(self): report_dir = HTMLTestReportCN.ReportDirectory(self.report_path) report_dir.create_dir(self.title) report_file_path = HTMLTestReportCN.GlobalMsg.get_value(\'report_path\') fp = open( report_file_path ,\'wb\' ) runner = HTMLTestReportCN.HTMLTestRunner(stream=fp, title=self.title, description=self.description, tester=self.tester) runner.run( self.load_test_suite() ) fp.close() return report_file_path if __name__==\'__main__\': report_path = RunCase().run() EmailUtils(open(report_path, \'rb\').read(), report_path).send_mail()
十、common下的email_utils.py 封装
import os import smtplib from email.mime.text import MIMEText from email.mime.multipart import MIMEMultipart from common.config_utils import config class EmailUtils(): def __init__(self,smtp_body,smtp_attch_path=None): self.smtp_server = config.smtp_server self.smtp_sender = config.smtp_sender self.smtp_password = config.smtp_password self.smtp_receiver = config.smtp_receiver self.smtp_cc = config.smtp_cc self.smtp_subject = config.smtp_subject self.smtp_body = smtp_body self.smtp_attch = smtp_attch_path def mail_message_body(self): message = MIMEMultipart() message[\'from\'] = self.smtp_sender message[\'to\'] = self.smtp_receiver message[\'Cc\'] = self.smtp_cc message[\'subject\'] = self.smtp_subject message.attach( MIMEText(self.smtp_body,\'html\',\'utf-8\') ) if self.smtp_attch: attach_file = MIMEText(open(self.smtp_attch, \'rb\').read(), \'base64\', \'utf-8\') attach_file[\'Content-Type\'] = \'application/octet-stream\' attach_file.add_header(\'Content-Disposition\', \'attachment\', filename=(\'gbk\', \'\', os.path.basename(self.smtp_attch))) message.attach(attach_file) return message def send_mail(self): smtp = smtplib.SMTP() smtp.connect(self.smtp_server) smtp.login(user=self.smtp_sender, password=self.smtp_password) smtp.sendmail(self.smtp_sender,self.smtp_receiver.split(\",\")+ self.smtp_cc.split(\",\"), self.mail_message_body().as_string()) if __name__==\'__main__\': html_path = os.path.dirname(__file__) + \'/../test_reports/接口自动化测试报告V1.1/接口自动化测试报告V1.1.html\' EmailUtils(\'<h3 align=\"center\">自动化测试报告</h3>\',html_path).send_mail()
以上就是本文的全部内容,希望对大家的学习有所帮助,也希望大家多多支持自学编程网。