【flink】之如何消费kafka数据?

发布于:2024-07-27 ⋅ 阅读:(33) ⋅ 点赞:(0)

为了编写一个使用Apache Flink来读取Apache Kafka消息的示例,我们需要确保我们的环境已经安装了Flink和Kafka,并且它们都能正常运行。此外,我们还需要在项目中引入相应的依赖库。以下是一个详细的步骤指南,包括依赖添加、代码编写和执行说明。

 1.环境准备

确保你已经安装了Apache Kafka和Apache Flink,并且Kafka正在运行。Kafka的默认端口是9092,而Zookeeper(Kafka依赖的服务)的默认端口是2181

2.Maven项目设置

创建一个新的Maven项目,并在pom.xml中添加以下依赖:

<dependencies>  
    <!-- Flink dependencies -->  
    <dependency>  
        <groupId>org.apache.flink</groupId>  
        <artifactId>flink-streaming-java_2.12</artifactId>  
        <version>1.13.2</version>  
    </dependency>  
    <dependency>  
        <groupId>org.apache.flink</groupId>  
        <artifactId>flink-connector-kafka_2.12</artifactId>  
        <version>1.13.2</version>  
    </dependency>  
  
    <!-- Kafka client dependency -->  
    <dependency>  
        <groupId>org.apache.kafka</groupId>  
        <artifactId>kafka-clients</artifactId>  
        <version>2.8.0</version>  
    </dependency>  
  
    <!-- Logging -->  
    <dependency>  
        <groupId>org.slf4j</groupId>  
        <artifactId>slf4j-log4j12</artifactId>  
        <version>1.7.30</version>  
    </dependency>  
</dependencies>

注意:请根据你使用的Scala或Java版本以及Flink和Kafka的版本调整上述依赖。

3.编写Flink Kafka Consumer代码

import org.apache.flink.api.common.functions.MapFunction;  
import org.apache.flink.streaming.api.datastream.DataStream;  
import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;  
import org.apache.flink.streaming.connectors.kafka.FlinkKafkaConsumer;  
  
import java.util.Properties;  
  
public class FlinkKafkaConsumerDemo {  
  
    public static void main(String[] args) throws Exception {  
        // 设置执行环境  
        final StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();  
  
        // Kafka消费者属性  
        Properties props = new Properties();  
        props.put("bootstrap.servers", "localhost:9092");  
        props.put("group.id", "test-group");  
        props.put("enable.auto.commit", "true");  
        props.put("auto.commit.interval.ms", "1000");  
        props.put("key.deserializer", "org.apache.kafka.common.serialization.StringDeserializer");  
        props.put("value.deserializer", "org.apache.kafka.common.serialization.StringDeserializer");  
  
        // 创建Kafka消费者  
        FlinkKafkaConsumer<String> myConsumer = new FlinkKafkaConsumer<>(  
            "input-topic", // Kafka topic  
            new SimpleStringSchema(), // 反序列化器  
            props);  
  
        // 添加数据源  
        DataStream<String> stream = env.addSource(myConsumer);  
  
        // 数据处理  
        stream.map(new MapFunction<String, String>() {  
            @Override  
            public String map(String value) throws Exception {  
                return "Received: " + value;  
            }  
        }).print();  
  
        // 执行流程序  
        env.execute("Flink Kafka Consumer Example");  
    }  
  
    // 简单的字符串反序列化器  
    public static final class SimpleStringSchema implements DeserializationSchema<String> {  
  
        @Override  
        public String deserialize(byte[] message) throws IOException {  
            return new String(message, "UTF-8");  
        }  
  
        @Override  
        public boolean isEndOfStream(String nextElement) {  
            return false;  
        }  
  
        @Override  
        public TypeInformation<String> getProducedType() {  
            return BasicTypeInfo.STRING_TYPE_INFO;  
        }  
    }  
}

4.执行程序

  1. 确保Kafka正在运行,并且有一个名为input-topic的topic(如果没有,你需要先创建它)。
  2. 编译并运行你的Maven项目

网站公告

今日签到

点亮在社区的每一天
去签到