【flink】之如何消费kafka数据并读写入redis?

发布于:2024-10-09 ⋅ 阅读:(36) ⋅ 点赞:(0)

背景: 

最近公司出现做了一个新需求,需求内容是加工一个营销时机,但是加工营销时机的同时需要把数据内容里的一个idmapping存入redis用于后续的读写。

准备: 

    <!-- 依赖 -->
    <dependency>
      <groupId>org.apache.flink</groupId>
      <artifactId>flink-connector-kafka</artifactId>
      <version>3.2.0-1.19</version>
    </dependency>
    <dependency>
      <groupId>org.apache.flink</groupId>
      <artifactId>flink-connector-base</artifactId>
      <version>1.19.1</version>
    </dependency>
    <dependency>
      <groupId>org.apache.flink</groupId>
      <artifactId>flink-connector-redis_2.11</artifactId>
      <version>1.1.5</version>
    </dependency>
  </dependencies>

代码:

package com.iterge.flink;

import lombok.extern.slf4j.Slf4j;
import org.apache.flink.api.common.eventtime.WatermarkStrategy;
import org.apache.flink.api.common.functions.RichMapFunction;
import org.apache.flink.api.common.serialization.SimpleStringSchema;
import org.apache.flink.connector.kafka.source.KafkaSource;
import org.apache.flink.connector.kafka.source.enumerator.initializer.OffsetsInitializer;
import org.apache.flink.streaming.api.datastream.DataStreamSource;
import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
import org.apache.flink.streaming.connectors.redis.common.config.FlinkJedisPoolConfig;
import redis.clients.jedis.Jedis;
import redis.clients.jedis.JedisPool;

/**
 * Hello world!
 *
 */
@Slf4j
public class FlinkDemo {
    //创建连接池
    static final JedisPool pool = new JedisPool("127.0.0.0",8423);
    //创建redis客户端
    static final Jedis jedis = pool.getResource();

    public static void main( String[] args ) throws Exception {
        final StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
        //DataStreamSource<String> stringDataStreamSource = env.fromData(Arrays.asList("1", "2", "3"));
        KafkaSource<String> source = KafkaSource.<String>builder()
                .setBootstrapServers("localhost:9092")
                .setTopics("it.erge.test.topic")
                .setGroupId("it.erge.test.topic.1")
                .setStartingOffsets(OffsetsInitializer.earliest())
                .setValueOnlyDeserializer(new SimpleStringSchema())
                .build();
        DataStreamSource<String> stringDataStreamSource = env.fromSource(source, WatermarkStrategy.noWatermarks(), "Kafka Source");
        stringDataStreamSource.map(new RichMapFunction<String, String>() {
            @Override
            public String map(String s) throws Exception {
                //读redis
                System.out.println("test="+jedis.get("test"));
                //写redis
                jedis.setex("test",60,s);
                return s;
            }
            @Override
            public void close() throws Exception {
                super.close();
                jedis.close();
            }
        });
        stringDataStreamSource.print();
        env.execute("test");
    }
}