MySession.py 5.59 KB
#!/usr/bin/python
# -*- coding: UTF-8 -*-
import requests
import json
from commons import common as com
from commons.Logging import Logger
log=Logger()

class mysession():
    "封装了requests的基类,以供后期统一使用"
    
    url="http://test.uap.diligrp.com/login/login.action"
    header={
    "Host": "test.uap.diligrp.com",
    "Connection": "keep-alive",
    "Content-Length": "33",
    "Cache-Control": "max-age=0",
    "Upgrade-Insecure-Requests": "1",
    "Origin": "http://test.uap.diligrp.com",
    "Content-Type": "application/x-www-form-urlencoded",
    "User-Agent": "Mozilla/5.0(WindowsNT10.0;Win64;x64)AppleWebKit/537.36(KHTML,likeGecko)Chrome/90.0.4430.212Safari/537.36",
    "Accept": "text/html,application/xhtml+xml,application/xml;q=0.9,image/avif,image/webp,image/apng,*/*;q=0.8,application/signed-exchange;v=b3;q=0.9",
    "Referer": "http://test.uap.diligrp.com/login/index.html",
    "Accept-Encoding": "gzip,deflate",
    "Accept-Language": "zh-CN,zh-TW;q=0.9,zh;q=0.8,en;q=0.7",
    "Cookie": "UAP_accessToken=;UAP_refreshToken=;UAP_loginPath="}
    body="userName=sg_wenze&password=111111"
    
    def __init__(self):
        "如下代码,可以通过配置文件来控制测试环境和灰度环境,http和https"
        self.url=mysession.url.replace("http://test.",com.get_global_config("global_data", "environment", "en") )
        self.header=mysession.header
        self.body=mysession.body
        self.timeout = 60
        self.max_retries = 3
        self.keep_alive = False
        self.ssl_verify=False
        self.proxies=None
        self.allow_redirects=False
        self.proxies={'http': 'http://localhost:8888', 'https': 'http://localhost:8888'}

    def get_session(self,account,**kwargs):
        "如下代码,可以通过配置文件来控制登录的账户session"
        self.body=self.body.replace("sg_wenze", com.get_global_config("global_data", "account", account).split("&")[0])
        self.body=self.body.replace("111111", com.get_global_config("global_data", "account", account).split("&")[1])
        #requests.session()会话保持,比如使用session成功的登录了某个网站,
        #则在再次使用该session对象求求该网站的其他网页都会默认使用该session之前使用的cookie等参数
        self.se=requests.session()
        #使用session对象的方法POST/GET等
        re=self.se.post(url=self.url, headers=self.header,data=self.body,proxies=self.proxies,**kwargs)
        #返回session对象,供其他接口使用
        return self.se    

    def close_session(self):
        "关闭session"
        self.se.close()

    def get(self, url, **kwargs):
        """Sends a GET request. Returns :class:`Response` object.

        :param url: URL for the new :class:`Request` object.
        :param \*\*kwargs: Optional arguments that ``request`` takes.
        :rtype: requests.Response
        """
        #记录日志
        log.info("{}\n{}".format(url,kwargs))
        #进行请求
        re=self.se.get(url,proxies=self.proxies,**kwargs)
        return re
    
    def post(self, url, data=None, json=None, **kwargs):
        """Sends a POST request. Returns :class:`Response` object.
        :param url: URL for the new :class:`Request` object.
        :param data: (optional) Dictionary, list of tuples, bytes, or file-like
            object to send in the body of the :class:`Request`.
        :param json: (optional) json to send in the body of the :class:`Request`.
        :param \*\*kwargs: Optional arguments that ``request`` takes.
        :rtype: requests.Response
        """
        #记录日志
        log.info("{}\n{}\n{}\n{}".format(url,data,json,kwargs))
        #进行请求
        re=self.se.post(url, data=data, json=json,proxies=self.proxies, **kwargs)
        return re

    def options(self, url, **kwargs):
        """Sends a OPTIONS request. Returns :class:`Response` object.
        :param url: URL for the new :class:`Request` object.
        :param \*\*kwargs: Optional arguments that ``request`` takes.
        :rtype: requests.Response
        """
        #记录日志
        log.info(url,kwargs)
        #进行请求
        re=self.se.options(url,**kwargs)
        return re

    def head(self, url, **kwargs):
        """Sends a HEAD request. Returns :class:`Response` object.

        :param url: URL for the new :class:`Request` object.
        :param \*\*kwargs: Optional arguments that ``request`` takes.
        :rtype: requests.Response
        """
        #记录日志
        log.info(url,kwargs)
        #进行请求
        re=self.se.head(url,**kwargs)
        return re
    
    def put(self, url, data=None, **kwargs):
        """Sends a PUT request. Returns :class:`Response` object.
        :param url: URL for the new :class:`Request` object.
        :param data: (optional) Dictionary, list of tuples, bytes, or file-like
            object to send in the body of the :class:`Request`.
        :param \*\*kwargs: Optional arguments that ``request`` takes.
        :rtype: requests.Response
        """
        #记录日志
        log.info(url,data,kwargs)
        #进行请求
        re=self.se.put(url, data,**kwargs)
        return re
    
    def delete(self, url, **kwargs):
        """Sends a DELETE request. Returns :class:`Response` object.
        :param url: URL for the new :class:`Request` object.
        :param \*\*kwargs: Optional arguments that ``request`` takes.
        :rtype: requests.Response
        """
        #记录日志
        log.info(url,kwargs)
        #进行请求
        re=self.se.delete(url,**kwargs)
        return re

my=mysession()
s1=my.get_session("user01")
s2=my.get_session("user02")