关于阿里云:弱结构化日志-Flink-SQL-怎么写SLS-SPL-来帮忙

4次阅读

共计 6649 个字符,预计需要花费 17 分钟才能阅读完成。

作者:潘伟龙(豁朗)

背景

日志服务 SLS 是云原生观测与剖析平台,为 Log、Metric、Trace 等数据提供大规模、低成本、实时的平台化服务,基于日志服务的便捷的数据接入能力,能够将系统日志、业务日志等接入 SLS 进行存储、剖析;阿里云 Flink 是阿里云基于 Apache Flink 构建的大数据分析平台,在实时数据分析、风控检测等场景利用宽泛。阿里云 Flink 原生反对阿里云日志服务 SLS 的 Connector,用户能够在阿里云 Flink 平台将 SLS 作为源表或者后果表应用。

阿里云 Flink SLS Connector 对于结构化的日志十分间接,通过配置,SLS 的日志字段能够与 Flink SQL 的 Table 字段列一一映射;而后仍有大量的业务日志并非齐全的结构化,例如会将所有日志内容写入一个字段中,须要正则提前、分隔符拆分等伎俩才能够提取出结构化的字段,基于这个场景,本文介绍一种 应用 SLS SPL 配置 SLS Connector 实现数据结构化 的计划,笼罩日志荡涤与格局规整场景。

弱结构化日志解决的痛点

弱结构化日志现状与结构化解决需要的矛盾

日志数据往往是多种起源,多种格局,往往没有固定的 Schema,所以在数据处理前,须要先对数据进行荡涤、格局规整,而后在进行数据分析;这类数据内容格局是不固定的,可能是 JSON 字符串、CSV 格局,甚至是不规则的 Java 堆栈日志。

Flink SQL 是一种兼容 SQL 语法的实时计算模型,能够基于 SQL 对结构化数据进行剖析,但同时也要求源数据模式固定:字段名称、类型、数量是固定;这也是 SQL 计算模型的根底。

日志数据的弱结构化特点与 Flink SQL 结构化分析之间有着一道鸿沟,逾越这道鸿沟须要一个中间层来进行数据荡涤、规整;这个中间层的计划有多种抉择能够应用,上面会对不同的计划做简略比照,并提出一种新的基于 SLS SPL 的计划来轻量化实现解决数据荡涤规整的工作。

弱结构化日志数据

上面是一条日志示例,日志格局较为简单,既有 JSON 字符串,又有字符串与 JSON 混合的场景。其中:

  • Payload 为 JSON 字符串,其中 schedule 字段的内容也是一段 JSON 构造。
  • requestURL 为一段规范的 URL Path 门路。
  • error 字段是前半部分蕴含 CouldNotExecuteQuery:字符串,后半局部是一段 JSON 构造。
  • tag__:__path 蕴含日志文件的门路,其中 service_a 可能是业务名称。
  • caller 中蕴含文件名与文件行数。
{"Payload": "{\"lastNotified\": 1705030483, \"serverUri\": \"http://test.alert.com/alert-api/tasks\", \"jobID\": \"44d6ce47bb4995ef0c8052a9a30ed6d8\", \"alertName\": \"alert-12345678-123456\", \"project\": \"test-sls-project\", \"projectId\": 123, \"aliuid\": \"1234567890\", \"alertDisplayName\": \"\\u6d4b\\u8bd5\\u963f\\u91cc\\u4e91\\u544a\\u8b66\", \"checkJobUri\": \"http://test.alert.com/alert-api/task_check\", \"schedule\": {\"timeZone\": \"\", \"delay\": 0, \"runImmediately\": false, \"type\": \"FixedRate\", \"interval\": \"1m\"}, \"jobRunID\": \"bf86aa5e67a6891d-61016da98c79b-5071a6b\", \"firedNotNotified\": 25161}",
  "TaskID": "bf86aa5e67a6891d-61016da98c79b-5071a6b-334f81a-5c38aaa1-9354-43ec-8369-4f41a7c23887",
  "TaskType": "ALERT",
  "__source__": "11.199.97.112",
  "__tag__:__hostname__": "iabcde12345.cloud.abc121",
  "__tag__:__path__": "/var/log/service_a.LOG",
  "caller": "executor/pool.go:64",
  "error": "CouldNotExecuteQuery : {\n    \"httpCode\": 404,\n    \"errorCode\": \"LogStoreNotExist\",\n    \"errorMessage\": \"logstore k8s-event does not exist\",\n    \"requestID\": \"65B7C10AB43D9895A8C3DB6A\"\n}",
  "requestURL": "/apis/autoscaling/v2beta1/namespaces/python-etl/horizontalpodautoscalers/cn-shenzhen-56492-1234567890123?timeout=30s",
  "ts": "2024-01-29 22:57:13"
}

结构化数据处理需要

对于这样的日志提取出更有价值的信息须要进行数据荡涤,首先须要提取重要的字段,而后对这些字段进行数据分析;本篇关注重要字段的提取,剖析依然能够在 Flink 中进行。

假如提取字段具体需要如下:

  • 提取 error 中的 httpCode、errorCode、errorMessage、requestID。
  • 提取 tag__:__path 中的 service_a 作为 serviceName。
  • 提取 caller 中的 pool.go 作为 fileName,64 作为 fileNo。
  • 提取 Payload 中的 project;提取 Payload 上面的 schedule 中的 type 为 scheuleType。
  • 重命名 source 为 serviceIP。
  • 其余字段舍弃。

最终须要的字段列表如下,基于这样一个表格模型,咱们能够便捷的应用 Flink SQL 进行数据分析。

解决方案

实现这样的数据荡涤,有很多种办法,这里列举几种基于 SLS 与 Flink 的计划,不同计划之间没有相对的优劣,须要依据不同的场景抉择不同的计划。

数据加工计划: 在 SLS 控制台创立指标 Logstore,通过创立数据加工工作,实现对数据的荡涤。

Flink 计划: 将 error 和 payload 指定为源表字段,通过 SQL 正则函数、JSON 函数对字段进行解析,解析后的字段写入长期表,而后对长期表进行剖析。

SPL 计划: 在 Flink SLS Connector 中配置 SPL 语句,对数据进行荡涤,Flink 中源表字段定义为荡涤后的数据结构。

从上述三种计划的原理不难看出,在须要数据荡涤的场景中,在 SLS Connector 中配置 SPL 是一种更轻量化的计划,具备 轻量化、易保护、易扩大 的特点。

在日志数据弱结构化的场景中,SPL 计划既防止了计划一中创立长期两头 Logstore,也防止了计划二中在 Flink 中创立长期表,在离数据源更近的地位进行数据荡涤,在计算平台关注业务逻辑,职责拆散更加清晰。

如何在 Flink 中应用 SPL

接下来以一段弱结构化日志为例,来介绍基于 SLS SPL 的能力来应用 Flink。为了便于演示,这里在 Flink 控制台配置 SLS 的源表,而后开启一个间断查问以察看成果。在理论应用过程中,仅需批改 SLS 源表配置,即可实现数据荡涤与字段规整。

SLS 筹备数据

  • 开明 SLS,在 SLS 创立 Project,Logstore,并创立具备生产 Logstore 的权限的账号 AK/SK。
  • 以后 Logstore 数据应用 SLS SDK 写入模仿数据,格局应用上述日志片段,其中蕴含 JSON、简单字符串等弱结构化字段。

预览 SPL 成果

在 Logstore 能够能够开启扫描模式,SLS SPL 管道式语法应用 分隔符宰割不同的指令,每次输出一个指令能够即时查看后果,而后减少管道数,渐进式、摸索式获取最终后果。

对上图中的 SPL 进行简略形容:

* | project Payload, error, "__tag__:__path__", "__tag__:__hostname__", caller 
 | parse-json Payload 
 | project-away Payload 
 | parse-regexp error, 'CouldNotExecuteQuery : ({[\w":\s,\-}]+)' as errorJson 
 | parse-json errorJson 
 | parse-regexp "__tag__:__path__", '\/var\/log\/([\w\_]+).LOG' as serviceName 
 | parse-regexp caller, '\w+/([\w\.]+):(\d+)' as fileName, fileNo 
 | project-rename serviceHost="__tag__:__hostname__" 
 | extend scheduleType = json_extract_scalar(schedule, '$.type') 
 | project httpCode, errorCode,errorMessage,requestID,fileName, fileNo, serviceHost,scheduleType, project
  • 1 行:project 指令:从原始后果中保留 Payload、error、__tag__:__path__、caller 字段,舍弃其余字段,这些字段用于后续解析。
  • 2 行:parse-json 指令:将 Payload 字符串开展为 JSON,第一层字段呈现在后果中,包含 lastNotified、serviceUri、jobID 等。
  • 3 行:project-away 指令:去除原始 Payload 字段。
  • 4 行:parse-regexp 指令:依照 error 字段中的内容,解析其中的局部 JSON 内容,置于 errorJson 字段。
  • 5 行:parse-json 指令:开展 errorJson 字段,失去 httpCode、errorCode、errorMessage 等字段。
  • 6 行:parse-regexp 指令:通过正则表达式解析出 tag__:__path 种的文件名,并命名为 serviceName。
  • 7 行:parse-regexp 指令:通过正则表达式捕捉组解析出 caller 种的文件名与行数,并置于 fileName、fileNo 字段。
  • 8 行:project-rename 指令:将 tag__:__hostname 字段重命名为 serviceHost。
  • 9 行:extend 指令:应用 json_extract_scalar 函数,提取 schedule 中的 type 字段,并命名为 scheduleType。
  • 10 行:project 指令:保留须要的字段列表,其中 project 字段来自于 Payload。

创立 SQL 作业

在阿里云 Flink 控制台创立一个空白的 SQL 的流作业草稿,点击下一步,进入作业编写。

在作业草稿中输出如下创立长期表的语句:

CREATE TEMPORARY TABLE sls_input_complex (
  errorCode STRING,
  errorMessage STRING,
  fileName STRING,
  fileNo STRING,
  httpCode STRING,
  requestID STRING,
  scheduleType STRING,
  serviceHost STRING,
  project STRING,
  proctime as PROCTIME()) WITH (
  'connector' = 'sls',
  'endpoint' ='cn-beijing-intranet.log.aliyuncs.com',
  'accessId' = '${ak}',
  'accessKey' = '${sk}',
  'starttime' = '2024-02-01 10:30:00',
  'project' ='${project}',
  'logstore' ='${logtore}',
  'query' = '* | project Payload, error,"__tag__:__path__","__tag__:__hostname__", caller | parse-json Payload | project-away Payload | parse-regexp error,''CouldNotExecuteQuery : ({[\w":\s,\-}]+)'' as errorJson | parse-json errorJson | parse-regexp "__tag__:__path__", ''\/var\/log\/([\w\_]+).LOG'' as serviceName | parse-regexp caller, ''\w+/([\w\.]+):(\d+)'' as fileName, fileNo | project-rename serviceHost="__tag__:__hostname__" | extend scheduleType = json_extract_scalar(schedule, ''$.type'') | project httpCode, errorCode,errorMessage,requestID,fileName, fileNo, serviceHost,scheduleType,project'
  );
  • 其中 ${ak},${sk},${project},${logstore} 须要替换为有生产权限的 AK 账号。
  • query 字段,替换为上述 SPL,留神在阿里云 Flink 控制台须要对单引号应用单引号本义,并且打消换行符。
  • SPL 最终失去的字段列表与 TABLE 中字段对应。

间断查问及成果

在作业中输出剖析语句,查看后果数据:

SELECT * FROM sls_input_complex

点击右上角调试按钮,进行调试,能够看到 TABLE 中每一列的值,对应 SPL 解决后的后果。

总结

为了适应弱结构化日志数据的需要,Flink SLS Connector 进行了降级,反对间接通过 Connector 配置 SPL 的形式实现 SLS 数据源的荡涤下推,特地是须要正则字段提取、JSON 字段提取、CSV 字段提取场景下,相较原数据加工计划和原 Flink SLS Connector 计划更轻量级,让数据荡涤的职责更加清晰,在数据源端实现数据荡涤工作,也能够缩小数据的网络传输流量,使得达到 Flink 的数据曾经是规整好的数据,能够更加专一在 Flink 中进行业务数据分析。

同时为了便于 SPL 验证测试,SLS 扫描查问也已反对应用 SPL 进行查问,能够实时看到 SPL 管道式语法执行后果。

参考链接:

[1] 日志服务概述

https://help.aliyun.com/zh/sls/product-overview/what-is-log-s…

[2] SPL 概述

https://help.aliyun.com/zh/sls/user-guide/spl-overview

[3] 阿里云 Flink Connector SLS

https://help.aliyun.com/zh/flink/developer-reference/log-serv…

[4] SLS 扫描查问

https://help.aliyun.com/zh/sls/user-guide/scan-based-query-ov…


参加体验赢惊喜大会门票

通义灵码,是阿里云出品的一款基于通义大模型的智能编码辅助工具,提供行级 / 函数级实时续写、自然语言生成代码、单元测试生成、代码优化、正文生成、代码解释、研发智能问答、异样报错排查等能力,并针对阿里云的云服务应用场景调优,助力开发者高效、晦涩的编码。

官网链接:https://tongyi.aliyun.com/lingma

本次 ArchSummit 架构师峰会期间,通义灵码联结 InfoQ 策动发动 AI 编程体验流动,保留小程序卡片,微信扫码进入小程序,参加通义灵码体验抽奖流动,有机会赢寰球架构师峰会专属收费门票(票价 5440 元)

正文完
 0