以下是使用 Spark SQL(Scala)连接 MySQL 并添加新数据的完整代码示例:
scala
import org.apache.spark.sql.SparkSession
object MySQLSparkExample {
def main(args: Array[String]): Unit = {
// 创建SparkSession
val spark = SparkSession.builder
.appName("MySQLDataInsertExample")
.config("spark.master", "local[*]")
.config("spark.jars.packages", "mysql:mysql-connector-java:8.0.26")
.getOrCreate()
// 配置MySQL连接参数
val jdbcUrl = "jdbc:mysql://localhost:3306/your_database?useSSL=false"
val connectionProperties = new java.util.Properties()
connectionProperties.setProperty("user", "your_username")
connectionProperties.setProperty("password", "your_password")
connectionProperties.setProperty("driver", "com.mysql.cj.jdbc.Driver")
try {
// 1. 读取现有数据示例
val existingData = spark.read.jdbc(jdbcUrl, "employees", connectionProperties)
println("现有数据:")
existingData.show()
// 2. 创建要添加的新数据
import spark.implicits._
val newData = Seq(
(1001, "John Doe", "Engineering", 5000.0),
(1002, "Jane Smith", "Marketing", 6000.0)
).toDF("id", "name", "department", "salary")
// 3. 将新数据追加到MySQL表
newData.write
.mode("append")
.jdbc(jdbcUrl, "employees", connectionProperties)
println("数据添加成功!")
// 4. 验证添加后的数据
val updatedData = spark.read.jdbc(jdbcUrl, "employees", connectionProperties)
println("添加后的数据:")
updatedData.show()
} catch {
case e: Exception =>
println(s"操作失败: ${e.getMessage}")
e.printStackTrace()
} finally {
// 关闭SparkSession
spark.stop()
}
}
}
关键配置说明:
依赖配置:
scala
.config("spark.jars.packages", "mysql:mysql-connector-java:8.0.26")
JDBC URL 格式:
scala
jdbc:mysql://<host>:<port>/<database>?useSSL=false
写入模式:
append
:追加数据(不会删除现有数据)overwrite
:覆盖表(先删除再插入)errorIfExists
:如果表存在则报错(默认)
执行步骤:
- 确保 MySQL 服务已启动
- 创建测试表:
sql
CREATE TABLE employees ( id INT PRIMARY KEY, name VARCHAR(50), department VARCHAR(50), salary DOUBLE );
- 运行 Spark 应用:
bash
spark-submit --class MySQLSparkExample \ --master local[*] \ --packages mysql:mysql-connector-java:8.0.26 \ your-application.jar
注意事项:
替换数据库连接参数:
your_database
your_username
your_password
如果遇到时区问题,可在 URL 中添加:
scala
?serverTimezone=UTC
确保 MySQL 用户有写入权限:
sql
GRANT INSERT ON your_database.employees TO 'your_username'@'%';
对于生产环境,建议:
- 使用连接池(如 HikariCP)
- 启用 SSL 加密
- 配置适当的重试机制
- 监控数据库连接状态