java中的接口是类吗
320
2022-08-28
python操作mysql之pymysql(mysql-python)
pymsql是Python中操作MySQL的模块,其使用方法和MySQLdb几乎相同。但目前pymysql支持python3.x而后者不支持3.x版本。
本文测试python版本:2.7.11。mysql版本:5.6.24
一、安装
pip3 install pymysql
二、使用操作
#!/usr/bin/env pytho# -*- coding:utf-8 -*-import pymysql # 创建连接conn = pymysql.connect(host='127.0.0.1', port=3306, user='root', passwd='', db='tkq1', charset='utf8')# 创建游标cursor = conn.cursor() # 执行SQL,并返回收影响行数effect_row = cursor.execute("select * from tb7") # 执行SQL,并返回受影响行数effect_row = cursor.execute("update tb7 set pass = '123' where nid = %s", (11,)) # 执行SQL,并返回受影响行数,执行多次effect_row = cursor.executemany("insert into tb7(user,pass,licnese)values(%s,%s,%s)", [("u1","u1pass","11111"),("u2","u2pass","22222")]) # 提交,不然无法保存新建或者修改的数据conn.commit() # 关闭游标cursor.close()# 关闭连接conn.close()
注意:存在中文的时候,连接需要添加charset='utf8',否则中文显示乱码。
2、获取查询数据
#! /usr/bin/env python# -*- coding:utf-8 -*-# __author__ = "TKQ"import pymysqlconn = pymysql.connect(host='127.0.0.1', port=3306, user='root', passwd='', db='tkq1')cursor = conn.cursor()cursor.execute("select * from tb7")# 获取剩余结果的第一行数据row_1 = cursor.fetchone()print row_1# 获取剩余结果前n行数据row_2 = cursor.fetchmany(3)# 获取剩余结果所有数据row_3 = cursor.fetchall()conn.commit()cursor.close()conn.close()
3、获取新创建数据自增ID
可以获取到最新自增的ID,也就是最后插入的一条数据ID
#! /usr/bin/env python# -*- coding:utf-8 -*-# __author__ = "TKQ"import pymysqlconn = pymysql.connect(host='127.0.0.1', port=3306, user='root', passwd='', db='tkq1')cursor = conn.cursor()effect_row = cursor.executemany("insert into tb7(user,pass,licnese)values(%s,%s,%s)", [("u3","u3pass","11113"),("u4","u4pass","22224")])conn.commit()cursor.close()conn.close()#获取自增idnew_id = cursor.lastrowid print new_id
4、移动游标
操作都是靠游标,那对游标的控制也是必须的
注:在fetch数据时按照顺序进行,可以使用cursor.scroll(num,mode)来移动游标位置,如:cursor.scroll(1,mode='relative') # 相对当前位置移动cursor.scroll(2,mode='absolute') # 相对绝对位置移动
5、fetch数据类型
关于默认获取的数据是元祖类型,如果想要或者字典类型的数据,即:
#! /usr/bin/env python# -*- coding:utf-8 -*-# __author__ = "TKQ"import pymysqlconn = pymysql.connect(host='127.0.0.1', port=3306, user='root', passwd='', db='tkq1')#游标设置为字典类型cursor = conn.cursor(cursor=pymysql.cursors.DictCursor)cursor.execute("select * from tb7")row_1 = cursor.fetchone()print row_1 #{u'licnese': 213, u'user': '123', u'nid': 10, u'pass': '213'}conn.commit()cursor.close()conn.close()
6、调用存储过程
a、调用无参存储过程
#! /usr/bin/env python# -*- coding:utf-8 -*-# __author__ = "TKQ"import pymysqlconn = pymysql.connect(host='127.0.0.1', port=3306, user='root', passwd='', db='tkq1')#游标设置为字典类型cursor = conn.cursor(cursor=pymysql.cursors.DictCursor)#无参数存储过程cursor.callproc('p2') #等价于cursor.execute("call p2()")row_1 = cursor.fetchone()print row_1conn.commit()cursor.close()conn.close()
b、调用有参存储过程
#! /usr/bin/env python# -*- coding:utf-8 -*-# __author__ = "TKQ"import pymysqlconn = pymysql.connect(host='127.0.0.1', port=3306, user='root', passwd='', db='tkq1')cursor = conn.cursor(cursor=pymysql.cursors.DictCursor)cursor.callproc('p1', args=(1, 22, 3, 4))#获取执行完存储的参数,参数@开头cursor.execute("select @p1,@_p1_1,@_p1_2,@_p1_3") #{u'@_p1_1': 22, u'@p1': None, u'@_p1_2': 103, u'@_p1_3': 24}row_1 = cursor.fetchone()print row_1conn.commit()cursor.close()conn.close()
三、关于pymysql防注入
1、字符串拼接查询,造成注入
正常查询语句:
#! /usr/bin/env python# -*- coding:utf-8 -*-# __author__ = "TKQ"import pymysqlconn = pymysql.connect(host='127.0.0.1', port=3306, user='root', passwd='', db='tkq1')cursor = conn.cursor()user="u1"passwd="u1pass"#正常构造语句的情况sql="select user,pass from tb7 where user='%s' and pass='%s'" % (user,passwd)#或者#sql=select user,pass from tb7 where user='u1' and pass='u1pass'row_count=cursor.execute(sql) row_1 = cursor.fetchone()print row_count,row_1conn.commit()cursor.close()conn.close()
构造注入语句:
#! /usr/bin/env python# -*- coding:utf-8 -*-# __author__ = "TKQ"import pymysqlconn = pymysql.connect(host='127.0.0.1', port=3306, user='root', passwd='', db='tkq1')cursor = conn.cursor()user="u1' or '1'-- "passwd="u1pass"sql="select user,pass from tb7 where user='%s' and pass='%s'" % (user,passwd)#拼接语句被构造成下面这样,永真条件,此时就注入成功了。因此要避免这种情况需使用pymysql提供的参数化查询。#select user,pass from tb7 where user='u1' or '1'-- ' and pass='u1pass'row_count=cursor.execute(sql)row_1 = cursor.fetchone()print row_count,row_1conn.commit()cursor.close()conn.close()
2、避免注入,使用pymysql提供的参数化语句
正常参数化查询
#! /usr/bin/env python# -*- coding:utf-8 -*-# __author__ = "TKQ"import pymysqlconn = pymysql.connect(host='127.0.0.1', port=3306, user='root', passwd='', db='tkq1')cursor = conn.cursor()user="u1"passwd="u1pass"#执行参数化查询row_count=cursor.execute("select user,pass from tb7 where user=%s and pass=%s",(user,passwd))row_1 = cursor.fetchone()print row_count,row_1conn.commit()cursor.close()conn.close()
构造注入,参数化查询注入失败。
#! /usr/bin/env python# -*- coding:utf-8 -*-# __author__ = "TKQ"import pymysqlconn = pymysql.connect(host='127.0.0.1', port=3306, user='root', passwd='', db='tkq1')cursor = conn.cursor()user="u1' or '1'-- "passwd="u1pass"#执行参数化查询row_count=cursor.execute("select user,pass from tb7 where user=%s and pass=%s",(user,passwd))#内部执行参数化生成的SQL语句,对特殊字符进行了加\转义,避免注入语句生成。# sql=cursor.mogrify("select user,pass from tb7 where user=%s and pass=%s",(user,passwd))# print sql#select user,pass from tb7 where user='u1\' or \'1\'-- ' and pass='u1pass'被转义的语句。row_1 = cursor.fetchone()print row_count,row_1conn.commit()cursor.close()conn.close()
结论:excute执行SQL语句的时候,必须使用参数化的方式,否则必然产生SQL注入漏洞。
3、使用存mysql存储过程动态执行SQL防注入
使用MYSQL存储过程自动提供防注入,动态传入SQL到存储过程执行语句。
定义动态调用sql的存储过程:
delimiter \\DROP PROCEDURE IF EXISTS proc_sql \\CREATE PROCEDURE proc_sql ( in nid1 INT, in nid2 INT, in callsql VARCHAR(255) )BEGIN set @nid1 = nid1; set @nid2 = nid2; set @callsql = callsql; PREPARE myprod FROM @callsql;-- PREPARE prod FROM 'select * from tb2 where nid>? and nid'; 传入的值为字符串,?为占位符-- 用@p1,和@p2填充占位符 EXECUTE myprod USING @nid1,@nid2; DEALLOCATE prepare myprod; END\\delimiter ;定义动态调用sql的存储过程
在sql中调用:
set @nid1=12;set @nid2=15;set @callsql = 'select * from tb7 where nid>? and nid';CALL proc_sql(@nid1,@nid2,@callsql)
pymsql中调用 :
#! /usr/bin/env python# -*- coding:utf-8 -*-# __author__ = "TKQ"import pymysqlconn = pymysql.connect(host='127.0.0.1', port=3306, user='root', passwd='', db='tkq1')cursor = conn.cursor()mysql="select * from tb7 where nid>? and nid"cursor.callproc('proc_sql', args=(11, 15, mysql))rows = cursor.fetchall()print rows #((12, 'u1', 'u1pass', 11111), (13, 'u2', 'u2pass', 22222), (14, 'u3', 'u3pass', 11113))conn.commit()cursor.close()conn.close()
四、使用with简化连接过程
每次都连接关闭很麻烦,使用上下文管理,简化连接过程
#! /usr/bin/env python# -*- coding:utf-8 -*-# __author__ = "TKQ"import pymysqlimport contextlib#定义上下文管理器,连接后自动关闭连接@contextlib.contextmanagerdef mysql(host='127.0.0.1', port=3306, user='root', passwd='', db='tkq1',charset='utf8'): conn = pymysql.connect(host=host, port=port, user=user, passwd=passwd, db=db, charset=charset) cursor = conn.cursor(cursor=pymysql.cursors.DictCursor) try: yield cursor finally: conn.commit() cursor.close() conn.close()# 执行sqlwith mysql() as cursor: print(cursor) row_count = cursor.execute("select * from tb7") row_1 = cursor.fetchone() print row_count, row_1
五、封装pymysql连接
a.封装函数执行sql语句获取结果
# encoding=utf-8import tracebackfrom pymysql import *class MysqlHelper: def __init__(self, host='10HW173532', user='finmsg_dev', password='zxcZXC$12', db='msgrule', charset='utf8', connect_timeout=120): # 注意这里有默认值的变量一定要放在没有默认值变量的后面 self.host = host self.user = user self.password = password self.db = db self.charset = charset self.connect_timeout = connect_timeout def open(self): self.conn = connect(host=self.host, user=self.user, password=self.password, db=self.db, charset=self.charset, connect_timeout=self.connect_timeout) self.cursor = self.conn.cursor() print("数据库已连接") def close(self): self.cursor.close() self.conn.close() print("数据库已关闭") def rollback(self): self.conn.rollback() self.cursor.close() self.conn.close() print("数据库执行回滚操作") def search_one(self, sql, params=None): try: self.open() self.cursor.execute(sql, params) result = self.cursor.fetchone() print("查询成功") self.close() return result except Exception as e: print(e) print("查询失败") def search_all(self, sql, params=None): # 查询获取多个值 try: self.open() self.cursor.execute(sql, params) result = self.cursor.fetchall() print("查询成功") self.close() return result except Exception as e: print(e) return "查询失败" def insert(self, sql, params=None): # 添加数据 try: self.open() self.cursor.execute(sql, params) try: self.conn.commit() print('添加成功') self.close() return "添加成功" except: self.rollback() return "添加失败" except Exception as e: print(e) return "添加失败" def update(self, sql, params=None): # 修改数据 try: self.open() self.cursor.execute(sql, params) try: self.conn.commit() print('修改成功') self.close() return "修改成功" except: self.rollback() print("修改失败") return "修改失败" except Exception as e: print(e) return "修改失败" def delete(self, sql, params=None): # 删除数据 try: self.open() self.cursor.execute(sql, params) try: self.conn.commit() print('删除成功') self.close() return "删除成功" except: self.rollback() print('删除失败') return "删除失败" except Exception as e: print(e) return "删除失败" def is_connected(self): # 检查数据库是否处于连接中 """Check if the server is alive""" try: self.conn.ping(reconnect=True) print("db is connecting") except: traceback.print_exc() self.conn = self.open() print("db reconnect")
b.封装函数获取数据库中的对象
""" 作用:完成数据库相关工具类封装 分析: 1. 主要方法 get_sql_one(sql) get_sql_all(sql) 2. 辅助方法 1. 获取连接对象 2. 获取游标对象 3. 关闭游标对象方法 4. 关闭连接对象方法"""from config.project import DataBasefrom tools.read_config import ReadConfigimport pymysql# 新建工具类 数据库class ReadDB: # 定义连接对象 类方法 conn = None def __init__(self, database): # 构造函数-->注意这里有默认值的变量一定要放在没有默认值变量的后面 self.host = ReadConfig().read_config(section=database, option="host") # 从config.ini配置文件读取出来的option为字符串类型,这里需要将port强转为int类型 self.port = int(ReadConfig().read_config(section=database, option="port")) self.user = ReadConfig().read_config(section=database, option="user") self.password = ReadConfig().read_config(section=database, option="password") self.db_name = ReadConfig().read_config(section=database, option="db_name") self.charset = 'utf8' self.connect_timeout = 120 # 获取连对象方法封装 def get_conn(self): if self.conn is None: self.conn = pymysql.connect(host=self.host, port=self.port, user=self.user, password=self.password, db=self.db_name, charset=self.charset, connect_timeout=self.connect_timeout) print('数据库连接成功') # 返回连接对象 return self.conn # 获取游标对象方法封装 def get_cursor(self): return self.get_conn().cursor() # 将游标设置为字典类型 def get_dict_cursor(self): return self.get_conn().cursor(cursor=pymysql.cursors.DictCursor) # 关闭游标对象方法封装 @staticmethod def close_cursor(cursor): if cursor: cursor.close() # 关闭连接对象方法封装 def close_conn(self): if self.conn: self.conn.close() # 注意:关闭连接对象后,对象还存在内存中,需要手工设置为None self.conn = None print("数据库关闭成功") # 主要 执行方法 获取单条结果 -> 在外界调用次方法就可以完成数据相应的操作【获取的数据库单条数据的格式为python元组类型】 def get_sql_one(self, sql_statement): # 定义游标对象及数据变量 cursor = None data = None try: # 获取游标对象 cursor = self.get_cursor() # 调用执行方法 cursor.execute(sql_statement) # 获取结果 data = cursor.fetchone() except Exception as e: print("get_sql_one error:", e) finally: # 关闭游标对象 self.close_cursor(cursor) # 关闭连接对象 self.close_conn() # 返回执行结果 return data # 主要 执行方法 获取单条结果 -> 在外界调用次方法就可以完成数据相应的操作【获取的数据库单条数据的格式为python字典类型】 def get_sql_byDict(self, sql_statement): # 定义游标对象及数据变量 cursor = None data = None try: # 获取游标对象 cursor = self.get_dict_cursor() # 调用执行方法 cursor.execute(sql_statement) # 获取结果 data = cursor.fetchone() except Exception as e: print("get_sql_one error:", e) finally: # 关闭游标对象 self.close_cursor(cursor) # 关闭连接对象 self.close_conn() # 返回执行结果 return data # 获取指定条数的数据库结果集【获取的数据库多条数据集格式为元组嵌套元组】 def get_sql_many(self, sql_statement, n): # 定义游标对象及数据变量 cursor = None data = None try: # 获取游标对象 cursor = self.get_cursor() # 调用执行方法 cursor.execute(sql_statement) # 获取结果 data = cursor.fetchmany(n) except Exception as e: print("get_sql_one error:", e) finally: # 关闭游标对象 self.close_cursor(cursor) # 关闭连接对象 self.close_conn() # 返回执行结果 return data # 获取 所有数据库结果集 def get_sql_all(self, sql_statement): # 定义游标对象及数据变量 cursor = None data = None try: # 获取游标对象 cursor = self.get_cursor() # 调用执行方法 cursor.execute(sql_statement) # 获取所有结果 data = cursor.fetchall() except Exception as e: print("get_sql_all error:", e) finally: # 关闭游标对象 self.close_cursor(cursor) # 关闭连接对象 self.close_conn() # 返回执行结果 return data # 修改、删除、新增 def update_sql(self, sql_statement): # 定义游标对象 cursor = None effect_row = None try: # 获取游标对象 cursor = self.get_cursor() # 调用执行方法,返回受影响行数 effect_row = cursor.execute(sql_statement) # 提交事务 self.conn.commit() except Exception as e: # 事务回滚 self.conn.rollback() print("get_sql_one error:", e) finally: # 关闭游标对象 self.close_cursor(cursor) # 关闭连接对象 self.close_conn() # 返回受影响行数 return effect_row def is_connected(self): # 检查数据库是否处于连接中 """Check if the server is alive""" try: self.conn.ping(reconnect=True) print("db is connecting") except Exception as error: print("db connect error:%s" % error) self.get_conn() self.get_dict_cursor() print("db reconnect")if __name__ == '__main__': sql = "SELECT * FROM bomt_clue_db.bo_clue_orig order by CREATE_TIME desc limit 1 " print(ReadDB(database=DataBase.LeaveInformation).get_sql_byDict(sql))
去期待陌生,去拥抱惊喜。
版权声明:本文内容由网络用户投稿,版权归原作者所有,本站不拥有其著作权,亦不承担相应法律责任。如果您发现本站中有涉嫌抄袭或描述失实的内容,请联系我们jiasou666@gmail.com 处理,核实后本网站将在24小时内删除侵权内容。
发表评论
暂时没有评论,来抢沙发吧~