Python3 从零单排14_flask模块&mysql操作封装

  1.flask模块介绍

  在测试的过程中,往往会遇到功能受接口影响而导致测试无法正常进行,往往要等接口完成之后才能做功能测试,但这往往很托节奏。这个时候就可以考虑,将接口和功能剥离,分开测试,同时进行,保证项目进度。

  问题是怎么分开呢?要研发先帮忙开发接口、调试好,然后测试?显然太low,python这个时候就可以来装一波X,我们自己动手按照需求开发一套接口来供功能测试,其第三方模块flask提供接口服务,早就有大神开发好接口服务的第三方模块,我们直接用就可以了,这就是python的强大之处了,flask模块使用如下:

import flask  #导入模块
server=flask.Flask(__name__)  #创建接口服务,格式:flask.Flask(__name__),类似于redis.Redis,其中(__name__)是指当前的python文件,也就是说接口是基于此文档在运行
@server.route('/api',methods=['get','post'])#创建好接口服务后,需要指定路径,路径包含后缀以及请求方法,必须要是@装饰,下面的函数才能正常运行
def reg():#接口主体部分,运行内容
    return ('hello world!')
server.run(port=9999)#服务创建好,并指定好路径,接口内容编写完后,需要启动服务,启动的时候需要指定端口

  上面就是最简洁的接口服务了,运行后就可以访问了,在浏览器输入:http://127.0.0.1:9999/api,浏览器会输出:“hello world!”。但如果想要获取到用户的请求数据、以及局域网内别的测试人员要访问你的接口咋办,代码如下:

import flask  #导入模块
from flask import request#导入request方法,用来接收传参
server=flask.Flask(__name__)  #创建接口服务,格式:flask.Flask(__name__),类似于redis.Redis,其中(__name__)是指当前的python文件,也就是说接口是基于此文档在运行
@server.route('/login',methods=['get','post'])#创建好接口服务后,需要指定路径,路径包含后缀以及请求方法,必须要是@装饰,下面的函数才能正常运行
def reg():#接口主体部分,运行内容
    name=request.values.get("username")#request.values.get("username")获取指定传参的value,用来判断、处理
    passwd=request.values.get("password")#如果传参是json类型,方法是request.json.get("username")
    if name and passwd:
        if name.strip() and passwd.strip():
            return '{"code":200,"msg":"注册成功!"}'
    return '{"code":300,"msg":"参数不能为空!"}'
server.run(port=9999,host='0.0.0.0',debug=True)#指定host为“0.0.0.0”后,局域网内其他IP就都可以访问了

  上述代码基本可以满足接口需求了,然后你可以在接口主体内容内,对传参进行分析、处理,比如得到传参后,与数据库里的数据进行比对、分析,给出正确响应。譬如用户输入用户名和密码,取到参数后,查询数据库中是否存在用户名,且用户名密码是否正确,用来判断是否允许登录。既然如此那么我们要初始化一些数据了,就是在运行接口之前要判断数据库、表以及数据是否存在,不存在则需要初始化创建上述一系列数据,创建数据库的函数如下:

  2.mysql操作函数封装

def oprate_sql(sql, HOST, USER, PASSWD, charset='utf8', port=3306):#定义操作sql语句的函数,如果是select或show则返回查询内容
    import pymysql
    conn = pymysql.connect(host=HOST, user=USER, password=PASSWD, charset=charset)
    cur = conn.cursor()
    cur.execute(sql)
    if sql.strip().startswith('select') or sql.strip().startswith('show'):
        res = cur.fetchall()
    else:
        conn.commit()
        res = 'ok'
    cur.close()
    conn.close()
    return res

def check_in(args,res):#判断a,是否在res的元素里,这里的res必须是sql执行返回的数据,用于注册,登录
    lis=[]
    for i in res:
        lis.append(i[0])
    if args in lis:
        return True
#调用:
sql='show databases;'
res=oprate_sql(sql, HOST, USER, PASSWD)
if not check_in(DBNAME, res):
    sql = 'create database %s;' % DBNAME
    oprate_sql(sql, HOST, USER, PASSWD)

  可以看到,在连接数据库的时候不能指定数据库名称,因为不确定是否存在数据库,所以先查询当前数据库存在哪些数据库名,判断是否存在,不存在则建立数据库名,创建表类似。

  但是后面也还要操作数据库,比如增删改查,这些都要指定具体数据库名称才可以,那么久不能用上面的函数了,因为数据库名是必填参数,上述函数并没有,代码如下:

def oprate_sql(sql, HOST, USER, PASSWD, DBNAME, charset='utf8', port=3306):#定义操作sql语句的函数,如果是select或show则返回查询内容
    import pymysql
    conn = pymysql.connect(host=HOST, user=USER, password=PASSWD, db=DBNAME, charset=charset)
    cur = conn.cursor()
    cur.execute(sql)
    if sql.strip().startswith('select') or sql.strip().startswith('show'):
        res = cur.fetchall()
    else:
        conn.commit()
        res = 'ok'
    cur.close()
    conn.close()
    return res
#调用:
sql = 'select username from %s' % (USERTB)
res=oprate_sql(sql, HOST, USER, PASSWD,DBNAME)

  那么问题来了,怎么合并这两个处理sql的函数呢,还记得我们函数有默认参数吧,我们把DBNAME默认为空字符,调用的时候传了DBNAME,那么我们就连接到具体的数据库名,如果调用的时候没有传DBNAME,那么我们就不连接数据库名,代码如下:

def oprate_sql(sql, HOST, USER, PASSWD, DBNAME='', charset='utf8', port=3306):#定义操作sql语句的函数,如果是select或show则返回查询内容
    import pymysql
    if DBNAME:
        conn = pymysql.connect(host=HOST, user=USER, password=PASSWD, db=DBNAME, charset=charset)
    else:
        conn = pymysql.connect(host=HOST, user=USER, password=PASSWD, charset=charset)
    cur = conn.cursor()
    cur.execute(sql)
    if sql.strip().startswith('select') or sql.strip().startswith('show'):
        res = cur.fetchall()
    else:
        conn.commit()
        res = 'ok'
    cur.close()
    conn.close()
    return res
#不传DBNAME
sql='show databases;'
res=oprate_sql(sql, HOST, USER, PASSWD)
#传DBNAME:
sql = 'select username from %s' % (USERTB)
res=oprate_sql(sql, HOST, USER, PASSWD,DBNAME=Table_Name)

  3.后面学到了类,可以进一步用类封装mysql操作:

 1 import pymysql
 2 
 3 
 4 class MySql(object):
 5     def __init__(self, host, port, user, pwd, dbname="", charset="utf8"):
 6         """初始化数据库信息以及连接"""
 7         self.host = host
 8         self.port = port
 9         self.user = user
10         self.pwd = pwd
11         self.dbname = dbname
12         self.charset = charset
13         self.conn = None  # 给析构函数用的,如果存在链接,则需要关闭链接,没有就不用管
14         self.conn = self.get_conn
15         self.cur = None
16         self.cur = self.get_cur
17 
18     @property
19     def get_conn(self):
20         """根据传参判断是否有传数据库名,然后做不同的链接操作"""
21         try:
22             if self.dbname:
23                 conn = pymysql.connect(host=self.host, port=self.port, user=self.user,
24                                             password=self.pwd, database=self.dbname, charset=self.charset)
25                 return conn
26             else:
27                 conn = pymysql.connect(host=self.host, port=self.port, user=self.user,
28                                             password=self.pwd, charset=self.charset)
29                 return conn
30         except Exception as e:
31             print(e)
32 
33     @property
34     def get_cur(self):
35         """获取游标"""
36         if self.conn:
37             return self.conn.cursor()
38 
39     def exec_sql(self, sql, *args):
40         """
41         执行sql,根据不同的sql语法做不同的操作:
42         查询就返回查询结果列表,其他成功就返回受影响的行数整型,报错返回报错信息字符串
43         *args是为了防止sql注入,自己拼接的sql语法字符串可能被sql注入,
44         这里的用法是sql不需要自己拼接,直接将变量按顺序传进来,pymysql自动拼接,而且避免了sql注入的问题
45         如:
46         sql = "insert into test.student VALUES (19,%s,'English',%s);"
47         res = test_mysql.exec_sql(sql,"yangxga",100)
48         """
49         try:
50             rows = self.cur.execute(sql, args)  # rows记录受影响的行
51             if sql.strip().startswith('select') or sql.strip().startswith('show'):
52                 res = self.cur.fetchall()
53                 res = self.format_res(res)
54                 return res
55             else:
56                 self.conn.commit()  # 非查询语句需要提交才能生效
57                 res = rows
58                 return res
59         except Exception as e:
60             return e
61 
62     @staticmethod
63     def format_res(res):
64         """格式化数据库查找到的结果,如果"""
65         res_lis = []
66         if res:
67             for i in res:
68                 res_lis.append(i)
69         return res_lis
70 
71     def __del__(self):
72         """析构函数,关闭鼠标,断开连接"""
73         if self.cur:
74             self.cur.close()
75         if self.conn:
76             self.conn.close()