在scala中使用sparkSQL连接MySQL并添加新数据

发布于:2025-05-14 ⋅ 阅读:(8) ⋅ 点赞:(0)

以下是使用 Spark SQL(Scala)连接 MySQL 并添加新数据的完整代码示例:

scala

import org.apache.spark.sql.SparkSession

object MySQLSparkExample {
  def main(args: Array[String]): Unit = {
    // 创建SparkSession
    val spark = SparkSession.builder
      .appName("MySQLDataInsertExample")
      .config("spark.master", "local[*]")
      .config("spark.jars.packages", "mysql:mysql-connector-java:8.0.26")
      .getOrCreate()

    // 配置MySQL连接参数
    val jdbcUrl = "jdbc:mysql://localhost:3306/your_database?useSSL=false"
    val connectionProperties = new java.util.Properties()
    connectionProperties.setProperty("user", "your_username")
    connectionProperties.setProperty("password", "your_password")
    connectionProperties.setProperty("driver", "com.mysql.cj.jdbc.Driver")

    try {
      // 1. 读取现有数据示例
      val existingData = spark.read.jdbc(jdbcUrl, "employees", connectionProperties)
      println("现有数据:")
      existingData.show()

      // 2. 创建要添加的新数据
      import spark.implicits._
      val newData = Seq(
        (1001, "John Doe", "Engineering", 5000.0),
        (1002, "Jane Smith", "Marketing", 6000.0)
      ).toDF("id", "name", "department", "salary")

      // 3. 将新数据追加到MySQL表
      newData.write
        .mode("append")
        .jdbc(jdbcUrl, "employees", connectionProperties)

      println("数据添加成功!")

      // 4. 验证添加后的数据
      val updatedData = spark.read.jdbc(jdbcUrl, "employees", connectionProperties)
      println("添加后的数据:")
      updatedData.show()

    } catch {
      case e: Exception =>
        println(s"操作失败: ${e.getMessage}")
        e.printStackTrace()
    } finally {
      // 关闭SparkSession
      spark.stop()
    }
  }
}

关键配置说明:

  1. 依赖配置

    scala

    .config("spark.jars.packages", "mysql:mysql-connector-java:8.0.26")
    
  2. JDBC URL 格式

    scala

    jdbc:mysql://<host>:<port>/<database>?useSSL=false
    
  3. 写入模式

    • append:追加数据(不会删除现有数据)
    • overwrite:覆盖表(先删除再插入)
    • errorIfExists:如果表存在则报错(默认)

执行步骤:

  1. 确保 MySQL 服务已启动
  2. 创建测试表:

    sql

    CREATE TABLE employees (
      id INT PRIMARY KEY,
      name VARCHAR(50),
      department VARCHAR(50),
      salary DOUBLE
    );
    
  3. 运行 Spark 应用:

    bash

    spark-submit --class MySQLSparkExample \
      --master local[*] \
      --packages mysql:mysql-connector-java:8.0.26 \
      your-application.jar
    

注意事项:

  1. 替换数据库连接参数:

    • your_database
    • your_username
    • your_password
  2. 如果遇到时区问题,可在 URL 中添加:

    scala

    ?serverTimezone=UTC
    
  3. 确保 MySQL 用户有写入权限:

    sql

    GRANT INSERT ON your_database.employees TO 'your_username'@'%';
    
  4. 对于生产环境,建议:

    • 使用连接池(如 HikariCP)
    • 启用 SSL 加密
    • 配置适当的重试机制
    • 监控数据库连接状态

网站公告

今日签到

点亮在社区的每一天
去签到