【接口自动化学习笔记】python+requests+excel实现接口自动化
excel驱动的接口自动化+邮件发送参考了上面博主的框架,发现只能实现Excel单个sheets页的自动化。但实际业务可能需要遍历多个sheets页。结合之前学习到的知识,以及实际业务,做了一些改动。1.requests库的封装2.excel读写的封装3.mail的封装。
·
文章内容参考:pytest+requests+Excel+allure接口自动化测试框架实践_JJJims的博客-CSDN博客
功能
excel驱动的接口自动化+邮件发送
背景
参考了上面博主的框架,发现只能实现Excel单个sheets页的自动化。但实际业务可能需要遍历多个sheets页。结合之前学习到的知识,以及实际业务,做了一些改动。
实现效果图


主要封装的模块
1.requests库的封装
2.excel读写的封装
3.mail的封装
贴上文件路径
主要用到的文件:common目录下的excel.py、read_yaml(yaml的读取)、Logger(logger的封装)等,以及excelrelate目录下的所有文件

测试用例模板


相关代码
1.封装excel的读写方法,文件excel.py
import os, openpyxl
from shutil import copyfile
from openpyxl.styles import Font
from common.Logger import logger
class Reader:
"""
用来读取Excel文件内容
只支持xlsx格式excel文件读取
pip install -U openpyxl
"""
def __init__(self):
# 整个excel工作簿缓存
self.workbook = None
# 当前工作sheet
self.sheet = None
# 当前sheet的行数
self.rows = 0
# 当前读取到的行数
self.r = 0
# 打开excel
def open_excel(self, srcfile):
# 如果打开的文件不存在,就报错
if not os.path.isfile(srcfile):
logger.error("%s not exist!" % (srcfile))
return
# 设置读取excel使用utf8编码
openpyxl.Workbook.encoding = "utf8"
# 读取excel内容到缓存workbook
self.workbook = openpyxl.load_workbook(filename=srcfile)
# 设置默认选取第一个sheet页面
self.sheet = self.workbook[self.workbook.sheetnames[0]]
# 设置rows为当前sheet的行数
self.rows = self.sheet.max_row
# 设置默认读取为第一行
self.r = 0
return
# 获取sheet页面
def get_sheets(self):
# 获取所有sheet的名字,并返回为一个列表
sheets = self.workbook.sheetnames
return sheets
# 切换sheet页面
def set_sheet(self, name):
# 通过sheet名字,切换sheet页面
self.sheet = self.workbook[name]
self.rows = self.sheet.max_row
self.r = 0
return
# 逐行读取
def readline(self):
# 存sheet里面所有的行
lines = []
# 遍历sheet里面所有的行
for row in self.sheet.rows:
# 存一行的单元格
line = []
# 取出所有的单元格
for cell in row:
if cell.value is None:
line.append('')
else:
line.append(cell.value)
lines.append(line)
return lines
class Writer:
"""
用来复制写入excel表格内容
只支持xlsx格式excel文件读取
"""
def __init__(self):
# 读取需要复制的excel
self.workbook = None
# 拷贝的工作空间
self.wb = None
# 当前工作的sheet页
self.sheet = None
# 记录生成的文件,用来保存
self.df = None
# 记录写入的行
self.row = 0
# 记录写入的列
self.clo = 0
# 复制并打开excel
def copy_open(self, srcfile, dstfile):
# 判断要复制的文件是否存在
if not os.path.isfile(srcfile):
print(srcfile + " not exist!")
return
# 判断要新建的文档是否存在,存在则提示
if os.path.isfile(dstfile):
logger.warning(dstfile + " file already exist!")
# 记录要保存的文件
self.df = dstfile
# 读取excel到缓存
# formatting_info带格式的复制
self.workbook = openpyxl.load_workbook(filename=srcfile)
# 拷贝,也在内存里面
copyfile(srcfile, dstfile)
# 打开复制后的excel文件
self.wb = openpyxl.load_workbook(filename=dstfile)
return
# 获取sheet页面
def get_sheets(self):
# 获取所有sheet的名字,并返回为一个列表
sheets = self.workbook.sheetnames
return sheets
# 切换sheet页面
def set_sheet(self, name):
# 通过sheet名字,切换sheet页面
self.sheet = self.wb[name]
return
# 写入指定单元格,保留原格式
def write(self, r, c, value, color=None):
"""
:param r: 行
:param c: 列
:param value: 要写入的字符串
:param color: 0,黑色;1,白色;2,红色;3,绿色;4,蓝色;5,黄色
:return:
"""
d = self.sheet.cell(row=r + 1, column=c + 1, value=value)
if color is not None:
if color == 0:
color = "FF000000"
elif color == 1:
color = "FFFFFFFF"
elif color == 2:
color = "FFFF0000"
elif color == 3:
color = "FF00FF00"
elif color == 4:
color = "FF0000FF"
elif color == 5:
color = "FFFFFF00"
else:
color = "FF000000"
font = Font(name='Arial',
size=11,
bold=False,
italic=False,
vertAlign=None,
underline='none',
strike=False,
color=color)
d.font = font
return
# 保存
def save_close(self):
# 保存复制后的文件到硬盘
self.wb.save(self.df)
return
2.封装requests接口,api_key.py
import json
import jsonpath
import requests
from common.Logger import logger
class ApiKey:
def __init__(self):
#定义一个字典用于保存处理后的接口返回值
self.all_val={}
# 用于提取返回结果中所需要的内容
def get_text(self,data,key):
# loads将json格式数据转换为字典格式
dict_data = json.loads(data)
try:
value_list = jsonpath.jsonpath(dict_data,key)
#返回的是列表,取列表第一个值
return value_list[0]
except:
return '查询失败,没有找到key值{}'.format(key)
def save_json(self,data,name,key):
dict_data = json.loads(data)
try:
value = jsonpath.jsonpath(dict_data, key)[0]
self.all_val[name]=value
except:
return '保存json失败,没有找到key值{}'.format(key)
# get请求的封装
def get(self,url, params=None, **kwargs):
res=requests.get(url, params=params, **kwargs)
return res
# post请求的封装
def post(self,**kwargs):
res=requests.post(**kwargs)
return res
3.处理requests接口返回值,判断用例是否成功 runcase.py
def run_case(ak,rowdata):
all_val=ak.all_val
# print('传入的参数是{}'.format(rowdata))
# 判断当前行的第一列的值,是否是数字编号
if type(rowdata[0]) is int:
# 校验字段
assert_value = rowdata[8]
# 预期结果字段
expect_value = rowdata[9]
if rowdata[6]:
# 判断参数是否是字典格式
if str(rowdata[6]).startswith('{'):
# eval官方解释:将字符串str当做有效的表达式来求值并返回计算结果
data = eval(rowdata[6])
else:
data = rowdata[6]
dict_data = {
'url': rowdata[2] + rowdata[3],
# 这里直接给headers一个字典值
'headers': eval(rowdata[5]),
rowdata[7]: data
}
# print('有传参的时候传的参数{}'.format(dict_data))
# 没有传参的时候
else:
dict_data = {
'url': rowdata[2] + rowdata[3],
'headers': eval(rowdata[5])
}
# print('没有传参的时候传的参数{}'.format(dict_data))
# 使用反射模拟请求
res = getattr(ak, rowdata[4])(**dict_data)
# print('---------------结果是-----------'.format(res))
#保存接口返回中的相关数据
if rowdata[12] is not None and rowdata[12]!='':
if ';' in rowdata[12]:
# ===============JSON提取,多参数版================
varStr = rowdata[12]
# 用分号分割varStr字符串,并保存到列表里
varStrList = varStr.split(';')
# 获取varStrList列表长度
length = len(varStrList)
# 遍历分割JSON表达式
jsonStr = rowdata[13]
jsonList = jsonStr.split(';')
# 循环输出列表值
for i in range(length):
# json引用变量名获取
key = varStrList[i]
# json表达式获取
jsonExp = jsonList[i]
# 字典值的保存
ak.save_json(res.text,key, jsonExp)
else:
# ===============JSON提取,单参数版================
# 根据Excel填写的json表达式动态获取值,保存
ak.save_json(res.text, rowdata[12],rowdata[13])
#获取返回结果中,需要校验的字段的值
result=ak.get_text(res.text, assert_value)
print('==========检查信息=========')
print("检验字段:" + rowdata[8])
print("预期结果:" + str(expect_value))
print("实际结果:" + str(result))
#返回实际结果和结果,这里也可以封装assert方法,判断相等,包含等
if rowdata[9]==result:
return str(result),'PASS'
else:
return str(result),'FAIL'
4.mail的封装
from smtplib import SMTP_SSL
from email.header import Header
from email.mime.text import MIMEText
from email.mime.multipart import MIMEMultipart
from common.Logger import logger
from common.read_yaml import ReadYaml
mail_info=ReadYaml('config.yaml').get_yaml_data()['mail']
class Mail:
#filenamey是要发送的excel名称,filepath是要发送的excel的绝对路径
def __init__(self,filenamey,filepath):
# 读取配置文件
self.filenamey=filenamey
self.filepath=filepath
self.mail_info = mail_info
# 发件人
self.mail_info['from'] = self.mail_info['username']
# smtp服务器域名,截取域名,然后拼上smtp.
self.mail_info['hostname'] = 'smtp.' + self.mail_info['username']\
[self.mail_info['username'].rfind('@') + 1:self.mail_info['username'].__len__()]
logger.info(self.mail_info)
def send(self, text):
# 这里使用SMTP_SSL就是默认使用465端口,如果发送失败,可以使用587
smtp = SMTP_SSL(self.mail_info['hostname'])
smtp.set_debuglevel(0)
''' SMTP 'ehlo' command.
Hostname to send for this command defaults to the FQDN of the local
host.
'''
smtp.ehlo(self.mail_info['hostname'])
# 登录
smtp.login(self.mail_info['username'], self.mail_info['password'])
# 支持附件的邮件
msg = MIMEMultipart()
msg.attach(MIMEText(text, 'html', self.mail_info['mail_encoding']))
msg['Subject'] = Header(self.mail_info['mail_subject'], self.mail_info['mail_encoding'])
# 添加自定义昵称
h = Header(self.mail_info['mailnick'], 'utf-8')
h.append('<' + self.mail_info['from'] + '>', 'ascii')
msg["from"] = h
# logger.debug(self.mail_info)
# logger.debug(text)
msg['to'] = ','.join(self.mail_info['to'])
receive = self.mail_info['to']
# 添加附件
att1 = MIMEText(open(self.filepath, 'rb').read(), 'base64', 'utf-8')
att1['Content-Type'] = 'application/octet-stream'
att1.add_header('Content-Disposition', 'attachment', filename=('gbk', '', self.filenamey))
msg.attach(att1)
print("附件加上了")
try:
smtp.sendmail(self.mail_info['from'], receive, msg.as_string())
smtp.quit()
logger.info('邮件发送成功')
except Exception as e:
logger.error('邮件发送失败:')
logger.exception(e)
(附上yaml文件的格式:

)
5.运行文件run.py
import datetime
import os, sys
#部署到jenkins提示No Module,将import报错的模块加入到系统路径中
curPath = os.path.abspath(os.path.dirname(__file__))
rootPath = os.path.split(curPath)[0]
sys.path.append(rootPath)
sys.path.append('D:\\pythonprj\\common')
sys.path.append('D:\\pythonprj\\excelrelate')
from excelrelate.sendmail import Mail
from common.excel import *
from excelrelate.runcase import run_case
from excelrelate.api_key import ApiKey
now = datetime.datetime.now()
tt = now.strftime('%Y%m%d%H%M%S')
#运行完后存放excel的地址
dpath = r'D:\pythonprj\data\excelres\XX系统用例{}.xlsx'.format(tt)
ak = ApiKey()
path = r'D:\pythonprj\data\test2.xlsx'
rd = Reader()
#打开excel
rd.open_excel(path)
wt = Writer()
#将excel复制到dpath这个路径,并打开
wt.copy_open(r'D:\pythonprj\data\test2.xlsx', dpath)
sheet = rd.get_sheets()
# print(sheet)
#遍历sheets页
for name in sheet:
print('***********页面是{}***********************'.format(name))
#读和写切换sheet页
rd.set_sheet(name)
wt.set_sheet(name)
line = rd.readline()
for i in range(1, len(line)):
act, res = run_case(ak, line[i])
print('最后的结果是{}'.format(res))
wt.write(i, 10, str(act), 0)
if res == 'PASS':
wt.write(i, 11, 'pass', 3)
else:
wt.write(i, 11, 'fail', 2)
wt.save_close()
#发邮件
ml=Mail('XX系统接口测试用例{}.xlsx'.format(tt),dpath)
ml.send('您有一封接口自动化结果邮件,请查收~')
更多推荐



所有评论(0)