Spark SQL 读取 MySQL 的形式
Spark SQL 还包含一个能够应用 JDBC 从其余数据库读取数据的数据源。与应用 JdbcRDD 相比,应优先应用此性能。这是因为后果作为 DataFrame 返回,它们能够在 Spark SQL 中轻松解决或与其余数据源连贯。JDBC 数据源也更易于应用 Java 或 Python,因为它不须要用户提供 ClassTag。
能够应用 Data Sources API 将近程数据库中的表加载为 DataFrame 或 Spark SQL 长期视图。用户能够在数据源选项中指定 JDBC 连贯属性。user 和 password 通常作为用于登录数据源的连贯属性。除连贯属性外,Spark 还反对以下不辨别大小写的选项:
属性名称 | 解释 |
---|---|
url |
要连贯的 JDBC URL |
dbtable |
读取或写入的 JDBC 表 |
query |
指定查问语句 |
driver |
用于连贯到该 URL 的 JDBC 驱动类名 |
partitionColumn, lowerBound, upperBound |
如果指定了这些选项,则必须全副指定。另外,numPartitions 必须指定 |
numPartitions |
表读写中可用于并行处理的最大分区数。这也确定了并发 JDBC 连贯的最大数量。如果要写入的分区数超过此限度,咱们能够通过 coalesce(numPartitions) 在写入之前进行调用将其升高到此限度 |
queryTimeout |
默认为0 ,查问超时工夫 |
fetchsize |
JDBC 的获取大小,它确定每次要获取多少行。这能够帮忙进步 JDBC 驱动程序的性能 |
batchsize |
默认为 1000,JDBC 批处理大小,这能够帮忙进步 JDBC 驱动程序的性能。 |
isolationLevel |
事务隔离级别,实用于以后连贯。它能够是一个NONE ,READ_COMMITTED ,READ_UNCOMMITTED ,REPEATABLE_READ ,或SERIALIZABLE ,对应于由 JDBC 的连贯对象定义,缺省值为规范事务隔离级别READ_UNCOMMITTED 。此选项仅实用于写作。 |
sessionInitStatement |
在向近程数据库关上每个数据库会话之后,在开始读取数据之前,此选项将执行自定义 SQL 语句,应用它来实现会话初始化代码。 |
truncate |
这是与 JDBC writer 相干的选项。当 SaveMode.Overwrite 启用时,就会清空指标表的内容,而不是删除和重建其现有的表。默认为false |
pushDownPredicate |
用于启用或禁用谓词下推到 JDBC 数据源的选项。默认值为 true,在这种状况下,Spark 将尽可能将过滤器下推到 JDBC 数据源。 |
源码
- SparkSession
/**
* Returns a [[DataFrameReader]] that can be used to read non-streaming data in as a
* `DataFrame`.
* {{{* sparkSession.read.parquet("/path/to/file.parquet")
* sparkSession.read.schema(schema).json("/path/to/file.json")
* }}}
*
* @since 2.0.0
*/
def read: DataFrameReader = new DataFrameReader(self)
- DataFrameReader
// ... 省略代码...
/**
* 所有的数据由 RDD 的一个分区解决,如果你这个表很大,很可能会呈现 OOM
* 能够应用 DataFrameDF.rdd.partitions.size 办法查看
*/
def jdbc(url: String, table: String, properties: Properties): DataFrame = {assertNoSpecifiedSchema("jdbc")
this.extraOptions ++= properties.asScala
this.extraOptions += (JDBCOptions.JDBC_URL -> url, JDBCOptions.JDBC_TABLE_NAME -> table)
format("jdbc").load()}
/**
* @param url 数据库 url
* @param table 表名
* @param columnName 分区字段名
* @param lowerBound `columnName` 的最小值, 用于分区步长
* @param upperBound `columnName` 的最大值, 用于分区步长.
* @param numPartitions 分区数量
* @param connectionProperties 其余参数
* @since 1.4.0
*/
def jdbc(
url: String,
table: String,
columnName: String,
lowerBound: Long,
upperBound: Long,
numPartitions: Int,
connectionProperties: Properties): DataFrame = {
this.extraOptions ++= Map(
JDBCOptions.JDBC_PARTITION_COLUMN -> columnName,
JDBCOptions.JDBC_LOWER_BOUND -> lowerBound.toString,
JDBCOptions.JDBC_UPPER_BOUND -> upperBound.toString,
JDBCOptions.JDBC_NUM_PARTITIONS -> numPartitions.toString)
jdbc(url, table, connectionProperties)
}
/**
* @param predicates 每个分区的 where 条件
* 比方:"id <= 1000", "score > 1000 and score <= 2000"
* 将会分成两个分区
* @since 1.4.0
*/
def jdbc(
url: String,
table: String,
predicates: Array[String],
connectionProperties: Properties): DataFrame = {assertNoSpecifiedSchema("jdbc")
val params = extraOptions.toMap ++ connectionProperties.asScala.toMap
val options = new JDBCOptions(url, table, params)
val parts: Array[Partition] = predicates.zipWithIndex.map {case (part, i) =>
JDBCPartition(part, i) : Partition
}
val relation = JDBCRelation(parts, options)(sparkSession)
sparkSession.baseRelationToDataFrame(relation)
}
示例
private def runJdbcDatasetExample(spark: SparkSession): Unit = {// 从 JDBC source 加载数据(load)
val jdbcDF = spark.read
.format("jdbc")
.option("url", "jdbc:mysql://127.0.0.1:3306/test")
.option("dbtable", "mytable")
.option("user", "root")
.option("password", "root")
.load()
val connectionProperties = new Properties()
connectionProperties.put("user", "root")
connectionProperties.put("password", "root")
val jdbcDF2 = spark.read
.jdbc("jdbc:mysql://127.0.0.1:3306/test", "mytable", connectionProperties)
// 指定读取 schema 的数据类型
connectionProperties.put("customSchema", "id DECIMAL(38, 0), name STRING")
val jdbcDF3 = spark.read
.jdbc("jdbc:mysql://127.0.0.1:3306/test", "mytable", connectionProperties)
}
值得注意的是,下面的形式如果不指定分区的话,Spark 默认会应用一个分区读取数据,这样在数据量特地大的状况下,会呈现 OOM。在读取数据之后,调用 DataFrameDF.rdd.partitions.size 办法能够查看分区数。
Spark SQL 批量写入 MySQL
代码示例如下:
object BatchInsertMySQL {case class Person(name: String, age: Int)
def main(args: Array[String]): Unit = {
// 创立 sparkSession 对象
val conf = new SparkConf()
.setAppName("BatchInsertMySQL")
val spark: SparkSession = SparkSession.builder()
.config(conf)
.getOrCreate()
import spark.implicits._
// MySQL 连贯参数
val url = JDBCUtils.url
val user = JDBCUtils.user
val pwd = JDBCUtils.password
// 创立 Properties 对象,设置连贯 mysql 的用户名和明码
val properties: Properties = new Properties()
properties.setProperty("user", user) // 用户名
properties.setProperty("password", pwd) // 明码
properties.setProperty("driver", "com.mysql.jdbc.Driver")
properties.setProperty("numPartitions","10")
// 读取 mysql 中的表数据
val testDF: DataFrame = spark.read.jdbc(url, "test", properties)
println("testDF 的分区数:" + testDF.rdd.partitions.size)
testDF.createOrReplaceTempView("test")
testDF.persist(StorageLevel.MEMORY_AND_DISK)
testDF.printSchema()
val result =
s"""-- SQL 代码""".stripMargin
val resultBatch = spark.sql(result).as[Person]
println("resultBatch 的分区数:" + resultBatch.rdd.partitions.size)
// 批量写入 MySQL
// 此处最好对解决的后果进行一次重分区
// 因为数据量特地大,会造成每个分区数据特地多
resultBatch.repartition(500).foreachPartition(record => {val list = new ListBuffer[Person]
record.foreach(person => {
val name = Person.name
val age = Person.age
list.append(Person(name,age))
})
upsertDateMatch(list) // 执行批量插入数据
})
// 批量插入 MySQL 的办法
def upsertPerson(list: ListBuffer[Person]): Unit = {
var connect: Connection = null
var pstmt: PreparedStatement = null
try {connect = JDBCUtils.getConnection()
// 禁用主动提交
connect.setAutoCommit(false)
val sql = "REPLACE INTO `person`(name, age)" +
"VALUES(?, ?)"
pstmt = connect.prepareStatement(sql)
var batchIndex = 0
for (person <- list) {pstmt.setString(1, person.name)
pstmt.setString(2, person.age)
// 退出批次
pstmt.addBatch()
batchIndex +=1
// 管制提交的数量,
// MySQL 的批量写入尽量限度提交批次的数据量,否则会把 MySQL 写挂!!!if(batchIndex % 1000 == 0 && batchIndex !=0){pstmt.executeBatch()
pstmt.clearBatch()}
}
// 提交批次
pstmt.executeBatch()
connect.commit()} catch {
case e: Exception =>
e.printStackTrace()} finally {JDBCUtils.closeConnection(connect, pstmt)
}
}
spark.close()}
}
JDBC 连贯工具类:
object JDBCUtils {
val user = "root"
val password = "root"
val url = "jdbc:mysql://localhost:3306/mydb"
Class.forName("com.mysql.jdbc.Driver")
// 获取连贯
def getConnection() = {DriverManager.getConnection(url,user,password)
}
// 开释连贯
def closeConnection(connection: Connection, pstmt: PreparedStatement): Unit = {
try {if (pstmt != null) {pstmt.close()
}
} catch {case e: Exception => e.printStackTrace()
} finally {if (connection != null) {connection.close()
}
}
}
}
总结
Spark 写入大量数据到 MySQL 时,在写入之前尽量对写入的 DF 进行重分区解决,防止分区内数据过多。在写入时,要留神应用 foreachPartition 来进行写入,这样能够为每一个分区获取一个连贯,在分区外部设定批次提交,提交的批次不易过大,免得将数据库写挂。
公众号『大数据技术与数仓』,回复『材料』支付大数据资料包