python操作mysql之pymysql(mysql-python)

网友投稿 320 2022-08-28


python操作mysql之pymysql(mysql-python)

pymsql是Python中操作MySQL的模块,其使用方法和MySQLdb几乎相同。但目前pymysql支持python3.x而后者不支持3.x版本。

本文测试python版本:2.7.11。mysql版本:5.6.24

一、安装

pip3 install pymysql

二、使用操作

#!/usr/bin/env pytho# -*- coding:utf-8 -*-import pymysql # 创建连接conn = pymysql.connect(host='127.0.0.1', port=3306, user='root', passwd='', db='tkq1', charset='utf8')# 创建游标cursor = conn.cursor() # 执行SQL,并返回收影响行数effect_row = cursor.execute("select * from tb7") # 执行SQL,并返回受影响行数effect_row = cursor.execute("update tb7 set pass = '123' where nid = %s", (11,)) # 执行SQL,并返回受影响行数,执行多次effect_row = cursor.executemany("insert into tb7(user,pass,licnese)values(%s,%s,%s)", [("u1","u1pass","11111"),("u2","u2pass","22222")]) # 提交,不然无法保存新建或者修改的数据conn.commit() # 关闭游标cursor.close()# 关闭连接conn.close()

注意:存在中文的时候,连接需要添加charset='utf8',否则中文显示乱码。

2、获取查询数据

#! /usr/bin/env python# -*- coding:utf-8 -*-# __author__ = "TKQ"import pymysqlconn = pymysql.connect(host='127.0.0.1', port=3306, user='root', passwd='', db='tkq1')cursor = conn.cursor()cursor.execute("select * from tb7")# 获取剩余结果的第一行数据row_1 = cursor.fetchone()print row_1# 获取剩余结果前n行数据row_2 = cursor.fetchmany(3)# 获取剩余结果所有数据row_3 = cursor.fetchall()conn.commit()cursor.close()conn.close()

3、获取新创建数据自增ID

可以获取到最新自增的ID,也就是最后插入的一条数据ID

#! /usr/bin/env python# -*- coding:utf-8 -*-# __author__ = "TKQ"import pymysqlconn = pymysql.connect(host='127.0.0.1', port=3306, user='root', passwd='', db='tkq1')cursor = conn.cursor()effect_row = cursor.executemany("insert into tb7(user,pass,licnese)values(%s,%s,%s)", [("u3","u3pass","11113"),("u4","u4pass","22224")])conn.commit()cursor.close()conn.close()#获取自增idnew_id = cursor.lastrowid print new_id

4、移动游标

操作都是靠游标,那对游标的控制也是必须的

注:在fetch数据时按照顺序进行,可以使用cursor.scroll(num,mode)来移动游标位置,如:cursor.scroll(1,mode='relative') # 相对当前位置移动cursor.scroll(2,mode='absolute') # 相对绝对位置移动

5、fetch数据类型

关于默认获取的数据是元祖类型,如果想要或者字典类型的数据,即:

#! /usr/bin/env python# -*- coding:utf-8 -*-# __author__ = "TKQ"import pymysqlconn = pymysql.connect(host='127.0.0.1', port=3306, user='root', passwd='', db='tkq1')#游标设置为字典类型cursor = conn.cursor(cursor=pymysql.cursors.DictCursor)cursor.execute("select * from tb7")row_1 = cursor.fetchone()print row_1  #{u'licnese': 213, u'user': '123', u'nid': 10, u'pass': '213'}conn.commit()cursor.close()conn.close()

6、调用存储过程

a、调用无参存储过程

#! /usr/bin/env python# -*- coding:utf-8 -*-# __author__ = "TKQ"import pymysqlconn = pymysql.connect(host='127.0.0.1', port=3306, user='root', passwd='', db='tkq1')#游标设置为字典类型cursor = conn.cursor(cursor=pymysql.cursors.DictCursor)#无参数存储过程cursor.callproc('p2') #等价于cursor.execute("call p2()")row_1 = cursor.fetchone()print row_1conn.commit()cursor.close()conn.close()

b、调用有参存储过程

#! /usr/bin/env python# -*- coding:utf-8 -*-# __author__ = "TKQ"import pymysqlconn = pymysql.connect(host='127.0.0.1', port=3306, user='root', passwd='', db='tkq1')cursor = conn.cursor(cursor=pymysql.cursors.DictCursor)cursor.callproc('p1', args=(1, 22, 3, 4))#获取执行完存储的参数,参数@开头cursor.execute("select @p1,@_p1_1,@_p1_2,@_p1_3") #{u'@_p1_1': 22, u'@p1': None, u'@_p1_2': 103, u'@_p1_3': 24}row_1 = cursor.fetchone()print row_1conn.commit()cursor.close()conn.close()

三、关于pymysql防注入

1、字符串拼接查询,造成注入

正常查询语句:

#! /usr/bin/env python# -*- coding:utf-8 -*-# __author__ = "TKQ"import pymysqlconn = pymysql.connect(host='127.0.0.1', port=3306, user='root', passwd='', db='tkq1')cursor = conn.cursor()user="u1"passwd="u1pass"#正常构造语句的情况sql="select user,pass from tb7 where user='%s' and pass='%s'" % (user,passwd)#或者#sql=select user,pass from tb7 where user='u1' and pass='u1pass'row_count=cursor.execute(sql) row_1 = cursor.fetchone()print row_count,row_1conn.commit()cursor.close()conn.close()

构造注入语句:

#! /usr/bin/env python# -*- coding:utf-8 -*-# __author__ = "TKQ"import pymysqlconn = pymysql.connect(host='127.0.0.1', port=3306, user='root', passwd='', db='tkq1')cursor = conn.cursor()user="u1' or '1'-- "passwd="u1pass"sql="select user,pass from tb7 where user='%s' and pass='%s'" % (user,passwd)#拼接语句被构造成下面这样,永真条件,此时就注入成功了。因此要避免这种情况需使用pymysql提供的参数化查询。#select user,pass from tb7 where user='u1' or '1'-- ' and pass='u1pass'row_count=cursor.execute(sql)row_1 = cursor.fetchone()print row_count,row_1conn.commit()cursor.close()conn.close()

2、避免注入,使用pymysql提供的参数化语句

正常参数化查询

#! /usr/bin/env python# -*- coding:utf-8 -*-# __author__ = "TKQ"import pymysqlconn = pymysql.connect(host='127.0.0.1', port=3306, user='root', passwd='', db='tkq1')cursor = conn.cursor()user="u1"passwd="u1pass"#执行参数化查询row_count=cursor.execute("select user,pass from tb7 where user=%s and pass=%s",(user,passwd))row_1 = cursor.fetchone()print row_count,row_1conn.commit()cursor.close()conn.close()

构造注入,参数化查询注入失败。

#! /usr/bin/env python# -*- coding:utf-8 -*-# __author__ = "TKQ"import pymysqlconn = pymysql.connect(host='127.0.0.1', port=3306, user='root', passwd='', db='tkq1')cursor = conn.cursor()user="u1' or '1'-- "passwd="u1pass"#执行参数化查询row_count=cursor.execute("select user,pass from tb7 where user=%s and pass=%s",(user,passwd))#内部执行参数化生成的SQL语句,对特殊字符进行了加\转义,避免注入语句生成。# sql=cursor.mogrify("select user,pass from tb7 where user=%s and pass=%s",(user,passwd))# print sql#select user,pass from tb7 where user='u1\' or \'1\'-- ' and pass='u1pass'被转义的语句。row_1 = cursor.fetchone()print row_count,row_1conn.commit()cursor.close()conn.close()

结论:excute执行SQL语句的时候,必须使用参数化的方式,否则必然产生SQL注入漏洞。

3、使用存mysql存储过程动态执行SQL防注入

使用MYSQL存储过程自动提供防注入,动态传入SQL到存储过程执行语句。

定义动态调用sql的存储过程:

delimiter \\DROP PROCEDURE IF EXISTS proc_sql \\CREATE PROCEDURE proc_sql ( in nid1 INT, in nid2 INT, in callsql VARCHAR(255) )BEGIN set @nid1 = nid1; set @nid2 = nid2; set @callsql = callsql; PREPARE myprod FROM @callsql;-- PREPARE prod FROM 'select * from tb2 where nid>? and nid

在sql中调用:

set @nid1=12;set @nid2=15;set @callsql = 'select * from tb7 where nid>? and nid

pymsql中调用 :

#! /usr/bin/env python# -*- coding:utf-8 -*-# __author__ = "TKQ"import pymysqlconn = pymysql.connect(host='127.0.0.1', port=3306, user='root', passwd='', db='tkq1')cursor = conn.cursor()mysql="select * from tb7 where nid>? and nid

四、使用with简化连接过程

每次都连接关闭很麻烦,使用上下文管理,简化连接过程

#! /usr/bin/env python# -*- coding:utf-8 -*-# __author__ = "TKQ"import pymysqlimport contextlib#定义上下文管理器,连接后自动关闭连接@contextlib.contextmanagerdef mysql(host='127.0.0.1', port=3306, user='root', passwd='', db='tkq1',charset='utf8'): conn = pymysql.connect(host=host, port=port, user=user, passwd=passwd, db=db, charset=charset) cursor = conn.cursor(cursor=pymysql.cursors.DictCursor) try: yield cursor finally: conn.commit() cursor.close() conn.close()# 执行sqlwith mysql() as cursor: print(cursor) row_count = cursor.execute("select * from tb7") row_1 = cursor.fetchone() print row_count, row_1

五、封装pymysql连接

a.封装函数执行sql语句获取结果

# encoding=utf-8import tracebackfrom pymysql import *class MysqlHelper: def __init__(self, host='10HW173532', user='finmsg_dev', password='zxcZXC$12', db='msgrule', charset='utf8', connect_timeout=120): # 注意这里有默认值的变量一定要放在没有默认值变量的后面 self.host = host self.user = user self.password = password self.db = db self.charset = charset self.connect_timeout = connect_timeout def open(self): self.conn = connect(host=self.host, user=self.user, password=self.password, db=self.db, charset=self.charset, connect_timeout=self.connect_timeout) self.cursor = self.conn.cursor() print("数据库已连接") def close(self): self.cursor.close() self.conn.close() print("数据库已关闭") def rollback(self): self.conn.rollback() self.cursor.close() self.conn.close() print("数据库执行回滚操作") def search_one(self, sql, params=None): try: self.open() self.cursor.execute(sql, params) result = self.cursor.fetchone() print("查询成功") self.close() return result except Exception as e: print(e) print("查询失败") def search_all(self, sql, params=None): # 查询获取多个值 try: self.open() self.cursor.execute(sql, params) result = self.cursor.fetchall() print("查询成功") self.close() return result except Exception as e: print(e) return "查询失败" def insert(self, sql, params=None): # 添加数据 try: self.open() self.cursor.execute(sql, params) try: self.conn.commit() print('添加成功') self.close() return "添加成功" except: self.rollback() return "添加失败" except Exception as e: print(e) return "添加失败" def update(self, sql, params=None): # 修改数据 try: self.open() self.cursor.execute(sql, params) try: self.conn.commit() print('修改成功') self.close() return "修改成功" except: self.rollback() print("修改失败") return "修改失败" except Exception as e: print(e) return "修改失败" def delete(self, sql, params=None): # 删除数据 try: self.open() self.cursor.execute(sql, params) try: self.conn.commit() print('删除成功') self.close() return "删除成功" except: self.rollback() print('删除失败') return "删除失败" except Exception as e: print(e) return "删除失败" def is_connected(self): # 检查数据库是否处于连接中 """Check if the server is alive""" try: self.conn.ping(reconnect=True) print("db is connecting") except: traceback.print_exc() self.conn = self.open() print("db reconnect")

b.封装函数获取数据库中的对象

""" 作用:完成数据库相关工具类封装 分析: 1. 主要方法 get_sql_one(sql) get_sql_all(sql) 2. 辅助方法 1. 获取连接对象 2. 获取游标对象 3. 关闭游标对象方法 4. 关闭连接对象方法"""from config.project import DataBasefrom tools.read_config import ReadConfigimport pymysql# 新建工具类 数据库class ReadDB: # 定义连接对象 类方法 conn = None def __init__(self, database): # 构造函数-->注意这里有默认值的变量一定要放在没有默认值变量的后面 self.host = ReadConfig().read_config(section=database, option="host") # 从config.ini配置文件读取出来的option为字符串类型,这里需要将port强转为int类型 self.port = int(ReadConfig().read_config(section=database, option="port")) self.user = ReadConfig().read_config(section=database, option="user") self.password = ReadConfig().read_config(section=database, option="password") self.db_name = ReadConfig().read_config(section=database, option="db_name") self.charset = 'utf8' self.connect_timeout = 120 # 获取连对象方法封装 def get_conn(self): if self.conn is None: self.conn = pymysql.connect(host=self.host, port=self.port, user=self.user, password=self.password, db=self.db_name, charset=self.charset, connect_timeout=self.connect_timeout) print('数据库连接成功') # 返回连接对象 return self.conn # 获取游标对象方法封装 def get_cursor(self): return self.get_conn().cursor() # 将游标设置为字典类型 def get_dict_cursor(self): return self.get_conn().cursor(cursor=pymysql.cursors.DictCursor) # 关闭游标对象方法封装 @staticmethod def close_cursor(cursor): if cursor: cursor.close() # 关闭连接对象方法封装 def close_conn(self): if self.conn: self.conn.close() # 注意:关闭连接对象后,对象还存在内存中,需要手工设置为None self.conn = None print("数据库关闭成功") # 主要 执行方法 获取单条结果 -> 在外界调用次方法就可以完成数据相应的操作【获取的数据库单条数据的格式为python元组类型】 def get_sql_one(self, sql_statement): # 定义游标对象及数据变量 cursor = None data = None try: # 获取游标对象 cursor = self.get_cursor() # 调用执行方法 cursor.execute(sql_statement) # 获取结果 data = cursor.fetchone() except Exception as e: print("get_sql_one error:", e) finally: # 关闭游标对象 self.close_cursor(cursor) # 关闭连接对象 self.close_conn() # 返回执行结果 return data # 主要 执行方法 获取单条结果 -> 在外界调用次方法就可以完成数据相应的操作【获取的数据库单条数据的格式为python字典类型】 def get_sql_byDict(self, sql_statement): # 定义游标对象及数据变量 cursor = None data = None try: # 获取游标对象 cursor = self.get_dict_cursor() # 调用执行方法 cursor.execute(sql_statement) # 获取结果 data = cursor.fetchone() except Exception as e: print("get_sql_one error:", e) finally: # 关闭游标对象 self.close_cursor(cursor) # 关闭连接对象 self.close_conn() # 返回执行结果 return data # 获取指定条数的数据库结果集【获取的数据库多条数据集格式为元组嵌套元组】 def get_sql_many(self, sql_statement, n): # 定义游标对象及数据变量 cursor = None data = None try: # 获取游标对象 cursor = self.get_cursor() # 调用执行方法 cursor.execute(sql_statement) # 获取结果 data = cursor.fetchmany(n) except Exception as e: print("get_sql_one error:", e) finally: # 关闭游标对象 self.close_cursor(cursor) # 关闭连接对象 self.close_conn() # 返回执行结果 return data # 获取 所有数据库结果集 def get_sql_all(self, sql_statement): # 定义游标对象及数据变量 cursor = None data = None try: # 获取游标对象 cursor = self.get_cursor() # 调用执行方法 cursor.execute(sql_statement) # 获取所有结果 data = cursor.fetchall() except Exception as e: print("get_sql_all error:", e) finally: # 关闭游标对象 self.close_cursor(cursor) # 关闭连接对象 self.close_conn() # 返回执行结果 return data # 修改、删除、新增 def update_sql(self, sql_statement): # 定义游标对象 cursor = None effect_row = None try: # 获取游标对象 cursor = self.get_cursor() # 调用执行方法,返回受影响行数 effect_row = cursor.execute(sql_statement) # 提交事务 self.conn.commit() except Exception as e: # 事务回滚 self.conn.rollback() print("get_sql_one error:", e) finally: # 关闭游标对象 self.close_cursor(cursor) # 关闭连接对象 self.close_conn() # 返回受影响行数 return effect_row def is_connected(self): # 检查数据库是否处于连接中 """Check if the server is alive""" try: self.conn.ping(reconnect=True) print("db is connecting") except Exception as error: print("db connect error:%s" % error) self.get_conn() self.get_dict_cursor() print("db reconnect")if __name__ == '__main__': sql = "SELECT * FROM bomt_clue_db.bo_clue_orig order by CREATE_TIME desc limit 1 " print(ReadDB(database=DataBase.LeaveInformation).get_sql_byDict(sql))

去期待陌生,去拥抱惊喜。


版权声明:本文内容由网络用户投稿,版权归原作者所有,本站不拥有其著作权,亦不承担相应法律责任。如果您发现本站中有涉嫌抄袭或描述失实的内容,请联系我们jiasou666@gmail.com 处理,核实后本网站将在24小时内删除侵权内容。

上一篇:Java栈和基础队列的实现详解
下一篇:python多线程详解(python多线程详解(超详细))
相关文章

 发表评论

暂时没有评论,来抢沙发吧~