Flink SQL 的数据脱敏解决方案,反对面向用户级别的数据脱敏访问控制,即特定用户只能拜访到脱敏后的数据。此计划是实时畛域 Flink 的解决思路,相似于离线数仓 Hive 中 Ranger Column Masking 计划。
一、基础知识
1.1 数据脱敏
数据脱敏 (Data Masking) 是一种数据安全技术,用于爱护敏感数据,以避免未经受权的拜访。该技术通过将敏感数据替换为虚伪数据或不可辨认的数据来实现。例如能够应用数据脱敏技术将信用卡号码、社会平安号码等敏感信息替换为随机生成的数字或字母,以爱护这些信息的隐衷和平安。
1.2 业务流程
上面用订单表 orders
的两行数据来举例,示例数据如下:
1.2.1 设置脱敏策略
管理员配置用户、表、字段、脱敏条件,例如上面的配置。
1.2.2 用户拜访数据
当用户在 Flink 上查问 orders
表的数据时,会在底层联合该用户的脱敏条件从新生成 SQL,即让数据脱敏失效。
当用户 A 和用户 B 在执行上面雷同的 SQL 时,会看到不同的后果数据。
SELECT * FROM orders
用户 A 查看到的后果数据如下 ,customer_name
字段的数据被全副覆盖掉。
用户 B 查看到的后果数据如下 ,customer_name
字段的数据只会显示前 4 位,剩下的用 x 代替。
二、Hive 数据脱敏解决方案
在离线数仓工具 Hive 畛域,因为倒退多年已有 Ranger 来反对字段数据的脱敏管制,详见参考文献 [[1]](https://docs.cloudera.com/HDPDocuments/HDP3/HDP-3.1.0/authori…)。
下图是在 Ranger 里配置 Hive 表数据脱敏条件的页面,供参考。
但因为 Flink 实时数仓畛域倒退绝对较短,Ranger 还不反对 Flink SQL,以及依赖 Ranger 的话会导致系统部署和运维过重,因而开始 自研实时数仓的数据脱敏解决工具。当然本文中的核心思想也实用于 Ranger 中,能够基于此较快开发出 ranger-flink 插件。
三、Flink SQL 数据脱敏解决方案
3.1 解决方案
3.1.1 Flink SQL 执行流程
能够参考作者文章 [[FlinkSQL 字段血统解决方案及源码]](https://github.com/HamaWhiteGG/flink-sql-lineage/blob/main/README_CN.md),本文依据 Flink 1.16 修改和简化后的执行流程如下图所示。
在 CalciteParser.parse()
解决后会失去一个 SqlNode 类型的形象语法树,本文会针对此形象语法树来组装脱敏条件起初生成新的 AST,以实现数据脱敏管制。
3.1.2 Calcite 对象继承关系
上面章节要用到 Calcite 中的 SqlNode、SqlCall、SqlIdentifier、SqlJoin、SqlBasicCall 和 SqlSelect 等类,此处进行简略介绍以及展现它们间继承关系,以便读者浏览本文源码。
序号 | 类 | 介绍 |
---|---|---|
1 | SqlNode | A SqlNode is a SQL parse tree. |
2 | SqlCall | A SqlCall is a call to an SqlOperator operator. |
3 | SqlIdentifier | A SqlIdentifier is an identifier, possibly compound. |
4 | SqlJoin | Parse tree node representing a JOIN clause. |
5 | SqlBasicCall | Implementation of SqlCall that keeps its operands in an array. |
6 | SqlSelect | A SqlSelect is a node of a parse tree which represents a select statement, the parent class is SqlCall |
3.1.3 解决思路
针对输出的 Flink SQL,在 CalciteParser.parse()
进行语法解析后生成形象语法树 (Abstract Syntax Tree
,简称 AST) 后,采纳自定义 Calcite SqlBasicVisitor
的办法遍历 AST 中的所有 SqlSelect
,获取到外面的每个输出表。如果输出表中字段有配置脱敏条件,则针对输出表生成子查问语句,并把脱敏字段改写成CAST(脱敏函数(字段名) AS 字段类型) AS 字段名
, 再通过CalciteParser.parseExpression()
把子查问转换成 SqlSelect,并用此 SqlSelect 替换原 AST 中的输出表来生成新的 AST,最初失去新的 SQL 来继续执行。
3.2 具体计划
3.2.1 解析输出表
通过对 Flink SQL 语法的剖析和钻研,最终呈现输出表的只蕴含以下两种状况:
- SELECT 语句的 FROM 子句,如果是子查问,则递归持续遍历。
- SELECT … JOIN 语句的 Left 和 Right 子句,如果是多表 JOIN,则递归查问遍历。
因而,上面的次要步骤会依据 FROM 子句的类型来寻找输出表。
3.2.2 次要步骤
次要通过 Calcite 提供的访问者模式自定义 DataMaskVisitor 来实现,遍历 AST 中所有的 SqlSelect 对象用子查问替换外面的输出表。
上面详细描述替换输出表的步骤,整体流程如下图所示。
- 遍历 AST 中 SELECT 语句。
- 判断是否自定义的 SELECT 语句(由上面步骤 10 生成),是则跳转到步骤 11,否则持续步骤 3。
- 判断 SELECT 语句中的 FROM 类型,依照不同类型对应执行上面的步骤 4、5、6 和 11。
- 如果 FROM 是 SqlJoin 类型,则别离遍历其左 Left 和 Right 右节点,即执行以后步骤 4 和步骤 7。因为可能是三张表及以上的 Join,因而进行递归解决,即针对其左 Left 节点跳回到步骤 3。
- 如果 FROM 是 SqlIdentifier 类型,则示意是表。然而输出 SQL 中没有定义表的别名,则用表名作为别名。跳转到步骤 8。
- 如果 FROM 是 SqlBasicCall 类型,则示意带别名。但须要判断是否来自子查问,是则跳转到步骤 11 持续遍历 AST,后续步骤 1 会对子查问中的 SELECT 语句进行解决。否则跳转到步骤 8。
- 递归解决 Join 的右节点,即跳回到步骤 3。
- 遍历表中的每个字段,如果某个字段有定义脱敏条件,则把改字段改写成格局
CAST(脱敏函数(字段名) AS 字段类型) AS 字段名
,否则用原字段名。 - 针对步骤 8 解决后的字段,构建子查问语句,形如
(SELECT 字段名 1, 字段名 2, CAST(脱敏函数(字段名 3) AS 字段类型) AS 字段名 3、字段名 4 FROM 表名) AS 表别名
。 - 对步骤 9 的子查问调用
CalciteParser.parseExpression()
进行解析,生成自定义 SELECT 语句,并替换掉原 FROM。 - 持续遍历 AST,找到外面的 SELECT 语句进行解决,跳回到步骤 1。
3.2.3 Hive 及 Ranger 兼容性
在 Ranger 中,默认的脱敏策略的如下所示。通过调研发现 Ranger 的大部分脱敏策略是通过调用 Hive 自带或自定义的零碎函数实现的。
序号 | 策略名 | 策略阐明 | Hive 零碎函数 |
---|---|---|---|
1 | Redact | 用 x 屏蔽字母字符,用 n 屏蔽数字字符 | mask |
2 | Partial mask: show last 4 | 仅显示最初四个字符, 其余用 x 代替 | mask_show_last_n |
3 | Partial mask: show first 4 | 仅显示前四个字符, 其余用 x 代替 | mask_show_first_n |
4 | Hash | 用值的哈希值替换原值 | mask_hash |
5 | Nullify | 用 NULL 值替换原值 | Ranger 本身实现 |
6 | Unmasked | 原样显示 | Ranger 本身实现 |
7 | Date: show only year | 仅显示日期字符串的年份 | mask |
8 | Custom | Hive UDF 来自定义策略 |
因为 Flink 反对 Hive Catalog,在 Flink 能调用 Hive 零碎函数。因而,本计划也反对在 Flink SQL 配置 Ranger 的脱敏策略。
四、用例测试
用例测试数据来自于 CDC Connectors for Apache Flink[[4]](https://ververica.github.io/flink-cdc-connectors/master/conte…)官网,
本文给 orders
表减少一个 region 字段,同时减少 'connector'='print'
类型的 print_sink 表,其字段和 orders
表的一样。
下载源码后,可通过 Maven 运行单元测试。
$ cd flink-sql-security
$ mvn test
具体测试用例可查看源码中的单测 RewriteDataMaskTest
和ExecuteDataMaskTest
,上面只形容两个案例。
4.1 测试 SELECT
4.1.1 输出 SQL
用户 A 执行下述 SQL:
SELECT order_id, customer_name, product_id, region FROM orders
4.1.2 依据脱敏条件从新生成 SQL
- 输出 SQL 是一个简略 SELECT 语句,其 FROM 类型是
SqlIdentifier
,因为没有定义别名,用表名orders
作为别名。 - 因为用户 A 针对字段
customer_name
定义脱敏条件 MASK(对应函数是脱敏函数是mask
),该字段在流程图中的步骤 8 中被改写为CAST(mask(customer_name) AS STRING) AS customer_name
,其余字段未定义脱敏条件则放弃不变。 - 而后在步骤 9 的操作中,表名
orders
被改写成如下子查问,子查问两侧用括号()
进行包裹,并且用AS 别名
来减少表别名。
(SELECT
order_id,
order_date,
CAST(mask(customer_name) AS STRING) AS customer_name,
product_id,
price,
order_status,
region
FROM
orders
) AS orders
4.1.3 输入 SQL 和运行后果
最终执行的改写后 SQL 如下所示,这样用户 A 查问到的顾客姓名 customer_name
字段都是覆盖后的数据。
SELECT
order_id,
customer_name,
product_id,
region
FROM (
SELECT
order_id,
order_date,
CAST(mask(customer_name) AS STRING) AS customer_name,
product_id,
price,
order_status,
region
FROM
orders
) AS orders
4.2 测试 INSERT-SELECT
4.2.1 输出 SQL
用户 A 执行下述 SQL:
INSERT INTO print_sink SELECT * FROM orders
4.2.2 依据脱敏条件从新生成 SQL
通过自定义 Calcite DataMaskVisitor 拜访生成的 AST,能找到对应的 SELECT 语句是SELECT order_id, customer_name, product_id, region FROM orders
。
针对此 SELECT 语句的改写逻辑同上,不再论述。
4.2.3 输入 SQL 和运行后果
最终执行的改写后 SQL 如下所示,留神插入到 print_sink
表的 customer_name
字段是覆盖后的数据。
INSERT INTO print_sink (
SELECT
*
FROM (
SELECT
order_id,
order_date,
CAST(mask(customer_name) AS STRING) AS customer_name,
product_id,
price,
order_status,
region
FROM
orders
) AS orders
)
五、参考文献
- Apache Ranger Column Masking in Hive
- FlinkSQL 字段血统解决方案及源码
- 从 SQL 语句中解析出源表和后果表
- 基于 Flink CDC 构建 MySQL 和 Postgres 的 Streaming ETL
- HiveQL—数据脱敏函数
查看更多技术内容
更多内容
流动举荐
阿里云基于 Apache Flink 构建的企业级产品 - 实时计算 Flink 版现开启流动:
0 元试用 实时计算 Flink 版(5000CU* 小时,3 个月内)
理解流动详情:https://free.aliyun.com/?pipCode=sc