No operators defined in streaming topology. Cannot generate StreamGraph
原因是博主想通过在flinkjar中写flinksql,以jar的方式提交application模式的作业(因为无法直接使用yarn-application提交flinksql)。
代码中有create和insert intosql,但是运行时报上面错误,各方面查找资料,其原因是flink没有识别insert操作,解决方案就是显示提交insert
错误代码
public static void execute(String content, TableEnvironment tEnv) {
List<String> insertSql = new ArrayList<>();
// 分割SQL语句
Arrays.stream(content.split(";"))
.map(String::trim)
.filter(s -> !s.isEmpty())
.forEach(sql -> {
sql = sql.replaceAll(SEMICOLON, ";").replaceAll(SY, "\"").replaceAll(SDY, "`");
if (!sql.endsWith(";")) {
sql = sql + ";"; // 加上分号
}
System.out.println("执行SQL>>>" + sql);
tEnv.executeSql(sql);
});
}
修改后
public static void execute(String content, TableEnvironment tEnv) {
List<String> insertSql = new ArrayList<>();
// 分割SQL语句
Arrays.stream(content.split(";"))
.map(String::trim)
.filter(s -> !s.isEmpty())
.forEach(sql -> {
if (!sql.endsWith(";")) {
sql = sql + ";"; // 加上分号
}
if (sql.toUpperCase().startsWith("INSERT")) {
insertSql.add(sql);
} else {
System.out.println("执行SQL>>>" + sql);
tEnv.executeSql(sql);
}
});
// 统一执行insert
// 显式触发, 否则会报错The main method caused an error: No operators defined in streaming topology. Cannot execute.
if (insertSql.size() > 0) {
StatementSet stmtSet = tEnv.createStatementSet();
for (String insert : insertSql) {
// 添加多个 INSERT 语句
System.out.println("执行SQL>>>" + insert);
stmtSet.addInsertSql(insert);
}
try {
// 执行并设置作业名称
TableResult result = stmtSet.execute();
JobClient jobClient = result.getJobClient().orElseThrow(
() -> new RuntimeException("无法获取JobClient")
);
System.out.println("等待作业启动...");
// 等待作业进入运行状态
JobStatus status = jobClient.getJobStatus().get(STATUS_TIMEOUT, TimeUnit.SECONDS);
if (status == JobStatus.RUNNING) {
System.out.println("作业已正常运行");
// 添加2秒延迟确保状态同步
Thread.sleep(2000);
} else {
throw new IllegalStateException("作业启动失败,状态: " + status);
}
} catch (InterruptedException | ExecutionException e) {
e.printStackTrace();
throw new RuntimeException(e);
} catch (TimeoutException e) {
throw new RuntimeException(e);
}
}
}
ERROR org.apache.flink.shaded.curator5.org.apache.curator.ConnectionState - Authentication failed
在flink的conf.yml添加
zookeeper:
sasl:
disable: true
hadoop日志里中文显示成了问号
在flink的conf.yml添加
env:
java:
opts:
all: -Dfile.encoding=UTF-8
Invalid event: APP_UPDATE_SAVED at ACCEPTED
flink/lib下添加flink-shaded-hadoop-3-uber-3.1.1.7.2.9.0-173-9.0.jar
java.lang.NoSuchMethodError: org.apache.commons.cli.Option.builder(Ljava/lang/String;)Lorg/apache/commons/cli/Option$Builder;
flink/lib下添加commons-cli-1.5.0.jar
yarn启动容器使用随机端口的问题
需要节点间端口互通,否则经常会因为端口不通而导致容器启动不起来
#假设要允许192.168.0.66和192.168.0.67访问本机所有端口
sudo firewall-cmd --permanent --add-rich-rule='rule family="ipv4" source address="192.168.0.66" accept'
sudo firewall-cmd --permanent --add-rich-rule='rule family="ipv4" source address="192.168.0.67" accept'
sudo firewall-cmd --reload