乐趣区

关于spark:Spark-SQL百万级数据批量读写入MySQL

Spark SQL 读取 MySQL 的形式

Spark SQL 还包含一个能够应用 JDBC 从其余数据库读取数据的数据源。与应用 JdbcRDD 相比,应优先应用此性能。这是因为后果作为 DataFrame 返回,它们能够在 Spark SQL 中轻松解决或与其余数据源连贯。JDBC 数据源也更易于应用 Java 或 Python,因为它不须要用户提供 ClassTag。

能够应用 Data Sources API 将近程数据库中的表加载为 DataFrame 或 Spark SQL 长期视图。用户能够在数据源选项中指定 JDBC 连贯属性。user 和 password 通常作为用于登录数据源的连贯属性。除连贯属性外,Spark 还反对以下不辨别大小写的选项:

属性名称 解释
url 要连贯的 JDBC URL
dbtable 读取或写入的 JDBC 表
query 指定查问语句
driver 用于连贯到该 URL 的 JDBC 驱动类名
partitionColumn, lowerBound, upperBound 如果指定了这些选项,则必须全副指定。另外,numPartitions必须指定
numPartitions 表读写中可用于并行处理的最大分区数。这也确定了并发 JDBC 连贯的最大数量。如果要写入的分区数超过此限度,咱们能够通过 coalesce(numPartitions) 在写入之前进行调用将其升高到此限度
queryTimeout 默认为0,查问超时工夫
fetchsize JDBC 的获取大小,它确定每次要获取多少行。这能够帮忙进步 JDBC 驱动程序的性能
batchsize 默认为 1000,JDBC 批处理大小,这能够帮忙进步 JDBC 驱动程序的性能。
isolationLevel 事务隔离级别,实用于以后连贯。它能够是一个NONEREAD_COMMITTEDREAD_UNCOMMITTEDREPEATABLE_READ,或SERIALIZABLE,对应于由 JDBC 的连贯对象定义,缺省值为规范事务隔离级别READ_UNCOMMITTED。此选项仅实用于写作。
sessionInitStatement 在向近程数据库关上每个数据库会话之后,在开始读取数据之前,此选项将执行自定义 SQL 语句,应用它来实现会话初始化代码。
truncate 这是与 JDBC writer 相干的选项。当 SaveMode.Overwrite 启用时,就会清空指标表的内容,而不是删除和重建其现有的表。默认为false
pushDownPredicate 用于启用或禁用谓词下推到 JDBC 数据源的选项。默认值为 true,在这种状况下,Spark 将尽可能将过滤器下推到 JDBC 数据源。

源码

  • SparkSession
/**
   * Returns a [[DataFrameReader]] that can be used to read non-streaming data in as a
   * `DataFrame`.
   * {{{*   sparkSession.read.parquet("/path/to/file.parquet")
   *   sparkSession.read.schema(schema).json("/path/to/file.json")
   * }}}
   *
   * @since 2.0.0
   */
  def read: DataFrameReader = new DataFrameReader(self)
  • DataFrameReader
  // ... 省略代码...
  /**
   * 所有的数据由 RDD 的一个分区解决,如果你这个表很大,很可能会呈现 OOM
   * 能够应用 DataFrameDF.rdd.partitions.size 办法查看
   */
  def jdbc(url: String, table: String, properties: Properties): DataFrame = {assertNoSpecifiedSchema("jdbc")
    this.extraOptions ++= properties.asScala
    this.extraOptions += (JDBCOptions.JDBC_URL -> url, JDBCOptions.JDBC_TABLE_NAME -> table)
    format("jdbc").load()}
/**
   * @param url 数据库 url
   * @param table 表名
   * @param columnName 分区字段名
   * @param lowerBound  `columnName` 的最小值, 用于分区步长
   * @param upperBound  `columnName` 的最大值, 用于分区步长.
   * @param numPartitions 分区数量 
   * @param connectionProperties 其余参数
   * @since 1.4.0
   */
  def jdbc(
      url: String,
      table: String,
      columnName: String,
      lowerBound: Long,
      upperBound: Long,
      numPartitions: Int,
      connectionProperties: Properties): DataFrame = {
    this.extraOptions ++= Map(
      JDBCOptions.JDBC_PARTITION_COLUMN -> columnName,
      JDBCOptions.JDBC_LOWER_BOUND -> lowerBound.toString,
      JDBCOptions.JDBC_UPPER_BOUND -> upperBound.toString,
      JDBCOptions.JDBC_NUM_PARTITIONS -> numPartitions.toString)
    jdbc(url, table, connectionProperties)
  }

  /**
   * @param predicates 每个分区的 where 条件
   * 比方:"id <= 1000", "score > 1000 and score <= 2000"
   * 将会分成两个分区
   * @since 1.4.0
   */
  def jdbc(
      url: String,
      table: String,
      predicates: Array[String],
      connectionProperties: Properties): DataFrame = {assertNoSpecifiedSchema("jdbc")
    val params = extraOptions.toMap ++ connectionProperties.asScala.toMap
    val options = new JDBCOptions(url, table, params)
    val parts: Array[Partition] = predicates.zipWithIndex.map {case (part, i) =>
      JDBCPartition(part, i) : Partition
    }
    val relation = JDBCRelation(parts, options)(sparkSession)
    sparkSession.baseRelationToDataFrame(relation)
  }

示例

    private def runJdbcDatasetExample(spark: SparkSession): Unit = {// 从 JDBC source 加载数据(load)
    val jdbcDF = spark.read
      .format("jdbc")
      .option("url", "jdbc:mysql://127.0.0.1:3306/test")
      .option("dbtable", "mytable")
      .option("user", "root")
      .option("password", "root")
      .load()

    val connectionProperties = new Properties()
    connectionProperties.put("user", "root")
    connectionProperties.put("password", "root")
    val jdbcDF2 = spark.read
      .jdbc("jdbc:mysql://127.0.0.1:3306/test", "mytable", connectionProperties)
    // 指定读取 schema 的数据类型
    connectionProperties.put("customSchema", "id DECIMAL(38, 0), name STRING")
    val jdbcDF3 = spark.read
      .jdbc("jdbc:mysql://127.0.0.1:3306/test", "mytable", connectionProperties)

  }

值得注意的是,下面的形式如果不指定分区的话,Spark 默认会应用一个分区读取数据,这样在数据量特地大的状况下,会呈现 OOM。在读取数据之后,调用 DataFrameDF.rdd.partitions.size 办法能够查看分区数。

Spark SQL 批量写入 MySQL

代码示例如下:

object BatchInsertMySQL {case class Person(name: String, age: Int)
  def main(args: Array[String]): Unit = {

    // 创立 sparkSession 对象
    val conf = new SparkConf()
      .setAppName("BatchInsertMySQL")
    val spark: SparkSession =  SparkSession.builder()
      .config(conf)
      .getOrCreate()
    import spark.implicits._
    // MySQL 连贯参数
    val url = JDBCUtils.url
    val user = JDBCUtils.user
    val pwd = JDBCUtils.password

    // 创立 Properties 对象,设置连贯 mysql 的用户名和明码
    val properties: Properties = new Properties()

    properties.setProperty("user", user) // 用户名
    properties.setProperty("password", pwd) // 明码
    properties.setProperty("driver", "com.mysql.jdbc.Driver")
    properties.setProperty("numPartitions","10")

    // 读取 mysql 中的表数据
    val testDF: DataFrame = spark.read.jdbc(url, "test", properties)
     println("testDF 的分区数:" + testDF.rdd.partitions.size)
   testDF.createOrReplaceTempView("test")
   testDF.persist(StorageLevel.MEMORY_AND_DISK)
   testDF.printSchema()

    val result =
      s"""-- SQL 代码""".stripMargin

    val resultBatch = spark.sql(result).as[Person]
    println("resultBatch 的分区数:" + resultBatch.rdd.partitions.size)

    // 批量写入 MySQL
    // 此处最好对解决的后果进行一次重分区
    // 因为数据量特地大,会造成每个分区数据特地多
    resultBatch.repartition(500).foreachPartition(record => {val list = new ListBuffer[Person]
      record.foreach(person => {
        val name = Person.name
        val age = Person.age
        list.append(Person(name,age))
      })
      upsertDateMatch(list) // 执行批量插入数据
    })
    // 批量插入 MySQL 的办法
    def upsertPerson(list: ListBuffer[Person]): Unit = {

      var connect: Connection = null
      var pstmt: PreparedStatement = null

      try {connect = JDBCUtils.getConnection()
        // 禁用主动提交
        connect.setAutoCommit(false)

        val sql = "REPLACE INTO `person`(name, age)" +
          "VALUES(?, ?)"

        pstmt = connect.prepareStatement(sql)

        var batchIndex = 0
        for (person <- list) {pstmt.setString(1, person.name)
          pstmt.setString(2, person.age)
          // 退出批次
          pstmt.addBatch()
          batchIndex +=1
          // 管制提交的数量,
          // MySQL 的批量写入尽量限度提交批次的数据量,否则会把 MySQL 写挂!!!if(batchIndex % 1000 == 0 && batchIndex !=0){pstmt.executeBatch()
            pstmt.clearBatch()}

        }
        // 提交批次
        pstmt.executeBatch()
        connect.commit()} catch {
        case e: Exception =>
          e.printStackTrace()} finally {JDBCUtils.closeConnection(connect, pstmt)
      }
    }

    spark.close()}
}

JDBC 连贯工具类:

object JDBCUtils {
  val user = "root"
  val password = "root"
  val url = "jdbc:mysql://localhost:3306/mydb"
  Class.forName("com.mysql.jdbc.Driver")
  // 获取连贯
  def getConnection() = {DriverManager.getConnection(url,user,password)
  }
// 开释连贯
  def closeConnection(connection: Connection, pstmt: PreparedStatement): Unit = {
    try {if (pstmt != null) {pstmt.close()
      }
    } catch {case e: Exception => e.printStackTrace()
    } finally {if (connection != null) {connection.close()
      }
    }
  }
}

总结

Spark 写入大量数据到 MySQL 时,在写入之前尽量对写入的 DF 进行重分区解决,防止分区内数据过多。在写入时,要留神应用 foreachPartition 来进行写入,这样能够为每一个分区获取一个连贯,在分区外部设定批次提交,提交的批次不易过大,免得将数据库写挂。

公众号『大数据技术与数仓』,回复『材料』支付大数据资料包

退出移动版