1、完整执行流程概览
2. SQL到逻辑执行计划(Logical RelNode)
2.1 解析流程
SQL解析:使用Apache Calcite解析SQL字符串复制下载
// TableEnvironmentImpl.executeSql() public TableResult executeSql(String statement) { List<Operation> operations = getParser().parse(statement); Operation operation = operations.get(0); return executeInternal(operation); }
2.2 核心组件
- CatalogManager:元数据管理
SqlNodeToOperationConversion
:SQL到Operation转换
e.g.
protected Operation parse(String sql) {
FlinkPlannerImpl planner = getPlannerBySqlDialect(SqlDialect.DEFAULT);
final CalciteParser parser = getParserBySqlDialect(SqlDialect.DEFAULT);
SqlNode node = parser.parse(sql);
return SqlNodeToOperationConversion.convert(planner, catalogManager, node).get();
}
QueryOperationConverter
:Operation到RelNode转换
e.g.
public RelNode visit(ProjectQueryOperation projection) {
List<RexNode> rexNodes = convertToRexNodes(projection.getProjectList());
return relBuilder
.project(rexNodes, projection.getResolvedSchema().getColumnNames(), true)
.build();
}
3. 逻辑优化阶段(Optimizer)
3.1 优化流程
Logical RelNode → Rule-Based optimization → Cost-based optimization → Optimized logic plan
1、SQL 解析 → 生成 逻辑关系代数树(Logical RelNode)
2、基于规则的优化(RBO) → 重写逻辑计划
HepPlanner
(RBO 优化器):按顺序应用规则集。RelOptRule
:优化规则基类,Flink 扩展了流处理专属规则。
FlinkStreamProgram
// project rewrite
chainedProgram.addLast(
PROJECT_REWRITE,
FlinkHepRuleSetProgramBuilder.newBuilder
.setHepRulesExecutionType(HEP_RULES_EXECUTION_TYPE.RULE_COLLECTION)
.setHepMatchOrder(HepMatchOrder.BOTTOM_UP)
.add(FlinkStreamRuleSets.PROJECT_RULES)
.build()
)
3、基于代价的优化(CBO) → 选择最优物理计划
VolcanoPlanner
:采用动态规划搜索最优计划树。FlinkCost
:定义 Flink 的代价计算逻辑(CPU/内存/网络权重)。RelMetadataProvider
:提供统计信息(需集成 Catalog 元数据,FlinkDefaultRelMetadataProvider
)
FlinkStreamProgram
chainedProgram.addLast(
LOGICAL,
FlinkVolcanoProgramBuilder.newBuilder
.add(FlinkStreamRuleSets.LOGICAL_OPT_RULES)
.setRequiredOutputTraits(Array(FlinkConventions.LOGICAL))
.build()
)
4、生成优化逻辑计划 → 转换为 Flink 物理算子
将优化后的 RelNode 转换为 Flink 的物理执行计划(Transformation)。
FlinkPhysicalRel
:物理计划节点基类。PlannerBase
:物理计划生成入口,调用translateToPlan()
。Transformation
:Flink 算子底层表示。
4、执行入口
Pipeline pipeline =
execEnv.createPipeline(
transformations,
tableConfig.getConfiguration(),
defaultJobName,
jobStatusHookList);
JobClient jobClient = execEnv.executeAsync(pipeline);