Spark SQL读取MySQL的形式
Spark SQL还包含一个能够应用JDBC从其余数据库读取数据的数据源。与应用JdbcRDD相比,应优先应用此性能。这是因为后果作为DataFrame返回,它们能够在Spark SQL中轻松解决或与其余数据源连贯。JDBC数据源也更易于应用Java或Python,因为它不须要用户提供ClassTag。
能够应用Data Sources API将近程数据库中的表加载为DataFrame或Spark SQL长期视图。用户能够在数据源选项中指定JDBC连贯属性。 user和password通常作为用于登录数据源的连贯属性。除连贯属性外,Spark还反对以下不辨别大小写的选项:
属性名称 | 解释 |
---|---|
url | 要连贯的JDBC URL |
dbtable | 读取或写入的JDBC表 |
query | 指定查问语句 |
driver | 用于连贯到该URL的JDBC驱动类名 |
partitionColumn, lowerBound, upperBound | 如果指定了这些选项,则必须全副指定。另外, numPartitions 必须指定 |
numPartitions | 表读写中可用于并行处理的最大分区数。这也确定了并发JDBC连贯的最大数量。如果要写入的分区数超过此限度,咱们能够通过coalesce(numPartitions) 在写入之前进行调用将其升高到此限度 |
queryTimeout | 默认为0 ,查问超时工夫 |
fetchsize | JDBC的获取大小,它确定每次要获取多少行。这能够帮忙进步JDBC驱动程序的性能 |
batchsize | 默认为1000,JDBC批处理大小,这能够帮忙进步JDBC驱动程序的性能。 |
isolationLevel | 事务隔离级别,实用于以后连贯。它能够是一个NONE ,READ_COMMITTED ,READ_UNCOMMITTED ,REPEATABLE_READ ,或SERIALIZABLE ,对应于由JDBC的连贯对象定义,缺省值为规范事务隔离级别READ_UNCOMMITTED 。此选项仅实用于写作。 |
sessionInitStatement | 在向近程数据库关上每个数据库会话之后,在开始读取数据之前,此选项将执行自定义SQL语句,应用它来实现会话初始化代码。 |
truncate | 这是与JDBC writer相干的选项。当SaveMode.Overwrite 启用时,就会清空指标表的内容,而不是删除和重建其现有的表。默认为false |
pushDownPredicate | 用于启用或禁用谓词下推到JDBC数据源的选项。默认值为true,在这种状况下,Spark将尽可能将过滤器下推到JDBC数据源。 |
源码
- SparkSession
/** * Returns a [[DataFrameReader]] that can be used to read non-streaming data in as a * `DataFrame`. * {{{ * sparkSession.read.parquet("/path/to/file.parquet") * sparkSession.read.schema(schema).json("/path/to/file.json") * }}} * * @since 2.0.0 */ def read: DataFrameReader = new DataFrameReader(self)
- DataFrameReader
// ...省略代码... /** *所有的数据由RDD的一个分区解决,如果你这个表很大,很可能会呈现OOM *能够应用DataFrameDF.rdd.partitions.size办法查看 */ def jdbc(url: String, table: String, properties: Properties): DataFrame = { assertNoSpecifiedSchema("jdbc") this.extraOptions ++= properties.asScala this.extraOptions += (JDBCOptions.JDBC_URL -> url, JDBCOptions.JDBC_TABLE_NAME -> table) format("jdbc").load() }/** * @param url 数据库url * @param table 表名 * @param columnName 分区字段名 * @param lowerBound `columnName`的最小值,用于分区步长 * @param upperBound `columnName`的最大值,用于分区步长. * @param numPartitions 分区数量 * @param connectionProperties 其余参数 * @since 1.4.0 */ def jdbc( url: String, table: String, columnName: String, lowerBound: Long, upperBound: Long, numPartitions: Int, connectionProperties: Properties): DataFrame = { this.extraOptions ++= Map( JDBCOptions.JDBC_PARTITION_COLUMN -> columnName, JDBCOptions.JDBC_LOWER_BOUND -> lowerBound.toString, JDBCOptions.JDBC_UPPER_BOUND -> upperBound.toString, JDBCOptions.JDBC_NUM_PARTITIONS -> numPartitions.toString) jdbc(url, table, connectionProperties) } /** * @param predicates 每个分区的where条件 * 比方:"id <= 1000", "score > 1000 and score <= 2000" * 将会分成两个分区 * @since 1.4.0 */ def jdbc( url: String, table: String, predicates: Array[String], connectionProperties: Properties): DataFrame = { assertNoSpecifiedSchema("jdbc") val params = extraOptions.toMap ++ connectionProperties.asScala.toMap val options = new JDBCOptions(url, table, params) val parts: Array[Partition] = predicates.zipWithIndex.map { case (part, i) => JDBCPartition(part, i) : Partition } val relation = JDBCRelation(parts, options)(sparkSession) sparkSession.baseRelationToDataFrame(relation) }
示例
private def runJdbcDatasetExample(spark: SparkSession): Unit = { // 从JDBC source加载数据(load) val jdbcDF = spark.read .format("jdbc") .option("url", "jdbc:mysql://127.0.0.1:3306/test") .option("dbtable", "mytable") .option("user", "root") .option("password", "root") .load() val connectionProperties = new Properties() connectionProperties.put("user", "root") connectionProperties.put("password", "root") val jdbcDF2 = spark.read .jdbc("jdbc:mysql://127.0.0.1:3306/test", "mytable", connectionProperties) // 指定读取schema的数据类型 connectionProperties.put("customSchema", "id DECIMAL(38, 0), name STRING") val jdbcDF3 = spark.read .jdbc("jdbc:mysql://127.0.0.1:3306/test", "mytable", connectionProperties) }
值得注意的是,下面的形式如果不指定分区的话,Spark默认会应用一个分区读取数据,这样在数据量特地大的状况下,会呈现OOM。在读取数据之后,调用DataFrameDF.rdd.partitions.size办法能够查看分区数。
Spark SQL批量写入MySQL
代码示例如下:
object BatchInsertMySQL { case class Person(name: String, age: Int) def main(args: Array[String]): Unit = { // 创立sparkSession对象 val conf = new SparkConf() .setAppName("BatchInsertMySQL") val spark: SparkSession = SparkSession.builder() .config(conf) .getOrCreate() import spark.implicits._ // MySQL连贯参数 val url = JDBCUtils.url val user = JDBCUtils.user val pwd = JDBCUtils.password // 创立Properties对象,设置连贯mysql的用户名和明码 val properties: Properties = new Properties() properties.setProperty("user", user) // 用户名 properties.setProperty("password", pwd) // 明码 properties.setProperty("driver", "com.mysql.jdbc.Driver") properties.setProperty("numPartitions","10") // 读取mysql中的表数据 val testDF: DataFrame = spark.read.jdbc(url, "test", properties) println("testDF的分区数: " + testDF.rdd.partitions.size) testDF.createOrReplaceTempView("test") testDF.persist(StorageLevel.MEMORY_AND_DISK) testDF.printSchema() val result = s"""-- SQL代码 """.stripMargin val resultBatch = spark.sql(result).as[Person] println("resultBatch的分区数: " + resultBatch.rdd.partitions.size) // 批量写入MySQL // 此处最好对解决的后果进行一次重分区 // 因为数据量特地大,会造成每个分区数据特地多 resultBatch.repartition(500).foreachPartition(record => { val list = new ListBuffer[Person] record.foreach(person => { val name = Person.name val age = Person.age list.append(Person(name,age)) }) upsertDateMatch(list) //执行批量插入数据 }) // 批量插入MySQL的办法 def upsertPerson(list: ListBuffer[Person]): Unit = { var connect: Connection = null var pstmt: PreparedStatement = null try { connect = JDBCUtils.getConnection() // 禁用主动提交 connect.setAutoCommit(false) val sql = "REPLACE INTO `person`(name, age)" + " VALUES(?, ?)" pstmt = connect.prepareStatement(sql) var batchIndex = 0 for (person <- list) { pstmt.setString(1, person.name) pstmt.setString(2, person.age) // 退出批次 pstmt.addBatch() batchIndex +=1 // 管制提交的数量, // MySQL的批量写入尽量限度提交批次的数据量,否则会把MySQL写挂!!! if(batchIndex % 1000 == 0 && batchIndex !=0){ pstmt.executeBatch() pstmt.clearBatch() } } // 提交批次 pstmt.executeBatch() connect.commit() } catch { case e: Exception => e.printStackTrace() } finally { JDBCUtils.closeConnection(connect, pstmt) } } spark.close() }}
JDBC连贯工具类:
object JDBCUtils { val user = "root" val password = "root" val url = "jdbc:mysql://localhost:3306/mydb" Class.forName("com.mysql.jdbc.Driver") // 获取连贯 def getConnection() = { DriverManager.getConnection(url,user,password) }// 开释连贯 def closeConnection(connection: Connection, pstmt: PreparedStatement): Unit = { try { if (pstmt != null) { pstmt.close() } } catch { case e: Exception => e.printStackTrace() } finally { if (connection != null) { connection.close() } } }}
总结
Spark写入大量数据到MySQL时,在写入之前尽量对写入的DF进行重分区解决,防止分区内数据过多。在写入时,要留神应用foreachPartition来进行写入,这样能够为每一个分区获取一个连贯,在分区外部设定批次提交,提交的批次不易过大,免得将数据库写挂。
公众号『大数据技术与数仓』,回复『材料』支付大数据资料包