java 单机接口限流处理方案
316
2022-09-05
python+mysql+faker高效率插入海量关联随机数据
python+mysql+faker高效率插入海量关联随机数据
# encoding:utf-8import pymysqlimport timeimport randomfrom faker import Fakerhost = '106.52.198.240'port = 3306user = 'root'password = 'Root_2021'db = 'device'faker = Faker("zh_CN")def run_time(func): def wrapper(*args, **kwargs): old_time = time.time() cs = func(*args, **kwargs) new_time = time.time() print('程序运行时间:{}s'.format(round(new_time-old_time), 3)) return cs return wrapper# 连接mysqlclass Mysql_connet(): def __init__(self): self.data = pymysql.connect( host=host, port=port, user=user, password=password, db=db) # 获取mysql操作光标 self.cursor = self.data.cursor() # 初始化变量 self.count = 0 # 设置sql语句循环次数 @run_time def insert_data(self): while self.count < number: self.count += 1 # 定义mysql字段的范围随机数变量 self.device_id = faker.random_number(15) self.no = faker.random_number(15) self.terminal_no = faker.random_number(15) self.capital_id = faker.random_number(15) self.images_id = faker.random_number(15) self.department_id = random.choice([1453377213176082434]) self.type = random.randint(0,1) self.sub_type = random.randint(1,3) self.cover_type = random.randint(0,2) self.quyu = random.randint(440300, 440310) self.jingdu = round(random.uniform(99, 117),6) #随机生成保留6位小数的随机数 self.weidu = round(random.uniform(23, 41),6) self.cellar_well_terminal_id = faker.random_number(15) self.signal = random.randint(0, 31) self.battery = random.randint(0, 100) self.is_online = random.randint(0, 1) self.control_status = random.randint(0, 1) self.now_time = time.strftime( "%Y-%m-%d %H:%M:%S", time.localtime()) self.name = 'cs' + str(self.count) self.address = faker.address() self.mac = faker.mac_address().upper() # 生成mysql语句插入语句 sql = "INSERT INTO `t_cellar_well`(`id`,`no`,`terminal_no`,`province_id`,`province_name`,`city_id`,`city_name`,`area_id`,`area_name`,`address`,`spec`,`department_id`,`department_name`,`type`,`sub_type`,`cover_type`,`is_online`,`control_status`,`status`,`is_delete`,`create_at`,`create_by`,`update_at`,`longitude`,`latitude`,`name`,`remark`) VALUES\ ({device_id},{no},{terminal_no},44,'广东省',4403,'深圳市',440303,'南山区','大新路南头街道88-36号','1',{department_id},'自动化测试',{type},{sub_type},{cover_type},{is_online},{control_status},0,0,'{now_time}',NULL,NULL,{jingdu},{weidu},'{name}','{beizhu}')".format( device_id=self.device_id, no=self.no, terminal_no=self.terminal_no, department_id=self.department_id, type=self.type, sub_type=self.sub_type, cover_type=self.cover_type, is_online=self.is_online, control_status=self.control_status, now_time=self.now_time, jingdu=self.jingdu, weidu=self.weidu, name=self.name, beizhu=self.address) sql2 = "INSERT INTO `t_capital` VALUES ({capital_id}, {terminal_no}, 0, {department_id}, '自动化测试', '', '', NULL, NULL, NULL, 0, NULL, '{now_time}', NULL, NULL)".format( capital_id=self.capital_id, terminal_no=self.terminal_no, department_id=self.department_id, now_time=self.now_time) sql3 = "INSERT INTO `t_cellar_well_terminal` VALUES ({cellar_well_terminal_id}, {no}, {terminal_no}, 'IGW-JGW-NB-K-BX-V1.0', 'IWG-7-ZK1.0.0.210922.T', '{battery}', '{signal}', '0', {is_online}, 0, {control_status}, '{now_time}', '{now_time}', 0000000014, 3, '{now_time}', '898602B7031880020463', '20', 4, NULL, 4, NULL, NULL, NULL, '898602B7031880020463', '0', '{mac}', '460040723520463', '5', '0', '0', '0', '0', '0', 0, NULL)".format( cellar_well_terminal_id=self.cellar_well_terminal_id, no=self.no, terminal_no=self.terminal_no, battery=self.battery, signal=self.signal, is_online=self.is_online, control_status=self.control_status, now_time=self.now_time, mac=self.mac) sql4 = "INSERT INTO `t_cellar_well_install_step` VALUES ({cellar_well_terminal_id}, {terminal_no}, 1, 0, 0, 0, 0, 0, '{now_time}', NULL, NULL)".format( cellar_well_terminal_id=self.cellar_well_terminal_id, terminal_no=self.terminal_no, now_time=self.now_time) sql5 = "INSERT INTO `t_images` VALUES ({images_id}, '{terminal_no}', {device_id}, 0, 'NULL, NULL, NULL, 0, '{now_time}');".format( images_id=self.images_id, terminal_no=self.terminal_no, device_id=self.device_id, now_time=self.now_time) # 执行sql语句 try: print('正在插入第{count}数据>>>'.format(count=self.count)) self.cursor.execute(sql) self.cursor.execute(sql2) self.cursor.execute(sql3) self.cursor.execute(sql4) self.cursor.execute(sql5) self.data.commit() # 错误回滚 except: self.data.rollback() print('出现错误,已经回滚!') # 关闭mysql print('{count}条随机数已经生成完毕!!!'.format(count=self.count)) self.data.close() @run_time def delete_data(self): sql6 = "DELETE FROM t_capital WHERE CHAR_LENGTH(id)<16;" sql7 = "DELETE FROM `t_cellar_well` WHERE char_length(id)<16;" sql8 = "DELETE FROM t_cellar_well_terminal WHERE CHAR_LENGTH(id)<16;" sql9 = "DELETE FROM t_cellar_well_install_step WHERE CHAR_LENGTH(id)<16;" sql0 = "DELETE FROM t_images WHERE CHAR_LENGTH(id) < 16;" try: print('正在删除数据,请稍后。。。') self.cursor.execute(sql6) self.cursor.execute(sql7) self.cursor.execute(sql8) self.cursor.execute(sql9) self.cursor.execute(sql0) self.data.commit() # 错误回滚 except: self.data.rollback() print('出现错误,已经回滚!') # 关闭mysql print('随机生成的数据已经全部删除!!!') self.data.close()#程序执行my_connet = Mysql_connet()make = int(input('输入操作, 1:生成随机数据, 2:删除随机数据:'))if make == 1: number = int(input('输入生成的数据量(必须为正整数):')) my_connet.insert_data()elif make == 2: my_connet.delete_data()else: print('输入错误,请重新执行')
其中遇到的问题在插入整个字典到mysql数据的某个字段时,需要使用json.dumps()转换。
另外一个是性能优化问题,看到一篇文章是使用for循环生成一条sql语句插入多个数据。于是便先试一试效果
# encoding:utf-8import pymysqlimport timeimport randomimport jsonimport threadingfrom faker import Fakerimport asynciohost = '106.52.198.240'port = 3306user = 'root'password = 'Root_2021'db = 'device'faker = Faker("zh_CN")def run_time(func): def wrapper(*args, **kwargs): old_time = time.time() cs = func(*args, **kwargs) new_time = time.time() print('程序运行时间:{}s'.format(round(new_time-old_time), 3)) return cs return wrapper# 连接mysqlclass Mysql_connet(): def __init__(self): self.data = pymysql.connect( host=host, port=port, user=user, password=password, db=db) # 获取mysql操作光标 self.cursor = self.data.cursor() # 初始化变量 # count = 0 # 设置sql语句循环次数 @run_time def insert_data(self): for count in range(0, 100): # 生成mysql语句插入语句 sql = "INSERT INTO `t_cellar_well`(`id`,`no`,`terminal_no`,`province_id`,`province_name`,`city_id`,`city_name`,`area_id`,`area_name`,`address`,`spec`,`department_id`,`department_name`,`type`,`sub_type`,`cover_type`,`is_online`,`control_status`,`status`,`is_delete`,`create_at`,`create_by`,`update_at`,`longitude`,`latitude`,`name`,`remark`) VALUES" sql2 = "INSERT INTO `t_capital` VALUES" sql3 = "INSERT INTO `t_cellar_well_terminal` VALUES" sql4 = "INSERT INTO `t_cellar_well_install_step` VALUES" sql5 = "INSERT INTO `t_images` VALUES" sql6 = "INSERT INTO `t_device_alarm` VALUES" for n in range(1,1000): self.device_id = faker.random_number(15) self.no = faker.random_number(15) self.terminal_no = faker.random_number(15) self.capital_id = faker.random_number(15) self.images_id = faker.random_number(15) self.alarm_id = faker.random_number(15) self.cellar_well_terminal_id = faker.random_number(15) self.department_id = random.choice([1453377213176082434]) self.type = random.randint(0, 1) self.sub_type = random.randint(1, 3) self.cover_type = random.randint(0, 2) self.quyu = random.randint(440300, 440310) self.jingdu = round(random.uniform( 111.664816, 116.403484), 6) # 随机生成保留6位小数的随机数 self.weidu = round(random.uniform(22.823273, 24.519951), 6) self.signal = random.randint(0, 31) self.battery = random.randint(0, 100) self.is_online = random.randint(0, 1) self.control_status = random.randint(0, 1) self.now_time = time.strftime( "%Y-%m-%d %H:%M:%S", time.localtime()) # self.name = 'cs' + str(count) self.address = faker.address() self.mac = faker.mac_address().upper() self.alarm_type = random.choice( [101, 102, 103, 104, 109, 2, 4, 7, 8, 9, 11, 13, 15, 71, 72, 73, 51, 52, 53]) self.alarm_date = self.now_time self.value = {"alarmIsSwitch": 0, "angleBluetoothSignalValue": 10, "angleKillAlarmValue": "12", "angleQuake": "250", "angleStaticMaxValue": "150", "angleStaticTime": "2", "broadcastCyc": 1, "ch4Level1": "50", "ch4Level2": "20", "ch4Level3": "10", "coverIsSwitch": 1, "domain": "antan.com", "gasHeartbeat": 86400, "ip": "106.52.198.240", "leanangle": 15, "logNum": 0, "moduleType": 7, "monitorModel": 0, "openangle": 15, "port": 9999, "qxAlarmNum": 10, "qxDayAbnormalWakeNum": 200, "sensorAlarmShakeNum": 36, "sensorHeartbeat": 172800, "sensorHeartbeatDuration": 24, "sensorIsSwitch": 0, "siltHeartbeat": 86400, "siltHigh": 1, "terminalHeartbeatDuration": 24, "timeout": 2000, "wakeHeartbeat": 86400, "waterLevel1": 10, "waterLevel2": 20, "waterRemoveQuakeTime": 5} self.now_value = {"bluetoothSignalValue": 49, "bluetoothSleepCyc": 1, "co": "0", "coverIsSwitch": 0, "coverSignalValue": 10, "currentBatteryNum": "13", "currentLogNum": "109", "currentSensorBatteryNum": "90", "firedamp": "0", "gasSensorCheckcycle": "3600", "gateWayMac": "D4:96:69:8C:24:6F", "gatewayAlarmSwitch": 0, "hardwareVer": "IGW-JGW-NB-K-BX-V1.0", "hs": "0", "iccid": "898602B7031880020454", "imei": "869951040891925", "imsi": "460040723520454", "leanAlarmValue": "14", "leanAngle": "3", "moduleType": 7, "mudHigh": "0", "netSignalValue": 49, "openAlarmValue": "15", "openAngle": "0", "semaphore": "9", "sensorAlarmShakeNum": 36, "sensorAlarmSwitch": 0, "sensorHardwareVer": "IGW-IAS-BL-K-PCBV1.0", "sensorMac": "C8:8E:06:59:C2:FA", "sensorSoftwareVer": "SR211102.A", "sensorWakeNum": "0", "sim": "898602B7031880020454", "sinr": "16", "softwareVer": "IWG-7-ZK211104.1", "temperature": "0", "waterHigh": "0"} self.value = json.dumps(self.value) self.now_value = json.dumps(self.now_value) insert_num = count*1000+n self.name = 'cs' + str(insert_num) sql += "({device_id},{no},{terminal_no},44,'广东省',4403,'深圳市',440303,'南山区','大新路南头街道88-36号','1',{department_id},'自动化测试',{type},{sub_type},{cover_type},{is_online},{control_status},0,0,'{now_time}',NULL,NULL,{jingdu},{weidu},'{name}','{beizhu}'), ".format( device_id=self.device_id, no=self.no, terminal_no=self.terminal_no, department_id=self.department_id, type=self.type, sub_type=self.sub_type, cover_type=self.cover_type, is_online=self.is_online, control_status=self.control_status, now_time=self.now_time, jingdu=self.jingdu, weidu=self.weidu, name=self.name, beizhu=self.address) sql2 += "({capital_id}, {terminal_no}, 0, {department_id}, '自动化测试', '', '', NULL, NULL, NULL, 0, NULL, '{now_time}', NULL, NULL),".format( capital_id=self.capital_id, terminal_no=self.terminal_no, department_id=self.department_id, now_time=self.now_time) sql3 += "({cellar_well_terminal_id}, {no}, {terminal_no}, 'IGW-JGW-NB-K-BX-V1.0', 'IWG-7-ZK1.0.0.210922.T', '{battery}', '{signal}', '0', {is_online}, 0, {control_status}, '{now_time}', '{now_time}', 0000000014, 3, '{now_time}', '898602B7031880020463', '20', 4, NULL, 4, '{value}', '{now_value}', NULL, '898602B7031880020463', '0', '{mac}', '460040723520463', '5', '0', '0', '0', '0', '0', 0, NULL), ".format( cellar_well_terminal_id=self.cellar_well_terminal_id, no=self.no, terminal_no=self.terminal_no, battery=self.battery, signal=self.signal, is_online=self.is_online, control_status=self.control_status, now_time=self.now_time, value=self.value, now_value=self.now_value, mac=self.mac) sql4 += "({cellar_well_terminal_id}, {terminal_no}, 1, 0, 0, 0, 0, 0, '{now_time}', NULL, NULL), ".format( cellar_well_terminal_id=self.cellar_well_terminal_id, terminal_no=self.terminal_no, now_time=self.now_time) sql5 += "({images_id}, '{terminal_no}', {device_id}, 0, 'NULL, NULL, NULL, 0, '{now_time}'), ".format( images_id=self.images_id, terminal_no=self.terminal_no, device_id=self.device_id, now_time=self.now_time) sql6 += "({alarm_id}, '{terminal_no}', {alarm_type}, 0, 0, '{alarm_date}', NULL, NULL, 0, NULL, NULL, NULL), ".format( alarm_id=self.alarm_id, terminal_no=self.terminal_no, alarm_type=self.alarm_type, alarm_date=self.alarm_date) self.name = 'cs' + str(count) + '000' self.device_id = faker.random_number(14) self.capital_id = faker.random_number(14) self.images_id = faker.random_number(14) self.alarm_id = faker.random_number(14) self.cellar_well_terminal_id = faker.random_number(14) sql += "({device_id},{no},{terminal_no},44,'广东省',4403,'深圳市',440303,'南山区','大新路南头街道88-36号','1',{department_id},'自动化测试',{type},{sub_type},{cover_type},{is_online},{control_status},0,0,'{now_time}',NULL,NULL,{jingdu},{weidu},'{name}','{beizhu}')".format( device_id=self.device_id, no=self.no, terminal_no=self.terminal_no, department_id=self.department_id, type=self.type, sub_type=self.sub_type, cover_type=self.cover_type, is_online=self.is_online, control_status=self.control_status, now_time=self.now_time, jingdu=self.jingdu, weidu=self.weidu, name=self.name, beizhu=self.address) sql2 += "({capital_id}, {terminal_no}, 0, {department_id}, '自动化测试', '', '', NULL, NULL, NULL, 0, NULL, '{now_time}', NULL, NULL)".format( capital_id=self.capital_id, terminal_no=self.terminal_no, department_id=self.department_id, now_time=self.now_time) sql3 += "({cellar_well_terminal_id}, {no}, {terminal_no}, 'IGW-JGW-NB-K-BX-V1.0', 'IWG-7-ZK1.0.0.210922.T', '{battery}', '{signal}', '0', {is_online}, 0, {control_status}, '{now_time}', '{now_time}', 0000000014, 3, '{now_time}', '898602B7031880020463', '20', 4, NULL, 4, '{value}', '{now_value}', NULL, '898602B7031880020463', '0', '{mac}', '460040723520463', '5', '0', '0', '0', '0', '0', 0, NULL)".format( cellar_well_terminal_id=self.cellar_well_terminal_id, no=self.no, terminal_no=self.terminal_no, battery=self.battery, signal=self.signal, is_online=self.is_online, control_status=self.control_status, now_time=self.now_time, value=self.value, now_value=self.now_value, mac=self.mac) sql4 += "({cellar_well_terminal_id}, {terminal_no}, 1, 0, 0, 0, 0, 0, '{now_time}', NULL, NULL) ".format( cellar_well_terminal_id=self.cellar_well_terminal_id, terminal_no=self.terminal_no, now_time=self.now_time) sql5 += "({images_id}, '{terminal_no}', {device_id}, 0, 'NULL, NULL, NULL, 0, '{now_time}')".format( images_id=self.images_id, terminal_no=self.terminal_no, device_id=self.device_id, now_time=self.now_time) sql6 += "({alarm_id}, '{terminal_no}', {alarm_type}, 0, 0, '{alarm_date}', NULL, NULL, 0, NULL, NULL, NULL) ".format( alarm_id=self.alarm_id, terminal_no=self.terminal_no, alarm_type=self.alarm_type, alarm_date=self.alarm_date) # print(sql) # print(sql2) # print(sql3) # print(sql4) # print(sql5) # 执行sql语句 try: print('第{count}次插入数据>>>'.format(count=count)) self.cursor.execute(sql) self.cursor.execute(sql2) self.cursor.execute(sql3) self.cursor.execute(sql4) self.cursor.execute(sql5) self.cursor.execute(sql6) self.data.commit() # 错误回滚 except: self.data.rollback() print('出现错误,已经回滚!') # 关闭mysql print('{count}条随机数已经生成完毕!!!'.format(count=count)) self.data.close() @run_time def delete_data(self): sql6 = "DELETE FROM t_capital WHERE CHAR_LENGTH(id)<16;" sql7 = "DELETE FROM `t_cellar_well` WHERE char_length(id)<16;" sql8 = "DELETE FROM t_cellar_well_terminal WHERE CHAR_LENGTH(id)<16;" sql9 = "DELETE FROM t_cellar_well_install_step WHERE CHAR_LENGTH(id)<16;" sql0 = "DELETE FROM t_images WHERE CHAR_LENGTH(id) < 16;" sql11 = "DELETE FROM t_device_alarm WHERE CHAR_LENGTH(id) < 16;" try: print('正在删除数据,请稍后。。。') self.cursor.execute(sql6) self.cursor.execute(sql7) self.cursor.execute(sql8) self.cursor.execute(sql9) self.cursor.execute(sql0) self.cursor.execute(sql11) self.data.commit() # 错误回滚 except: self.data.rollback() print('出现错误,已经回滚!') # 关闭mysql print('随机生成的数据已经全部删除!!!') self.data.close()my_connet = Mysql_connet()make = int(input('输入操作, 1:生成mysql随机数据, 2:删除mysql随机数据:'))if make == 1: # number = int(input('输入生成的mysql数据量(必须为正整数):')) my_connet.insert_data()elif make == 2: my_connet.delete_data()else: print('输入错误,请重新执行')
这里优化是直接生成一条sql语句同时生成多条数据的语法,然后执行这一条sql语句,相当只执行一次插入,但是生成的数据和上面的循环插入效果是一样,但是执行时间效率却大大提高了。
过程中有个问题,开始我的sql变量是连接所有插入条目的,但是16节点的cobar在输100万条的时候就连接断开了,而单库直接10条也插不进去,显示mysql连接断开
这个是sql语句长度限制的问题,在mysql的配置文件中有一个max_allowed_packet = 1M,10万条插入语句已经超过这个限额了,100万条分给16个cobar节点也超过了,所以可以把这个参数调大,或者代码里分段执行sql。
版权声明:本文内容由网络用户投稿,版权归原作者所有,本站不拥有其著作权,亦不承担相应法律责任。如果您发现本站中有涉嫌抄袭或描述失实的内容,请联系我们jiasou666@gmail.com 处理,核实后本网站将在24小时内删除侵权内容。
发表评论
暂时没有评论,来抢沙发吧~