关于运维:异常追踪频道与-IM-双向互动

55次阅读

共计 5717 个字符,预计需要花费 15 分钟才能阅读完成。

背景

为不便更加及时可不便的获取异样追踪中的新 Issue,咱们能够通过在外部群中创立一个飞书、钉钉或者企业微信的机器人来承受异样追踪中的新 Issue 的揭示,或者是新回复的揭示,这样能够帮忙咱们及时处理 Issue,咱们也能够通过 @机器人的这种形式来疾速进行 Issue 回复来进步咱们的异样解决效率

异样追踪与 IM 互动流程

本文以飞书机器人为例来解说具体实现流程。

筹备工作

  1. 获取观测云须要跟踪 Issue 工作空间的 API Key:https://docs.guance.com/management/api-key
  2. 创立一个飞书机器人助手利用:https://www.feishu.cn/hc/zh-CN/articles/495678957203
  3. 部署一个 Dataflux Func 观测云特别版:https://func.guance.com

试验流程

1. 编写飞书机器人收取 Issue 信息规定

因为飞书不间接提供 outgoing 这种间断对话形式的机器人利用,所以须要咱们通过多个机器人利用流程来实现相似的对话性能。

首先咱们要有一个收取 issue 信息的流程来捕捉新 issue 或新回复信息

咱们通过 Webhook 作为流程触发条件,当咱们监听脚本发现有新 issue 记录或者回复时就能够通过调用飞书的 webhook 来触发机器人的音讯推送流程了

咱们能够通过配置 Webhook 触发器将咱们 Issue 中的一些参数捕捉作为机器人发送飞书音讯的入参

咱们能够通过 markdown 的形式来编写咱们想要出现的音讯款式,同时援用咱们下面在 webhook 中配置的参数来出现音讯

2. 编写监听脚本

在做好收取 Issue 的规定配置后,咱们须要在曾经装置配置好的 Dataflux Func 中编写获取新音讯并通过 webhook 发送到飞书机器人的脚本。

首先咱们须要引入一些常量,比方获取新 Issue 的 OpenAPI 地址、API Key 等

import requests
import time
import json
from datetime import datetime, timedelta

# 增量 1 分钟, 获取 1 分钟前的工夫
one_minute_ago = datetime.now() - timedelta(minutes=1)
one_minute_ago_time = int(one_minute_ago.timestamp())
# 以后工夫戳
current_time = int(time.time())
# 飞书 webhook
feishu_webhook_url = "https://www.feishu.cn/flow/api/trigger-webhook/6af60259bd9691a0fd1xxxxxxx"
# 观测云 OpenAPI 地址
base_url = 'https://openapi.guance.com'
channel_list_url = base_url + '/api/v1/channel/quick_list'
issue_list_url = base_url + '/api/v1/issue/list'
# 观测云 API key
df_api_key = '5K3IcvtWbSZ2inxxxxxxxxxxx'

再引入了咱们须要的常量后咱们须要两个办法来实现新 issue 的获取,首先第一步咱们要理解异样追踪的展现逻辑,在异样追踪模块中所有的 issue 都会被频道治理,然而所有新建的 issue 都会呈现在全副的频道中,所以首先咱们须要一个办法获取所有的频道列表来找出咱们要监听的全副频道

# 获取频道列表
def get_channel_list(df_api_key, channel_list_url):
    # 要发送的参数
    params = {
        'DF-API-KEY': df_api_key,
        'Content-Type': 'application/json;charset=UTF-8'
    }

    try:
        # 发送 GET 申请
        response = requests.get(channel_list_url, headers=params)

        # 查看响应状态码
        if response.status_code == 200:
            # 解析 JSON 响应(如果实用)channel_list = response.json()["content"]
            channels = []
            # 解决响应数据
            # print("Response:", data)
            for channel in channel_list:
                data = {"id": channel["id"],
                    "name": channel["name"],
                    "uuid": channel["uuid"]
                }
                channels.append(data)
            return channels
        else:
            print("Request failed with status code:", response.status_code)

    except requests.exceptions.RequestException as e:
        print("Request error:", e)

再获取到咱们须要的频道 Channel_UUID 后咱们就能够通过频道 ID 来查找以后频道中的新增 Issue 了,同时咱们获取到新 issue 后能够同时发送给咱们的飞书机器人的 webhook

# 获取 issue 列表并发送飞书音讯
def get_issue_list_to_feishu(df_api_key, issue_list_url, channelUUID, feishu_webhook_url):
    # 要发送的参数
    headers = {
        'DF-API-KEY': df_api_key,
        'Content-Type': 'application/json;charset=UTF-8'
    }

    body = {
        'channelUUID': channelUUID,
        'startTime': one_minute_ago_time,
        'endTime': current_time,
    }

    try:
        # 发送 GET 申请
        response = requests.post(issue_list_url, headers=headers, data=json.dumps(body))

        # 查看响应状态码
        if response.status_code == 200:
            # 解析 JSON 响应(如果实用)issue_lists = response.json()['content']
            if len(issue_lists):
                for issue in issue_lists:
                    headers = {'Content-Type': 'application/json',}
                    data = {"issue_uuid": issue["uuid"],
                        "title": issue["name"],
                        "level": issue["level"],
                        "source": issue["resource"],
                        "message": issue["description"],
                        "createAt": datetime.fromtimestamp(issue["createAt"]).strftime('%Y-%m-%d %H:%M:%S')
                    }
                    try:
                        response = requests.post(feishu_webhook_url, headers=headers, data=json.dumps(data))
                        response.raise_for_status()
                        print("Message sent successfully to Feishu")
                    except requests.exceptions.HTTPError as errh:
                        print(f"Http Error: {errh}")
                    except requests.exceptions.ConnectionError as errc:
                        print(f"Error Connecting: {errc}")
                    except requests.exceptions.Timeout as errt:
                        print(f"Timeout Error: {errt}")
                    except requests.exceptions.RequestException as err:
                        print(f"OOps: Something Else {err}")
        else:
            print("Request failed with status code:", response.status_code)

    except requests.exceptions.RequestException as e:
        print("Request error:", e)

最初咱们须要一个能够配置定时工作的主办法,来实现获取新 Issue 的工作能够主动执行

# 主办法
@DFF.API('获取 issue 发送到飞书')
def main():
    channel_list = get_channel_list(df_api_key, channel_list_url)
    print(f"channel_list: {channel_list}")
    for channel in channel_list:
        if channel["name"] == "default":
            get_issue_list_to_feishu(df_api_key, issue_list_url, channel["uuid"], feishu_webhook_url)

再实现脚本编写后咱们须要在【治理 / 主动触发配置】中为咱们刚刚编写的脚本创立主动触发工作,将执行函数选为咱们最初编写的主办法: 获取 issue 发送到飞书即可,同时将 Crontab 配置为每分钟执行一次后,点击保留即可实现工作的创立了。

3. 在飞书中收取新 issue 并配置回复流程

在配置好工作后咱们就能够点击执行来立刻触发一次工作,或当有新 issue 时自行触发。当异样追踪中有新 Issue 时咱们就能够在咱们配置了机器人利用的飞书群中获取到新 Issue 揭示了。

这时咱们要想对本条 Issue 记录行回复评论的时候就须要咱们配置另外一条飞书机器人利用的回复 Issue 规定了

那这条规定的触发条件就是当咱们须要进行 Issue 音讯的回复时 @机器人的动作触发时,咱们能够配置在指定群主中失效该规定,也能够配置指定的触发词等条件

在配置好机器人触发规定后咱们须要创立咱们回复音讯的规定,因为须要拆解咱们回复音讯中的 IssueID 所以咱们须要一个 API 服务作为直达,再向 OpenAPI 发送增加评论申请

因为原音讯会带上 @机器人 所以在抉择参数时要抉择源音讯 (去除 @局部) 咱们能够通过填写 API 的返回示例来验证咱们增加评论是否胜利

当发送评论胜利后咱们能够通过下面获取的返回参数,来给音讯发送者发送音讯胜利发送的告诉

4. 编写直达发送评论脚本

因为须要对发送的音讯进行解析,所以咱们须要在 Dataflux Func 中利用受权链接性能来创立一个 API 服务,在收取到音讯解析后向 OpenAPI 发送新增评论的申请,同时增加返回的 Json 构造不便对音讯胜利的确认。

import requests
import time
import json
from datetime import datetime, timedelta


# 观测云 API key
df_api_key = '5K3IcvtWbSZ2in4ujU07xxxxxxxxxx'
# 观测云 OpenAPI 地址
base_url = 'https://openapi.guance.com'
create_issue_reply_url = base_url + '/api/v1/issue/reply/create'


@DFF.API('Create_Issue_Reply')
def create_issue_reply(content):
    print(f"conntent: {content}")
    param_list = content.split('#')
    issueUUID = param_list[1]
    content = param_list[2]
    # 要发送的参数
    headers = {
        'DF-API-KEY': df_api_key,
        'Content-Type': 'application/json;charset=UTF-8'
    }

    body = {
        'issueUUID': issueUUID,
        'content': content,
        'extend': {}}

    try:
        # 发送 GET 申请
        response = requests.post(create_issue_reply_url, headers=headers, data=json.dumps(body))
        print("repsonse:",response.json())
        # 查看响应状态码
        if response.status_code == 200:
            # 解析 JSON 响应(如果实用)result = response.json()['content']
            return {
                "issueUUID": issueUUID,
                "time": datetime.fromtimestamp(result["updateAt"]).strftime('%Y-%m-%d %H:%M:%S')
            }
        else:
            print("Request failed with status code:", response.status_code)

    except requests.exceptions.RequestException as e:
        print("Request error:", e)

在编写好 API 脚本后咱们须要通过【治理 / 受权链接】配置开启 API 服务来作为飞书 webhook http 申请的接收端

因为咱们须要直达解析参数,所以再 @机器人发送音讯时须要依照指定规定才能够胜利将音讯发送胜利,这里的采纳的形式是 #issueUUID# 音讯内容 的形式来进行的,当获取到正确的参数后就能够将回复内容发送到 OpenAPI

当音讯胜利发送后咱们能够从咱们本人配置的返回构造中获取数据进行告诉

同时也能够在 Studio 中查看咱们回复的音讯内容

总结

通过飞书机器人的形式能够更加便捷的对异样追踪中的 Issue 来进行治理,从而晋升咱们针对与异样的解决效率和合作效率。

正文完
 0