MySession.py
12.7 KB
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
259
260
261
262
263
264
265
266
267
268
269
270
271
272
273
274
275
276
277
278
279
280
281
282
283
284
285
286
287
288
289
290
291
292
#!/usr/bin/python
# -*- coding: UTF-8 -*-
import requests
import re
from commons import common as com
from commons.Logging import log
from commons.clientSession import cliSession
from commons.scripts.pwdCry import pwdCry
class mysession(requests.Session):
"封装了requests的基类,以供后期统一使用"
url = "http://test.uap.diligrp.com/login/login.action"
url_client = "http://test.uap.diligrp.com/api/authenticationApi/loginWeb"
header = {
"Host": "test.uap.diligrp.com",
"Connection": "keep-alive",
"Content-Length": "33",
"Cache-Control": "max-age=0",
"Upgrade-Insecure-Requests": "1",
"Origin": "http://test.uap.diligrp.com",
"Content-Type": "application/x-www-form-urlencoded",
"User-Agent": "Mozilla/5.0(WindowsNT10.0;Win64;x64)AppleWebKit/537.36(KHTML,likeGecko)Chrome/90.0.4430.212Safari/537.36",
"Accept": "text/html,application/xhtml+xml,application/xml;q=0.9,image/avif,image/webp,image/apng,*/*;q=0.8,application/signed-exchange;v=b3;q=0.9",
"Referer": "http://test.uap.diligrp.com/login/index.html",
"Accept-Encoding": "gzip,deflate",
"Accept-Language": "zh-CN,zh-TW;q=0.9,zh;q=0.8,en;q=0.7",
"Cookie": "UAP_accessToken=;UAP_refreshToken=;UAP_loginPath="}
header_client = {
"Content-Type": "text/plain;charset=utf-8",
"Host": "test.uap.diligrp.com",
"Content-Length": "209",
"Expect": "100-continue"}
body = "userName=sg_wenze&password=111111"
body_client= {"userName":"sg_wenze","password":"111111"}
def __init__(self,host=None):
"如下代码,可以通过配置文件来控制测试环境和灰度环境,http和https"
super().__init__()
self.user={}
self.url = mysession.url.replace("http://test.", com.get_global_config("global_data", "environment", "en"))
self.header = mysession.header
self.body = mysession.body
self.url_client = mysession.url_client.replace("http://test.", com.get_global_config("global_data", "environment", "en"))
self.header_client = mysession.header_client
self.body_client = mysession.body_client
self.timeout = (10,10)
self.max_retries = 3
self.keep_alive = False
self.ssl_verify = False
self.proxies = None
# self.proxies={'http': 'http://localhost:8888', 'https': 'http://localhost:8888'}
self.myproxies={'http': 'http://localhost:8888', 'https': 'http://localhost:8888'}
self.allow_redirects = False
self.firmid={"group":"1","hd":"2","cd":"3","qqhe":"4","mdj":"5","gy":"6","cc":"7","sg":"8","sy":"9"}
self.market={"sy":"沈阳","heb":"哈尔滨","sg":"寿光","gy":"贵阳","cc":"长春","hs":"杭水","hg":"杭果"}
self.host= None if host==None else eval(com.get_global_config("global_data", "host_ip", host))
def cliLogin(self,user="sy_userName_01"):
self.webHeaders, self.clientHeaders, self.userInfo = cliSession().loginUser(user=user)
return self
# def get_session(self, account, **kwargs):
# "如下代码,可以通过配置文件来控制登录的账户session"
# self.body = self.body.replace("sg_wenze",
# com.get_global_config("global_data", "account", account).split("&")[0])
# self.body = self.body.replace("111111",
# com.get_global_config("global_data", "account", account).split("&")[1])
# self.se = requests.session()
# co = requests.cookies.RequestsCookieJar()
# #加入UAP_firmId属性
# firm=account.split("_")[0]
# co.set("UAP_firmId", self.firmid[firm])
# self.se.cookies.update(co)
# # 进行登录请求
# re = self.se.post(url=self.url, headers=self.header, data=self.body, proxies=self.proxies, **kwargs)
# self.UAP_accessToken=self.se.cookies["UAP_accessToken"]
# self.UAP_refreshToken=self.se.cookies["UAP_refreshToken"]
# return self.se
#
# def get_login_info(self, account, **kwargs):
# "用于获取用户信息"
# self.body_client.update({"userName":com.get_global_config("global_data", "account", account).split("&")[0]})
# self.body_client.update({"password":pwdCry(com.get_global_config("global_data", "account", account).split("&")[1])})
# tmp = requests.post(url=self.url_client, headers=self.header_client, json=self.body_client, proxies=self.proxies, **kwargs)
# return tmp
def url_pro(self, url, host):
# url = url.replace(" ", "")
if host!=None:
if "http:" in url:
d1 = re.match(r"http://(.+?)/", url).group(1).split(":")[0]
url = re.sub(r"http://(.+?)/", r"http://" + host[d1] + "/", url)
elif "https:" in url:
d1 = re.match(r"https://(.+?)/", url).group(1).split(":")[0]
url = re.sub(r"https://(.+?)/", r"https://" + host[d1] + "/", url)
return url
def get_session_client(self, account, **kwargs):
"get_session和get_session_client的方法只能用一个"
self.body_client.update({"userName": com.get_global_config("global_data", "account", account).split("&")[0]})
self.body_client.update( {"password": pwdCry(com.get_global_config("global_data", "account", account).split("&")[1])})
# 登录请求
self.re = super().post(url=self.url_client, headers=self.header_client, json=self.body_client, **kwargs)
#冗余登录账户信息
self.user[account]=self.re.json()["data"]["user"]
#配置cookie
co = requests.cookies.RequestsCookieJar()
co.set("UAP_firmId", str(self.re.json()["data"]["user"]["firmId"]),domain=".diligrp.com")
co.set("UAP_accessToken", self.re.json()["data"]["accessToken"],domain=".diligrp.com")
co.set("UAP_refreshToken", self.re.json()["data"]["refreshToken"],domain=".diligrp.com")
self.cookies.update(co)
return self
def close_session(self):
"关闭session"
self.close()
def check_login(self, account, **kwargs):
"验证登录接口"
self.body = self.body.replace("sg_wenze",
com.get_global_config("global_data", "account", account).split("&")[0])
self.body = self.body.replace("111111", com.get_global_config("global_data", "account", account).split("&")[1])
# POST请求
print(self.body)
re = super().post(url=self.url, headers=self.header, data=self.body.encode('utf-8'),
allow_redirects=False, **kwargs)
# 判断请求
assert "UAP_accessToken" in re.headers["Set-Cookie"]
assert "UAP_refreshToken" in re.headers["Set-Cookie"]
assert "UAP_loginPath" in re.headers["Set-Cookie"]
print("登录接口验证成功")
return re
def useHeadersRequests(self, method=None, url=None, headers=None, data=None, **kwargs):
"""
模拟客户端接口操作,使用headers进行cookies传递,调用requests库进行接口访问
:param method:接口请求方式,POST,GET等
:param url:接口路径
:param data:接口数据
:kwargs:其他requests.request()支持参数可以直接传递
"""
print(url)
# print(headers)
print(data)
log.info("{0:=^86}".format(''))
log.info("{}\n{}\n{}\n".format(url, data, kwargs))
if "gateway" in url:
# 判断接口路径,通过接口路径钟是否包含gateway来判定接口是否是由客户端进行访问,来判定headers使用
self.clientHeaders = dict(self.clientHeaders, **headers)
res = requests.request(method=method, url=url, data=data, headers=self.clientHeaders, **kwargs)
else:
self.webHeaders = dict(self.webHeaders, **headers)
res = requests.request(method=method, url=url, data=data, headers=self.webHeaders, **kwargs)
return res
def request(self, method ,url, **kwargs):
""":param method: method for the new :class:`Request` object:
``GET``, ``OPTIONS``, ``HEAD``, ``POST``, ``PUT``, ``PATCH``, or ``DELETE``.
"""
# 记录日志
log.info("{0:=^86}".format(''))
log.info(url)
url=self.url_pro(url,self.host)
log.info("{}\n{}\n".format(url, kwargs))
# 进行请求
re = super().request(method , url, **kwargs,timeout=self.timeout)
return re
def get(self, url, **kwargs):
"""Sends a GET request. Returns :class:`Response` object.
:param url: URL for the new :class:`Request` object.
:param \*\*kwargs: Optional arguments that ``request`` takes.
:rtype: requests.Response
"""
# 进行请求
re = super().get(url, **kwargs)
return re
def post(self, url, data=None, json=None, **kwargs):
"""Sends a POST request. Returns :class:`Response` object.
:param url: URL for the new :class:`Request` object.
:param data: (optional) Dictionary, list of tuples, bytes, or file-like
object to send in the body of the :class:`Request`.
:param json: (optional) json to send in the body of the :class:`Request`.
:param \*\*kwargs: Optional arguments that ``request`` takes.
:rtype: requests.Response
"""
# 进行请求
re = super().post(url, data=data, json=json, **kwargs)
return re
def options(self, url, **kwargs):
"""Sends a OPTIONS request. Returns :class:`Response` object.
:param url: URL for the new :class:`Request` object.
:param \*\*kwargs: Optional arguments that ``request`` takes.
:rtype: requests.Response
"""
# 记录日志
log.info(url, kwargs)
# 进行请求
re = self.se.options(url, **kwargs)
return re
def head(self, url, **kwargs):
"""Sends a HEAD request. Returns :class:`Response` object.
:param url: URL for the new :class:`Request` object.
:param \*\*kwargs: Optional arguments that ``request`` takes.
:rtype: requests.Response
"""
# 记录日志
log.info(url, kwargs)
# 进行请求
re = self.se.head(url, **kwargs)
return re
def put(self, url, data=None, **kwargs):
"""Sends a PUT request. Returns :class:`Response` object.
:param url: URL for the new :class:`Request` object.
:param data: (optional) Dictionary, list of tuples, bytes, or file-like
object to send in the body of the :class:`Request`.
:param \*\*kwargs: Optional arguments that ``request`` takes.
:rtype: requests.Response
"""
# 记录日志
log.info(url, data, kwargs)
# 进行请求
re = self.se.put(url, data, **kwargs)
return re
def delete(self, url, **kwargs):
"""Sends a DELETE request. Returns :class:`Response` object.
:param url: URL for the new :class:`Request` object.
:param \*\*kwargs: Optional arguments that ``request`` takes.
:rtype: requests.Response
"""
# 记录日志
log.info(url, kwargs)
# 进行请求
re = self.se.delete(url, **kwargs)
return re
def set_mark(self):
"用户自定义优先级方法"
mark_list = eval(com.get_global_config("global_data", "mark", "list"))
print("预设运行标记:", mark_list)
global _global_mark
if type(mark_list) == type([]) and len(mark_list) != 0:
_global_mark = mark_list
return _global_mark
elif type(mark_list) == type([]) and len(mark_list) == 0:
_global_mark = False
return _global_mark
else:
raise Exception("error,pls check mark data")
def mark(self, m=None):
try:
# 没有输入任何预设值,默认跑全部
if _global_mark == False:
return True
# 输入预设值且未标记用例,默认跑全部
elif _global_mark != False and (m in _global_mark) or len([tag for tag in m if tag in _global_mark]) >0:
return True
else:
return False
except Exception as e:
return False
my = mysession()
my.set_mark()
# 沈阳客户端session
sessionSy = mysession().cliLogin("sy_userName_01")
# 哈尔滨客户端session
sessionHeb = mysession().cliLogin("hd_userName_01")
#获取对应市场session
sy1=mysession("host1").get_session_client("sy_user01")
heb=mysession("host1").get_session_client("heb_user01")
hg=mysession("host2").get_session_client("hg_user01")
# 检测登录接口
sy1.check_login("sy_user01")
# 检测登录接口
hg.check_login("hg_user01")