多平台统一管理软件接口,如何实现多平台统一管理软件接口
230
2022-09-02
Python 运用Paramiko实现批量巡检(python培训)
通过封装Paramiko这个SSH模块,我们可以实现远程批量管理Linux主机,在此基础上配合钉钉API接口可实现自动告警机制,定期自动检查设备状态,并推送到钉钉群内。
首先需要配置双网卡模式,我们将无线网卡配置路由让其走外网与钉钉连接,有线网口则负责与内部服务器相连接,只需要配置路由即可实现。
网络目标 网络掩码 网关 接口 跃点数0.0.0.0 0.0.0.0 132.35.93.1 132.35.93.11 210.0.0.0 0.0.0.0 192.168.191.1 192.168.191.3 25 C:\Windows\system32> route delete 0.0.0.0# 所有的外网访问走无线网卡,从网关 192.168.191.1 出去C:\Windows\system32>route -p add 0.0.0.0 mask 0.0.0.0 192.168.191.1# 如果是132网段,则走内部,网关为:132.35.93.1C:\Windows\system32>route -p add 132.35.0.0 mask 255.255.0.0 132.35.93.1
封装钉钉接口: 接口的调用需要传入需要通知特定人的手机号,这个模块命名为Ding.py 代码如下。
import requestsimport urllib.parseimport datetime,time,hmac,hashlib,base64,jsonclass DingToken(): def __init__(self,atAll,atMobiles): self.atAll = atAll self.atMobiles = atMobiles def send_message(self,message): timestamp = str(round(time.time() * 1000)) secret = 'SEC1018485caf7339e38530b' secret_enc = secret.encode('utf-8') string_to_sign = '{}\n{}'.format(timestamp, secret) string_to_sign_enc = string_to_sign.encode('utf-8') hmac_code = hmac.new(secret_enc, string_to_sign_enc, digestmod=hashlib.sha256).digest() sign = urllib.parse.quote(base64.b64encode(hmac_code)) headers={'Content-Type': 'application/json'} webhook = '+ timestamp + "&sign=" + sign data = { "msgtype": "text", "text": {"content": message }, "at": { "atMobiles": [ self.atMobiles ], "isAtAll": self.atAll } } requests.post(webhook, data=json.dumps(data), headers=headers) # 发送警告信息 def send_warning(self,platform,person,group,address,send_date,type,message): self.send_message( "------------------------------------------------------- \n" "\t\t\t\t\t {0} \n" "------------------------------------------------------- \n" "维护人员: \t {1} \n" "所在分组: \t {2} \n" "系统地址: \t {3} \n" "告警日期: \t {4} \n" "告警类型: \t {5} \n" "------------------------------------------------------- \n" "{6} \n" "-------------------------------------------------------". format(platform,person,group,address,send_date,type,message) ) # 发送Ping连通性报告 def send_ping(self,platform,send_date,success_len,error_len,error_list): self.send_message( "------------------------------------------------------- \n" "\t\t\t\t\t {0} \n" "------------------------------------------------------- \n" "[*] 日期: \t {1} \n[+] 连通主机数: \t {2} \n[-] 失败主机数: \t {3} \n" "------------------------------------------------------- \n" "失败主机列表: \n {4} \n".format(platform,send_date,success_len,error_len,error_list) )if __name__ == "__main__": ding = DingToken(False,"15646596977") ding.send_warning("总部客服","王瑞","CTI服务组","192.168.1.1","2021:01:01","磁盘异常","C:// \t 100% \n")
封装好的MySSH模块: 接着就是封装一个拉取数据到本地的SSH模块,这个模块命名为 MySSH.py 代码如下。
import paramiko, math,jsonclass MySSH: def __init__(self, address, username, password, default_port): self.address = address self.default_port = default_port self.username = username self.password = password def Init(self): try: self.ssh_obj = paramiko.SSHClient() self.ssh_obj.set_missing_host_key_policy(paramiko.AutoAddPolicy()) self.ssh_obj.connect(self.address, self.default_port, self.username, self.password, timeout=3, allow_agent=False, look_for_keys=False) self.sftp_obj = self.ssh_obj.open_sftp() except Exception: return False def BatchCMD(self, command): try: stdin, stdout, stderr = self.ssh_obj.exec_command(command, timeout=3) result = stdout.read() if len(result) != 0: result = str(result).replace("\\n", "\n") result = result.replace("b'", "").replace("'", "") return result else: return None except Exception: return None def CloseSSH(self): try: self.sftp_obj.close() self.ssh_obj.close() except Exception: pass def GetSystemVersion(self): return self.BatchCMD("uname") # 测试主机连通率 def GetPing(self): try: if self.GetSystemVersion() != None: print("{} 已连通.".format(self.address)) return True else: return False except Exception: return False # 拉取磁盘数据到本地,并返回字典 def GetAllDiskSpace(self): ref_dict = {} cmd_dict = {"Linux\n": "df | grep -v 'Filesystem' | awk '{print $5 \":\" $6}'", "AIX\n": "df | grep -v 'Filesystem' | awk '{print $4 \":\" $7}'" } try: os_version = self.GetSystemVersion() for version, run_cmd in cmd_dict.items(): if (version == os_version): os_ref = self.BatchCMD(run_cmd) ref_list = os_ref.split("\n") for each in ref_list: if each != "": ref_dict[str(each.split(":")[1])] = str(each.split(":")[0]) print("利用率字典: {}".format(ref_dict)) return ref_dict except Exception: return False # 拉取内存数据到本地。 def GetAllMemSpace(self): cmd_dict = {"Linux\n": "cat /proc/meminfo | head -n 2 | awk '{print $2}' | xargs | awk '{print $1 \":\" $2}'", "AIX\n": "svmon -G | grep -v 'virtual' | head -n 1 | awk '{print $2 \":\" $4}'" } try: os_version = self.GetSystemVersion() for version, run_cmd in cmd_dict.items(): if (version == os_version): os_ref = self.BatchCMD(run_cmd) mem_total = math.ceil(int(os_ref.split(":")[0].replace("\n", "")) / 1024) mem_free = math.ceil(int(os_ref.split(":")[1].replace("\n", "")) / 1024) percentage = 100 - int(mem_free / int(mem_total / 100)) print("利用百分比: {} \t 总内存: {} \t 剩余内存: {}".format(percentage,mem_total,mem_free)) return str(percentage) + " %" except Exception: return False # 获取CPU利用率数据 def GetCPUPercentage(self): ref_dict = {} cmd_dict = {"Linux\n": "vmstat | tail -n 1 | awk '{print $13 \":\" $14 \":\" $15}'", "AIX\n": "vmstat | tail -n 1 | awk '{print $14 \":\" $15 \":\" $16}'" } try: os_version = self.GetSystemVersion() for version, run_cmd in cmd_dict.items(): if (version == os_version): os_ref = self.BatchCMD(run_cmd) ref_list = os_ref.split("\n") for each in ref_list: if each != "": each = each.split(":") ref_dict = {"us": each[0], "sys": each[1], "idea": each[2]} print("CPU利用率数据: {}".format(ref_dict)) return ref_dict except Exception: return False # 获取到系统负载利用率 也就是一分钟负载五分钟负载十五分钟负载 def GetLoadAVG(self): ref_dict = {} cmd_dict = {"Linux\n": "cat /proc/loadavg | awk '{print $1 \":\" $2 \":\" $3}'", "AIX\n": "uptime | awk '{print $10 \":\" $11 \":\" $12}'" } try: os_version = self.GetSystemVersion() for version, run_cmd in cmd_dict.items(): if (version == os_version): os_ref = self.BatchCMD(run_cmd) ref_list = os_ref.split("\n") for each in ref_list: if each != "": each = each.replace(",","").split(":") ref_dict = {"1avg": each[0],"5avg": each[1],"15avg": each[2]} print("负载利用率: {}".format(ref_dict)) return ref_dict return False except Exception: return False # 检测指定进程是否存活 def CheckProcessStatus(self,processname): cmd_dict = {"Linux\n": "ps aux | grep '{0}' | grep -v 'grep' | awk {1} | wc -l".format(processname,"{'print $2'}"), "AIX\n": "ps aux | grep '{0}' | grep -v 'grep' | awk {1} | wc -l".format(processname,"{'print $2'}") } try: os_version = self.GetSystemVersion() for version, run_cmd in cmd_dict.items(): if (version == os_version): os_ref = self.BatchCMD(run_cmd) ret_flag = str(os_ref.split("\n")[0].replace(" ","").strip()) if ret_flag != "0": return True return False except Exception: return "None" # 检测指定端口是否存活 def CheckPortStatus(self,port): cmd_dict = {"Linux\n": "netstat -antp | grep {0} | awk {1}".format(port,"{'print $6'}") , "AIX\n": "netstat -ant | grep {0} | head -n 1 | awk {1}".format(port,"{'print $6'}") } try: os_version = self.GetSystemVersion() for version, run_cmd in cmd_dict.items(): if (version == os_version): os_ref = self.BatchCMD(run_cmd) ret_flag = str(os_ref.split("\n")[0].replace(" ","").strip()) if ret_flag == "LISTEN" or ret_flag == "ESTABLISHED": return True return False except Exception: return False
定义巡检过程: 这个模块是最重要的一个模块,主要负责解析JSON文件并巡检,该模块我们就命名为system.py,代码如下:
from Ding import DingTokenfrom MySSH import MySSHimport os,sys,time,datetime,json# --------------------------------------------------------------------------------------------------# 对所有主机设备进行def switch_ping(): with open("./config.json", "r", encoding="utf-8") as read_config_ptr: ptr = json.loads(read_config_ptr.read()) system = ptr.get("system") success,error = [],[] try: for system_list in system: for k,v in system_list.items(): for item in v: now = datetime.datetime.now() print("[+] 监控范围: Ping测试 --> 监测组: {} --> 检测日期: {} --> 检测地址: {}".format(k,datetime.datetime.strftime(now, '%Y-%m-%d %H:%M:%S'),item[0])) #InspectCPU(ptr.get("telephone"),item[0],item[1],item[2],ptr.get("default-port"),ptr.get("platform"),ptr.get("person"),k,ptr.get("cpu_limit")) ssh = MySSH(item[0],item[1],item[2],ptr.get("default-port")) ssh.Init() ref = ssh.GetPing() if ref == True: success.append(item[0]) else: error.append(item[0]) ding = DingToken(False, ptr.get("telephone")) ding.send_ping(ptr.get("platform"),datetime.datetime.strftime(now, '%Y-%m-%d %H:%M:%S'),len(success),len(error),error) except Exception: pass# --------------------------------------------------------------------------------------------------# 磁盘告警流程# 手机号 地址 账号 密码 端口 系统名称 姓名 分组 磁盘最大值def InspectDisk(telephone,address,username,password,port,platform,person,group,disk_limit): ding = DingToken(False, telephone) ssh = MySSH(address, username, password, port) ssh.Init() ret = ssh.GetAllDiskSpace() ssh.CloseSSH() if ret != False: for k,v in ret.items(): try: space = eval(v.split("%")[0]) if space >= int(disk_limit): now = datetime.datetime.now() strnow = datetime.datetime.strftime(now, '%Y-%m-%d %H:%M:%S') ding.send_warning(platform, person, group, address, strnow, "磁盘过载", "[ 分区: {} \t | \t 负载率: {} ]\n".format(k,v)) except Exception: passdef switch_inspect_disk(): with open("./config.json", "r", encoding="utf-8") as read_config_ptr: ptr = json.loads(read_config_ptr.read()) system = ptr.get("system") for system_list in system: # 循环所有字典 for k,v in system_list.items(): # 循环每个分组内的主机 for item in v: now = datetime.datetime.now() print("[+] 监控范围: 磁盘检测 --> 监测组: {} --> 检测日期: {} --> 检测地址: {}".format(k,datetime.datetime.strftime(now, '%Y-%m-%d %H:%M:%S'),item[0])) # InspectDisk("15646596977", "192.168.191.4", "root", "1233", "22", "总部客服(呼叫中心平台)", "王瑞", "CTI 3.4",80) InspectDisk(ptr.get("telephone"),item[0],item[1],item[2],ptr.get("default-port"),ptr.get("platform"),ptr.get("person"),k,ptr.get("disk_limit"))# --------------------------------------------------------------------------------------------------# 内存告警流程# InspectMemory("15646596977","192.168.191.3","root","1233",22,"总部客服系统","王瑞","CTI组",内存阈值)def InspectMemory(telephone,address,username,password,port,platform,person,group,memory_limit): ding = DingToken(False, telephone) ssh = MySSH(address, username, password, port) ssh.Init() ret = ssh.GetAllMemSpace() ssh.CloseSSH() if ret != False: try: space = eval(ret.split("%")[0]) if space >= int(memory_limit): now = datetime.datetime.now() strnow = datetime.datetime.strftime(now, '%Y-%m-%d %H:%M:%S') ding.send_warning(platform, person, group, address, strnow, "内存过载", "[ 负载率: \t | \t {} ]\n".format(ret)) except Exception: passdef switch_inspect_memory(): with open("./config.json", "r", encoding="utf-8") as read_config_ptr: ptr = json.loads(read_config_ptr.read()) system = ptr.get("system") for system_list in system: for k,v in system_list.items(): for item in v: now = datetime.datetime.now() print("[+] 监控范围: 内存检测 --> 监测组: {} --> 检测日期: {} --> 检测地址: {}".format(k,datetime.datetime.strftime(now, '%Y-%m-%d %H:%M:%S'),item[0])) InspectMemory(ptr.get("telephone"),item[0],item[1],item[2],ptr.get("default-port"),ptr.get("platform"),ptr.get("person"),k,ptr.get("memory_limit"))# --------------------------------------------------------------------------------------------------# 巡检CPU利用率def InspectCPU(telephone,address,username,password,port,platform,person,group,cpu_limit): ding = DingToken(False, telephone) ssh = MySSH(address, username, password, port) ssh.Init() ret = ssh.GetCPUPercentage() ssh.CloseSSH() if ret != False: try: space = int(100 - int(ret.get("idea"))) us = ret.get("us") + "%" sys = ret.get("sys") + "%" if space >= int(cpu_limit): print("CPU 总利用率: {}".format(space)) now = datetime.datetime.now() strnow = datetime.datetime.strftime(now, '%Y-%m-%d %H:%M:%S') ding.send_warning(platform, person, group, address, strnow, "CPU 利用率过载","[ 用户态: \t | \t {} ]\n[ 内核态: \t | \t {} ]\n[ 总利用率: \t | \t {}% ]\n".format(us,sys,space)) except Exception: passdef switch_inspect_cpu(): with open("./config.json", "r", encoding="utf-8") as read_config_ptr: ptr = json.loads(read_config_ptr.read()) system = ptr.get("system") for system_list in system: for k,v in system_list.items(): for item in v: now = datetime.datetime.now() print("[+] 监控范围: CPU检测 --> 监测组: {} --> 检测日期: {} --> 检测地址: {}".format(k,datetime.datetime.strftime(now, '%Y-%m-%d %H:%M:%S'),item[0])) InspectCPU(ptr.get("telephone"),item[0],item[1],item[2],ptr.get("default-port"),ptr.get("platform"),ptr.get("person"),k,ptr.get("cpu_limit"))# --------------------------------------------------------------------------------------------------# 巡检系统平均负载def InspectLoadAvg(telephone,address,username,password,port,platform,person,group,load_avg_limit): ding = DingToken(False, telephone) ssh = MySSH(address, username, password, port) ssh.Init() ret = ssh.GetLoadAVG() ssh.CloseSSH() if ret != False: try: if float(ret.get("1avg")) >= float(load_avg_limit[0]) or float(ret.get("5avg")) >= float(load_avg_limit[1]) or float(ret.get("15avg")) >= float(load_avg_limit[2]): now = datetime.datetime.now() strnow = datetime.datetime.strftime(now, '%Y-%m-%d %H:%M:%S') ding.send_warning(platform, person, group, address, strnow, "LoadAvg 过载","[ 一分钟负载: \t | \t {} ]\n[ 五分钟负载: \t | \t {} ]\n[ 十五分钟负载: \t | \t {} ]\n". format(ret.get("1avg"), ret.get("5avg"),ret.get("15avg"))) except Exception: passdef switch_inspect_avg(): with open("./config.json", "r", encoding="utf-8") as read_config_ptr: ptr = json.loads(read_config_ptr.read()) system = ptr.get("system") for system_list in system: for k,v in system_list.items(): for item in v: now = datetime.datetime.now() print("[+] 监控范围: 系统负载检测 --> 监测组: {} --> 检测日期: {} --> 检测地址: {}".format(k,datetime.datetime.strftime(now, '%Y-%m-%d %H:%M:%S'),item[0])) InspectLoadAvg(ptr.get("telephone"),item[0],item[1],item[2],ptr.get("default-port"),ptr.get("platform"),ptr.get("person"),k,list(ptr.get("load_avg_limit")))# --------------------------------------------------------------------------------------------------# 巡检系统端口是否开放# 首先传入一个IP地址,解析出其用户名密码表def GetAddressAsPasswd(address): with open("./config.json", "r", encoding="utf-8") as read_config_ptr: ptr = json.loads(read_config_ptr.read()) for each_list in ptr.get("system"): for k,v in each_list.items(): for ser in v: if ser[0] == address: return ser return False# 拼接字符串数据def append_string(src_str,append_str): for x in range(len(append_str)): src_str += append_str[x] return src_str# 检查端口是否被关闭了,如果关闭了则提示关闭def InspectPort(telephone,platform,person): with open("./config.json", "r", encoding="utf-8") as read_config_ptr: ptr = json.loads(read_config_ptr.read()) port = ptr.get("port") default_port = ptr.get("default-port") for each_list in port: for k,v in each_list.items(): for item in v: this_user_passwd = GetAddressAsPasswd(item[0]) if this_user_passwd != False: try: now = datetime.datetime.now() print("[+] 监控范围: 端口状态检测 --> 监测组: {} --> 检测日期: {} --> 检测地址: {}".format(k, datetime.datetime.strftime( now, '%Y-%m-%d %H:%M:%S'), item[0])) ssh = MySSH(this_user_passwd[0], this_user_passwd[1], this_user_passwd[2], default_port) ssh.Init() close_port_list = [] for check_port in range(1,len(item)): ref = ssh.CheckPortStatus(item[check_port]) print("端口ID: {} \t 状态位: {} 存活端口: {}".format(check_port, ref, item[check_port])) if ref == False or ref == "None": close_port_list.append(item[check_port]) print("端口ID: {} \t 状态位: {} 关闭端口: {}".format(check_port, ref, item[check_port])) ding = DingToken(False,telephone) send = "" for i in close_port_list: im = "[ 端口: {0} \t\t | \t 状态: 已关闭 ]\n".format(i) send = append_string(send,im) if(send != ""): ding.send_warning(platform,person,k,this_user_passwd[0],datetime.datetime.strftime(datetime.datetime.now(), '%Y-%m-%d %H:%M:%S'),"端口关闭",send) ssh.CloseSSH() except Exception: passdef switch_inspect_port(): with open("./config.json", "r", encoding="utf-8") as read_config_ptr: ptr = json.loads(read_config_ptr.read()) telephone = ptr.get("telephone") platform = ptr.get("platform") person = ptr.get("person") InspectPort(telephone, platform, person)# --------------------------------------------------------------------------------------------------# 巡检系统是否开放指定进程def InspectProcess(telephone,platform,person): with open("./config.json", "r", encoding="utf-8") as read_config_ptr: ptr = json.loads(read_config_ptr.read()) process = ptr.get("process") default_port = ptr.get("default-port") for each_list in process: for k,v in each_list.items(): for item in v: this_user_passwd = GetAddressAsPasswd(item[0]) if this_user_passwd != False: try: now = datetime.datetime.now() print("[+] 监控范围: 进程状态检测 --> 监测组: {} --> 检测日期: {} --> 检测地址: {}".format(k, datetime.datetime.strftime( now, '%Y-%m-%d %H:%M:%S'), item[0])) ssh = MySSH(this_user_passwd[0], this_user_passwd[1], this_user_passwd[2], default_port) ssh.Init() close_process_list = [] for check_process in range(1,len(item)): ref = ssh.CheckProcessStatus(item[check_process]) print("进程ID: {} \t 状态位: {} 存活进程: {}".format(check_process, ref, item[check_process])) if ref == False or ref == "None": close_process_list.append(item[check_process]) print("进程ID: {} \t 状态位: {} 关闭进程: {}".format(check_process, ref, item[check_process])) ding = DingToken(False,telephone) send = "" for i in close_process_list: im = "[ 进程: {0} \t\t | \t 状态: 已关闭 ]\n".format(i) send = append_string(send,im) if(send != ""): ding.send_warning(platform,person,k,this_user_passwd[0],datetime.datetime.strftime(datetime.datetime.now(), '%Y-%m-%d %H:%M:%S'),"进程退出",send) ssh.CloseSSH() except Exception: passdef switch_inspect_process(): with open("./config.json", "r", encoding="utf-8") as read_config_ptr: ptr = json.loads(read_config_ptr.read()) telephone = ptr.get("telephone") platform = ptr.get("platform") person = ptr.get("person") InspectProcess(telephone, platform, person)
以磁盘为例,当超过了我们定义好的阈值时,则会通过钉钉提示用户告警,告警提示如下。
定义配置文件: 配置文件则是巡检时需要解析的内容,我们需要依次写入账号密码等信息。
{ "system_config": [ {"ping_": "True"}, {"disk_": "True"}, {"cpu_": "False"}, {"loadavg_": "False"}, {"memory_": "False"}, {"process_": "False"}, {"port_": "False"} ], "platform": "总部客服系统(呼叫中心)", "person": "王瑞", "telephone": "18264825669", "e-mail": "admin@lyshark.com", "default-port": "22", "set-timeout": "3600", "disk_limit": 75, "memory_limit": 95, "cpu_limit": 90, "load_avg_limit": ["5.0","8.0","10.0"], "system": [ { "Centos 服务器组": [ ["192.168.1.1","root","1233"], ["192.168.1.1","root","1233"], ["192.168.1.1","root","1233"] ], "AIX 服务器组": [ ["192.168.1.1","root","1233"], ["192.168.1.1","root","1233"] ] } ], "process": [ { "CTI进程组": [ ["192.168.1.1","icdcomm","aplogic","ctilink","cnfgsvr","mcp"], ["192.168.1.1","aplogic","ctilink","cnfgsvr","mcp","ccsapp","nis","oas","ivr","ctiserver"] ] } ], "port": [ { "CTI端口检测": [ ["192.168.1.1","111","631","8888","19001","25","2556","5150","5600","34902","10000","34901","10005"], ["192.168.1.1","111","8080","10000","10004","10005","8888","19001","2556","5600","60661"] ] } ]}
定义main入口代码: 入口代码主要负责解析参数与巡检,由于怕影响服务器性能,所有没加多线程支持,不过也够用了。
import systemimport json,timeif __name__ == "__main__": with open("./config.json", "r", encoding="utf-8") as read_config_ptr: ptr = json.loads(read_config_ptr.read()) timeout = ptr.get("set-timeout") configure = ptr.get("system_config") # 默认开关全部关闭 disk_ = "False" cpu_ = "False" memory_ = "False" loadavg_ = "False" process_ = "False" port_ = "False" ping_ = "False" # 判断配置文件开关是否开启,如果开启了则将内存的开关开启 for dic in configure: for k,v in dic.items(): if k == "ping_" and v == "True": ping_ = "True" if k == "disk_" and v == "True": disk_ = "True" if k == "cpu_" and v == "True": cpu_ = "True" if k == "memory_" and v == "True": memory_ = "True" if k == "loadavg_" and v == "True": loadavg_ = "True" if k == "process_" and v == "True": process_ = "True" if k == "port_" and v == "True": port_ = "True" while True: if ping_ == "True": system.switch_ping() if disk_ == "True": system.switch_inspect_disk() if cpu_ == "True": system.switch_inspect_cpu() if memory_ == "True": system.switch_inspect_memory() if loadavg_ == "True": system.switch_inspect_avg() if process_ == "True": system.switch_inspect_process() if port_ == "True": system.switch_inspect_port() time.sleep(int(timeout))
实现正则解析: 前面的配置无法实现交互式问答,智能机器人推送数据,很被动,我们需要开启交互机器人,自己实现业务逻辑。
import os,sys,redef participle(text): ref = re.findall(r'(25[0-5]|2[0-4]\d|[0-1]\d{2}|[1-9]?\d)\.(25[0-5]|2[0-4]\d|[0-1]\d{2}|[1-9]?\d)\.(25[0-5]|2[0-4]\d|[0-1]\d{2}|[1-9]?\d)\.(25[0-5]|2[0-4]\d|[0-1]\d{2}|[1-9]?\d)',text) address = ".".join(ref[0]) select = re.findall('查询', text) if(len(address) !=0 and len(select) != 0): if len (re.findall('内存',text)) != 0: return [address,"内存"] elif len (re.findall('磁盘',text)) != 0: return [address,"磁盘"] elif len (re.findall('负载',text)) != 0: return [address,"负载"]if __name__ == "__main__": text = "帮我查询192.168.1.1主机的内存负载情况" text2 = "帮我查询 192.168.1.200 这 台设 备的 磁盘 使用率" text3 = "需要查询192.168.1.200 设备中负载使用情况,请反馈结果" text4 = "发现负载出现告警,请立即查询192.168.1.200这台设备的负载使用率,并立即反馈" print(participle(text)) print(participle(text2)) print(participle(text3)) print(participle(text4)) txt = ''' 主编单位:总部客服系统 ''' addr = re.findall('(主编单位:.*?)\s', txt)[0] print(addr)
这个案例,从文本中提取可用信息,并将其返回为列表格式。
接着配置钉钉开发者平台机器人,此处需要有公网地址,作用是,钉钉群有人at机器人时,机器人会将请求post发送到我们的django应用服务上。
使用django配置接收请求并处理即可,处理代码如下所示。
from django.import HttpResponse, JsonResponseimport json,hmac,hashlib,base64import sys,os,re# 机器人 app_secretapp_secret = "LcKEf0nlqe"# 根据IP查询操作使用def select_participle(text): ref = re.findall(r'(25[0-5]|2[0-4]\d|[0-1]\d{2}|[1-9]?\d)\.(25[0-5]|2[0-4]\d|[0-1]\d{2}|[1-9]?\d)\.(25[0-5]|2[0-4]\d|[0-1]\d{2}|[1-9]?\d)\.(25[0-5]|2[0-4]\d|[0-1]\d{2}|[1-9]?\d)',text) # 判断如果地址不为空,则进行查询等操作 if len(ref) != 0: address = ".".join(ref[0]) select = re.findall('查询', text) if(len(address) !=0 and len(select) != 0): if len (re.findall('内存',text)) != 0: return [address,"内存"] elif len (re.findall('负载',text)) != 0: return [address,"负载"] else: if len(re.findall('主机列表', text)) != 0: return ["主机列表"]def index(request): if request.method == "POST": HTTP_SIGN = request.META.get("HTTP_SIGN") HTTP_TIMESTAMP = request.META.get("HTTP_TIMESTAMP") res = json.loads(request.body) # 接收用户的输入并做处理 sendnick = res.get("senderNick") is_admin = res.get("isAdmin") conver = res.get("conversationTitle") content = res.get("text").get("content") print("发送人: {} 是否管理员: {} 所在组: {} 接收数据: {}".format(sendnick,is_admin,conver,content)) # 数据的加密解密 string_to_sign = '{}\n{}'.format(HTTP_TIMESTAMP, app_secret) string_to_sign_enc = string_to_sign.encode('utf-8') hmac_code = hmac.new(app_secret.encode("utf-8"), string_to_sign_enc, digestmod=hashlib.sha256).digest() sign = base64.b64encode(hmac_code).decode("utf-8") ref_select_text = select_participle(content) if len(ref_select_text) == 2: if sign == HTTP_SIGN: if "负载" in ref_select_text[1]: return JsonResponse( {"msgtype": "text", "text": { "content": "经过查询,主机: {} 负载率为: 10% 当前为正常状态".format(ref_select_text[0]) } }) if "内存" in ref_select_text[1]: return JsonResponse\ ( {"msgtype": "text", "text": { "content": "经过查询,主机: {} 内存占用率为: 70% 当前为正常状态".format(ref_select_text[0]) } } ) elif len(ref_select_text) == 1: if "主机列表" in ref_select_text[0]: return JsonResponse \ ( {"msgtype": "text", "text": { "content": "目前您所管理的主机,总结: 205台" } } ) return JsonResponse({"error": "没有权限"}) if request.method == "GET": return HttpResponse("没有权限")
django运行后,我们回到钉钉群内,问机器人一些问题看看吧,前提是这些问题已经定义了正则解析。
django 后台则能够接收到,这些数据,并做出回应。
版权声明:本文内容由网络用户投稿,版权归原作者所有,本站不拥有其著作权,亦不承担相应法律责任。如果您发现本站中有涉嫌抄袭或描述失实的内容,请联系我们jiasou666@gmail.com 处理,核实后本网站将在24小时内删除侵权内容。
发表评论
暂时没有评论,来抢沙发吧~