共计 7938 个字符,预计需要花费 20 分钟才能阅读完成。
简介: 本文着重从 shuffle、join 形式的抉择、对象重用、UDF 重用等方面介绍了京东在 Flink SQL 工作方面做的优化措施。
本文作者为京东算法服务部的张颖和段学浩,并由 Apache Hive PMC,阿里巴巴技术专家李锐帮忙校对。次要内容为:
- 背景
- Flink SQL 的优化
- 总结
GitHub 地址
https://github.com/apache/flink
欢送大家给 Flink 点赞送 star~
一、背景
目前,京东搜寻举荐的数据处理流程如上图所示。能够看到实时和离线是离开的,离线数据处理大部分用的是 Hive / Spark,实时数据处理则大部分用 Flink / Storm。
这就造成了以下景象:在一个业务引擎里,用户须要保护两套环境、两套代码,许多共性不能复用,数据的品质和一致性很难失去保障。且因为流批底层数据模型不统一,导致须要做大量的拼凑逻辑;甚至为了数据一致性,须要做大量的同比、环比、二次加工等数据比照,效率极差,并且非常容易出错。
而反对批流一体的 Flink SQL 能够很大水平上解决这个痛点,因而咱们决定引入 Flink 来解决这种问题。
在大多数作业,特地是 Flink 作业中,执行效率的优化始终是 Flink 工作优化的要害,在京东每天数据增量 PB 级状况下,作业的优化显得尤为重要。
写过一些 SQL 作业的同学必定都晓得,对于 Flink SQL 作业,在一些状况下会造成同一个 UDF 被重复调用的状况,这对一些耗费资源的工作十分不敌对;此外,影响执行效率大抵能够从 shuffle、join、failover 策略等方面思考;另外,Flink 工作调试的过程也非常复杂,对于一些线上机器隔离的公司来说尤甚。
为此,咱们实现了内嵌式的 Derby 来作为 Hive 的元数据存储数据库 (allowEmbedded);在工作复原方面,批式作业没有 checkpoint 机制来实现 failover,然而 Flink 特有的 region 策略能够使批式作业疾速复原;此外,本文还介绍了对象重用等相干优化措施。
二、Flink SQL 的优化
1. UDF 重用
在 Flink SQL 工作里会呈现以下这种状况:如果雷同的 UDF 既呈现在 LogicalProject 中,又呈现在 Where 条件中,那么 UDF 会进行屡次调用 (见 https://issues.apache.org/jira/browse/FLINK-20887))。然而如果该 UDF 十分耗 CPU 或者内存,这种多余的计算会十分影响性能,为此咱们心愿能把 UDF 的后果缓存起来下次间接应用。在设计的时候须要思考:(十分重要:请肯定保障 LogicalProject 和 where 条件的 subtask chain 到一起)
- 一个 taskmanager 外面可能会有多个 subtask,所以这个 cache 要么是 thread (THREAD LOCAL) 级别要么是 tm 级别;
- 为了防止出现一些状况导致清理 cache 的逻辑走不到,肯定要在 close 办法里将 cache 清掉;
- 为了避免内存有限增大,选取的 cache 最好能够被动管制 size;至于“超时工夫”,倡议能够配置一下,然而最好不要小于 UDF 先后调用的工夫;
- 上文有提到过,一个 tm 外面可能会有多个 subtask,相当于 tm 外面是个多线程的环境。首先咱们的 cache 须要是线程平安的,而后可依据业务判断需不需要锁。
依据以上思考,咱们用 guava cache 将 UDF 的后果缓存起来,之后调用的时候间接去 cache 外面拿数据,最大可能升高工作的耗费。上面是一个简略的应用(同时设置了最大应用 size、超时工夫,然而没有写锁):
public class RandomFunction extends ScalarFunction {private static Cache<String, Integer> cache = CacheBuilder.newBuilder()
.maximumSize(2)
.expireAfterWrite(3, TimeUnit.SECONDS)
.build();
public int eval(String pvid) {profileLog.error("RandomFunction invoked:" + atomicInteger.incrementAndGet());
Integer result = cache.getIfPresent(pvid);
if (null == result) {int tmp = (int)(Math.random() * 1000);
cache.put("pvid", tmp);
return tmp;
}
return result;
}
@Override
public void close() throws Exception {super.close();
cache.cleanUp();}
}
2. 单元测试
大家可能会好奇为什么会把单元测试也放到优化外面,大家都晓得 Flink 工作调试过程非常复杂,对于一些线上机器隔离的公司来说尤甚。京东的本地环境是没有方法拜访工作服务器的,因而在初始阶段调试工作,咱们消耗了很多工夫用来上传 jar 包、查看日志等行为。
为了升高工作的调试工夫、减少代码开发人员的开发效率,实现了内嵌式的 Derby 来作为 Hive 的元数据存储数据库 (allowEmbedded),这算是一种优化开发工夫的办法。具体思路如下:
首先创立 Hive Conf:
public static HiveConf createHiveConf() {ClassLoader classLoader = new HiveOperatorTest().getClass().getClassLoader();
HiveConf.setHiveSiteLocation(classLoader.getResource(HIVE_SITE_XML));
try {TEMPORARY_FOLDER.create();
String warehouseDir = TEMPORARY_FOLDER.newFolder().getAbsolutePath() + "/metastore_db";
String warehouseUri = String.format(HIVE_WAREHOUSE_URI_FORMAT, warehouseDir);
HiveConf hiveConf = new HiveConf();
hiveConf.setVar(
HiveConf.ConfVars.METASTOREWAREHOUSE,
TEMPORARY_FOLDER.newFolder("hive_warehouse").getAbsolutePath());
hiveConf.setVar(HiveConf.ConfVars.METASTORECONNECTURLKEY, warehouseUri);
hiveConf.set("datanucleus.connectionPoolingType", "None");
hiveConf.set("hive.metastore.schema.verification", "false");
hiveConf.set("datanucleus.schema.autoCreateTables", "true");
return hiveConf;
} catch (IOException e) {throw new CatalogException("Failed to create test HiveConf to HiveCatalog.", e);
}
}
接下来创立 Hive Catalog:(利用反射的形式调用 embedded 的接口)
public static void createCatalog() throws Exception{
Class clazz = HiveCatalog.class;
Constructor c1 = clazz.getDeclaredConstructor(new Class[]{String.class, String.class, HiveConf.class, String.class, boolean.class});
c1.setAccessible(true);
hiveCatalog = (HiveCatalog)c1.newInstance(new Object[]{"test-catalog", null, createHiveConf(), "2.3.4", true});
hiveCatalog.open();}
创立 tableEnvironment:(同官网)
EnvironmentSettings settings = EnvironmentSettings.newInstance().useBlinkPlanner().inBatchMode().build();
TableEnvironment tableEnv = TableEnvironment.create(settings);
TableConfig tableConfig = tableEnv.getConfig();
Configuration configuration = new Configuration();
configuration.setInteger("table.exec.resource.default-parallelism", 1);
tableEnv.registerCatalog(hiveCatalog.getName(), hiveCatalog);
tableEnv.useCatalog(hiveCatalog.getName());
最初敞开 Hive Catalog:
public static void closeCatalog() {if (hiveCatalog != null) {hiveCatalog.close();
}
}
此外,对于单元测试,构建适合的数据集也是一个十分大的性能,咱们实现了 CollectionTableFactory,容许本人构建适合的数据集,应用办法如下:
CollectionTableFactory.reset();
CollectionTableFactory.initData(Arrays.asList(Row.of("this is a test"), Row.of("zhangying480"), Row.of("just for test"), Row.of("a test case")));
StringBuilder sbFilesSource = new StringBuilder();
sbFilesSource.append("CREATE temporary TABLE db1.`search_realtime_table_dump_p13`(" + "`pvid` string) with ('connector.type'='COLLECTION','is-bounded'='true')");
tableEnv.executeSql(sbFilesSource.toString());
3. join 形式的抉择
传统的离线 Batch SQL (面向有界数据集的 SQL) 有三种根底的实现形式,别离是 Nested-loop Join、Sort-Merge Join 和 Hash Join。
效率 | 空间 | 备注 | |
---|---|---|---|
Nested-loop Join | 差 | 占用大 | |
Sort-Merge Join | 有 sort merge 开销 | 占用小 | 有序数据集的一种优化措施 |
Hash Join | 高 | 占用大 | 适宜大小表 |
-
Nested-loop Join 最为简略间接,将两个数据集加载到内存,并用内嵌遍历的形式来一一比拟两个数据集内的元素是否合乎 Join 条件。Nested-loop Join 的工夫效率以及空间效率都是最低的,能够应用:table.exec.disabled-operators:NestedLoopJoin 来禁用。
以下两张图片是禁用前和禁用后的成果 (如果你的禁用没有失效,先看一下是不是 Equi-Join):
- Sort-Merge Join 分为 Sort 和 Merge 两个阶段:首先将两个数据集进行别离排序,而后再对两个有序数据集别离进行遍历和匹配,相似于归并排序的合并。(Sort-Merge Join 要求对两个数据集进行排序,然而如果两个输出是有序的数据集,则能够作为一种优化计划)。
-
Hash Join 同样分为两个阶段:首先将一个数据集转换为 Hash Table,而后遍历另外一个数据集元素并与 Hash Table 内的元素进行匹配。
- 第一阶段和第一个数据集别离称为 build 阶段和 build table;
- 第二个阶段和第二个数据集别离称为 probe 阶段和 probe table。
Hash Join 效率较高然而对空间要求较大,通常是作为 Join 其中一个表为适宜放入内存的小表的状况下的优化计划 (并不是不容许溢写磁盘)。
留神:Sort-Merge Join 和 Hash Join 只实用于 Equi-Join (Join 条件均应用等于作为比拟算子)。
Flink 在 join 之上又做了一些细分,具体包含:
特点 | 应用 | |
---|---|---|
Repartition-Repartition strategy | 对数据集别离进行分区和 shuffle,如果数据集大的时候效率极差 | 两个数据集相差不大 |
Broadcast-Forward strategy | 将小表的数据全副发送到大表数据的机器上 | 两个数据集有较大的差距 |
- Repartition-Repartition strategy:Join 的两个数据集别离对它们的 key 应用雷同的分区函数进行分区,并通过网络发送数据;
- Broadcast-Forward strategy:大的数据集不做解决,另一个比拟小的数据集全副复制到集群中一部分数据的机器上。
家喻户晓,batch 的 shuffle 十分耗时间。
- 如果两个数据集有较大差距,倡议采纳 Broadcast-Forward strategy;
- 如果两个数据集差不多,倡议采纳 Repartition-Repartition strategy。
能够通过:table.optimizer.join.broadcast-threshold 来设置采纳 broadcast 的 table 大小,如果设置为“-1”,示意禁用 broadcast。
下图为禁用前后的成果:
4. multiple input
在 Flink SQL 工作里,升高 shuffle 能够无效的进步 SQL 工作的吞吐量,在理论的业务场景中常常遇到这样的状况:上游产出的数据曾经满足了数据分布要求 (如间断多个 join 算子,其中 key 是雷同的),此时 Flink 的 forward shuffle 是冗余的 shuffle,咱们心愿将这些算子 chain 到一起。Flink 1.12 引入了 mutiple input 的个性,能够打消大部分没必要的 forward shuffle,把 source 的算子 chain 到一起。
table.optimizer.multiple-input-enabled:true
下图为开了 multiple input 和没有开的拓扑图 (operator chain 性能曾经关上):
5. 对象重用
上下游 operator 之间会通过序列化 / 反序列化 / 复制阶段来进行数据传输,这种行为十分影响 Flink SQL 程序的性能,能够通过启用对象重用来进步性能。然而这在 DataStream 外面十分危险,因为可能会产生以下状况:在下一个算子中批改对象意外影响了下面算子的对象。
然而 Flink 的 Table / SQL API 中是十分平安的,能够通过如下形式来启用:
StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
env.getConfig().enableObjectReuse();
或者是通过设置:pipeline-object-reuse:true
为什么启用了对象重用会有这么大的性能晋升?在 Blink planner 中,同一工作的两个算子之间的数据交换最终将调用 BinaryString#copy,查看实现代码,能够发现 BinaryString#copy 须要复制底层 MemorySegment 的字节,通过启用对象重用来防止复制,能够无效晋升效率。
下图为没有开启对象重用时相应的火焰图:
6. SQL 工作的 failover 策略
batch 工作模式下 checkpoint 以及其相干的个性全副都不可用,因而针对实时工作的基于 checkpoint 的 failover 策略是不能利用在批工作下面的,然而 batch 工作容许 Task 之间通过 Blocking Shuffle 进行通信,当一个 Task 因为工作未知的起因失败之后,因为 Blocking Shuffle 中存储了这个 Task 所须要的全副数据,所以只须要重启这个 Task 以及通过 Pipeline Shuffle 与其相连的全副上游工作即可:
jobmanager.execution.failover-strategy:region (曾经 finish 的 operator 可间接复原)
table.exec.shuffle-mode:ALL\_EDGES\_BLOCKING (shuffle 策略)。
7. shuffle
Flink 里的 shuffle 分为 pipeline shuffle 和 blocking shuffle。
- pipeline shuffle 性能好,然而对资源的要求高,而且容错比拟差 (会将该 operator 分到后面的一个 region 外面,对于 batch 工作来说,如果这个算子出问题,将从上一个 region 复原);
-
blocking shuffle 就是传统的 batch shuffle,会将数据落盘,这种 shuffle 的容错好,然而会产生大量的磁盘、网络 io (如果为了省心的话,倡议用 blocking suffle)。blocking shuffle 又分为 hash shuffle 和 sort shuffle,
- 如果你的磁盘是 ssd 并且并发不太大的话,能够抉择应用 hash shuffle,这种 shuffle 形式产生的文件多、随机读多,对磁盘 io 影响较大;
- 如果你是 sata 并且并发比拟大,能够抉择用 sort-merge shuffle,这种 shuffle 产生的数据少,程序读,不会产生大量的磁盘 io,不过开销会更大一些 (sort merge)。
相应的控制参数:
table.exec.shuffle-mode,该参数有多个参数,默认是 ALL\_EDGES\_BLOCKING,示意所有的边都会用 blocking shuffle,不过大家能够试一下 POINTWISE\_EDGES\_PIPELINED,示意 forward 和 rescale edges 会主动开始 pipeline 模式。
taskmanager.network.sort-shuffle.min-parallelism,将这个参数设置为小于你的并行度,就能够开启 sort-merge shuffle;这个参数的设置须要思考一些其余的状况,具体的能够依照官网设置。
三、总结
本文着重从 shuffle、join 形式的抉择、对象重用、UDF 重用等方面介绍了京东在 Flink SQL 工作方面做的优化措施。另外,感激京东实时计算研发部付海涛等全副共事的反对与帮忙。
版权申明: 本文内容由阿里云实名注册用户自发奉献,版权归原作者所有,阿里云开发者社区不领有其著作权,亦不承当相应法律责任。具体规定请查看《阿里云开发者社区用户服务协定》和《阿里云开发者社区知识产权爱护指引》。如果您发现本社区中有涉嫌剽窃的内容,填写侵权投诉表单进行举报,一经查实,本社区将立即删除涉嫌侵权内容。