Python reques接口测试框架实现代码

2020-10-08 0 259

一、框架菜单

Python reques接口测试框架实现代码

1.1 common模块

Python reques接口测试框架实现代码

1.2 其他

Python reques接口测试框架实现代码

二、Excel接口测试案例编写

Python reques接口测试框架实现代码

Python reques接口测试框架实现代码

三、读取Excel测试封装(核心封装)

excel_utils.py 读取Excel中的数据

import os
import xlrd  #内置模块、第三方模块pip install 自定义模块


class ExcelUtils():
  def __init__(self,file_path,sheet_name):
    self.file_path = file_path
    self.sheet_name = sheet_name
    self.sheet = self.get_sheet() # 整个表格对象

  def get_sheet(self):
    wb = xlrd.open_workbook(self.file_path)
    sheet = wb.sheet_by_name(self.sheet_name)
    return sheet

  def get_row_count(self):
    row_count = self.sheet.nrows
    return row_count

  def get_col_count(self):
    col_count = self.sheet.ncols
    return col_count

  def __get_cell_value(self,row_index, col_index):
    cell_value = self.sheet.cell_value(row_index,col_index)
    return cell_value

  def get_merged_info(self):
    merged_info = self.sheet.merged_cells
    return merged_info

  def get_merged_cell_value(self,row_index, col_index):
    \"\"\"既能获取普通单元格的数据又能获取合并单元格数据\"\"\"
    cell_value = None
    for (rlow, rhigh, clow, chigh) in self.get_merged_info():
      if (row_index >= rlow and row_index < rhigh):
        if (col_index >= clow and col_index < chigh):
          cell_value = self.__get_cell_value(rlow, clow)
          break; # 防止循环去进行判断出现值覆盖的情况
        else:
          cell_value = self.__get_cell_value(row_index, col_index)
      else:
        cell_value = self.__get_cell_value(row_index, col_index)
    return cell_value

  def get_sheet_data_by_dict(self):
    all_data_list = []
    first_row = self.sheet.row(0) #获取首行数据
    for row in range(1, self.get_row_count()):
      row_dict = {}
      for col in range(0, self.get_col_count()):
        row_dict[first_row[col].value] = self.get_merged_cell_value(row, col)
      all_data_list.append(row_dict)
    return all_data_list

if __name__==\'__main__\':
  current_path = os.path.dirname(__file__)
  excel_path = os.path.join( current_path,\'..\',\'samples/data/test_case.xlsx\' )
  excelUtils = ExcelUtils(excel_path,\"Sheet1\")
  for row in excelUtils.get_sheet_data_by_dict():
    print( row )
import os
from common.excel_utils import ExcelUtils
from common.config_utils import config

current_path = os.path.dirname(__file__)
test_data_path = os.path.join( current_path,\'..\', config.CASE_DATA_PATH )

class TestdataUtils():
  def __init__(self,test_data_path = test_data_path):
    self.test_data_path = test_data_path
    self.test_data = ExcelUtils(test_data_path,\"Sheet1\").get_sheet_data_by_dict()
    self.test_data_by_mysql = SqlUtils().get_mysql_test_case_info()


  def __get_testcase_data_dict(self):
    testcase_dict = {}
    for row_data in self.test_data:
      testcase_dict.setdefault( row_data[\'测试用例编号\'],[] ).append( row_data )
    return testcase_dict

  def def_testcase_data_list(self):
    testcase_list = []
    for k,v in self.__get_testcase_data_dict().items():
      one_case_dict = {}
      one_case_dict[\"case_id\"] = k
      one_case_dict[\"case_info\"] = v
      testcase_list.append( one_case_dict )
    return testcase_list


if __name__==\"__main__\":
  testdataUtils = TestdataUtils()
  for i in testdataUtils.def_testcase_data_list():
    print( i )

testdata_utils.py 读取Excel中的数据后处理成需要的数据

四、request封装(核心封装)

requests_utils.py 包含post请求,get请求,异常,调用断言

import ast
import re
import requests
import jsonpath
from requests.exceptions import RequestException
from requests.exceptions import ProxyError
from requests.exceptions import ConnectionError
from common.config_utils import config
from common.check_utils import CheckUtils

class RequestsUtils():
  def __init__(self):
    self.hosts = config.hosts
    self.headers = {\"ContentType\":\"application/json;charset=utf-8\"}
    self.session = requests.session()
    self.temp_variables = {}

  def __get(self,get_info):
    try:
      url = self.hosts + get_info[\"请求地址\"]
      response = self.session.get( url = url,
                     params = ast.literal_eval(get_info[\"请求参数(get)\"])
                     )
      response.encoding = response.apparent_encoding
      if get_info[\"取值方式\"] == \"json取值\":
        value = jsonpath.jsonpath( response.json(),get_info[\"取值代码\"] )[0]
        self.temp_variables[ get_info[\"传值变量\"] ] = value
      elif get_info[\"取值方式\"] == \"正则取值\":
        value = re.findall(get_info[\"取值代码\"],response.text)[0]
        self.temp_variables[get_info[\"传值变量\"]] = value
      result = CheckUtils(response).run_check(get_info[\'期望结果类型\'], get_info[\'期望结果\'])
    except ProxyError as e:
      result = {\'code\': 4, \'result\': \'[%s]请求:代理错误异常\' % (get_info[\"接口名称\"])}
    except ConnectionError as e:
      result = {\'code\': 4, \'result\': \'[%s]请求:连接超时异常\' % (get_info[\"接口名称\"])}
    except RequestException as e:
      result = {\'code\': 4, \'result\': \'[%s]请求:Request异常,原因:%s\' % (get_info[\"接口名称\"], e.__str__())}
    except Exception as e:
      result = {\'code\':4,\'result\':\'[%s]请求:系统异常,原因:%s\'%(get_info[\"接口名称\"],e.__str__())}
    return result

  def __post(self,post_info):
    try:
      url = self.hosts + post_info[\"请求地址\"]
      response = self.session.post( url = url,
                     headers = self.headers,
                     params = ast.literal_eval(post_info[\"请求参数(get)\"]),
                    # data = post_infos[\"提交数据(post)\"],
                     json=ast.literal_eval(post_info[\"提交数据(post)\"])
                    )
      response.encoding = response.apparent_encoding
      if post_info[\"取值方式\"] == \"json取值\":
        value = jsonpath.jsonpath( response.json(),post_info[\"取值代码\"] )[0]
        self.temp_variables[ post_info[\"传值变量\"] ] = value
      elif post_info[\"取值方式\"] == \"正则取值\":
        value = re.findall(post_info[\"取值代码\"],response.text)[0]
        self.temp_variables[post_info[\"传值变量\"]] = value
      #调用CheckUtils()
      result = CheckUtils(response).run_check(post_info[\'期望结果类型\'],post_info[\'期望结果\'])
    except ProxyError as e:
      result = {\'code\': 4, \'result\': \'[%s]请求:代理错误异常\' % (post_info[\"接口名称\"])}
    except ConnectionError as e:
      result = {\'code\': 4, \'result\': \'[%s]请求:连接超时异常\' % (post_info[\"接口名称\"])}
    except RequestException as e:
      result = {\'code\': 4, \'result\': \'[%s]请求:Request异常,原因:%s\' % (post_info[\"接口名称\"], e.__str__())}
    except Exception as e:
      result = {\'code\':4,\'result\':\'[%s]请求:系统异常,原因:%s\'%(post_info[\"接口名称\"],e.__str__())}
    return result

  def request(self,step_info):
    try:
      request_type = step_info[\"请求方式\"]
      param_variable_list = re.findall(\'\\\\${\\w+}\', step_info[\"请求参数(get)\"])
      if param_variable_list:
        for param_variable in param_variable_list:
          step_info[\"请求参数(get)\"] = step_info[\"请求参数(get)\"]\\
            .replace(param_variable,\'\"%s\"\' % self.temp_variables.get(param_variable[2:-1]))
      if request_type == \"get\":
        result = self.__get( step_info )
      elif request_type == \"post\":
        data_variable_list = re.findall(\'\\\\${\\w+}\', step_info[\"提交数据(post)\"])
        if data_variable_list:
          for param_variable in data_variable_list:
            step_info[\"提交数据(post)\"] = step_info[\"提交数据(post)\"] \\
              .replace(param_variable, \'\"%s\"\' % self.temp_variables.get(param_variable[2:-1]))
        result = self.__post( step_info )
      else:
        result = {\'code\':1,\'result\':\'请求方式不支持\'}
    except Exception as e:
      result = {\'code\':4,\'result\':\'用例编号[%s]的[%s]步骤出现系统异常,原因:%s\'%(step_info[\'测试用例编号\'],step_info[\"测试用例步骤\"],e.__str__())}
    return result

  def request_by_step(self,step_infos):
    self.temp_variables = {}
    for step_info in step_infos:
      temp_result = self.request( step_info )
      # print( temp_result )
      if temp_result[\'code\']!=0:
        break
    return temp_result


if __name__==\"__main__\":

  case_info = [
    {\'请求方式\': \'get\', \'请求地址\': \'/cgi-bin/token\', \'请求参数(get)\': \'{\"grant_type\":\"client_credential\",\"appid\":\"wxXXXXXxc16\",\"secret\":\"XXXXXXXX\"}\', \'提交数据(post)\': \'\', \'取值方式\': \'json取值\', \'传值变量\': \'token\', \'取值代码\': \'$.access_token\', \'期望结果类型\': \'正则匹配\', \'期望结果\': \'{\"access_token\":\"(.+?)\",\"expires_in\":(.+?)}\'},
    {\'请求方式\': \'post\', \'请求地址\': \'/cgi-bin/tags/create\', \'请求参数(get)\': \'{\"access_token\":${token}}\', \'提交数据(post)\': \'{\"tag\" : {\"name\" : \"衡东\"}}\',\'取值方式\': \'无\', \'传值变量\': \'\', \'取值代码\': \'\', \'期望结果类型\': \'正则匹配\', \'期望结果\': \'{\"tag\":{\"id\":(.+?),\"name\":\"衡东\"}}\'}
  ]
  RequestsUtils().request_by_step(case_info)

五、断言封装(核心封装)

check_utils.py 断言封装,与实际结果核对

import re
import ast

class CheckUtils():
  def __init__(self,check_response=None):
    self.ck_response=check_response
    self.ck_rules = {
      \'无\': self.no_check,
      \'json键是否存在\': self.check_key,
      \'json键值对\': self.check_keyvalue,
      \'正则匹配\': self.check_regexp
    }
    self.pass_result = {
      \'code\': 0,
      \'response_reason\': self.ck_response.reason,
      \'response_code\': self.ck_response.status_code,
      \'response_headers\': self.ck_response.headers,
      \'response_body\': self.ck_response.text,
      \'check_result\': True,
      \'message\': \'\' # 扩招作为日志输出等
    }
    self.fail_result = {
      \'code\': 2,
      \'response_reason\': self.ck_response.reason,
      \'response_code\': self.ck_response.status_code,
      \'response_headers\': self.ck_response.headers,
      \'response_body\': self.ck_response.text,
      \'check_result\': False,
      \'message\': \'\' # 扩招作为日志输出等
    }


  def no_check(self):
    return self.pass_result


  def check_key(self,check_data=None):
    check_data_list = check_data.split(\',\')  #把需要判断的值做切割,取出键值
    res_list = [] #存放每次比较的结果
    wrong_key = [] #存放比较失败key
    for check_data in check_data_list:  #把切割的键值和取出响应结果中的所有的键一个一个对比
      if check_data in self.ck_response.json().keys():
        res_list.append(self.pass_result )
      else:
        res_list.append( self.fail_result )
        wrong_key.append(check_data)   #把失败的键放进来,便于后续日志输出
    # print(res_list)
    # print(wrong_key)
    if self.fail_result in res_list:
      return self.fail_result
    else:
      return self.pass_result

  def check_keyvalue(self,check_data=None):
    res_list = [] # 存放每次比较的结果
    wrong_items = [] # 存放比较失败 items
    for check_item in ast.literal_eval(check_data).items(): #literal_eval()安全性的把字符串转成字典,items()取出键值对
      if check_item in self.ck_response.json().items():
        res_list.append( self.pass_result )
      else:
        res_list.append( self.fail_result )
        wrong_items.append(check_item)
    # print( res_list )
    # print( wrong_items )

    if self.fail_result in res_list:
      return self.fail_result
    else:
      return self.pass_result

  def check_regexp(self,check_data=None):
    pattern = re.compile(check_data)
    if re.findall(pattern=pattern,string=self.ck_response.text): #匹配到了,不为空,为true
      return self.pass_result
    else:
      return self.fail_result

  def run_check(self,check_type=None,check_data=None):
    code = self.ck_response.status_code
    if code == 200:
      if check_type in self.ck_rules.keys():
        result=self.ck_rules[check_type](check_data)
        return result
      else:
        self.fail_result[\'message\'] = \'不支持%s判断方法\'%check_type
        return self.fail_result
    else:
      self.fail_result[\'message\'] = \'请求的响应状态码非%s\'%str(code)
      return self.fail_result




if __name__==\"__main__\":
  # 检查键是否存在,{\"access_token\":\"hello\",\"expires_\":7200} 设为响应结果,\"access_token,expires_in\" 为检查对象值
  CheckUtils({\"access_token\":\"hello\",\"expires_\":7200}).check_key(\"access_token,expires_in\")
  #检查键值对是否存在
  CheckUtils({\"access_token\":\"hello\",\"expires_i\":7200}).check_keyvalue(\'{\"expires_in\": 7200}\')
  #正则对比
  #TURE
  print(CheckUtils(\'{\"access_token\":\"hello\",\"expires_in\":7200}\').check_regexp(\'\"expires_in\":(.+?)\'))
  #False
  print(CheckUtils(\'{\"access_token\":\"hello\",\"expires\":7200}\').check_regexp(\'\"expires_in\":(.+?)\'))

六、api_testcase下的api_test.py 封装

import warnings
import unittest
import paramunittest
from common.testdata_utils import TestdataUtils
from common.requests_utils import RequestsUtils

#如果是mysql数据源的话切换成 def_testcase_data_list_by_mysql()  exccel数据源:def_testcase_data_list()

case_infos = TestdataUtils().def_testcase_data_list_by_mysql()

@paramunittest.parametrized(
  *case_infos
)

class APITest(paramunittest.ParametrizedTestCase):
  def setUp(self) -> None:
    warnings.simplefilter(\'ignore\', ResourceWarning) #不会弹出警告提示

  def setParameters(self, case_id, case_info):
    self.case_id = case_id
    self.case_info = case_info

  def test_api_common_function(self):
    \'\'\'测试描述\'\'\'
    self._testMethodName = self.case_info[0].get(\"测试用例编号\")
    self._testMethodDoc = self.case_info[0].get(\"测试用例名称\")
    actual_result = RequestsUtils().request_by_step(self.case_info)
    self.assertTrue( actual_result.get(\'check_result\'),actual_result.get(\'message\') )

if __name__ == \'__main__\':
  unittest.main()

七、common下的log_utils.py 封装

import os
import logging
import time
from common.config_utils import config

current_path = os.path.dirname(__file__)
log_output_path = os.path.join( current_path,\'..\', config.LOG_PATH )

class LogUtils():
  def __init__(self,log_path=log_output_path):
    self.log_name = os.path.join( log_output_path ,\'ApiTest_%s.log\'%time.strftime(\'%Y_%m_%d\') )
    self.logger = logging.getLogger(\"ApiTestLog\")
    self.logger.setLevel( config.LOG_LEVEL )

    console_handler = logging.StreamHandler() # 控制台输出
    file_handler = logging.FileHandler(self.log_name,\'a\',encoding=\'utf-8\') # 文件输出
    formatter = logging.Formatter(\"%(asctime)s %(name)s %(levelname)s %(message)s\")
    console_handler.setFormatter(formatter)
    file_handler.setFormatter(formatter)

    self.logger.addHandler( console_handler )
    self.logger.addHandler( file_handler )

    console_handler.close() # 防止打印日志重复
    file_handler.close()   # 防止打印日志重复

  def get_logger(self):
    return self.logger

logger = LogUtils().get_logger()  # 防止打印日志重复

if __name__ == \'__main__\':
  logger.info(\'hello\')

八、common下的config_utils.py的封装

配置文件的编写:

Python reques接口测试框架实现代码

对配置文件的读取封装:

import os
import configparser

current_path = os.path.dirname(__file__)
cfgpath = os.path.join(current_path, \"../conf/local_config.ini\")
print(cfgpath)


class ConfigUtils:
  def __init__(self,config_path=cfgpath):
    self.__conf=configparser.ConfigParser()
    self.__conf.read(config_path, encoding=\"utf-8\")

  def read_ini(self,sec,option):
    value=self.__conf.get(sec,option)
    return value

  @property
  def hosts(self):
    value=self.read_ini(\'default\',\'hosts\')
    return value

  @property
  def LOG_PATH(self):
    value = self.read_ini(\'path\', \'LOG_PATH\')
    return value

  @property
  def CASE_DATA_PATH(self):
    value = self.read_ini(\'path\', \'CASE_DATA_PATH\')
    return value

  @property
  def REPORT_PATH(self):
    value = self.read_ini(\'path\', \'REPORT_PATH\')
    return value

  @property
  def LOG_LEVEL(self):
    value = int(self.read_ini(\'log\', \'LOG_LEVEL\'))
    return value


  @property
  def smtp_server(self):
    smtp_server_value = self.read_ini(\'email\', \'smtp_server\')
    return smtp_server_value

  @property
  def smtp_sender(self):
    smtp_sender_value = self.read_ini(\'email\', \'smtp_sender\')
    return smtp_sender_value

  @property
  def smtp_password(self):
    smtp_password_value = self.read_ini(\'email\', \'smtp_password\')
    return smtp_password_value

  @property
  def smtp_receiver(self):
    smtp_receiver_value = self.read_ini(\'email\', \'smtp_receiver\')
    return smtp_receiver_value

  @property
  def smtp_cc(self):
    smtp_cc_value = self.read_ini(\'email\', \'smtp_cc\')
    return smtp_cc_value

  @property
  def smtp_subject(self):
    smtp_subject_value = self.read_ini(\'email\', \'smtp_subject\')
    return smtp_subject_value
config=ConfigUtils()

if __name__==\'__main__\':
  current_path = os.path.dirname(__file__)
  cfgpath = os.path.join(current_path, \"../conf/local_config.ini\")
  config_u=ConfigUtils()
  print(config_u.hosts)
  print(config_u.LOG_LEVEL)

九、test_runner下的run_case.py 封装

class RunCase():
  def __init__(self):
    self.test_case_path = test_case_path
    self.report_path = test_report_path
    self.title = \'P1P2接口自动化测试报告\'
    self.description = \'自动化接口测试框架\'
    self.tester = \'测试开发组\'
  def load_test_suite(self):
    discover = unittest.defaultTestLoader.discover(start_dir=self.test_case_path,
                            pattern=\'api_test.py\',
                            top_level_dir=self.test_case_path)
    all_suite = unittest.TestSuite()
    all_suite.addTest( discover )
    return all_suite
  def run(self):
    report_dir = HTMLTestReportCN.ReportDirectory(self.report_path)
    report_dir.create_dir(self.title)
    report_file_path = HTMLTestReportCN.GlobalMsg.get_value(\'report_path\')
    fp = open( report_file_path ,\'wb\' )
    runner = HTMLTestReportCN.HTMLTestRunner(stream=fp,
                         title=self.title,
                         description=self.description,
                         tester=self.tester)
    runner.run( self.load_test_suite() )
    fp.close()
    return report_file_path


if __name__==\'__main__\':
  report_path = RunCase().run()
  EmailUtils(open(report_path, \'rb\').read(), report_path).send_mail()

十、common下的email_utils.py 封装

import os
import smtplib
from email.mime.text import MIMEText
from email.mime.multipart import MIMEMultipart
from common.config_utils import config

class EmailUtils():
  def __init__(self,smtp_body,smtp_attch_path=None):
    self.smtp_server = config.smtp_server
    self.smtp_sender = config.smtp_sender
    self.smtp_password = config.smtp_password
    self.smtp_receiver = config.smtp_receiver
    self.smtp_cc = config.smtp_cc
    self.smtp_subject = config.smtp_subject
    self.smtp_body = smtp_body
    self.smtp_attch = smtp_attch_path

  def mail_message_body(self):
    message = MIMEMultipart()
    message[\'from\'] = self.smtp_sender
    message[\'to\'] = self.smtp_receiver
    message[\'Cc\'] = self.smtp_cc
    message[\'subject\'] = self.smtp_subject
    message.attach( MIMEText(self.smtp_body,\'html\',\'utf-8\') )
    if self.smtp_attch:
      attach_file = MIMEText(open(self.smtp_attch, \'rb\').read(), \'base64\', \'utf-8\')
      attach_file[\'Content-Type\'] = \'application/octet-stream\'
      attach_file.add_header(\'Content-Disposition\', \'attachment\', filename=(\'gbk\', \'\', os.path.basename(self.smtp_attch)))
      message.attach(attach_file)
    return message

  def send_mail(self):
    smtp = smtplib.SMTP()
    smtp.connect(self.smtp_server)
    smtp.login(user=self.smtp_sender, password=self.smtp_password)
    smtp.sendmail(self.smtp_sender,self.smtp_receiver.split(\",\")+ self.smtp_cc.split(\",\"), self.mail_message_body().as_string())

if __name__==\'__main__\':
  html_path = os.path.dirname(__file__) + \'/../test_reports/接口自动化测试报告V1.1/接口自动化测试报告V1.1.html\'
  EmailUtils(\'<h3 align=\"center\">自动化测试报告</h3>\',html_path).send_mail()

以上就是本文的全部内容,希望对大家的学习有所帮助,也希望大家多多支持自学编程网。

遇见资源网 Python Python reques接口测试框架实现代码 http://www.ox520.com/26591.html

常见问题

相关文章

发表评论
暂无评论
官方客服团队

为您解决烦忧 - 24小时在线 专业服务