MySession.py 12.7 KB
#!/usr/bin/python
# -*- coding: UTF-8 -*-
import requests
import re
from commons import common as com
from commons.Logging import log
from commons.clientSession import cliSession
from commons.scripts.pwdCry import pwdCry


class mysession(requests.Session):
    "封装了requests的基类,以供后期统一使用"

    url = "http://test.uap.diligrp.com/login/login.action"
    url_client = "http://test.uap.diligrp.com/api/authenticationApi/loginWeb"

    header = {
        "Host": "test.uap.diligrp.com",
        "Connection": "keep-alive",
        "Content-Length": "33",
        "Cache-Control": "max-age=0",
        "Upgrade-Insecure-Requests": "1",
        "Origin": "http://test.uap.diligrp.com",
        "Content-Type": "application/x-www-form-urlencoded",
        "User-Agent": "Mozilla/5.0(WindowsNT10.0;Win64;x64)AppleWebKit/537.36(KHTML,likeGecko)Chrome/90.0.4430.212Safari/537.36",
        "Accept": "text/html,application/xhtml+xml,application/xml;q=0.9,image/avif,image/webp,image/apng,*/*;q=0.8,application/signed-exchange;v=b3;q=0.9",
        "Referer": "http://test.uap.diligrp.com/login/index.html",
        "Accept-Encoding": "gzip,deflate",
        "Accept-Language": "zh-CN,zh-TW;q=0.9,zh;q=0.8,en;q=0.7",
        "Cookie": "UAP_accessToken=;UAP_refreshToken=;UAP_loginPath="}
    header_client = {
        "Content-Type": "text/plain;charset=utf-8",
        "Host": "test.uap.diligrp.com",
        "Content-Length": "209",
        "Expect": "100-continue"}

    body = "userName=sg_wenze&password=111111"
    body_client= {"userName":"sg_wenze","password":"111111"}

    def __init__(self,host=None):
        "如下代码,可以通过配置文件来控制测试环境和灰度环境,http和https"
        super().__init__()
        self.user={}
        self.url = mysession.url.replace("http://test.", com.get_global_config("global_data", "environment", "en"))
        self.header = mysession.header
        self.body = mysession.body
        self.url_client = mysession.url_client.replace("http://test.", com.get_global_config("global_data", "environment", "en"))
        self.header_client = mysession.header_client
        self.body_client = mysession.body_client
        self.timeout = (10,10)
        self.max_retries = 3
        self.keep_alive = False
        self.ssl_verify = False
        self.proxies = None
        # self.proxies={'http': 'http://localhost:8888', 'https': 'http://localhost:8888'}
        self.myproxies={'http': 'http://localhost:8888', 'https': 'http://localhost:8888'}
        self.allow_redirects = False
        self.firmid={"group":"1","hd":"2","cd":"3","qqhe":"4","mdj":"5","gy":"6","cc":"7","sg":"8","sy":"9"}
        self.market={"sy":"沈阳","heb":"哈尔滨","sg":"寿光","gy":"贵阳","cc":"长春","hs":"杭水","hg":"杭果"}
        self.host= None if host==None else eval(com.get_global_config("global_data", "host_ip", host))


    def cliLogin(self,user="sy_userName_01"):
        self.webHeaders, self.clientHeaders, self.userInfo = cliSession().loginUser(user=user)
        return self

    # def get_session(self, account, **kwargs):
    #     "如下代码,可以通过配置文件来控制登录的账户session"
    #     self.body = self.body.replace("sg_wenze",
    #                                   com.get_global_config("global_data", "account", account).split("&")[0])
    #     self.body = self.body.replace("111111",
    #                                   com.get_global_config("global_data", "account", account).split("&")[1])
    #     self.se = requests.session()
    #     co = requests.cookies.RequestsCookieJar()
    #     #加入UAP_firmId属性
    #     firm=account.split("_")[0]
    #     co.set("UAP_firmId", self.firmid[firm])
    #     self.se.cookies.update(co)
    #     # 进行登录请求
    #     re = self.se.post(url=self.url, headers=self.header, data=self.body, proxies=self.proxies, **kwargs)
    #     self.UAP_accessToken=self.se.cookies["UAP_accessToken"]
    #     self.UAP_refreshToken=self.se.cookies["UAP_refreshToken"]
    #     return self.se
    #
    # def get_login_info(self, account, **kwargs):
    #     "用于获取用户信息"
    #     self.body_client.update({"userName":com.get_global_config("global_data", "account", account).split("&")[0]})
    #     self.body_client.update({"password":pwdCry(com.get_global_config("global_data", "account", account).split("&")[1])})
    #     tmp = requests.post(url=self.url_client, headers=self.header_client, json=self.body_client, proxies=self.proxies, **kwargs)
    #     return tmp

    def url_pro(self, url, host):
        # url = url.replace(" ", "")
        if host!=None:
            if "http:" in url:
                d1 = re.match(r"http://(.+?)/", url).group(1).split(":")[0]
                url = re.sub(r"http://(.+?)/", r"http://" + host[d1] + "/", url)
            elif "https:" in url:
                d1 = re.match(r"https://(.+?)/", url).group(1).split(":")[0]
                url = re.sub(r"https://(.+?)/", r"https://" + host[d1] + "/", url)
        return url

    def get_session_client(self, account, **kwargs):
        "get_session和get_session_client的方法只能用一个"
        self.body_client.update({"userName": com.get_global_config("global_data", "account", account).split("&")[0]})
        self.body_client.update( {"password": pwdCry(com.get_global_config("global_data", "account", account).split("&")[1])})
        # 登录请求
        self.re = super().post(url=self.url_client, headers=self.header_client, json=self.body_client, **kwargs)
        #冗余登录账户信息
        self.user[account]=self.re.json()["data"]["user"]
        #配置cookie
        co = requests.cookies.RequestsCookieJar()
        co.set("UAP_firmId", str(self.re.json()["data"]["user"]["firmId"]),domain=".diligrp.com")
        co.set("UAP_accessToken", self.re.json()["data"]["accessToken"],domain=".diligrp.com")
        co.set("UAP_refreshToken", self.re.json()["data"]["refreshToken"],domain=".diligrp.com")
        self.cookies.update(co)
        return self

    def close_session(self):
        "关闭session"
        self.close()

    def check_login(self, account, **kwargs):
        "验证登录接口"
        self.body = self.body.replace("sg_wenze",
                                      com.get_global_config("global_data", "account", account).split("&")[0])
        self.body = self.body.replace("111111", com.get_global_config("global_data", "account", account).split("&")[1])
        # POST请求
        print(self.body)
        re = super().post(url=self.url, headers=self.header, data=self.body.encode('utf-8'),
                          allow_redirects=False, **kwargs)
        # 判断请求
        assert "UAP_accessToken" in re.headers["Set-Cookie"]
        assert "UAP_refreshToken" in re.headers["Set-Cookie"]
        assert "UAP_loginPath" in re.headers["Set-Cookie"]
        print("登录接口验证成功")
        return re

    def useHeadersRequests(self, method=None, url=None, headers=None, data=None, **kwargs):
        """
        模拟客户端接口操作,使用headers进行cookies传递,调用requests库进行接口访问
        :param method:接口请求方式,POST,GET等
        :param url:接口路径
        :param data:接口数据
        :kwargs:其他requests.request()支持参数可以直接传递
        """
        print(url)
        # print(headers)
        print(data)
        log.info("{0:=^86}".format(''))
        log.info("{}\n{}\n{}\n".format(url, data, kwargs))
        if "gateway" in url:
            # 判断接口路径,通过接口路径钟是否包含gateway来判定接口是否是由客户端进行访问,来判定headers使用
            self.clientHeaders = dict(self.clientHeaders, **headers)
            res = requests.request(method=method, url=url, data=data, headers=self.clientHeaders, **kwargs)
        else:
            self.webHeaders = dict(self.webHeaders, **headers)
            res = requests.request(method=method, url=url, data=data, headers=self.webHeaders, **kwargs)
        return res


    def request(self, method ,url, **kwargs):
        """:param method: method for the new :class:`Request` object:
        ``GET``, ``OPTIONS``, ``HEAD``, ``POST``, ``PUT``, ``PATCH``, or ``DELETE``.
        """
        # 记录日志
        log.info("{0:=^86}".format(''))
        log.info(url)
        url=self.url_pro(url,self.host)
        log.info("{}\n{}\n".format(url, kwargs))
        # 进行请求
        re = super().request(method , url, **kwargs,timeout=self.timeout)
        return re


    def get(self, url, **kwargs):
        """Sends a GET request. Returns :class:`Response` object.

        :param url: URL for the new :class:`Request` object.
        :param \*\*kwargs: Optional arguments that ``request`` takes.
        :rtype: requests.Response
        """
        # 进行请求
        re = super().get(url, **kwargs)
        return re

    def post(self, url, data=None, json=None, **kwargs):
        """Sends a POST request. Returns :class:`Response` object.
        :param url: URL for the new :class:`Request` object.
        :param data: (optional) Dictionary, list of tuples, bytes, or file-like
            object to send in the body of the :class:`Request`.
        :param json: (optional) json to send in the body of the :class:`Request`.
        :param \*\*kwargs: Optional arguments that ``request`` takes.
        :rtype: requests.Response
        """
        # 进行请求
        re = super().post(url, data=data, json=json, **kwargs)
        return re

    def options(self, url, **kwargs):
        """Sends a OPTIONS request. Returns :class:`Response` object.
        :param url: URL for the new :class:`Request` object.
        :param \*\*kwargs: Optional arguments that ``request`` takes.
        :rtype: requests.Response
        """
        # 记录日志
        log.info(url, kwargs)
        # 进行请求
        re = self.se.options(url, **kwargs)
        return re

    def head(self, url, **kwargs):
        """Sends a HEAD request. Returns :class:`Response` object.

        :param url: URL for the new :class:`Request` object.
        :param \*\*kwargs: Optional arguments that ``request`` takes.
        :rtype: requests.Response
        """
        # 记录日志
        log.info(url, kwargs)
        # 进行请求
        re = self.se.head(url, **kwargs)
        return re

    def put(self, url, data=None, **kwargs):
        """Sends a PUT request. Returns :class:`Response` object.
        :param url: URL for the new :class:`Request` object.
        :param data: (optional) Dictionary, list of tuples, bytes, or file-like
            object to send in the body of the :class:`Request`.
        :param \*\*kwargs: Optional arguments that ``request`` takes.
        :rtype: requests.Response
        """
        # 记录日志
        log.info(url, data, kwargs)
        # 进行请求
        re = self.se.put(url, data, **kwargs)
        return re

    def delete(self, url, **kwargs):
        """Sends a DELETE request. Returns :class:`Response` object.
        :param url: URL for the new :class:`Request` object.
        :param \*\*kwargs: Optional arguments that ``request`` takes.
        :rtype: requests.Response
        """
        # 记录日志
        log.info(url, kwargs)
        # 进行请求
        re = self.se.delete(url, **kwargs)
        return re

    def set_mark(self):
        "用户自定义优先级方法"
        mark_list = eval(com.get_global_config("global_data", "mark", "list"))
        print("预设运行标记:", mark_list)
        global _global_mark
        if type(mark_list) == type([]) and len(mark_list) != 0:
            _global_mark = mark_list
            return _global_mark
        elif type(mark_list) == type([]) and len(mark_list) == 0:
            _global_mark = False
            return _global_mark
        else:
            raise Exception("error,pls check mark data")

    def mark(self, m=None):
        try:
            # 没有输入任何预设值,默认跑全部
            if _global_mark == False:
                return True
            # 输入预设值且未标记用例,默认跑全部
            elif _global_mark != False and (m in _global_mark) or len([tag for tag in m if tag in _global_mark]) >0:
                return True
            else:
                return False
        except Exception as e:
            return False


my = mysession()
my.set_mark()
# 沈阳客户端session
sessionSy = mysession().cliLogin("sy_userName_01")
# 哈尔滨客户端session
sessionHeb = mysession().cliLogin("hd_userName_01")
#获取对应市场session
sy1=mysession("host1").get_session_client("sy_user01")
heb=mysession("host1").get_session_client("heb_user01")
hg=mysession("host2").get_session_client("hg_user01")
# 检测登录接口
sy1.check_login("sy_user01")
# 检测登录接口
hg.check_login("hg_user01")