stevensu1/EC0720
表 API 和 SQL#
表 API 和 SQL——用于统一流和批处理 加工。表 API 是适用于 Java、Scala 和 Python 的语言集成查询 API,它 允许组合来自关系运算符的查询,例如 selection、filter 和 join in 一种非常直观的方式。Flink 的 SQL 支持基于实现 SQL 标准的 Apache Calcite。任一接口中指定的查询具有相同的语义 并指定相同的结果,无论输入是连续的(流式处理:无界)还是有界的(批处理:有界)。
我们的目标是同步mysql表和数据
先完成maven依赖:这里我们只引入flink-table-api-java:
概览 |Apache Flink
如果在ide中运行:还要引入<!--flink-clients,flink-table-runtime,flink-table-planner-loader- -->三个模块:概览 |Apache Flink
接着是mysql连接相关JDBC |Apache Flink
JDBC SQL 连接器
JDBC 连接器允许使用 JDBC 驱动程序从任何关系数据库读取数据和将数据写入任何关系数据库。本文档介绍如何设置 JDBC 连接器以针对关系数据库运行 SQL 查询。
如果在 DDL 上定义了主键,则 JDBC 接收器以更新插入模式运行,以便与外部系统交换 UPDATE/DELETE 消息,否则,它以追加模式运行,不支持使用 UPDATE/DELETE 消息。
依次引入对应maven依赖:<!--flink-connector-jdbc-core,mysql-connector-java,flink-connector-jdbc-mysql -->
到此所需的依赖引入完成。不过程序通常需要打包并通过web ui上传到Fink服务器上运行,Fink服务器通过java SPI服务发现运行我们的jar,关于java SPI接口,前面的文章《关于Red Hat Single Sign-On的User Storage SPI》里有提到过。
这是官网的插件配置地址:
第一步 |Apache Flink,所以要需要添加官方提供的maven打包插件:使用 Maven |Apache Flink
最后完整的依赖如下:
<project xmlns="http://maven.apache.org/POM/4.0.0" xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd">
<modelVersion>4.0.0</modelVersion>
<groupId>org.example</groupId>
<artifactId>FLINKTAS-TEST-Catalog</artifactId>
<version>1.0-SNAPSHOT</version>
<packaging>jar</packaging>
<name>FLINKTAS-TEST-Catalog</name>
<url>http://maven.apache.org</url>
<properties>
<project.build.sourceEncoding>UTF-8</project.build.sourceEncoding>
<flink.version>2.0.0</flink.version>
</properties>
<dependencies>
<!--flink-clients,flink-table-runtime,flink-table-planner-loader- -->
<dependency>
<groupId>org.apache.flink</groupId>
<artifactId>flink-table-planner-loader</artifactId>
<version>2.0.0</version>
</dependency>
<dependency>
<groupId>org.apache.flink</groupId>
<artifactId>flink-table-runtime</artifactId>
<version>2.0.0</version>
</dependency>
<dependency>
<groupId>org.apache.flink</groupId>
<artifactId>flink-clients</artifactId>
<version>2.0.0</version>
</dependency>
<!--flink-table-api-java -->
<dependency>
<groupId>org.apache.flink</groupId>
<artifactId>flink-table-api-java</artifactId>
<version>2.0.0</version>
</dependency>
<!--flink-connector-jdbc-core,mysql-connector-java,flink-connector-jdbc-mysql -->
<dependency>
<groupId>mysql</groupId>
<artifactId>mysql-connector-java</artifactId>
<version>8.0.28</version>
</dependency>
<dependency>
<groupId>org.apache.flink</groupId>
<artifactId>flink-connector-jdbc-mysql</artifactId>
<version>4.0.0-2.0</version>
</dependency>
<dependency>
<groupId>org.apache.flink</groupId>
<artifactId>flink-connector-jdbc-core</artifactId>
<version>4.0.0-2.0</version>
</dependency>
<dependency>
<groupId>junit</groupId>
<artifactId>junit</artifactId>
<version>3.8.1</version>
<scope>test</scope>
</dependency>
</dependencies>
<build>
<plugins>
<plugin>
<groupId>org.apache.maven.plugins</groupId>
<artifactId>maven-shade-plugin</artifactId>
<version>3.1.1</version>
<executions>
<execution>
<phase>package</phase>
<goals>
<goal>shade</goal>
</goals>
<configuration>
<artifactSet>
<excludes>
<exclude>com.google.code.findbugs:jsr305</exclude>
</excludes>
</artifactSet>
<filters>
<filter>
<!-- Do not copy the signatures in the META-INF folder.
Otherwise, this might cause SecurityExceptions when using the JAR. -->
<artifact>*:*</artifact>
<excludes>
<exclude>META-INF/*.SF</exclude>
<exclude>META-INF/*.DSA</exclude>
<exclude>META-INF/*.RSA</exclude>
</excludes>
</filter>
</filters>
<transformers>
<transformer
implementation="org.apache.maven.plugins.shade.resource.ManifestResourceTransformer">
<!-- Replace this with the main class of your job -->
<mainClass>org.example.App</mainClass>
</transformer>
<transformer
implementation="org.apache.maven.plugins.shade.resource.ServicesResourceTransformer"/>
</transformers>
</configuration>
</execution>
</executions>
</plugin>
<plugin>
<groupId>org.apache.maven.plugins</groupId>
<artifactId>maven-compiler-plugin</artifactId>
<configuration>
<source>15</source>
<target>15</target>
</configuration>
</plugin>
</plugins>
</build>
</project>
现在来实现java处理流程:
先理解一下Catalogs:他可把整个数据库一次性注册到表环境TableEnvironment中
flink-connector-jdbc-mysql模块已经对mysql的Catalogs 做了实现MySqlCatalog,但是它不能创建物理表,对此需要对其进行扩展实现对应的建表逻辑。
这是我的实现:
package org.example;
import org.apache.flink.connector.jdbc.mysql.database.catalog.MySqlCatalog;
import org.apache.flink.table.api.Schema;
import org.apache.flink.table.catalog.*;
import org.apache.flink.table.catalog.exceptions.CatalogException;
import org.apache.flink.table.catalog.exceptions.DatabaseNotExistException;
import org.apache.flink.table.catalog.exceptions.TableAlreadyExistException;
import org.apache.flink.table.types.DataType;
import java.sql.*;
import java.util.*;
public class MyMySqlCatalog extends MySqlCatalog {
public MyMySqlCatalog(ClassLoader userClassLoader, String catalogName, String defaultDatabase, String username, String pwd, String baseUrl) {
super(userClassLoader, catalogName, defaultDatabase, username, pwd, baseUrl);
}
public MyMySqlCatalog(ClassLoader userClassLoader, String catalogName, String defaultDatabase, String baseUrl, Properties connectionProperties) {
super(userClassLoader, catalogName, defaultDatabase, baseUrl, connectionProperties);
}
public void createTable(ObjectPath tablePath, CatalogBaseTable table, boolean ignoreIfExists)
throws TableAlreadyExistException, DatabaseNotExistException, CatalogException {
// 检查数据库是否存在
if (!databaseExists(tablePath.getDatabaseName())) {
throw new DatabaseNotExistException(getName(), tablePath.getDatabaseName());
}
// 检查表是否已存在
if (tableExists(tablePath)) {
if (!ignoreIfExists) {
return;
}
}
Connection conn = null;
try {
conn = DriverManager.getConnection(baseUrl + tablePath.getDatabaseName(), this.getUsername(), this.getPassword());
String createTableSql = generateCreateTableSql(tablePath.getObjectName(), table);
try (PreparedStatement stmt = conn.prepareStatement(createTableSql)) {
stmt.execute();
}
} catch (SQLException e) {
throw new CatalogException(
String.format("Failed to create table %s", tablePath.getFullName()), e);
} finally {
try {
if (conn != null) {
conn.close();
}
} catch (SQLException e) {
e.printStackTrace();
}
}
}
private String generateCreateTableSql(String tableName, CatalogBaseTable table) {
StringBuilder sql = new StringBuilder();
sql.append("CREATE TABLE IF NOT EXISTS `").append(tableName).append("` (");
// 构建列定义
Schema schema = table.getUnresolvedSchema();
List<String> columnDefs = new ArrayList<>();
for (Schema.UnresolvedColumn column : schema.getColumns()) {
if (column instanceof Schema.UnresolvedPhysicalColumn) {
Schema.UnresolvedPhysicalColumn physicalColumn =
(Schema.UnresolvedPhysicalColumn) column;
String columnDef = String.format("`%s` %s",
physicalColumn.getName(),
convertFlinkTypeToMySql((DataType) physicalColumn.getDataType()));
columnDefs.add(columnDef);
}
}
sql.append(String.join(", ", columnDefs));
sql.append(")");
return sql.toString();
}
private String convertFlinkTypeToMySql(DataType dataType) {
// 简化的类型转换,您可以根据需要扩展
String typeName = dataType.getLogicalType().getTypeRoot().name();
switch (typeName) {
case "INTEGER":
return "INT";
case "VARCHAR":
return "VARCHAR(255)";
case "BIGINT":
return "BIGINT";
case "DOUBLE":
return "DOUBLE";
case "BOOLEAN":
return "BOOLEAN";
case "TIMESTAMP_WITHOUT_TIME_ZONE":
return "TIMESTAMP";
default:
return "TEXT";
}
}
}
最后贴一下做数据同步过程的代码:
package org.example;
import org.apache.flink.table.api.EnvironmentSettings;
import org.apache.flink.table.api.Schema;
import org.apache.flink.table.api.TableEnvironment;
import org.apache.flink.table.catalog.CatalogTable;
import org.apache.flink.table.catalog.Column;
import org.apache.flink.table.catalog.ObjectPath;
import org.apache.flink.table.catalog.exceptions.DatabaseNotExistException;
import org.apache.flink.table.catalog.exceptions.TableAlreadyExistException;
import java.util.List;
import static org.apache.flink.table.api.DataTypes.INT;
import static org.apache.flink.table.api.DataTypes.STRING;
/**
* Hello world!
*/
public class App {
public static void main(String[] args) throws DatabaseNotExistException, TableAlreadyExistException {
EnvironmentSettings settings = EnvironmentSettings.inStreamingMode();
TableEnvironment tableEnv = TableEnvironment.create(settings);
String name = "my_catalog";
String defaultDatabase = "test";
String username = "root";
String password = "root";
String baseUrl = "jdbc:mysql://localhost:3306";
MyMySqlCatalog catalog = new MyMySqlCatalog(
ClassLoader.getSystemClassLoader(),
name,
defaultDatabase,
username,
password,
baseUrl
);
tableEnv.registerCatalog("my_catalog", catalog);
// set the JdbcCatalog as the current catalog of the session
tableEnv.useCatalog("my_catalog");
List<String> tables = catalog.listTables(defaultDatabase);
boolean exists = catalog.tableExists(ObjectPath.fromString("test.my_table_03"));
//如果表不存在,则创建
if (!exists) {
// 定义表的字段和类型
List<Column> columns = List.of(
Column.physical("id", INT().notNull()),
Column.physical("name", STRING())
);
Schema.Builder chemaB = Schema.newBuilder();
chemaB.column("id", INT().notNull());
chemaB.column("name", STRING());
chemaB.primaryKey("id");
Schema chema = chemaB.build();
CatalogTable catalogTable = CatalogTable.newBuilder()
.schema(chema)
.build();
catalog.createTable(ObjectPath.fromString("test.my_table_03"), catalogTable, true);
}
tableEnv.executeSql("SELECT * FROM my_table_01").print();
tableEnv.executeSql("SELECT * FROM my_table_03").print();
// 执行同步
tableEnv.executeSql("INSERT INTO my_table_03 SELECT id, name FROM my_table_01");
System.out.println("Hello World!");
}
}
执行结果:
但是如果系统表太多,注册Catalogs可能会很消耗Flink内存,所以也可以只把需要的表注册到表环境中,
package org.example;
import org.apache.flink.table.api.EnvironmentSettings;
import org.apache.flink.table.api.Table;
import org.apache.flink.table.api.TableEnvironment;
/**
* Hello world!
*/
public class App {
public static void main(String[] args) {
EnvironmentSettings settings = EnvironmentSettings
.newInstance()
.inStreamingMode()
.build();
TableEnvironment tableEnv = TableEnvironment.create(settings);
registerMySqlTable(tableEnv);
Table table1 = tableEnv.from("my_table_01");
table1.printSchema();
tableEnv.executeSql("SELECT * FROM my_table_01").print();
registerMySqlTable02(tableEnv); // my_table_02
Table table2 = tableEnv.from("my_table_02");
table2.printSchema();
tableEnv.executeSql("SELECT * FROM my_table_02").print();
// 执行同步
tableEnv.executeSql("INSERT INTO my_table_02 SELECT id, name FROM my_table_01");
System.out.println("Hello World!");
}
/**
* 注册 MySQL 表 my_table_02 到 Flink 表环境中
*/
public static void registerMySqlTable02(TableEnvironment tableEnv) {
tableEnv.executeSql(
"CREATE TABLE my_table_02 (" +
"id INT PRIMARY KEY NOT ENFORCED, " +
"name STRING" +
") WITH (" +
"'connector' = 'jdbc', " +
"'url' = 'jdbc:mysql://localhost:3306/test', " +
"'table-name' = 'my_table_02', " +
"'username' = 'root', " +
"'password' = 'root'" +
")"
);
}
/**
* 注册 MySQL 表到 Flink 表环境中
*/
public static void registerMySqlTable(TableEnvironment tableEnv) {
tableEnv.executeSql(
"CREATE TABLE my_table_01 (" +
"id INT PRIMARY KEY NOT ENFORCED," +
"name STRING" +
") WITH (" +
"'connector' = 'jdbc'," +
"'url' = 'jdbc:mysql://localhost:3306/test'," +
"'table-name' = 'my_table_01'," +
"'username' = 'root'," +
"'password' = 'root'" +
")"
);
}
}
这样也可以实现数据同步。最后优化建议可以使用jdbc连接池技术。