Readme
java8 flink1.13 kafka3 iceberg0.13
链路:Kafka -> Flink-> IceBerg(HiveCatalog)
代码
import org.apache.flink.api.common.serialization.SimpleStringSchema;
import org.apache.flink.streaming.api.datastream.DataStream;
import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
import org.apache.flink.streaming.connectors.kafka.FlinkKafkaConsumer;
import org.apache.flink.table.data.GenericRowData;
import org.apache.flink.table.data.RowData;
import org.apache.flink.table.data.StringData;
import org.apache.flink.table.data.TimestampData;
import org.apache.hadoop.conf.Configuration;
import org.apache.iceberg.CatalogProperties;
import org.apache.iceberg.catalog.TableIdentifier;
import org.apache.iceberg.flink.CatalogLoader;
import org.apache.iceberg.flink.TableLoader;
import org.apache.iceberg.flink.sink.FlinkSink;
import org.json.JSONObject;
import java.time.LocalDateTime;
import java.time.format.DateTimeFormatter;
import java.util.HashMap;
import java.util.Properties;
public class FlinkIcebergStreamingExample {
private static final String KFK_BOOTSTRAP_SERVERS = "例如121.0.0.1:9092";
private static final String KFK_GROUP_ID = "例如groupid";
private static final String KFK_AUTO_OFFSET_RESET = "例如earliest";
private static final String ICE_WAREHOUSE_LOCATION = "例如hdfs://cluster/user/hive/warehouse";
private static final String ICE_URI = "例如thrift://metastore.xx:9083";
private static final String ICE_CATALOG_IMPL = "例如org.apache.iceberg.hive.HiveCatalog";
private static final String KAFKA_SOURCE = "来源TOPIC";
private static final int TARGET_FIELD_NUM = 3; // 假设有3个字段
private static final String FIELD1 = "第一个字段名";
private static final String FIELD2 = "第二个字段名";
private static final String FIELD3 = "第三个字段名";
private static final String ICE_DB = "目标数据库";
private static final String ICE_TABLE = "目标表表名";
private static final DateTimeFormatter TIMESTAMP_FORMATTER = DateTimeFormatter.ofPattern("yyyy-MM-dd HH:mm:ss");
public static void main(String[] args) throws Exception {
final StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
DataStream<String> kafkaStream = env.addSource(new FlinkKafkaConsumer<>(
KAFKA_SOURCE,
new SimpleStringSchema(),
configureKafka()
)).name("Kafka Source");
DataStream<RowData> rowDataStream = kafkaStream.map(jsonMessage -> {
JSONObject jsonObject = new JSONObject(jsonMessage);
GenericRowData rowdata = new GenericRowData(TARGET_FIELD_NUM);
// 数值字段样例
rowdata.setField(0, jsonObject.optLong(FIELD1));
// 文本字段样例
rowdata.setField(1, StringData.fromString(jsonObject.optString(FIELD2)));
// 时间戳字段样例 减去8小时,因为kafka的时间是UTC时间,而iceberg的时间是UTC+8的时间
rowdata.setField(2, TimestampData.fromLocalDateTime(
LocalDateTime.parse(jsonObject.optString(FIELD3).substring(0, 19), TIMESTAMP_FORMATTER).minusHours(8)
));
return rowdata;
});
FlinkSink.forRowData(rowDataStream).tableLoader(configureIceberg(ICE_WAREHOUSE_LOCATION,ICE_URI,ICE_CATALOG_IMPL,ICE_DB,ICE_TABLE)).append();
env.execute("Flink Kafka to Iceberg Streaming Job");
}
private static Properties configureKafka() {
return new Properties() {{
setProperty("bootstrap.servers", KFK_BOOTSTRAP_SERVERS);
setProperty("group.id", KFK_GROUP_ID);
setProperty("auto.offset.reset", KFK_AUTO_OFFSET_RESET);
}};
}
private static TableLoader configureIceberg(String s1,String s2,String s3,String s4,String s5) {
return TableLoader.fromCatalog(CatalogLoader.hive("iceberg", new Configuration(), new HashMap<String, String>() {{
put(CatalogProperties.WAREHOUSE_LOCATION, s1);
put(CatalogProperties.URI, s2);
put(CatalogProperties.CATALOG_IMPL, s3);
}}), TableIdentifier.of(s4, s5));
}
}
Maven依赖
<properties>
<maven.compiler.source>8</maven.compiler.source>
<maven.compiler.target>8</maven.compiler.target>
<java.version>1.8</java.version>
<maven.compiler.source>${java.version}</maven.compiler.source>
<maven.compiler.target>${java.version}</maven.compiler.target>
<flink.version>1.13.0</flink.version>
<scala.version>2.12</scala.version>
<hadoop.version>3.1.3</hadoop.version>
<iceberg.version>0.13.2</iceberg.version>
<jackson.version>2.13.3</jackson.version> <!-- 建议使用最新稳定版 -->
</properties>
<dependencies>
<dependency>
<groupId>org.apache.flink</groupId>
<artifactId>flink-streaming-java_${scala.version}</artifactId>
<version>${flink.version}</version>
</dependency>
<dependency>
<groupId>org.apache.flink</groupId>
<artifactId>flink-connector-kafka_${scala.version}</artifactId>
<version>${flink.version}</version>
</dependency>
<dependency>
<groupId>org.apache.flink</groupId>
<artifactId>flink-clients_${scala.version}</artifactId>
<version>${flink.version}</version>
</dependency>
<dependency>
<groupId>org.apache.flink</groupId>
<artifactId>flink-json</artifactId>
<version>${flink.version}</version>
</dependency>
<!--如果保存检查点到hdfs上,需要引入此依赖-->
<dependency>
<groupId>org.apache.hadoop</groupId>
<artifactId>hadoop-client</artifactId>
<version>${hadoop.version}</version>
</dependency>
<dependency>
<groupId>org.projectlombok</groupId>
<artifactId>lombok</artifactId>
<version>1.18.20</version>
</dependency>
<dependency>
<groupId>org.apache.flink</groupId>
<artifactId>flink-table-api-java-bridge_${scala.version}</artifactId>
<version>${flink.version}</version>
</dependency>
<dependency>
<groupId>org.apache.flink</groupId>
<artifactId>flink-table-planner-blink_${scala.version}</artifactId>
<version>${flink.version}</version>
</dependency>
<!--Guava工程包含了若干被Google的Java项目广泛依赖的核心库,方便开发-->
<dependency>
<groupId>com.google.guava</groupId>
<artifactId>guava</artifactId>
<version>29.0-jre</version>
</dependency>
<dependency>
<groupId>org.apache.flink</groupId>
<artifactId>flink-java</artifactId>
<version>${flink.version}</version>
</dependency>
<!-- Iceberg Flink Runtime -->
<dependency>
<groupId>org.apache.iceberg</groupId>
<artifactId>iceberg-flink-1.13</artifactId>
<version>${iceberg.version}</version>
</dependency>
<dependency>
<groupId>org.apache.iceberg</groupId>
<artifactId>iceberg-hive-metastore</artifactId>
<version>${iceberg.version}</version>
</dependency>
<dependency>
<groupId>org.apache.iceberg</groupId>
<artifactId>iceberg-core</artifactId>
<version>${iceberg.version}</version>
</dependency>
<dependency>
<groupId>org.apache.iceberg</groupId>
<artifactId>iceberg-parquet</artifactId>
<version>${iceberg.version}</version>
</dependency>
<dependency>
<groupId>org.json</groupId>
<artifactId>json</artifactId>
<version>20210307</version>
</dependency>
<!--Flink默认使用的是slf4j记录日志,相当于一个日志的接口,我们这里使用log4j作为具体的日志实现-->
<dependency>
<groupId>org.slf4j</groupId>
<artifactId>slf4j-api</artifactId>
<version>1.7.25</version>
</dependency>
<dependency>
<groupId>org.slf4j</groupId>
<artifactId>slf4j-log4j12</artifactId>
<version>1.7.25</version>
</dependency>
<dependency>
<groupId>org.apache.logging.log4j</groupId>
<artifactId>log4j-to-slf4j</artifactId>
<version>2.14.0</version>
</dependency>
<dependency>
<groupId>org.apache.flink</groupId>
<artifactId>flink-connector-jdbc_${scala.version}</artifactId>
<version>${flink.version}</version>
</dependency>
<dependency>
<groupId>org.apache.thrift</groupId>
<artifactId>libthrift</artifactId>
<version>0.20.0</version>
</dependency>
<dependency>
<groupId>org.apache.hive</groupId>
<artifactId>hive-metastore</artifactId>
<version>${hadoop.version}</version>
</dependency>
</dependencies>