1. 前序
Greenplum 是一款优良的 mpp 数据库产品,官网举荐了几种将内部数据写入 Greenplum 形式,蕴含:通用的 Jdbc,gpload 以及 Pivotal Greenplum-Spark Connector 等。
- Jdbc:Jdbc 形式,写大数据量会很慢。
- gpload:适宜写大数据量数据,能并行写入。但其毛病是须要装置客户端,包含 gpfdist 等依赖,装置起来很麻烦。须要理解能够参考 gpload。
- Greenplum-Spark Connector:基于 Spark 并行处理,并行写入 Greenplum,并提供了并行读取的接口。也是接下来该文重点介绍的局部。
2. Greenplum-Spark Connector 读数据架构
一个 Spark application,是由 Driver 和 Executor 节点形成。当 Spark Application 应用 Greenplum-Spark Connector 加载 Greenplum 数据时,其 Driver 端会通过 JDBC 的形式申请 Greenplum 的 master 节点获取相干的元数据信息。Connector 将会依据这些元数据信息去决定 Spark 的 Executor 去怎么去并行的读取该表的数据。
Greenplum 数据库存储数据是按 segment 组织的,Greenplum-Spark Connector 在加载 Greenplum 数据时,须要指定 Greenplum 表的一个字段作为 Spark 的 partition 字段,Connector 会应用这个字段的值来计算,该 Greenplum 表的某个 segment 该被哪一个或多个 Spark partition 读取。
其读取过程如下:
- Spark Driver 通过 Jdbc 的形式连贯 Greenplum master,并读取指定表的相干元数据信息。而后依据指定的分区字段以及分区个数去决定 segment 怎么调配。
- Spark Executor 端会通过 Jdbc 的形式连贯 Greenplum master,创立 Greenplum 内部表。
- 而后 Spark Executor 通过 Http 形式连贯 Greenplum 的数据节点,获取指定的 segment 的数据。该获取数据的操作在 Spark Executor 并行执行。
3. Greenplum-Spark Connector 写数据流程
1.GSC 在 Spark Executor 端通过 Jetty 启动一个 Http 服务,将该服务封装为反对 Greenplum 的 gpfdist 协定。
2.GSC 在 Spark Executor 端通过 Jdbc 形式连贯 Greenplum master,创立 Greenplum 内部表,该内部表文件地址指向该 Executor 所启动的 gpfdist 协定地址。SQL 示例如下:
CREATE READABLE EXTERNAL TABLE"public"."spark_9dc823a6fa48df60_3d9d854163f8f07a_1_42" (LIKE "public"."rank_a1")LOCATION ('gpfdist://10.0.8.145:44772/spark_9dc823a6fa48df60_3d9d854163f8f07a_1_42')FORMAT 'CSV'(DELIMITER AS '|' NULL AS '')ENCODING'UTF-8'
3.GSC 在 Spark Executor 端通过 Jdbc 形式连贯 Greenplum master,而后执行 insert 语句至实在的表中,数据来源于这张内部表。SQL 示例如下:
INSERT INTO "public"."rank_a1"SELECT *FROM "public"."spark_9dc823a6fa48df60_3d9d854163f8f07a_1_42"
至于这张内部表的数据,是否落地以后 Executor 服务器,不分明。猜想不会落地,而是间接通过 Http 间接传递给了 Greenplum 对应的 Segment。
4.GSC 监听 onApplicationEnd 事件,在 Spark application 完结后,删除创立的内部表。
4. Greenplum-Spark Connector 应用
1. 下载 GSC Jar 包。下载地址:https://network.pivotal.io/pr…。可间接下载最新版本的 GSC,即 1.6.2,反对 Greenplum5.0 之后的版本。如:
greenplum-spark_2.11-1.6.2.jar
2.Maven 中引入:
<dependency> <groupId>io.pivotal.greenplum.spark</groupId> <artifactId>greenplum-spark_2.11</artifactId> <version>1.6.2</version> </dependency>
3. Spark 提交引入:
- spark-shell 或 spark-submit 时候,通过 -jars 退出 greenplum-spark_2.11-1.6.2.jar。
- 将 greenplum-spark_2.11-1.6.2.jar 与 Spark application 包打成 uber jar 提交。
5. Greenplum-Spark Connector 参数
6. 从 Greenplum 读取数据
1.DataFrameReader.load() 形式:
val gscReadOptionMap = Map("url" -> "jdbc:postgresql://gpdb-master:5432/testdb", "user" -> "bill", "password" -> "changeme", "dbschema" -> "myschema", "dbtable" -> "table1", "partitionColumn" -> "id")val gpdf = spark.read.format("greenplum") .options(gscReadOptionMap) .load()
2.spark.read.greenplum() 形式:
val url = "jdbc:postgresql://gpmaster.domain:15432/tutorial"val tblname = "avgdelay"val jprops = new Properties()jprops.put("user", "user2")jprops.put("password", "changeme")jprops.put("partitionColumn", "airlineid")val gpdf = spark.read.greenplum(url, tblname, jprops)
然而,这种形式必然须要引入一个隐式转换,官网也没介绍。
7. 写数据至 Greenplum
1. 写数据示例:
val gscWriteOptionMap = Map("url" -> "jdbc:postgresql://gpdb-master:5432/testdb", "user" -> "bill", "password" -> "changeme", "dbschema" -> "myschema", "dbtable" -> "table2",)dfToWrite.write.format("greenplum") .options(gscWriteOptionMap) .save()
在通过 GSC 写到 Greenplum 表时,如果表曾经存在或表中曾经存在数据,可通过 DataFrameWriter.mode(SaveMode savemode) 形式指定其输入模式。相干模式行为如下:
2.GSC 主动建表
2.1 创立的 Greenplum 表将不会有 distribution 列,如下为 GSC 生成的建表语句:
CREATE TABLE "public"."rank_a1" ("id" INTEGER NOT NULL, "rank" TEXT, "year" INTEGER NOT NULL, "gender" INTEGER NOT NULL, "count" INTEGER NOT NULL);
2.2 创立的 Greenplum 表的字段名将会应用 Spark DataFrame 中的字段名。
2.3 在 GSC 主动建表时,将会为字段名加上双引号,这将使 Greenplum 辨别大小写。
2.4 当 Spark DataFrame 的字段不为 nullable 时,GSC 主动建表的字段将是 NOT NULL。
2.5 将会对应的 Spark DataFrame 字段类型映射为 Greenplum 的字段类型。参考,字段类型映射表。
3. 提前手动建表
3.1 将 Spark DataFrame 的字段名的数据写至 Greenplum 表的对应的字段中。值得注意的是,GSC 在做映射的时候,是严格辨别大小写的。
3.2 写至 Greenplum 的字段的数据类型,与对应的 Spark DataFrame 统一,具体参见字段类型映射。
3.3 如果 Spark 数据中某列蕴含空数据,需确保对应的 Greenplum 表的列没有被指定为 NOT NULL。
3.4 Greenplum 表中建表时其字段程序能够与 Spark DataFrame 中不统一。但 Greenplum 表中不能呈现不存在在 Spark DataFrame 中的字段。如下例子:
// Greenplum 中的字段 CREATE TABLE public.rank_a1 (id int4 NOT NULL, "rank" text NULL, "year" int4 NOT NULL, gender int4 NOT NULL, count int4 NOT NULL)DISTRIBUTED BY (id);// Spark DataFrame 中的字段 var df = Seq((2, "a|b", 2, 2, 2),(3, "a|b", 3, 3, 3)).toDF("id", "rank", "year", "gender")// 在写数据至 public.rank_a1 表时,将会报错如下 Exception in thread "main" java.lang.IllegalArgumentException: requirement failed: The number of columns doesn't match.Old column names (5): _1, _2, _3, _4, _5New column names (4): id, rank, year, gender at scala.Predef$.require(Predef.scala:224) at org.apache.spark.sql.Dataset.toDF(Dataset.scala:435) at org.apache.spark.sql.DatasetHolder.toDF(DatasetHolder.scala:44) at com.lt.spark.greenplum.GreenplumWrite$.main(GreenplumWrite.scala:14) at com.lt.spark.greenplum.GreenplumWrite.main(GreenplumWrite.scala)
3.5 确保指定的用户对于该表有读写的权限,主动建表,须要有建表的权限。
8. Troubleshooting
1. 端口相干问题
2. Greenplum 连接数问题
当连贯 Greenplum 的连接数靠近 Greenplum 数据库配置的最大连接数(max_connections)时。Spark application 将会抛出 connection limit exceeded 谬误。
排查过程:
2.1 查问 Greenplum 数据的最大连接数:
postgres=# show max_connections; max_connections----------------- 250(1 row)
2.2 查问以后连贯 Greenplum 数据库的连接数:
postgres=# SELECT count(*) FROM pg_stat_activity;
2.3 查问指定的用户连贯 Greenplum 数据的连接数:
postgres=# SELECT count(*) FROM pg_stat_activity WHERE datname='tutorial';postgres=# SELECT count(*) FROM pg_stat_activity WHERE usename='user1';
2.4 查问 Greenplum 数据库闲暇和流动的连接数:
postgres=# SELECT count(*) FROM pg_stat_activity WHERE current_query='<IDLE>';postgres=# SELECT count(*) FROM pg_stat_activity WHERE current_query!='<IDLE>';
2.5 查问连贯 Greenplum 数据库名,用户名,客户端地址,客户端 ip,以后查问语句:
postgres=# SELECT datname, usename, client_addr, client_port, current_query FROM pg_stat_activity;
如果确认是 Spark application 应用连接数过多,则配置 JDBC Connection Pooling 相干参数,缩小连接数。
3. Greenplum Database Data Length Errors
在应用 Greenplum 4.x 或 5.x 的时候,可能会报出“data line too long”谬误。这是因为在 Greenplum 数据库中参数项“gp_max_csv_line_length”默认值是 1M。须要登陆 Greenplum master 批改这个参数值。示例如下,通过 gpconfig 批改该参数的值为 5M:
gpadmin@gpmaster$ gpconfig -c gp_max_csv_line_length -v 5242880gpadmin@gpmaster$ gpstop -u
9. 类型映射表
1. Greenplum to Spark
2. Spark to Greenplum
10. 参考
- Greenplum-Spark Connector 官网文档:https://greenplum-spark.docs….
- Greenplum 建表语句文档:https://gpdb.docs.pivotal.io/…_guide/sql_commands/CREATE_EXTERNAL_TABLE.html#topic1
- Greenplum 参数配置官网文档:https://gpdb.docs.pivotal.io/…_guide/config_params/guc-list.html#gp_max_csv_line_length