ConfigDB.py 10.6 KB
#!/usr/bin/python
# -*- coding: UTF-8 -*-
import pymysql
import time
import json
import commons.common as ca
from commons.Logging import log


dbhost=ca.get_global_config('global_data','Database','dbhost')
dbport=int(ca.get_global_config('global_data','Database','dbport'))
dbname=ca.get_global_config('global_data','Database','dbname')
dbuser=ca.get_global_config('global_data','Database','dbuser')
dbpassword=ca.get_global_config('global_data','Database','dbpassword')
#dbcharset=get_global_config('Database','dbcharset')

def mysql_conn_test():
    # 数据库连接重试功能和连接超时功能的DB连接
    _conn_status = True
    _max_retries_count = 3  # 设置最大重试次数
    _conn_retries_count = 0  # 初始重试次数
    _conn_timeout = 3  # 连接超时时间为3秒
    while _conn_status and _conn_retries_count < _max_retries_count:
        try:
            print('连接数据库中..')
            db = pymysql.connect(host=dbhost,  
                                 port=dbport,
                                 user=dbuser, 
                                 passwd=dbpassword, 
                                 database=dbname, 
                                 charset='utf8', 
                                 connect_timeout=_conn_timeout)
            _conn_status = False  # 如果conn成功则_status为设置为False则退出循环,返回db连接对象
            print("连接结果 ",db)
            return db
        except:
            _conn_retries_count += 1
        print(_conn_retries_count)
        print('connect db is error!!')
        time.sleep(3)  # 此为测试看效果
        continue
    raise Exception("pls check your mysql config!")


def mysql_selectOne(select_action):
    action=select_action
    db = pymysql.connect(host=dbhost,  
                         port=dbport,
                         user=dbuser, 
                         passwd=dbpassword, 
                         database=dbname, 
                         charset='utf8', 
                         connect_timeout=3)
    cursor = db.cursor()
    try:
        # 使用execute方法执行SQL语句
        cursor.execute(action)
        data = cursor.fetchone()
        return data
    except Exception as e:
        print("数据库操作异常:%s" % str(e))
        log.error("数据库操作异常:%s" % str(e))
        assert False
    finally:
        # 关闭数据库连接
        db.close()

def mysql_selectAll(select_action):
    action=select_action
    db = pymysql.connect(host=dbhost,  
                         port=dbport,
                         user=dbuser, 
                         passwd=dbpassword, 
                         database=dbname, 
                         charset='utf8', 
                         connect_timeout=3)
    cursor = db.cursor()
    try:
        # 使用execute方法执行SQL语句
        cursor.execute(action)
        # 使用 fetchall() 方法获取所有数据  
        data = cursor.fetchall()
#         print data
        return data
    except Exception as e:
        print("数据库操作异常:%s" % str(e))
        log.error("数据库操作异常:%s" % str(e))
        assert False
    finally:
        # 关闭数据库连接
        db.close()

def mysql_delete(delete_action):
    action=delete_action
    db = pymysql.connect(host=dbhost,  
                         port=dbport,
                         user=dbuser, 
                         passwd=dbpassword, 
                         database=dbname, 
                         charset='utf8', 
                         connect_timeout=3)
    cursor = db.cursor()
    try:
        # 使用execute方法执行SQL语句
        cursor.execute(action)
        # 提交
        db.commit()
    except Exception as e:
        print("数据库操作异常:%s" % str(e))
        log.error("数据库操作异常:%s" % str(e))
        # 错误回滚
        db.rollback()
    finally:
        # 关闭数据库连接
        db.close()

def mysql_update(update_action):
    action=update_action
    db = pymysql.connect(host=dbhost,  
                         port=dbport,
                         user=dbuser, 
                         passwd=dbpassword, 
                         database=dbname, 
                         charset='utf8', 
                         connect_timeout=3)
    cursor = db.cursor()
    try:
        # 使用execute方法执行SQL语句
        cursor.execute(action)
        # 提交
        db.commit()
    except Exception as e:
        print("数据库操作异常:%s" % str(e))
        log.error("数据库操作异常:%s" % str(e))
        # 错误回滚
        db.rollback()
    finally:
        # 关闭数据库连接
        db.close()

def mysql_insert(insert_action):
    action=insert_action
    db = pymysql.connect(host=dbhost,  
                         port=dbport,
                         user=dbuser, 
                         passwd=dbpassword, 
                         database=dbname, 
                         charset='utf8', 
                         connect_timeout=3)
    cursor = db.cursor()
    try:
        # 使用execute方法执行SQL语句
        cursor.execute(action)
        # 提交
        db.commit()
    except Exception as e:
        print("数据库操作异常:%s" % str(e))
        log.error("数据库操作异常:%s" % str(e))
        # 错误回滚
        db.rollback()
    finally:
        # 关闭数据库连接
        db.close()

def mysql_check_insert(api,section,check_sql,delete_sql,insert_sql):
    log.info(u"======测试数据准备======")
    check=ca.get_api_config(api, section, check_sql)
    delete=ca.get_api_config(api, section, delete_sql)
    insert=ca.get_api_config(api, section, insert_sql)

    try:
        db = pymysql.connect(dbhost, dbuser, dbpassword, dbname, charset='utf8' )
        cursor = db.cursor()
        # 使用execute方法执行SQL语句
        cursor.execute(check)
        result=cursor.fetchall()
        # 提交
        db.commit()
        if result:
            log.info(u"检查到数据库有重复数据%r"%str(result))
            log.info(u"删除查询到的重复数据%r"%str(delete))
            cursor.execute(delete)
            log.info(u"删除数据完成")
            log.info(u"向数据库中插入测试数据%r"%str(insert))
            cursor.execute(insert)
            log.info(u"插入数据完成")
            result=cursor.fetchall()
            db.commit()
            return result
        else:
            log.info(u"数据库没有重复数据直接插入自定义数据%r"%str(insert))
            result=cursor.execute(insert)
            log.info(u"插入数据完成,返回结果为为%r"%str(result))
#             cursor.fetchall()
            db.commit()
            return result
        
    except Exception as e:
        print(u"数据库操作异常:%r" % str(e))
        log.error(u"数据库操作异常:%r" % str(e))
        # 错误回滚
        db.rollback()
        assert False
    finally:
        # 关闭数据库连接
        db.close()

def Check_in_Mysql(in_data,sql_spm):
    log.info(u"======从数据库中查询传入数据======")
    result=mysql_selectAll(sql_spm)
    log.info(u"传入in_data为: %s"%in_data)
    log.info(u"数据库查询到的结果为 %s"%result)

    result=str(result)
    if len(str(in_data)) == 0 and len(result)!=0:
        log.error(u"传入数据为空!!\n")
        assert False
        return
    elif len(str(in_data)) == 0 and len(result)==0:
        log.error(u"传入数据与数据库查询结果都为空!!\n")
        assert False
        return
    elif len(str(in_data)) != 0 and len(result)==0:
        log.error(u"数据库查询结果为空\n")
        assert False
        return    
    elif isinstance(in_data,(list)):
        log.info(u'检查的数据格式 为list类型')
        for i in range(0,len(in_data)):
            in_data[i]=str(in_data[i]).decode("utf-8").encode("unicode-escape")

            if in_data[i] in result:
                assert True
                log.info(u"遍历list第%d次,插入数据%r与数据库查询结果一致"%(i,in_data[i]))
            else:
                log.error(u"#########ERROR#########:\n in_data与数据库查询结果不一致%r"%in_data[i])
                assert False
        return True
    
    elif str(in_data).decode("utf-8").encode("unicode-escape") in result:
        assert True
        log.info(u"in_data与数据库查询结果一致!!%r"%in_data)
        return True
    else:
        log.info(u"#########ERROR#########:\n in_data与数据库查询结果不一致!!%r"%in_data)
        assert False

def Check_in_Response(check_data,src_data):
    "check_data必须为列表形式,src_data必须为r.json()的数据类型"
    log.info(u"======从响应Body中查询传入数据======")
    src_data=json.dumps(src_data)
    src_data=src_data.replace(" ","")
    log.info(u"传入check_data为: %s"%check_data)
    log.info(u"对比的响应数据为 %s"%src_data)
    if len(check_data) == 0 or len(src_data)==0:
        log.error(u"传入数据为空!!\n")
        assert False
        return
    elif isinstance (check_data,str) :
        check_data=check_data.replace(" ","").replace("[","").replace("]","").decode("utf-8").split(",")
        for i in range(0,len(check_data)):
            #由于数据库查询的数据中,中文的格式为unicode-escape,所以传入的数据需要进行encode
            check_data[i]=check_data[i].decode("utf-8").encode("unicode-escape")
#             print type(check_data),check_data[i]
            if check_data[i] in src_data:
                assert True
                log.info(u"遍历list第%d次,字段%s在请求响应结果中"%((i+1),check_data[i].decode("unicode-escape")))
            else:
                log.error(u"#########ERROR#########:\n check_data不在请求响应结果中%s"%check_data[i].decode("unicode-escape"))
                assert False
        return True
    elif isinstance (check_data,list) and len(check_data) > 0:
        for i in range(0,len(check_data)):
            check_data[i]=check_data[i].decode("utf-8").encode("unicode-escape")
            print(type(check_data),check_data[i],str(check_data[i]))
            print(type(src_data),src_data)
            if check_data[i] in src_data:
                assert True
                log.info(u"遍历list第%d次,插入数据%r与数据库查询结果一致"%(i,check_data[i]))
            else:
                log.error(u"#########ERROR#########:\n check_data数据库查询结果不一致%r"%check_data[i])
                assert False
        return True
    else:
        log.info(u"#########ERROR#########:\n 检查数据有问题,请检查!!!!!%r")
        assert False