Dolphinscheduler中Hive慢查询的耗时最时间最长告警(dolphinscheduler hive)

网友投稿 374 2022-08-30


Dolphinscheduler中Hive慢查询的耗时最时间最长告警(dolphinscheduler hive)

1.Dolphinscheduler钉钉告警

import pymysqlimport jsonimport requestsimport hmacimport hashlibimport timeimport base64import urllibfrom DBUtils.PooledDB import PooledDB# 数据库连接池 需要执行安装 pip3 install DBUtils==1.3from apscheduler.schedulers.blocking import BlockingSchedulerfrom dolphinscheduler import analysis_processingPOOL = PooledDB( creator=pymysql, # 使用链接数据库的模块 maxconnections=6, # 连接池允许的最大连接数,0和None表示不限制连接数 mincached=2, # 初始化时,链接池中至少创建的空闲的链接,0表示不创建 maxcached=5, # 链接池中最多闲置的链接,0和None不限制 maxshared=3, # 链接池中最多共享的链接数量,0和None表示全部共享。PS: 无用,因为pymysql和MySQLdb等模块的 threadsafety都为1,所有值无论设置为多少,_maxcached永远为0,所以永远是所有链接都共享。 blocking=True, # 连接池中如果没有可用连接后,是否阻塞等待。True,等待;False,不等待然后报错 maxusage=None, # 一个链接最多被重复使用的次数,None表示无限制 setsession=[], # 开始会话前执行的命令列表。如:["set datestyle to ...", "set time zone ..."] ping=0, # ping MySQL服务端,检查是否服务可用。# 如:0 = None = never, 1 = default = whenever it is requested, 2 = when a cursor is created, 4 = when a query is executed, 7 = always host='127.0.0.1', port=3306, user='root', password='*************', database='dolphinscheduler', charset='utf8')def get_data_fetchall(sql): connect = POOL.connection() cursor = connect.cursor() try: cursor.execute(sql) results = cursor.fetchall() return results except: print("Error: unable to fetch data") connect.close()def main(): # 获取任务列表 exec_sql = ''' SELECT c.id, c.NAME AS project_name, b.NAME AS process_name, a.NAME AS task_name, a.submit_time, a.start_time, a.end_time, round((unix_timestamp(a.end_time)-unix_timestamp(a.start_time))/3600,2) AS duration FROM t_ds_task_instance a LEFT JOIN t_ds_process_definition b ON a.process_definition_id=b.id LEFT JOIN t_ds_project c ON b.project_id=c.id WHERE a.submit_time>=date_format(now(),"%Y-%m-%d 00:00:00") AND a.task_type="SHELL" ORDER BY duration DESC LIMIT 10 ''' result_all = get_data_fetchall(exec_sql) # 打印受影响的行数 sendtext = "任务慢查询 [ %d ]条数据:" % result_all.__len__() print(sendtext) # 任务告警相关 if result_all is not None: for result in result_all: id = result[0] # id project_name = result[1] # 项目名称 process_name = result[2] # 执行进程名称 task_name = result[3] # Task名称 submit_time = result[4].strftime('%Y-%m-%d %H:%M:%S') # 提交时间 start_time = result[5].strftime('%Y-%m-%d %H:%M:%S') # 执行开始时间 end_time = result[6].strftime('%Y-%m-%d %H:%M:%S') # 执行结束时间 duration = result[7] # 任务耗时 alert_msg_template = " `【%s - 任务检测到慢查询!!!】` /n>**项目名称:** *%s* /n>**执行进程:** *%s* /n>**Task名称:** *%s* /n>**任务提交时间:** *%s* /n>**执行开始时间:** *%s* /n>**执行结束时间:** *%s* /n>**任务耗时:** *%s* /n" % ( task_name, project_name, process_name, task_name, submit_time, start_time, end_time, duration) title = '【%s - 任务检测到慢查询!!!】' % task_name send_dingding(title, alert_msg_template)def send_dingding(title, text): timestamp, sign = get_timestamp_sign() url = "+ \ "×tamp=" + timestamp + "&sign=" + sign h = {"Content-type": "application/json;charset=utf-8"} values = { "msgtype": "markdown", "markdown": { "title": "%s" % title, "text": "%s" % text }, "at": { "isAtAll": True } } x_msg = json.dumps(values).replace('/n', '\n\n') res = requests.post(url, data=x_msg, headers=h) errmsg = json.loads(res.text)['errmsg'] if errmsg == 'ok': return 'ok' return 'fail: %s' % res.textdef get_timestamp_sign(): timestamp = str(round(time.time() * 1000)) secret = "72b70fb27f1d0d5e218d676ed9c250126e4d1df3fe98e422a88d6dece337f6d4" # SEC开头的 secret_enc = secret.encode('utf-8') string_to_sign = '{}\n{}'.format(timestamp, secret) string_to_sign_enc = string_to_sign.encode('utf-8') hmac_code = hmac.new(secret_enc, string_to_sign_enc, digestmod=hashlib.sha256).digest() sign = urllib.parse.quote_plus(base64.b64encode(hmac_code)) return (timestamp, sign)if __name__ == '__main__': main()

效果展示

2.Grafana查询展示

select c.id as "序号", c.name as "项目名称" , b.name as "任务名称" , a.name as "Task名称" , a.submit_time as "任务提交时间", a.start_time as "执行开始时间", a.end_time as "执行结束时间", round((unix_timestamp(a.end_time)-unix_timestamp(a.start_time))/3600,2) as "耗时"from dolphinschedulernew.t_ds_task_instance aleft join t_ds_process_definition b on a.process_definition_id = b.idleft join t_ds_project c on b.project_id = c.idwhere a.submit_time >= date_format(now(),'%Y-%m-%d 00:00:00') and a.task_type = 'SHELL'order by 8 desclimit 50


版权声明:本文内容由网络用户投稿,版权归原作者所有,本站不拥有其著作权,亦不承担相应法律责任。如果您发现本站中有涉嫌抄袭或描述失实的内容,请联系我们jiasou666@gmail.com 处理,核实后本网站将在24小时内删除侵权内容。

上一篇:centos 解决python3.7 安装时No module named _ssl 亲测有效(centos7重置root密码)
下一篇:mybatis中orderBy(排序字段)和sort(排序方式)引起的bug及解决
相关文章

 发表评论

暂时没有评论,来抢沙发吧~