作者 | sqlboy-yuzhenc

背景介绍

在理论利用中,咱们常常须要将特定的工作告诉给特定的人,尽管 Apache DolphinScheduler 在平安核心提供了告警组和告警实例,然而配置起来绝对简单,并且还须要在定时调度时指定告警组。通过这篇文章,你将学到一个简略的办法,无需任何配置,只须要在用户表(t_ds_user)表中减少字段钉钉名称(dignding_name),创立用户时指定用户的手机号码和保护对应的钉钉名称,就能轻松实现 Apache DolphinScheduler 工作失败时钉钉告警到指定的人。

装置插件plpython3u

psql etl -U postgrescreate extension plpython3u

pip装置requests

cd /opt && wget https://bootstrap.pypa.io/get-pip.pypython get-pip.pypip install requests

创立发送钉钉的存储过程

  • plpython3u为不受信语言,所以只能被超级用户应用
sqlcreate or replace function tool.sp_send(      message json     ,webhook varchar      ,secret varchar )    returns text    language plpython3u    security definer as $function$import requestsimport jsonimport timeimport hmacimport hashlibimport base64import urllib.parse"""/* * 作者 : v-yuzhenc * 性能 : 给钉钉发送一条音讯 * message : 须要发送的音讯,json格局,详情参考https://open.dingtalk.com/document/robots/custom-robot-access * webhook : 钉钉机器人的webhook * secret : 钉钉机器人的secret * */"""v_timestamp = str(round(time.time() * 1000))p_secret = secretsecret_enc = p_secret.encode('utf-8')string_to_sign = '{}\n{}'.format(v_timestamp, p_secret)string_to_sign_enc = string_to_sign.encode('utf-8')hmac_code = hmac.new(secret_enc, string_to_sign_enc, digestmod=hashlib.sha256).digest()v_sign = urllib.parse.quote_plus(base64.b64encode(hmac_code))# 钉钉自定义机器人的webhook地址p_webhook = webhookwebhook_url = p_webhook+"&timestamp="+v_timestamp+"&sign="+v_sign# 要发送的音讯内容p_message = json.loads(message)# 发送POST申请response = requests.post(webhook_url, data=json.dumps(p_message), headers={"Content-Type": "application/json"})# 打印响应后果return response.text$function$;alter function tool.sp_send(json,varchar,varchar) owner to tool;grant execute on function tool.sp_send(json,varchar,varchar) to public;

测试发送钉钉的存储过程

select sp_send('{    "msgtype": "actionCard",    "actionCard": {        "title": "我 20 年前想打造一间苹果咖啡厅,而它正是 Apple Store 的前身",         "text": "![screenshot](/img/bVc9rHN) \n\n #### 乔布斯 20 年前想打造的苹果咖啡厅 \n\n Apple Store 的设计正从原来满满的科技感走向生活化,而其生活化的走向其实能够追溯到 20 年前苹果一个建设咖啡馆的打算",         "btnOrientation": "0",         "btns": [            {                "title": "内容不错",                 "actionURL": "https://www.dingtalk.com/"            },             {                "title": "不感兴趣",                 "actionURL": "https://www.dingtalk.com/"            }        ]    }}'::json);

参考

自定义机器人平安设置 - 钉钉开放平台

自定义机器人接入 - 钉钉开放平台

t_ds_user减少字段

alter table t_ds_user add column dingding_name varchar(100);--人为将海豚账号对应的钉钉用户名更新下来

编写触发器

CREATE OR REPLACE FUNCTION dp.tg_ds_udef_alert_ding() RETURNS trigger LANGUAGE plpgsqlAS $function$/* * 作者:v-yuzhenc * 性能:海豚调度工作流失败主动告警 * */declare    i record;    v_user varchar;    v_mobile varchar;    v_content text;    v_message varchar;begin    if new.state in (4,5,6) then         for i in (            select                  d.user_name                ,d.phone                 ,d.dingding_name                ,g.name project_name                ,e.name process_name                ,string_agg(distinct b.name||' '||to_char(b.end_time,'yyyy-mm-dd hh24:mi:ss'),'\r\n') task_name            from t_ds_process_instance a             inner join t_ds_task_instance b             on (a.id = b.process_instance_id)            inner join t_ds_task_definition c             on (b.task_code = c.code and b.task_definition_version = c."version")            inner join t_ds_user d             on (c.user_id = d.id)            inner join t_ds_process_definition e             on (a.process_definition_code = e.code and a.process_definition_version = e."version")            inner join t_ds_project g             on (e.project_code = g.code)            where c.task_type <> 'SUB_PROCESS'                and a.state = 6                and b.state = 6                and a.id = new.id            group by d.user_name                ,d.phone                 ,d.dingding_name                ,g.name                ,e.name        ) loop             v_mobile := i.phone;            v_user := i.dingding_name;            v_content := '海豚工作流执行失败,请尽快解决!\r\n项目名称:\r\n'||i.project_name||'\r\n工作流名称:\r\n'||i.process_name||'\r\n工作名称:\r\n'||i.task_name;            v_message := $v_message${    "at": {        "atMobiles":[            "$v_message$||v_mobile||$v_message$"        ],        "atUserIds":[            "$v_message$||v_user||$v_message$"        ],        "isAtAll": false    },    "text": {        "content":"$v_message$||v_content||$v_message$"    },    "msgtype":"text"}$v_message$;            --告警            perform tool.sp_send(v_message::json);        end loop;    end if;    return new;end;$function$;create trigger tg_state_ds_process_instance after update on t_ds_process_instance for each row execute procedure tg_ds_udef_alert_ding();

测试

本文转载自CSDN博主sqlboy-yuzhenc文章:https://blog.csdn.net/qq_33445829/article/details/131073349

本文由 白鲸开源科技 提供公布反对!