关于apache-doris:Apache-Doris-Broker数据导入

5次阅读

共计 15078 个字符,预计需要花费 38 分钟才能阅读完成。

Apache Doris Broker 数据导入

1. 概要

Broker load 是一个异步的导入形式,反对的数据源取决于 Broker 过程反对的数据源。

用户须要通过 MySQL 协定 创立 Broker load 导入,并通过查看导入命令查看导入后果

次要实用于以下场景:

  • 内部数据源(如 HDFS 等)读取数据,导入到 Doris 中。
  • 数据量在 几十到百 GB 级别。
  • 次要用于数据迁徙,或者定时批量导入

Broker load 反对文件类型:PARQUET、ORC、CSV 格局

2. 原理

用户在提交导入工作后,FE 会生成对应的 Plan 并依据目前 BE 的个数和文件的大小,将 Plan 分给 多个 BE 执行,每个 BE 执行一部分导入数据。

BE 在执行的过程中会从 Broker 拉取数据,在对数据 transform 之后将数据导入零碎。所有 BE 均实现导入,由 FE 最终决定导入是否胜利

3. 应用形式

Apache Doris Broker Load 形式是通过 doris 提供的 Broker load SQL 语句创立。

3.1 SQL 语法

上面是 SQL 语法,具体应用不分明的中央也能够在 Mysql Client 命令行下执行 help broker load查看具体应用办法

LOAD LABEL db_name.label_name 
(data_desc, ...)
WITH BROKER broker_name broker_properties
[PROPERTIES (key1=value1, ...)]

* data_desc:

    DATA INFILE ('file_path', ...)
    [NEGATIVE]
    INTO TABLE tbl_name
    [PARTITION (p1, p2)]
    [COLUMNS TERMINATED BY separator]
    [(col1, ...)]
    [PRECEDING FILTER predicate]
    [SET (k1=f1(xx), k2=f2(xx))]
    [WHERE predicate]

* broker_properties: 

    (key1=value1, ...)

3.2 实例

这里咱们应用 Broker load 形式,从 hive 分区表中将数据导入到 Doris 指定的表中

上面是我的 hive 表构造,数据格式是:ORC,分区字段是:budat

CREATE TABLE purchase_hive_iostock (
  lgort string NULL,
  mblnr string NULL,
  mblpo string NULL,
  werks string NULL,
  ebeln string NULL,
  ebelp string NULL,
  aufnr string NULL,
  rsnum string NULL,
  rspos string NULL,
  kdauf string NULL,
  kdpos string NULL,
  bwart string NULL,
  menge decimal(18, 3) NULL,,
  meins string NULL ,
  matnr string NULL ,
  bukrs string NULL ,
  waers string NULL ,
  dmbtr decimal(18, 3) NULL ,
  shkzg string NULL    ,
  bstme string NULL    ,
  bstmg decimal(13, 2) NULL ,
  temp1 string NULL ,
  temp2 string NULL ,
  temp3 string NULL ,
  temp4 string NULL ,
  temp5 string NULL ,
  rq datetime NULL        
)
COMMENT '出入库记录'
PARTITIONED BY(budat STRING)
....

Doris 中对应的表:

CREATE TABLE `ods_purchase_hive_iostock_delta` (
  `budat` date NOT NULL    ,
  `lgort` varchar(100) NULL,
  `mblnr` varchar(100) NULL,
  `mblpo` varchar(100) NULL,
  `werks` varchar(100) NULL,
  `ebeln` varchar(100) NULL,
  `ebelp` varchar(100) NULL,
  `aufnr` varchar(100) NULL,
  `rsnum` varchar(100) NULL,
  `rspos` varchar(100) NULL,
  `kdauf` varchar(100) NULL,
  `kdpos` varchar(100) NULL,
  `bwart` varchar(100) NULL,
  `menge` decimal(18, 3) NULL ,
  `meins` varchar(100) NULL,
  `matnr` varchar(100) NULL,
  `bukrs` varchar(100) NULL,
  `waers` varchar(100) NULL,
  `dmbtr` decimal(18, 3) NULL,
  `shkzg` varchar(10) NULL   ,
  `bstme` varchar(20) NULL   ,
  `bstmg` decimal(13, 2) NULL,
  `temp1` varchar(100) NULL ,
  `temp2` varchar(100) NULL ,
  `temp3` varchar(100) NULL ,
  `temp4` varchar(100) NULL ,
  `temp5` varchar(100) NULL ,
  `rq` datetime NULL ,
) ENGINE=OLAP
UNIQUE KEY(`budat`, `lgort`, `mblnr`, `mblpo`, `werks`)
COMMENT "出入库记录数据表"
PARTITION BY RANGE(`budat`)
(PARTITION P_000000 VALUES [('0000-01-01'), ('2021-01-01')),
PARTITION P_202101 VALUES [('2021-01-01'), ('2021-02-01')),
PARTITION P_202102 VALUES [('2021-02-01'), ('2021-03-01')),
PARTITION P_202103 VALUES [('2021-03-01'), ('2021-04-01')),
PARTITION P_202104 VALUES [('2021-04-01'), ('2021-05-01')),
PARTITION P_202105 VALUES [('2021-05-01'), ('2021-06-01')),
PARTITION P_202106 VALUES [('2021-06-01'), ('2021-07-01')),
PARTITION P_202107 VALUES [('2021-07-01'), ('2021-08-01')),
PARTITION P_202108 VALUES [('2021-08-01'), ('2021-09-01')),
PARTITION P_202109 VALUES [('2021-09-01'), ('2021-10-01')),
PARTITION P_202110 VALUES [('2021-10-01'), ('2021-11-01')),
PARTITION P_202111 VALUES [('2021-11-01'), ('2021-12-01')))
DISTRIBUTED BY HASH(`werks`) BUCKETS 5
PROPERTIES (
"replication_num" = "3",
"dynamic_partition.enable" = "true",
"dynamic_partition.time_unit" = "MONTH",
"dynamic_partition.time_zone" = "Asia/Shanghai",
"dynamic_partition.start" = "-2147483648",
"dynamic_partition.end" = "2",
"dynamic_partition.prefix" = "P_",
"dynamic_partition.replication_num" = "3",
"dynamic_partition.buckets" = "5",
"dynamic_partition.start_day_of_month" = "1",
"in_memory" = "false",
"storage_format" = "V2"
);

上面的语句是将 Hive 分区表中的数据导入到 Doris 对应的表中

LOAD LABEL order_bill_2021_0915_3
(DATA INFILE("hdfs://namenodeservice1/user/data/hive_db/data_ods.db/purchase_hive_iostock/*/*")
    INTO TABLE ods_purchase_hive_iostock_delta
    COLUMNS TERMINATED BY "\\x01"
    FORMAT AS "orc"   (lgort,mblnr,mblpo,werks,ebeln,ebelp,aufnr,rsnum,rspos,kdauf,kdpos,bwart,menge,meins,matnr,bukrs,waers,dmbtr,shkzg,bstme,bstmg,temp1,temp2,temp3,temp4,temp5,rq)
    COLUMNS FROM PATH AS (budat)
    SET (budat=budat,lgort=lgort,mblnr=mblnr,mblpo=mblpo,werks=werks,ebeln=ebeln,ebelp=ebelp,aufnr=aufnr,rsnum=rsnum,rspos=rspos,kdauf=kdauf,kdpos=kdpos,bwart=bwart,menge=menge,meins=meins,matnr=matnr,bukrs=bukrs,waers=waers,dmbtr=dmbtr,shkzg=shkzg,bstme=bstme,bstmg=bstmg,temp1=temp1,temp2=temp2,temp3=temp3,temp4=temp4,temp5=temp5,rq=rq)
    where 1=1
)
WITH BROKER "hdfs_broker"
(
    "dfs.nameservices"="hadoop",
    "dfs.ha.namenodes.eadhadoop" = "nn1,nn2",
    "dfs.namenode.rpc-address.eadhadoop.nn1" = "222:8000",
    "dfs.namenode.rpc-address.eadhadoop.nn2" = "117:8000",
    "dfs.client.failover.proxy.provider" = "org.apache.hadoop.hdfs.server.namenode.ha.ConfiguredFailoverProxyProvider",
    "hadoop.security.authentication" = "kerberos",
    "kerberos_principal" = "ddd.COM",
    "kerberos_keytab_content" = "BQHIININ1111URAAE="
)
PROPERTIES
(
    "timeout"="1200",
    "max_filter_ratio"="0.1"
);

而后提交工作就行了

3.3 认证形式

  • 反对简略认证拜访
  • 反对通过 kerberos 认证拜访
  • 反对 HDFS HA 模式拜访

3.3.1 简略认证形式

简略认证即 Hadoop 配置 hadoop.security.authenticationsimple

应用零碎用户拜访 HDFS。或者在 Broker 启动的环境变量中增加:HADOOP_USER_NAME。明码置空即可

(
    "username" = "user",
    "password" = ""
);

3.3.2 Kerberos 认证

下面示例应用的就是 Kerberos 认证形式。

该认证形式需提供以下信息:

  • hadoop.security.authentication:指定认证形式为 kerberos。
  • kerberos_principal:指定 kerberos 的 principal。
  • kerberos_keytab:指定 kerberos 的 keytab 文件门路。该文件必须为 Broker 过程所在服务器上的文件的绝对路径。并且能够被 Broker 过程拜访。
  • kerberos_keytab_content:指定 kerberos 中 keytab 文件内容通过 base64 编码之后的内容。这个跟 kerberos_keytab 配置二选一即可

示例:

(
    "hadoop.security.authentication" = "kerberos",
    "kerberos_principal" = "doris@YOUR.COM",
    "kerberos_keytab" = "/home/doris/my.keytab"
)

或者

(
    "hadoop.security.authentication" = "kerberos",
    "kerberos_principal" = "doris@YOUR.COM",
    "kerberos_keytab_content" = "ASDOWHDLAWIDJHWLDKSALDJSDIWALD"
)

如果采纳 Kerberos 认证形式,则部署 Broker 过程的时候须要 krb5.conf 文件,krb5.conf文件蕴含 Kerberos 的配置信息,通常,您应该将 krb5.conf 文件装置在目录 /etc 中。您能够通过设置环境变量 KRB5_CONFIG 笼罩默认地位。krb5.conf文件的内容示例如下:

[libdefaults]
    default_realm = DORIS.HADOOP
    default_tkt_enctypes = des3-hmac-sha1 des-cbc-crc
    default_tgs_enctypes = des3-hmac-sha1 des-cbc-crc
    dns_lookup_kdc = true
    dns_lookup_realm = false

[realms]
    DORIS.HADOOP = {kdc = kerberos-doris.hadoop.service:7005}

3.3.3 HDFS HA 模式

Doris 只是 HDFS HA 模式,上面这个配置用于拜访以 HA 模式部署的 HDFS 集群。

  • dfs.nameservices:指定 hdfs 服务的名字,自定义,如:”dfs.nameservices” = “my_ha”。
  • dfs.ha.namenodes.xxx:自定义 namenode 的名字, 多个名字以逗号分隔。其中 xxx 为 dfs.nameservices 中自定义的名字,如:“dfs.ha.namenodes.my_ha” = “my_nn”。
  • dfs.namenode.rpc-address.xxx.nn:指定 namenode 的 rpc 地址信息。其中 nn 示意 dfs.ha.namenodes.xxx 中配置的 namenode 的名字,如:”dfs.namenode.rpc-address.my_ha.my_nn” = “host:port”。
  • dfs.client.failover.proxy.provider:指定 client 连贯 namenode 的 provider,默认为:org.apache.hadoop.hdfs.server.namenode.ha.ConfiguredFailoverProxyProvider。

示例如下:

(
    "dfs.nameservices" = "my_ha",
    "dfs.ha.namenodes.my_ha" = "my_namenode1, my_namenode2",
    "dfs.namenode.rpc-address.my_ha.my_namenode1" = "nn1_host:rpc_port",
    "dfs.namenode.rpc-address.my_ha.my_namenode2" = "nn2_host:rpc_port",
    "dfs.client.failover.proxy.provider" = "org.apache.hadoop.hdfs.server.namenode.ha.ConfiguredFailoverProxyProvider"
)

如果 HA 模式和认证联合应用,示例如下:

(
    "username"="user",
    "password"="passwd",
    "dfs.nameservices" = "my_ha",
    "dfs.ha.namenodes.my_ha" = "my_namenode1, my_namenode2",
    "dfs.namenode.rpc-address.my_ha.my_namenode1" = "nn1_host:rpc_port",
    "dfs.namenode.rpc-address.my_ha.my_namenode2" = "nn2_host:rpc_port",
    "dfs.client.failover.proxy.provider" = "org.apache.hadoop.hdfs.server.namenode.ha.ConfiguredFailoverProxyProvider"
)

对于 HDFS 集群的配置能够写入 hdfs-site.xml 文件中,用户应用 Broker 过程读取 HDFS 集群的信息时,只须要填写集群的文件路径名和认证信息即可。

3.4 主要参数介绍

创立导入的具体语法执行 HELP BROKER LOAD 查看语法帮忙。这里次要介绍 Broker load 的创立导入语法中参数意义和注意事项:

3.4.1 Label

导入工作的标识。每个导入工作,都有一个在单 database 外部惟一的 Label。Label 是用户在导入命令中自定义的名称。通过这个 Label,用户能够查看对应导入工作的执行状况。

Label 的另一个作用,是避免用户反复导入雷同的数据。强烈推荐用户同一批次数据应用雷同的 label。这样同一批次数据的反复申请只会被承受一次,保障了 At-Most-Once 语义

当 Label 对应的导入作业状态为 CANCELLED 时,能够再次应用该 Label 提交导入作业

3.4.2 数据形容类参数

数据形容类参数次要指的是 Broker load 创立导入语句中的属于 data_desc 局部的参数。每组 data_desc 次要表述了本次导入波及到的数据源地址,ETL 函数,指标表及分区等信息

  • 多表导入

    Broker load 反对一次导入工作波及多张表,每个 Broker load 导入工作可在多个 data_desc 申明多张表来实现多表导入。每个独自的 data_desc 还能够指定属于该表的数据源地址。Broker load 保障了单次导入的多张表之间原子性胜利或失败。

  • negative

    data_desc中还能够设置数据取反导入。这个性能次要用于,当数据表中聚合列的类型都为 SUM 类型时。如果心愿撤销某一批导入的数据。则能够通过 negative 参数导入同一批数据。Doris 会主动为这一批数据在聚合列上数据取反,以达到打消同一批数据的性能。

  • partition

    data_desc 中能够指定待导入表的 partition 信息,如果待导入数据不属于指定的 partition 则不会被导入。同时,不在指定 Partition 的数据会被认为是谬误数据。

  • set column mapping

    data_desc 中的 SET 语句负责设置列函数变换,这里的列函数变换反对所有查问的等值表达式变换。如果原始数据的列和表中的列不一一对应,就须要用到这个属性。

  • preceding filter predicate

    用于过滤原始数据。原始数据是未经列映射、转换的数据。用户能够在对转换前的数据前进行一次过滤,选取冀望的数据,再进行转换。

  • where predicate

    data_desc 中的 WHERE 语句中负责过滤曾经实现 transform 的数据,被 filter 的数据不会进入容忍率的统计中。如果多个 data_desc 中申明了同一张表的多个条件的话,则会 merge 同一张表的多个条件,merge 策略是 AND

  • COLUMNS FROM PATH AS

    因为 Hive 的分区是体现在 HDFS 的门路上,咱们在导入的时候,能够通过这个参数动静的通过 HDFS 门路获取 hive 分区字段,

  • FORMAT

    数据源的数据文件格式,如果是 PARQUET 或者 ORC 格局的数据, 须要再文件头的列名与 doris 表中的列名统一

    示例:

    (tmp_c1,tmp_c2)
    SET
    (
        id=tmp_c2,
        name=tmp_c1
    )

代表获取在 parquet 或 orc 中以 (tmp_c1, tmp_c2) 为列名的列,映射到 doris 表中的 (id, name) 列。如果没有设置 set, 则以 column 中的列作为映射

3.4.3 导入作业参数

导入作业参数次要指的是 Broker load 创立导入语句中的属于 opt_properties局部的参数。导入作业参数是作用于整个导入作业的。

  • timeout

    导入作业的超时工夫(以秒为单位),用户能够在 opt_properties 中自行设置每个导入的超时工夫。导入工作在设定的 timeout 工夫内未实现则会被零碎勾销,变成 CANCELLED。Broker load 的默认导入超时工夫为 4 小时。

    通常状况下,用户不须要手动设置导入工作的超时工夫。当在默认超时工夫内无奈实现导入时,能够手动设置工作的超时工夫。

    举荐超时工夫

    总文件大小(MB)/ 用户 Doris 集群最慢导入速度(MB/s) > timeout >((总文件大小(MB) 待导入的表及相干 Roll up 表的个数)/ (10 导入并发数))

    导入并发数见文档最初的导入系统配置阐明,公式中的 10 为目前的导入限速 10MB/s。

    例如一个 1G 的待导入数据,待导入表蕴含 3 个 Rollup 表,以后的导入并发数为 3。则 timeout 的 最小值为 (1 * 1024 * 3) / (10 * 3) = 102 秒

    因为每个 Doris 集群的机器环境不同且集群并发的查问工作也不同,所以用户 Doris 集群的最慢导入速度须要用户本人依据历史的导入工作速度进行揣测。

  • max_filter_ratio

    导入工作的最大容忍率,默认为 0 容忍,取值范畴是 0~1。当导入的错误率超过该值,则导入失败。

    如果用户心愿疏忽谬误的行,能够通过设置这个参数大于 0,来保障导入能够胜利。

    计算公式为:

    max_filter_ratio = (dpp.abnorm.ALL / (dpp.abnorm.ALL + dpp.norm.ALL) )

    dpp.abnorm.ALL 示意数据品质不合格的行数。如类型不匹配,列数不匹配,长度不匹配等等。

    dpp.norm.ALL 指的是导入过程中正确数据的条数。能够通过 SHOW LOAD 命令查问导入工作的正确数据量。

    原始文件的行数 = dpp.abnorm.ALL + dpp.norm.ALL

  • exec_mem_limit

    导入内存限度。默认是 2GB。单位为字节。

  • strict_mode

    Broker load 导入能够开启 strict mode 模式。开启形式为 properties ("strict_mode" = "true")。默认的 strict mode 为敞开。

    strict mode 模式的意思是:对于导入过程中的列类型转换进行严格过滤。严格过滤的策略如下:

    1. 对于列类型转换来说,如果 strict mode 为 true,则谬误的数据将被 filter。这里的谬误数据是指:原始数据并不为空值,在参加列类型转换后后果为空值的这一类数据。
    2. 对于导入的某列由函数变换生成时,strict mode 对其不产生影响。
    3. 对于导入的某列类型蕴含范畴限度的,如果原始数据能失常通过类型转换,但无奈通过范畴限度的,strict mode 对其也不产生影响。例如:如果类型是 decimal(1,0), 原始数据为 10,则属于能够通过类型转换但不在列申明的范畴内。这种数据 strict 对其不产生影响。
  • merge_type 数据的合并类型,一共反对三种类型 APPEND、DELETE、MERGE 其中,APPEND 是默认值,示意这批数据全副须要追加到现有数据中,DELETE 示意删除与这批数据 key 雷同的所有行,MERGE 语义 须要与 delete 条件联结应用,示意满足 delete 条件的数据依照 DELETE 语义解决其余的依照 APPEND 语义解决

3.4.4 strict mode 与 source data 的导入关系

这里以列类型为 TinyInt 来举例

注:当表中的列容许导入空值时

源数据 源数据示例 string to int strict_mode result
空值 \N N/A true or false NULL
not null aaa or 2000 NULL true 有效数据(被过滤)
not null aaa NULL false NULL
not null 1 1 true or false 正确数据

这里以列类型为 Decimal(1,0) 举例

注:当表中的列容许导入空值时

源数据 源数据示例 string to int strict_mode result
空值 \N N/A true or false NULL
not null aaa NULL true 有效数据(被过滤)
not null aaa NULL false NULL
not null 1 or 10 1 true or false 正确数据

留神:10 尽管是一个超过范畴的值,然而因为其类型合乎 decimal 的要求,所以 strict mode 对其不产生影响。10 最初会在其余 ETL 解决流程中被过滤。但不会被 strict mode 过滤。

3.5 查看后果

Broker load 导入形式因为是异步的,所以用户必须将创立导入的 Label 记录,并且在 查看导入命令中应用 Label 来查看导入后果。查看导入命令在所有导入形式中是通用的,具体语法可执行 HELP SHOW LOAD 查看。

mysql> show load order by createtime desc limit 1\G
*************************** 1. row ***************************
         JobId: 76391
         Label: label1
         State: FINISHED
      Progress: ETL:N/A; LOAD:100%
          Type: BROKER
       EtlInfo: unselected.rows=4; dpp.abnorm.ALL=15; dpp.norm.ALL=28133376
      TaskInfo: cluster:N/A; timeout(s):10800; max_filter_ratio:5.0E-5
      ErrorMsg: N/A
    CreateTime: 2019-07-27 11:46:42
  EtlStartTime: 2019-07-27 11:46:44
 EtlFinishTime: 2019-07-27 11:46:44
 LoadStartTime: 2019-07-27 11:46:44
LoadFinishTime: 2019-07-27 11:50:16
           URL: http://192.168.1.10:8040/api/_load_error_log?file=__shard_4/error_log_insert_stmt_4bb00753932c491a-a6da6e2725415317_4bb00753932c491a_a6da6e2725415317
    JobDetails: {"Unfinished backends":{"9c3441027ff948a0-8287923329a2b6a7":[10002]},"ScannedRows":2390016,"TaskNumber":1,"All backends":{"9c3441027ff948a0-8287923329a2b6a7":[10002]},"FileNumber":1,"FileSize":1073741824}

上面次要介绍了查看导入命令返回后果集中参数意义:

  • JobId

    导入工作的惟一 ID,每个导入工作的 JobId 都不同,由零碎主动生成。与 Label 不同的是,JobId 永远不会雷同,而 Label 则能够在导入工作失败后被复用。

  • Label

    导入工作的标识。

  • State

    导入工作以后所处的阶段。在 Broker load 导入过程中次要会呈现 PENDING 和 LOADING 这两个导入中的状态。如果 Broker load 处于 PENDING 状态,则阐明以后导入工作正在期待被执行;LOADING 状态则示意正在执行中。

    导入工作的最终阶段有两个:CANCELLED 和 FINISHED,当 Load job 处于这两个阶段时,导入实现。其中 CANCELLED 为导入失败,FINISHED 为导入胜利。

  • Progress

    导入工作的进度形容。分为两种进度:ETL 和 LOAD,对应了导入流程的两个阶段 ETL 和 LOADING。目前 Broker load 因为只有 LOADING 阶段,所以 ETL 则会永远显示为 N/A

    LOAD 的进度范畴为:0~100%。

    LOAD 进度 = 以后实现导入的表个数 / 本次导入工作设计的总表个数 * 100%

    如果所有导入表均实现导入,此时 LOAD 的进度为 99% 导入进入到最初失效阶段,整个导入实现后,LOAD 的进度才会改为 100%。

    导入进度并不是线性的。所以如果一段时间内进度没有变动,并不代表导入没有在执行。

  • Type

    导入工作的类型。Broker load 的 type 取值只有 BROKER。

  • EtlInfo

    次要显示了导入的数据量指标 unselected.rows , dpp.norm.ALLdpp.abnorm.ALL。用户能够依据第一个数值判断 where 条件过滤了多少行,后两个指标验证以后导入工作的错误率是否超过 max_filter_ratio

    三个指标之和就是原始数据量的总行数。

  • TaskInfo

    次要显示了以后导入工作参数,也就是创立 Broker load 导入工作时用户指定的导入工作参数,包含:clustertimeoutmax_filter_ratio

  • ErrorMsg

    在导入工作状态为 CANCELLED,会显示失败的起因,显示分两局部:type 和 msg,如果导入工作胜利则显示 N/A

    type 的取值意义:

    USER_CANCEL: 用户勾销的工作
    ETL_RUN_FAIL:在 ETL 阶段失败的导入工作
    ETL_QUALITY_UNSATISFIED:数据品质不合格,也就是谬误数据率超过了 max_filter_ratio
    LOAD_RUN_FAIL:在 LOADING 阶段失败的导入工作
    TIMEOUT:导入工作没在超时工夫内实现
    UNKNOWN:未知的导入谬误
  • CreateTime/EtlStartTime/EtlFinishTime/LoadStartTime/LoadFinishTime

    这几个值别离代表导入创立的工夫,ETL 阶段开始的工夫,ETL 阶段实现的工夫,Loading 阶段开始的工夫和整个导入工作实现的工夫。

    Broker load 导入因为没有 ETL 阶段,所以其 EtlStartTime, EtlFinishTime, LoadStartTime 被设置为同一个值。

    导入工作长时间停留在 CreateTime,而 LoadStartTime 为 N/A 则阐明目前导入工作沉积重大。用户可缩小导入提交的频率。

    LoadFinishTime - CreateTime = 整个导入工作所耗费工夫
    LoadFinishTime - LoadStartTime = 整个 Broker load 导入工作执行工夫 = 整个导入工作所耗费工夫 - 导入工作期待的工夫
  • URL

    导入工作的谬误数据样例,拜访 URL 地址既可获取本次导入的谬误数据样例。当本次导入不存在谬误数据时,URL 字段则为 N/A。

  • JobDetails

    显示一些作业的具体运行状态。包含导入文件的个数、总大小(字节)、子工作个数、已解决的原始行数,运行子工作的 BE 节点 Id,未实现的 BE 节点 Id。

    {"Unfinished backends":{"9c3441027ff948a0-8287923329a2b6a7":[10002]},"ScannedRows":2390016,"TaskNumber":1,"All backends":{"9c3441027ff948a0-8287923329a2b6a7":[10002]},"FileNumber":1,"FileSize":1073741824}

    其中已解决的原始行数,每 5 秒更新一次。该行数仅用于展现以后的进度,不代表最终理论的解决行数。理论解决行数以 EtlInfo 中显示的为准。

3.6 勾销导入

当 Broker load 作业状态不为 CANCELLED 或 FINISHED 时,能够被用户手动勾销。勾销时须要指定待勾销导入工作的 Label。勾销导入命令语法可执行 HELP CANCEL LOAD查看

3.7 Broker 导入相干参数

以下三个参数次要是为了管制导入的速度

  • min_bytes_per_broker_scanner/max_bytes_per_broker_scanner/max_broker_concurrency

    前两个配置限度了单个 BE 解决的数据量的最小和最大值。第三个配置限度了一个作业的最大的导入并发数。最小解决的数据量,最大并发数,源文件的大小和以后集群 BE 的个数 独特决定了本次导入的并发数

    本次导入并发数 = Math.min(源文件大小 / 最小处理量,最大并发数,以后 BE 节点个数)
    本次导入单个 BE 的处理量 = 源文件大小 / 本次导入的并发数

    通常一个导入作业反对的最大数据量为 max_bytes_per_broker_scanner * BE 节点数。如果须要导入更大数据量,则须要适当调整 max_bytes_per_broker_scanner 参数的大小。

    默认配置:

    参数名:min_bytes_per_broker_scanner,默认 64MB,单位 bytes。参数名:max_broker_concurrency,默认 10。参数名:max_bytes_per_broker_scanner,默认 3G,单位 bytes。

4. 最佳实际

应用 Broker load 最适宜的场景就是原始数据在文件系统(HDFS 等)中的场景。其次,因为 Broker load 是单次导入中惟一的一种异步导入的形式,所以如果用户在导入大文件中,须要应用异步接入,也能够思考应用 Broker load

上面以单个 BE 应用 Broker 形式导入,在导入的数据量上的倡议,如果你有多个 BE,数据量应该乘以 BE 的数量来进行计算

  1. 3G 数据

    用户能够间接提交 Broker load 创立导入申请。

  2. 3G 以上

    因为单个导入 BE 最大的处理量为 3G,超过 3G 的待导入文件就须要通过调整 Broker load 的导入参数来实现大文件的导入

    • 最大扫描量和最大并发数

      依据以后 BE 的个数和原始文件的大小批改单个 BE 的最大扫描量和最大并发数

      批改 fe.conf 中配置
      
      max_broker_concurrency = BE 个数
      以后导入工作单个 BE 解决的数据量 = 原始文件大小 / max_broker_concurrency
      max_bytes_per_broker_scanner >= 以后导入工作单个 BE 解决的数据量
      
      比方一个 100G 的文件,集群的 BE 个数为 10 个
      max_broker_concurrency = 10
      max_bytes_per_broker_scanner >= 10G = 100G / 10

      批改后,所有的 BE 会并发的解决导入工作,每个 BE 解决原始文件的一部分。

      留神:上述两个 FE 中的配置均为系统配置,也就是说其批改是作用于所有的 Broker load 的工作的。

    • 超时工夫(timeout)

      在创立导入的时候自定义以后导入工作的 timeout 工夫

      以后导入工作单个 BE 解决的数据量 / 用户 Doris 集群最慢导入速度(MB/s) >= 以后导入工作的 timeout 工夫 >= 以后导入工作单个 BE 解决的数据量 / 10M/s
      
      比方一个 100G 的文件,集群的 BE 个数为 10 个
      timeout >= 1000s = 10G / 10M/s
    • 默认的导入最大超时工夫

      当用户发现第二步计算出的 timeout 工夫超过零碎默认的导入最大超时工夫 4 小时

      这时候不举荐用户将导入最大超时工夫间接改大来解决问题。单个导入工夫如果超过默认的导入最大超时工夫 4 小时,最好是通过切分待导入文件并且分屡次导入来解决问题。次要起因是:单次导入超过 4 小时的话,导入失败后重试的工夫老本很高。

      能够通过如下公式计算出 Doris 集群冀望最大导入文件数据量:

      冀望最大导入文件数据量 = 14400s * 10M/s * BE 个数
      比方:集群的 BE 个数为 10 个
      冀望最大导入文件数据量 = 14400s * 10M/s * 10 = 1440000M ≈ 1440G
      
      留神:个别用户的环境可能达不到 10M/s 的速度,所以倡议超过 500G 的文件都进行文件切分,再导入。

5. 性能剖析

能够在提交 LOAD 作业前,先执行 set is_report_success=true 关上会话变量。而后提交导入作业。待导入作业实现后,能够在 FE 的 web 页面的 Queris 标签中查看到导入作业的 Profile。

这个 Profile 能够帮忙剖析导入作业的运行状态。

以后只有作业胜利执行后,能力查看 Profile。

这个同样实用去其余作业,包含查问

正文完
 0