乐趣区

关于mysql:SparkSQL执行update操作修改mysql数据

这是不须要改源码的形式

//user 表样例类
case class User1(id: Long, name: String, password: String, imgUrl: String, update_date: String)

object SparkSQLUpdateMySQLOfJDBC {def main(args: Array[String]): Unit = {
    //SparkSession
    val spark: SparkSession = SparkSession.builder()
      .appName("SparkSqlToMysql")
      .master("local")
      .getOrCreate()

    // 读取 json/csv 文件数据
    val df = spark.read.json("data/user.json")
    df.show()
    val data: Array[Row] = df.collect()

    // 创立数据库连贯
    val connection = DriverManager.getConnection("jdbc:mysql://localhost:3306/ssm?characterEcoding=UTF-8",
      "root", "000000")
      
    // 申明执行的 SQL
    val statement = connection.prepareStatement("update user set name=?, password=?, imgUrl=?, update_date=? where id=?")

    // 组装参数
    val now: String = DateTimeFormatter.ofPattern("yyyy-MM-dd HH:mm:ss").format(LocalDateTime.now)
    data.foreach(
      u => {// statement.setObject(sql 参数占位符的地位, 样例类的属性)
        statement.setObject(1, u.getString(2))
        statement.setObject(2, u.getString(3))
        statement.setObject(3, u.getString(4))
        statement.setObject(4, now)
        statement.setObject(5, u.getLong(0))
        statement.addBatch() // 批量执行}
    )

    // 执行 SQL
    statement.executeBatch()

    spark.stop()}
}

写入中文后有呈现乱码 ’??’ 的状况, 有晓得的大佬指导一下

退出移动版