Python采用并发查询mysql以及调用API灌数据 (八)- 异步并发加锁,保证数据安全...(python并发写入mysql)
Python采用并发查询mysql以及调用API灌数据 (八)- 异步并发加锁,保证数据安全...(python并发写入mysql)
前情回顾
上一篇文章已经编写了异步并发API请求灌数据,那么本章节我们来继续编写异步并发加锁,保证数据安全
实战任务
本次因为服务架构重构,表优化、重构,带来的任务就是需要从原来的mysql数据库中,读取原表数据(部分存在多张关联查询)然后通过调用API的服务方式灌入新的数据库表中(包含mysql、mongodb)。
执行流程如下
那么根据流程所需要的功能,需要以下的实例进行支撑: 1.并发实例 2.查询数据实例 3.执行post请求实例
目标:循环查询处理并发数据,并且加锁保证数据安全
给查询数据表添加is_import字段,在mysql表中添加查询标识,插入成功则为1,无插入则为0
然后初始化 is_import = 0 即可,下面来给我们之前的model方法的查询中添加条件查询。
编写model类中selectTable方法,增加条件查询
# 根据设置的旧表字段,查询旧库的数据库数据 def selectTable(self,DB_NAME,TABLE_NAME,fields,order,cond_dict=''): # 选择数据库 self.mydb.selectDataBase(DB_NAME) # 数据查询 result = self.mydb.select(TABLE_NAME, fields=fields,order=order,cond_dict=cond_dict) # 关闭连接 self.mydb.close() # 返回查询的数据 return result
增加条件查询cond_dict字典,测试使用。
测试成功之后,就要在model方法中增加一个更新is_import为1的方法了。
在model类中增加更新is_import为1的方法
有些时候,因为传入的可能字段名不是is_import,可能是is_import_xxx。那么就要根据传入的字典获取字段名称了。
# 更新is_import字段为1的方法 def updateIsImport(self, TABLE_NAME, attrs_dict, cond_dict): """更新数据 args: tablename :表名字 attrs_dict :更新属性键值对字典 cond_dict :更新条件字典 example: params = {"name" : "caixinglong", "age" : "38"} cond_dict = {"name" : "liuqiao", "age" : "18"} mydb.update(table, params, cond_dict) """ # 选择数据库 result = self.mydb.update(TABLE_NAME, attrs_dict=attrs_dict, cond_dict=cond_dict) return result
写好了,更新字段的方法之后,下面我们在API请求成功之后进行使用。
在消费者方法中引用更新方法
此时消费者已经在上一个篇章中写了异步并发的方法,但是这样调用的话,会导致mysql更新的时候报错。 为了保证数据安全,我只能降低效率,增加锁了。
首先先看一个线程加锁的伪代码
#-* coding: utf-8 -*import threadingimport timeimport osdef func1(k): global lock while True: lock.acquire() # 开始锁进程 .... 执行任务 ... lock.release() # 释放进程锁if __name__=='__main__': # 初始化进程锁 lock = threading.Lock() # 使用4个CPU开启进程并发 for k in range(4): new_thread = threading.Thread(target=func1,args=(k,)) # 开启一个进程调用func1,并且传入参数k new_thread.start()
从示例代码可以看出,进程锁的基本使用方法。下面我们来使用一下进程锁来保证数据安全。
使用进程锁
result_row = [] lock = threading.Lock() # 初始化进程锁 for row in select_result: lock.acquire() # 开启进程锁 consume(row, url, model,lock=lock) # 消费请求API
初步代码基本就写到这里了。后面肯定有很多需要优化的地方。 例如: 1、使用查询分页再开启线程并发处理。 2、拆分生产者与消费者,加入rabbitmq等中间件来对付异常处理
版权声明:本文内容由网络用户投稿,版权归原作者所有,本站不拥有其著作权,亦不承担相应法律责任。如果您发现本站中有涉嫌抄袭或描述失实的内容,请联系我们jiasou666@gmail.com 处理,核实后本网站将在24小时内删除侵权内容。
发表评论
暂时没有评论,来抢沙发吧~