ConfigDB.py 12.1 KB
#!/usr/bin/python
# -*- coding: UTF-8 -*-
import MySQLdb
import commons.common as ca
import json
from commons.Logging import Logger
# import chardet
log=Logger()


dbhost=ca.get_global_config('global_data','Database','dbhost')
dbport=ca.get_global_config('global_data','Database','dbport')
dbname=ca.get_global_config('global_data','Database','dbname')
dbuser=ca.get_global_config('global_data','Database','dbuser')
dbpassword=ca.get_global_config('global_data','Database','dbpassword')
#dbcharset=get_global_config('Database','dbcharset')

def mysql_selectOne(select_action):
    action=select_action
    db = MySQLdb.connect(dbhost, dbuser, dbpassword, dbname, charset='utf8' )
    cursor = db.cursor()
    # 使用cursor()方法获取操作游标 
    try:
        # 使用execute方法执行SQL语句
        cursor.execute(action)
        # 使用 fetchone() 方法获取一条数据  
        data = cursor.fetchone()
#         print data
        return data
    except Exception as e:
        print("数据库操作异常:%s" % str(e))
        log.error("数据库操作异常:%s" % str(e))
        assert False
    finally:
        # 关闭数据库连接
        db.close()

def mysql_selectAll(select_action):
    action=select_action
    db = MySQLdb.connect(dbhost, dbuser, dbpassword, dbname, charset='utf8' )
    #游标加上cursorclass=MySQLdb.cursors.DictCursor后,返回值会变成字典,不加时返回数据格式如:(("001","test"),)加了后变成({'name': test, 'id': 001L,})
    #cursor = db.cursor(cursorclass=MySQLdb.cursors.DictCursor)
    cursor = db.cursor()
    # 使用cursor()方法获取操作游标 
    try:
        # 使用execute方法执行SQL语句
        cursor.execute(action)
        # 使用 fetchall() 方法获取所有数据  
        data = cursor.fetchall()
#         print data
        return data
    except Exception as e:
        print("数据库操作异常:%s" % str(e))
        log.error("数据库操作异常:%s" % str(e))
        assert False
    finally:
        # 关闭数据库连接
        db.close()

def mysql_delete(delete_action):
    action=delete_action
    db = MySQLdb.connect(dbhost, dbuser, dbpassword, dbname, charset='utf8' )
    cursor = db.cursor()
    # 使用cursor()方法获取操作游标 
    try:
        # 使用execute方法执行SQL语句
        cursor.execute(action)
        # 提交
        db.commit()
    except Exception as e:
        print("数据库操作异常:%s" % str(e))
        log.error("数据库操作异常:%s" % str(e))
        # 错误回滚
        db.rollback()
    finally:
        # 关闭数据库连接
        db.close()

def mysql_update(update_action):
    action=update_action
    db = MySQLdb.connect(dbhost, dbuser, dbpassword, dbname, charset='utf8' )
    cursor = db.cursor()
    # 使用cursor()方法获取操作游标 
    try:
        # 使用execute方法执行SQL语句
        cursor.execute(action)
        # 提交
        db.commit()
    except Exception as e:
        print("数据库操作异常:%s" % str(e))
        log.error("数据库操作异常:%s" % str(e))
        # 错误回滚
        db.rollback()
    finally:
        # 关闭数据库连接
        db.close()

def mysql_insert(insert_action):
    action=insert_action
    db = MySQLdb.connect(dbhost, dbuser, dbpassword, dbname, charset='utf8' )
    cursor = db.cursor()
    # 使用cursor()方法获取操作游标 
    try:
        # 使用execute方法执行SQL语句
        cursor.execute(action)
        # 提交
        db.commit()
    except Exception as e:
        print("数据库操作异常:%s" % str(e))
        log.error("数据库操作异常:%s" % str(e))
        # 错误回滚
        db.rollback()
    finally:
        # 关闭数据库连接
        db.close()

def mysql_check_insert(api,section,check_sql,delete_sql,insert_sql):
    log.info(u"======测试数据准备======")
    check=ca.get_api_config(api, section, check_sql)
    delete=ca.get_api_config(api, section, delete_sql)
    insert=ca.get_api_config(api, section, insert_sql)
#     print check
#     print delete
#     print insert
    try:
        db = MySQLdb.connect(dbhost, dbuser, dbpassword, dbname, charset='utf8' )
        cursor = db.cursor()
        # 使用cursor()方法获取操作游标 
        # 使用execute方法执行SQL语句
        cursor.execute(check)
        result=cursor.fetchall()
        # 提交
        db.commit()
        if result:
            log.info(u"检查到数据库有重复数据%r"%str(result))
            log.info(u"删除查询到的重复数据%r"%str(delete))
            cursor.execute(delete)
            log.info(u"删除数据完成")
            log.info(u"向数据库中插入测试数据%r"%str(insert))
            cursor.execute(insert)
            log.info(u"插入数据完成")
            result=cursor.fetchall()
            db.commit()
            return result
        else:
            log.info(u"数据库没有重复数据直接插入自定义数据%r"%str(insert))
            result=cursor.execute(insert)
            log.info(u"插入数据完成,返回结果为为%r"%str(result))
#             cursor.fetchall()
            db.commit()
            return result
        
    except Exception as e:
        print(u"数据库操作异常:%r" % str(e))
        log.error(u"数据库操作异常:%r" % str(e))
        # 错误回滚
        db.rollback()
        assert False
    finally:
        # 关闭数据库连接
        db.close()



def mysql_select_order(orderNum):
    return mysql_selectOne("SELECT * FROM `order`.orders WHERE code = '%s';"%orderNum)
    
def mysql_delete_order(orderNum):
    mysql_delete("DELETE FROM `order`.orders WHERE code = '%s';"%orderNum)
    
def mysql_select_coupon(couponNum):
    return mysql_selectOne("SELECT * FROM `customer`.customer_coupon WHERE id = '%s';"%couponNum)

def mysql_delete_coupon(couponNum):
    mysql_delete("DELETE FROM `customer`.customer_coupon WHERE id = '%s';"%couponNum)


def Check_in_Mysql(in_data,sql_spm):
    log.info(u"======从数据库中查询传入数据======")
    result=mysql_selectAll(sql_spm)
    log.info(u"传入in_data为: %s"%in_data)
    log.info(u"数据库查询到的结果为 %s"%result)

    result=str(result)
#     print "1111111",result.decode("utf-8")
    if len(str(in_data)) == 0 and len(result)!=0:
        log.error(u"传入数据为空!!\n")
        assert False
        return
    elif len(str(in_data)) == 0 and len(result)==0:
        log.error(u"传入数据与数据库查询结果都为空!!\n")
        assert False
        return
    elif len(str(in_data)) != 0 and len(result)==0:
        log.error(u"数据库查询结果为空\n")
        assert False
        return    
    elif isinstance(in_data,(list)):
        log.info(u'检查的数据格式 为list类型')
        for i in range(0,len(in_data)):
            #由于数据库查询的数据中,中文的格式为unicode-escape,所以传入的数据需要进行encode
            in_data[i]=str(in_data[i]).decode("utf-8").encode("unicode-escape")
#             log.info(u"正在检查输入数据:%s"%in_data[i])
            if in_data[i] in result:
                assert True
                log.info(u"遍历list第%d次,插入数据%r与数据库查询结果一致"%(i,in_data[i]))
            else:
                log.error(u"#########ERROR#########:\n in_data与数据库查询结果不一致%r"%in_data[i])
                assert False
        return True
    
    elif str(in_data).decode("utf-8").encode("unicode-escape") in result:
        assert True
        log.info(u"in_data与数据库查询结果一致!!%r"%in_data)
        return True
    else:
        log.info(u"#########ERROR#########:\n in_data与数据库查询结果不一致!!%r"%in_data)
        assert False

def Check_in_Response(check_data,src_data):
    #check_data必须为列表形式,src_data必须为r.json()的数据类型
    log.info(u"======从响应Body中查询传入数据======")
    src_data=json.dumps(src_data)
    src_data=src_data.replace(" ","")
    log.info(u"传入check_data为: %s"%check_data)
    log.info(u"对比的响应数据为 %s"%src_data)
    if len(check_data) == 0 or len(src_data)==0:
        log.error(u"传入数据为空!!\n")
        assert False
        return
    elif isinstance (check_data,str) :
        check_data=check_data.replace(" ","").replace("[","").replace("]","").decode("utf-8").split(",")
        for i in range(0,len(check_data)):
            #由于数据库查询的数据中,中文的格式为unicode-escape,所以传入的数据需要进行encode
            check_data[i]=check_data[i].decode("utf-8").encode("unicode-escape")
#             print type(check_data),check_data[i]
            if check_data[i] in src_data:
                assert True
                log.info(u"遍历list第%d次,字段%s在请求响应结果中"%((i+1),check_data[i].decode("unicode-escape")))
            else:
                log.error(u"#########ERROR#########:\n check_data不在请求响应结果中%s"%check_data[i].decode("unicode-escape"))
                assert False
        return True
    elif isinstance (check_data,list) and len(check_data) > 0:
        for i in range(0,len(check_data)):
            #由于数据库查询的数据中,中文的格式为unicode-escape,所以传入的数据需要进行encode
            check_data[i]=check_data[i].decode("utf-8").encode("unicode-escape")
            print(type(check_data),check_data[i],str(check_data[i]))
            print(type(src_data),src_data)
            if check_data[i] in src_data:
                assert True
                log.info(u"遍历list第%d次,插入数据%r与数据库查询结果一致"%(i,check_data[i]))
            else:
                log.error(u"#########ERROR#########:\n check_data数据库查询结果不一致%r"%check_data[i])
                assert False
        return True
    else:
        log.info(u"#########ERROR#########:\n 检查数据有问题,请检查!!!!!%r")
        assert False


# test=ca.get_api_config('listCoupon', 'RequstHeader', 'testlist')
# print type(test),test
# print test.replace(" ","").replace("[","").replace("]","").decode("utf-8").split(",")
# test1=test.replace(" ","").replace("[","").replace("]","").decode("utf-8").split(",")
# 
# a='couponBizType'
# b='code'
# data1=[a,b:"200"]
# data2={"code":"200","data":{"code":u"你好","data":[{"couponBizType":1,"couponCode":"004","couponCode123":"004"}],"result":"OK","success":2},"result":"OK","success":2}
#  
# print Check_in_Response(data1, data2)

# d=mysql_selectAll("select id as ddd,created_id,name from `product`.tag where id =1")
# d1=mysql_selectOne("select id,created_id,name from `product`.tag where id =1111")
# print d
# print d1
# print d[0][0]
# print chardet.detect(str(d))  
# print chardet.detect(str(d[0][2]))  
# mysql_insert(ca.get_api_config('submitOrder', 'coupon', 'insert'))
# d=ca.get_api_config('submitOrder', 'coupon', 'insert')
# d=d.decode("utf-8")
# print d
# mysql_insert(d)

# c={"couponBizType":1,"couponCode":"004"}
# a={"code":"200","data":{"code":"200","data":[],"result":"OK","success":1},"result":"OK","success":2}
# b={"code":"200","data":{"code":"200","data":[{"couponBizType":1,"couponCode":"004","couponCode123":"004"}],"result":"OK","success":2},"result":"OK","success":2}
# # print json_cmp(a, b)
# print json_cmp(a, b)
# print 123
# mysql_check_insert('listCoupon', 'listCoupon01',1,2,3)
# a=mysql_selectAll("select * from `order`.orders where id =271")
# print a
# if a:
#     print 111

# def string(str1):
#     a=[]
#     b=[]
#     str1= list(str1)
#     print type(str1[0]),type(str1[1]),len(str1)
#     for i in range(len(str1)):
#         if str1[i] in '0123456789':
#             b.append(str1[i])
#         else:
#             a.append(str1[i])
#     a= "".join(a)
#     b= "".join(b)
#     c=a+b
#     return c
# 
# print string("A1B2c3")
# 
# assert None, "add assert message"