Flink-05学习 接上节,将FlinkJedisPoolConfig 从Kafka写入Redis

发布于:2025-07-05 ⋅ 阅读:(15) ⋅ 点赞:(0)

上节成功实现了FlinkKafkaConsumer消费Kafka数据,并将数据写入到控制台,接下来将继续将计算的结果输入到redis中。

pom.xml

引入redis到pom包

<?xml version="1.0" encoding="UTF-8"?>
<project xmlns="http://maven.apache.org/POM/4.0.0"
         xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
         xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd">
    <modelVersion>4.0.0</modelVersion>

    <groupId>org.example</groupId>
    <artifactId>flink-demo</artifactId>
    <version>1.0-SNAPSHOT</version>

    <properties>
        <maven.compiler.source>8</maven.compiler.source>
        <maven.compiler.target>8</maven.compiler.target>
        <project.build.sourceEncoding>UTF-8</project.build.sourceEncoding>
    </properties>

    <dependencies>
        <!-- https://mvnrepository.com/artifact/org.apache.flink/flink-java -->
        <dependency>
            <groupId>org.apache.flink</groupId>
            <artifactId>flink-java</artifactId>
            <version>1.17.1</version>
        </dependency>
        <!-- https://mvnrepository.com/artifact/org.apache.flink/flink-streaming-java -->
        <dependency>
            <groupId>org.apache.flink</groupId>
            <artifactId>flink-streaming-java</artifactId>
            <version>1.17.1</version>
        </dependency>
        <!-- https://mvnrepository.com/artifact/org.apache.flink/flink-clients -->
        <dependency>
            <groupId>org.apache.flink</groupId>
            <artifactId>flink-clients</artifactId>
            <version>1.17.1</version>
        </dependency>


        <!-- https://mvnrepository.com/artifact/org.apache.flink/flink-table-api-java-bridge -->
        <dependency>
            <groupId>org.apache.flink</groupId>
            <artifactId>flink-table-api-java-bridge</artifactId>
            <version>1.17.1</version>
        </dependency>
        <!-- https://mvnrepository.com/artifact/org.apache.flink/flink-table-api-java -->
        <dependency>
            <groupId>org.apache.flink</groupId>
            <artifactId>flink-table-api-java</artifactId>
            <version>1.17.1</version>
        </dependency>
        <dependency>
            <groupId>org.apache.flink</groupId>
            <artifactId>flink-connector-base</artifactId>
            <version>1.17.1</version>
        </dependency>
        <dependency>
            <groupId>org.apache.maven</groupId>
            <artifactId>maven-plugin-api</artifactId>
            <version>2.0</version>
        </dependency>
        <dependency>
            <groupId>org.apache.maven.plugin-tools</groupId>
            <artifactId>maven-plugin-annotations</artifactId>
            <version>3.2</version>
        </dependency>
        <dependency>
            <groupId>org.codehaus.plexus</groupId>
            <artifactId>plexus-utils</artifactId>
            <version>3.0.8</version>
        </dependency>
        <dependency>
            <groupId>junit</groupId>
            <artifactId>junit</artifactId>
            <version>4.8.2</version>
            <scope>test</scope>
        </dependency>
        <!--mybatis坐标-->
        <dependency>
            <groupId>org.mybatis</groupId>
            <artifactId>mybatis</artifactId>
            <version>3.4.5</version>
        </dependency>
        <!--mysql驱动坐标-->
        <dependency>
            <groupId>mysql</groupId>
            <artifactId>mysql-connector-java</artifactId>
            <version>5.1.6</version>
            <scope>runtime</scope>
        </dependency>
        <dependency>
            <groupId>org.apache.flink</groupId>
            <artifactId>flink-connector-kafka</artifactId>
            <version>1.17.1</version>
        </dependency>
        <dependency>
            <groupId>org.apache.kafka</groupId>
            <artifactId>kafka-clients</artifactId>
            <version>2.7.0</version>
        </dependency>
        <dependency>
            <groupId>org.apache.flink</groupId>
            <artifactId>flink-connector-redis_2.11</artifactId>
            <version>1.1.0</version>
        </dependency>
    </dependencies>

    <build>
    <plugins>
        <plugin>
            <groupId>org.apache.maven.plugins</groupId>
            <artifactId>maven-plugin-plugin</artifactId>
            <version>3.2</version>
            <executions>
                <execution>
                    <phase>package</phase>
                    <configuration>
                        <filters>
                            <filter>
                                <artifact>*:*</artifact>
                                <excludes>
                                    <exclude>META-INF/*.SF</exclude>
                                    <exclude>META-INF/*.DSA</exclude>
                                    <exclude>META-INF/*.RSA</exclude>
                                </excludes>
                            </filter>
                        </filters>
                    </configuration>
                </execution>
            </executions>
        </plugin>
        <plugin>
            <groupId>org.apache.maven.plugins</groupId>
            <artifactId>maven-compiler-plugin</artifactId>
            <configuration>
                <source>8</source>
                <target>8</target>
            </configuration>
        </plugin>
    </plugins>
</build>
</project>

KafkaProducer.java 生产数据存入Kafka

同上一节,具体代码

package org.example.snow.demo5;

import org.apache.kafka.clients.producer.KafkaProducer;
import org.apache.kafka.clients.producer.ProducerRecord;

import java.util.Properties;

/**
 * @author snowsong
 */
public class KafkaTestProducer {
    public static void main(String[] args) throws InterruptedException {
        Properties props = new Properties();
        // Kafka 集群的初始连接地址
        props.put("bootstrap.servers", "172.16.1.173:9092");
        // 序列化器 将 Java 对象序列化为字节数组
        props.put("key.serializer", "org.apache.kafka.common.serialization.StringSerializer");
        props.put("value.serializer", "org.apache.kafka.common.serialization.StringSerializer");
        // kafka生产者
        KafkaProducer<String, String> producer = new KafkaProducer<>(props);
        // 消息循环
        for (int i = 0; i < 50; i++) {
            String key = "key-" + i;
            String value = "value-" + i;
            ProducerRecord<String, String> record = new ProducerRecord<>("xue", key, value);
            producer.send(record);
            System.out.println("send: " + key);
            Thread.sleep(200);
        }
        // 关闭生产者
        producer.close();
    }
}

启动服务类

Flink消费Kafka,并将结果存入redis。
设置FlinkRedisConfig

   // 配置 Redis 连接池,设置 Redis 服务器地址和端口并构建对象
        FlinkJedisPoolConfig conf = new FlinkJedisPoolConfig
                .Builder()
                .setHost(REDIS_SERVER)
                .setPort(REDIS_PORT)
                .build();
        // 创建 RedisSink 对象,用于将数据写入 Redis
        RedisSink<Tuple2<String, String>> redisSink = new RedisSink<>(conf, new MyRedisMapper());
        // 将 RedisSink 添加到数据流中,作为数据的接收端
        wordData.addSink(redisSink);

MyRedisMapper
它实现了 RedisMapper 接口,用于自定义 Redis 数据的映射规则。MyRedisMapper 类用于将 Flink 数据流中的 Tuple2 对象映射到 Redis 命令中。

public static class MyRedisMapper implements RedisMapper<Tuple2<String,String>> {

        /**
         * 获取当前命令的描述信息。
         *
         * @return 返回Redis命令的描述信息对象,其中包含了命令的类型为LPUSH。
         */
        @Override
        public RedisCommandDescription getCommandDescription() {
            return new RedisCommandDescription(RedisCommand.LPUSH);
        }

        /**
         * 从给定的Tuple2数据中获取键。
         *
         * @param data 一个包含两个字符串元素的Tuple2对象
         * @return 返回Tuple2对象的第一个元素,即键
         */
        @Override
        public String getKeyFromData(Tuple2<String,String> data) {
            return data.f0;
        }

        /**
         * 从给定的元组中获取第二个元素的值。
         *
         * @param data 一个包含两个字符串元素的元组
         * @return 元组中的第二个元素的值
         */
        @Override
        public String getValueFromData(Tuple2<String,String> data) {
            return data.f1;
        }

starApp的完整代码如下:

package org.example.snow.demo5;

import org.apache.flink.api.common.functions.MapFunction;
import org.apache.flink.api.common.serialization.SimpleStringSchema;
import org.apache.flink.api.java.tuple.Tuple2;
import org.apache.flink.streaming.api.datastream.DataStreamSource;
import org.apache.flink.streaming.api.datastream.SingleOutputStreamOperator;
import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
import org.apache.flink.streaming.connectors.kafka.FlinkKafkaConsumer;
import org.apache.flink.streaming.connectors.redis.RedisSink;
import org.apache.flink.streaming.connectors.redis.common.config.FlinkJedisPoolConfig;
import org.apache.flink.streaming.connectors.redis.common.mapper.RedisCommand;
import org.apache.flink.streaming.connectors.redis.common.mapper.RedisCommandDescription;
import org.apache.flink.streaming.connectors.redis.common.mapper.RedisMapper;

import java.util.Properties;

/**
 * @author snowsong
 */
public class StartApp {
    private static final String REDIS_SERVER = "0.0.0.0";

    private static final Integer REDIS_PORT = 6379;

    public static void main(String[] args) throws Exception {
        // 初始化
        StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
        // 配置 Kafka 客户端的连接参数
        Properties properties = new Properties();
        properties.setProperty("bootstrap.servers", "172.16.1.173:9092");
        FlinkKafkaConsumer flinkKafkaConsumer = new FlinkKafkaConsumer<>("xue",
                new SimpleStringSchema(), properties);
        DataStreamSource dataStreamSource = env.addSource(flinkKafkaConsumer);
        // 将接收的数据映射为二元组
        SingleOutputStreamOperator<Tuple2<String, String>> wordData = dataStreamSource.map(new MapFunction<String, Tuple2<String, String>>() {
            /**
             * 将输入的字符串映射为 Tuple2 对象。
             *
             * @param value 输入的字符串
             * @return 一个包含两个元素的 Tuple2 对象,第一个元素为 "l_words",第二个元素为输入的字符串
             * @throws Exception 如果发生异常,则抛出该异常
             */
            @Override
            public Tuple2<String, String> map(String value) throws Exception {
                return new Tuple2<>("l_words", value);
            }
        });
        // 配置 Redis 连接池,设置 Redis 服务器地址和端口并构建对象
        FlinkJedisPoolConfig conf = new FlinkJedisPoolConfig
                .Builder()
                .setHost(REDIS_SERVER)
                .setPort(REDIS_PORT)
                .build();
        // 创建 RedisSink 对象,用于将数据写入 Redis
        RedisSink<Tuple2<String, String>> redisSink = new RedisSink<>(conf, new MyRedisMapper());
        // 将 RedisSink 添加到数据流中,作为数据的接收端
        wordData.addSink(redisSink);
        env.execute();
    }

    /**
     * MyRedisMapper 类用于将 Flink 数据流中的 Tuple2 对象映射到 Redis 命令中。
     * 它实现了 RedisMapper 接口,用于自定义 Redis 数据的映射规则。
     */
    public static class MyRedisMapper implements RedisMapper<Tuple2<String,String>> {

        /**
         * 获取当前命令的描述信息。
         *
         * @return 返回Redis命令的描述信息对象,其中包含了命令的类型为LPUSH。
         */
        @Override
        public RedisCommandDescription getCommandDescription() {
            return new RedisCommandDescription(RedisCommand.LPUSH);
        }

        /**
         * 从给定的Tuple2数据中获取键。
         *
         * @param data 一个包含两个字符串元素的Tuple2对象
         * @return 返回Tuple2对象的第一个元素,即键
         */
        @Override
        public String getKeyFromData(Tuple2<String,String> data) {
            return data.f0;
        }

        /**
         * 从给定的元组中获取第二个元素的值。
         *
         * @param data 一个包含两个字符串元素的元组
         * @return 元组中的第二个元素的值
         */
        @Override
        public String getValueFromData(Tuple2<String,String> data) {
            return data.f1;
        }
    }

}