Flink读取Kafka数据写入IceBerg(HiveCatalog)

发布于:2025-03-19 ⋅ 阅读:(14) ⋅ 点赞:(0)

Readme

java8 flink1.13 kafka3 iceberg0.13

链路:Kafka -> Flink-> IceBerg(HiveCatalog)

代码

import org.apache.flink.api.common.serialization.SimpleStringSchema;
import org.apache.flink.streaming.api.datastream.DataStream;
import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
import org.apache.flink.streaming.connectors.kafka.FlinkKafkaConsumer;
import org.apache.flink.table.data.GenericRowData;
import org.apache.flink.table.data.RowData;
import org.apache.flink.table.data.StringData;
import org.apache.flink.table.data.TimestampData;
import org.apache.hadoop.conf.Configuration;
import org.apache.iceberg.CatalogProperties;
import org.apache.iceberg.catalog.TableIdentifier;
import org.apache.iceberg.flink.CatalogLoader;
import org.apache.iceberg.flink.TableLoader;
import org.apache.iceberg.flink.sink.FlinkSink;
import org.json.JSONObject;

import java.time.LocalDateTime;
import java.time.format.DateTimeFormatter;
import java.util.HashMap;
import java.util.Properties;

public class FlinkIcebergStreamingExample {

    private static final String KFK_BOOTSTRAP_SERVERS = "例如121.0.0.1:9092";
    private static final String KFK_GROUP_ID = "例如groupid";
    private static final String KFK_AUTO_OFFSET_RESET = "例如earliest";
    private static final String ICE_WAREHOUSE_LOCATION = "例如hdfs://cluster/user/hive/warehouse";
    private static final String ICE_URI = "例如thrift://metastore.xx:9083";
    private static final String ICE_CATALOG_IMPL = "例如org.apache.iceberg.hive.HiveCatalog";
    private static final String KAFKA_SOURCE = "来源TOPIC";
    private static final int TARGET_FIELD_NUM = 3; // 假设有3个字段
    private static final String FIELD1 = "第一个字段名";
    private static final String FIELD2 = "第二个字段名";
    private static final String FIELD3 = "第三个字段名";
    private static final String ICE_DB = "目标数据库";
    private static final String ICE_TABLE = "目标表表名";


    private static final DateTimeFormatter TIMESTAMP_FORMATTER = DateTimeFormatter.ofPattern("yyyy-MM-dd HH:mm:ss");

    public static void main(String[] args) throws Exception {
        final StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();

        DataStream<String> kafkaStream = env.addSource(new FlinkKafkaConsumer<>(
                KAFKA_SOURCE,
                new SimpleStringSchema(),
                configureKafka()
        )).name("Kafka Source");

        DataStream<RowData> rowDataStream = kafkaStream.map(jsonMessage -> {
            JSONObject jsonObject = new JSONObject(jsonMessage);
            GenericRowData rowdata = new GenericRowData(TARGET_FIELD_NUM);
            // 数值字段样例
            rowdata.setField(0, jsonObject.optLong(FIELD1));
            // 文本字段样例
            rowdata.setField(1, StringData.fromString(jsonObject.optString(FIELD2)));
            // 时间戳字段样例 减去8小时,因为kafka的时间是UTC时间,而iceberg的时间是UTC+8的时间
            rowdata.setField(2, TimestampData.fromLocalDateTime(
                    LocalDateTime.parse(jsonObject.optString(FIELD3).substring(0, 19), TIMESTAMP_FORMATTER).minusHours(8)
            ));
            return rowdata;
        });

        FlinkSink.forRowData(rowDataStream).tableLoader(configureIceberg(ICE_WAREHOUSE_LOCATION,ICE_URI,ICE_CATALOG_IMPL,ICE_DB,ICE_TABLE)).append();
        env.execute("Flink Kafka to Iceberg Streaming Job");
    }

    private static Properties configureKafka() {
        return new Properties() {{
            setProperty("bootstrap.servers", KFK_BOOTSTRAP_SERVERS);
            setProperty("group.id", KFK_GROUP_ID);
            setProperty("auto.offset.reset", KFK_AUTO_OFFSET_RESET);
        }};
    }

    private static TableLoader configureIceberg(String s1,String s2,String s3,String s4,String s5) {
        return TableLoader.fromCatalog(CatalogLoader.hive("iceberg", new Configuration(), new HashMap<String, String>() {{
            put(CatalogProperties.WAREHOUSE_LOCATION, s1);
            put(CatalogProperties.URI, s2);
            put(CatalogProperties.CATALOG_IMPL, s3);
        }}), TableIdentifier.of(s4, s5));
    }

}

Maven依赖

    <properties>
        <maven.compiler.source>8</maven.compiler.source>
        <maven.compiler.target>8</maven.compiler.target>
        <java.version>1.8</java.version>
        <maven.compiler.source>${java.version}</maven.compiler.source>
        <maven.compiler.target>${java.version}</maven.compiler.target>
        <flink.version>1.13.0</flink.version>
        <scala.version>2.12</scala.version>
        <hadoop.version>3.1.3</hadoop.version>
        <iceberg.version>0.13.2</iceberg.version>
        <jackson.version>2.13.3</jackson.version> <!-- 建议使用最新稳定版 -->
    </properties>
    <dependencies>
        <dependency>
            <groupId>org.apache.flink</groupId>
            <artifactId>flink-streaming-java_${scala.version}</artifactId>
            <version>${flink.version}</version>
        </dependency>

        <dependency>
            <groupId>org.apache.flink</groupId>
            <artifactId>flink-connector-kafka_${scala.version}</artifactId>
            <version>${flink.version}</version>
        </dependency>

        <dependency>
            <groupId>org.apache.flink</groupId>
            <artifactId>flink-clients_${scala.version}</artifactId>
            <version>${flink.version}</version>
        </dependency>
        <dependency>
            <groupId>org.apache.flink</groupId>
            <artifactId>flink-json</artifactId>
            <version>${flink.version}</version>
        </dependency>

        <!--如果保存检查点到hdfs上,需要引入此依赖-->
        <dependency>
            <groupId>org.apache.hadoop</groupId>
            <artifactId>hadoop-client</artifactId>
            <version>${hadoop.version}</version>
        </dependency>

        <dependency>
            <groupId>org.projectlombok</groupId>
            <artifactId>lombok</artifactId>
            <version>1.18.20</version>
        </dependency>
        <dependency>
            <groupId>org.apache.flink</groupId>
            <artifactId>flink-table-api-java-bridge_${scala.version}</artifactId>
            <version>${flink.version}</version>
        </dependency>

        <dependency>
            <groupId>org.apache.flink</groupId>
            <artifactId>flink-table-planner-blink_${scala.version}</artifactId>
            <version>${flink.version}</version>
        </dependency>


        <!--Guava工程包含了若干被Google的Java项目广泛依赖的核心库,方便开发-->
        <dependency>
            <groupId>com.google.guava</groupId>
            <artifactId>guava</artifactId>
            <version>29.0-jre</version>
        </dependency>

        <dependency>
            <groupId>org.apache.flink</groupId>
            <artifactId>flink-java</artifactId>
            <version>${flink.version}</version>
        </dependency>

        <!--         Iceberg Flink Runtime -->
        <dependency>
            <groupId>org.apache.iceberg</groupId>
            <artifactId>iceberg-flink-1.13</artifactId>
            <version>${iceberg.version}</version>
        </dependency>

        <dependency>
            <groupId>org.apache.iceberg</groupId>
            <artifactId>iceberg-hive-metastore</artifactId>
            <version>${iceberg.version}</version>
        </dependency>

        <dependency>
            <groupId>org.apache.iceberg</groupId>
            <artifactId>iceberg-core</artifactId>
            <version>${iceberg.version}</version>
        </dependency>
        <dependency>
            <groupId>org.apache.iceberg</groupId>
            <artifactId>iceberg-parquet</artifactId>
            <version>${iceberg.version}</version>
        </dependency>
        <dependency>
            <groupId>org.json</groupId>
            <artifactId>json</artifactId>
            <version>20210307</version>
        </dependency>
        <!--Flink默认使用的是slf4j记录日志,相当于一个日志的接口,我们这里使用log4j作为具体的日志实现-->
        <dependency>
            <groupId>org.slf4j</groupId>
            <artifactId>slf4j-api</artifactId>
            <version>1.7.25</version>
        </dependency>

        <dependency>
            <groupId>org.slf4j</groupId>
            <artifactId>slf4j-log4j12</artifactId>
            <version>1.7.25</version>
        </dependency>

        <dependency>
            <groupId>org.apache.logging.log4j</groupId>
            <artifactId>log4j-to-slf4j</artifactId>
            <version>2.14.0</version>
        </dependency>

        <dependency>
            <groupId>org.apache.flink</groupId>
            <artifactId>flink-connector-jdbc_${scala.version}</artifactId>
            <version>${flink.version}</version>
        </dependency>

        <dependency>
            <groupId>org.apache.thrift</groupId>
            <artifactId>libthrift</artifactId>
            <version>0.20.0</version>
        </dependency>
        <dependency>
            <groupId>org.apache.hive</groupId>
            <artifactId>hive-metastore</artifactId>
            <version>${hadoop.version}</version>
        </dependency>
    </dependencies>


网站公告

今日签到

点亮在社区的每一天
去签到