EMR Serverless Spark 是一款兼容开源 Spark 的高性能 Lakehouse 产品。它为用户提供任务开发、调试、发布、调度和运维等全方位的产品化服务,显著简化了大数据计算的工作流程,使用户能更专注于数据分析和价值提炼。
StarRocks官方提供了Spark Connector用于Spark和StarRocks之间的数据读写,EMR Serverless Spark可以在开发时添加对应的配置连接StarRocks。本文为您介绍在EMR Serverless Spark中实现StarRocks的读取和写入操作。
EMR Serverless Spark 新用户可免费领取 1000 CU*小时 资源包,快速体验 ETL 开发、任务调度、数据查询与分析全流程。(链接:https://www.aliyun.com/product/emr/getting-started?utm_content=g_1000402199)
前提条件
- 已创建Serverless Spark工作空间,详情请参见创建工作空间。(链接:https://x.sm.cn/AN7vPD2)
- 已创建EMR Serverless StarRocks实例,详情请参见创建实例。(链接:https://x.sm.cn/CAbeJiu)
使用限制
EMR Serverless Spark引擎的版本要求为esr-2.5.0、esr-3.1.0、esr-4.1.0及以上版本。
操作流程
步骤一:获取 Spark Connector JAR 并上传至OSS
- 参见使用Spark Connector读取数据,选择相应的方式下载对应版本的Spark Connector JAR。(链接:https://x.sm.cn/v4QPaz)
例如,本文选择直接下载已经编译好的JAR,即从Maven Central Repository获取不同版本的Connector JAR包。(链接:https://x.sm.cn/DLYRbXP)
说明:Connector JAR包的命名格式为starrocks-spark-connec``tor-${spark_version}_${scala_version}-${connector_version}.jar
。例如,您使用的引擎版本为esr-4.1.0 (Spark 3.5.2, Scala 2.12),想使用1.1.2版本的Connector,则可以选择starrocks-spark-connector-3.5_2.12-1.1.2.jar
。
- 将下载的Spark Connector JAR上传至阿里云OSS中,上传操作可以参见简单上传。(链接:https://x.sm.cn/AcdD9UZ)
步骤二:添加网络连接
- 获取网络信息。
您可以在EMR Serverless Starrocks页面,进入目标StarRocks实例的实例详情页面,以获取该实例的专有网络和交换机信息。(链接:https://x.sm.cn/D5dHUwu)
- 新增网络连接。
- 在EMR Serverless Spark页面,进入目标Spark工作空间的网络连接页面**,单击新增网络连接**。(链接:https://x.sm.cn/FLs7Nq)
- 在新增网络连接对话框中,输入连接名称,并选择之前获取到的StarRocks实例的专有网络和交换机信息,然后单击确定。
更多网络连接信息,请参见 EMR Serverless Spark与其他VPC间网络互通。(链接:https://x.sm.cn/AURlOSD)
步骤三:在 StarRocks 中创建库表
连接StarRocks实例,详情请参见通过EMR StarRocks Manager连接StarRocks实例。(链接:https://x.sm.cn/JGSgejX)
在SQL Editor的查询列表页面,单击文件或者右侧区域的
图标,然后单击确认以新增文件。
在新增的文件中输入以下SQL语句,单击运行。
CREATE DATABASE `testdb`;
CREATE TABLE `testdb`.`score_board`
(
`id` int(11) NOT NULL COMMENT "",
`name` varchar(65533) NULL DEFAULT "" COMMENT "",
`score` int(11) NOT NULL DEFAULT "0" COMMENT ""
)
ENGINE=OLAP
PRIMARY KEY(`id`)
COMMENT "OLAP"
DISTRIBUTED BY HASH(`id`);
通过 Serverless Spark 读写 StarRocks
方式一:使用SQL会话、Notebook会话读写StarRocks
会话类型更多介绍,请参见会话管理。(链接:https://x.sm.cn/8MqzEJP)
SQL 会话
- 通过Serverless Spark向StarRocks写入数据。
a. 创建SQL会话,详情请参见管理SQL会话。(链接:https://x.sm.cn/CxvVvQR)
创建会话时,选择与 StarRocks Connector 版本对应的引擎版本,在网络连接中选择上一步创建好的网络连接,并在 **Spark配置 **中添加以下参数来加载Spark Connector。
spark.user.defined.jars oss://<bucketname>/path/connector.jar
其中,oss://<bucketname>/path/connector.jar
为您步骤一中上传至OSS的Spark Connector的路径。例如,oss://emr-oss/spark/starrocks-spark-connector-3.5_2.12-1.1.2.jar
。
b. 在数据开发页面,创建一个SQL > SparkSQL类型的任务,然后在右上角选择创建好的SQL会话。更多操作,请参见SparkSQL开发。(链接:https://x.sm.cn/9rcGjIi)
c. 拷贝如下代码到新增的SparkSQL页签中,并根据需要修改相应的参数信息,然后单击运行。
CREATE TABLE score_board
USING starrocks
OPTIONS
(
"starrocks.table.identifier" = "testdb.score_board",
"starrocks.fe.http.url" = "<fe_host>:<fe_http_port>",
"starrocks.fe.jdbc.url" = "jdbc:mysql://<fe_host>:<fe_query_port>",
"starrocks.user" = "<user>",
"starrocks.password" = "<password>"
);
INSERT INTO `score_board` VALUES (1, "starrocks", 100), (2, "spark", 100);
其中,涉及参数说明如下:
<fe_host>
:Serverless StarRocks实例中FE的内网或公网地址。您可以在实例详情页面的FE详情区域查看。- 如果使用内网地址,请确保在同一VPC内。
- 如果使用公网地址,需确保安全组规则允许相应的端口通信,详情请参见网络访问与安全设置。(链接:https://x.sm.cn/ApMsVuF)
<fe_http_port>
:Serverless StarRocks实例中FE的HTTP端口(默认为8030)。您可以在实例详情页面的FE详情区域查看。<fe_query_port>
:Serverless StarRocks实例中FE的查询端口(默认为9030)。您可以在实例详情页面的FE详情区域查看。<user>
:Serverless StarRocks实例的用户名。默认提供admin用户,具有管理员权限。您也可以通过用户管理页面新增用户来连接。新增用户操作,请参见管理用户及数据授权。(链接:https://x.sm.cn/AiIP1PP)<password>
:用户<user>
对应的密码。
- 通过Serverless Spark查询写入的数据。
在本文示例中,我们是在上述的SparkSQL任务中创建了一个临时视图 test_view
,然后通过该视图查询 score_board 表的数据。拷贝如下代码到新增的
SparkSQL页签中,选中代码后单击运行选中。
CREATE TEMPORARY VIEW test_view
USING starrocks
OPTIONS
(
"starrocks.table.identifier" = "testdb.score_board",
"starrocks.fe.http.url" = "<fe_host>:<fe_http_port>",
"starrocks.fe.jdbc.url" = "jdbc:mysql://<fe_host>:<fe_query_port>",
"starrocks.user" = "<user>",
"starrocks.password" = "<password>"
);
SELECT * FROM test_view;
返回信息如下图所示。
Notebook 会话
- 通过Serverless Spark向StarRocks写入数据。
a. 创建Notebook会话,详情请参见管理Notebook会话。(链接:https://x.sm.cn/DZ8b4bR)
创建会话时,选择与StarRocks Connector版本对应的引擎版本,在网络连接中选择上一步创建好的网络连接,并在Spark配置中添加以下参数来加载Spark Connector。
spark.user.defined.jars oss://<bucketname>/path/connector.jar
其中,oss://<bucketname>/path/connector.jar
为您步骤一中上传至OSS的Spark Connector的路径。例如,oss://emr-oss/spark/starrocks-spark-connector-3.5_2.12-1.1.2.jar
。
b. 在数据开发页面,选择创建一个Python > Notebook类型的任务,然后在右上角选择创建的Notebook会话。
更多操作,请参见管理Notebook会话。
c. 拷贝如下代码到新增的Notebook的Python单元格中,单击运行。
# 替换为您的Serverless StarRocks配置。
fe_host = "<fe_host>"
fe_http_port = "<fe_http_port>"
fe_query_port = "<fe_query_port>"
user = "<user>"
password = "<password>"
# 创建表
create_table_sql = f"""
CREATE TABLE score_board
USING starrocks
OPTIONS (
"starrocks.table.identifier" = "testdb.score_board",
"starrocks.fe.http.url" = "{fe_host}:{fe_http_port}",
"starrocks.fe.jdbc.url" = "jdbc:mysql://{fe_host}:{fe_query_port}",
"starrocks.user" = "{user}",
"starrocks.password" = "{password}"
)
"""
spark.sql(create_table_sql)
#插入数据
insert_data_sql = """
INSERT INTO `score_board` VALUES (1, "starrocks", 100), (2, "spark", 100)
"""
spark.sql(insert_data_sql)
填写示例如下图所示。
其中,涉及参数说明如下:
<fe_host>
:Serverless StarRocks实例中FE的内网或公网地址。您可以在实例详情页面的FE详情区域查看。- 如果使用内网地址,请确保在同一VPC内。
- 如果使用公网地址,需确保安全组规则允许相应的端口通信,详情请参见网络访问与安全设置。(链接:https://x.sm.cn/BMpwm94)
<fe_http_port>
:Serverless StarRocks实例中FE的HTTP端口(默认为8030)。您可以在实例详情页面的FE详情区域查看。<fe_query_port>
:Serverless StarRocks实例中FE的查询端口(默认为9030)。您可以在实例详情页面的FE详情区域查看。<user>
:Serverless StarRocks实例的用户名。默认提供admin用户,具有管理员权限。您也可以通过用户管理页面新增用户来连接。新增用户操作,请参见管理用户及数据授权。(链接:https://x.sm.cn/AiIP1PP)<password>
:用户<user>
对应的密码。
- 通过Serverless Spark查询写入的数据。
在本文示例中,我们新增一个Python单元格,在其中创建了一个临时视图 test_view
,然后通过该视图查询 score_board
表的数据。拷贝如下代码到新增的Python单元格中,然后单击运行图标。
#创建view
create_view_sql=f"""
CREATE TEMPORARY VIEW test_view
USING starrocks
OPTIONS (
"starrocks.table.identifier" = "testdb.score_board",
"starrocks.fe.http.url" = "{fe_host}:{fe_http_port}",
"starrocks.fe.jdbc.url" = "jdbc:mysql://{fe_host}:{fe_query_port}",
"starrocks.user" = "{user}",
"starrocks.password" = "{password}"
)
"""
spark.sql(create_view_sql)
#查询
query_sql="SELECT * FROM test_view"
result_df = spark.sql(query_sql)
result_df.show()
返回信息如下图所示。
**方式二:使用 Spark 批任务读写 StarRocks
创建Spark批任务。
a. 在EMR Serverless Spark页面,单击左侧的数据开发。
b. 在开发目录页签下,单击运行图标。
c. 在新建对话框中,输入名称,类型选择批任务 > SQL,然后单击确定。
类型您可以根据实际情况进行调整,本文以SQL为例。更多类型参数介绍,请参见Application开发。(链接:https://x.sm.cn/HUsOvJI)
- 通过Spark批任务读写StarRocks。
a. 在新建的任务开发的右上角选择队列。
添加队列的具体操作,请参见管理资源队列。(链接:https://x.sm.cn/2xTsBdJ)
b. 在新建的任务开发中,配置以下信息,其余参数无需配置,然后单击运行。
参数 | 说明 |
---|---|
SQL文件 | 本示例所使用的文件为spark_sql_starrocks.sql,其内容是SQL会话中的SQL语句,请根据实际情况对具体配置进行替换。在使用之前,您需要先下载该文件并进行相应的修改,然后在文件管理页面进行上传。(链接:https://x.sm.cn/AhcNaCt) spark_sql_starrocks.sql参数说明: + <fe_host> :Serverless StarRocks实例中FE的内网或公网地址。您可以在实例详情页面的FE详情区域查看。- 如果使用内网地址,请确保在同一VPC内。 - 如果使用公网地址,需确保安全组规则允许相应的端口通信,详情请参见网络访问与安全设置。(链接:https://x.sm.cn/BMpwm94) + <fe_http_port> :Serverless StarRocks实例中FE的HTTP端口(默认为8030)。您可以在实例详情页面的FE详情区域查看。+ <fe_query_port> :Serverless StarRocks实例中FE的查询端口(默认为9030)。您可以在实例详情页面的FE详情区域查看。+ <user> :Serverless StarRocks实例的用户名。默认提供admin用户,具有管理员权限。您也可以通过用户管理页面新增用户来连接。新增用户操作,请参见管理用户及数据授权。(链接:https://x.sm.cn/BrfbZoN)+ <password> :用户 <user> 对应的密码。 |
引擎版本 | 选择与Spark Connector版本对应的引擎版本。 |
网络连接 | 选择前一步创建好的网络连接。 |
Spark 配置 | 在Spark配置中添加以下参数来加载Spark Connector。 plain spark.user.defined.jars oss://<bucketname>/path/connector.jar 其中,oss://<bucketname>/path/connector.jar 为您步骤一中上传至OSS的Spark Connector的路径。例如,oss://emr-oss/spark/starrocks-spark-connector-3.5_2.12-1.1.2.jar 。 |
- 查看日志信息。
a. 您可以在下方的运行记录区域,单击操作列的详情。
b.单击日志探查页签,查看该任务的日志信息。