背景:
最近公司出现做了一个新需求,需求内容是加工一个营销时机,但是加工营销时机的同时需要把数据内容里的一个idmapping存入redis用于后续的读写。
准备:
<!-- 依赖 -->
<dependency>
<groupId>org.apache.flink</groupId>
<artifactId>flink-connector-kafka</artifactId>
<version>3.2.0-1.19</version>
</dependency>
<dependency>
<groupId>org.apache.flink</groupId>
<artifactId>flink-connector-base</artifactId>
<version>1.19.1</version>
</dependency>
<dependency>
<groupId>org.apache.flink</groupId>
<artifactId>flink-connector-redis_2.11</artifactId>
<version>1.1.5</version>
</dependency>
</dependencies>
代码:
package com.iterge.flink;
import lombok.extern.slf4j.Slf4j;
import org.apache.flink.api.common.eventtime.WatermarkStrategy;
import org.apache.flink.api.common.functions.RichMapFunction;
import org.apache.flink.api.common.serialization.SimpleStringSchema;
import org.apache.flink.connector.kafka.source.KafkaSource;
import org.apache.flink.connector.kafka.source.enumerator.initializer.OffsetsInitializer;
import org.apache.flink.streaming.api.datastream.DataStreamSource;
import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
import org.apache.flink.streaming.connectors.redis.common.config.FlinkJedisPoolConfig;
import redis.clients.jedis.Jedis;
import redis.clients.jedis.JedisPool;
/**
* Hello world!
*
*/
@Slf4j
public class FlinkDemo {
//创建连接池
static final JedisPool pool = new JedisPool("127.0.0.0",8423);
//创建redis客户端
static final Jedis jedis = pool.getResource();
public static void main( String[] args ) throws Exception {
final StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
//DataStreamSource<String> stringDataStreamSource = env.fromData(Arrays.asList("1", "2", "3"));
KafkaSource<String> source = KafkaSource.<String>builder()
.setBootstrapServers("localhost:9092")
.setTopics("it.erge.test.topic")
.setGroupId("it.erge.test.topic.1")
.setStartingOffsets(OffsetsInitializer.earliest())
.setValueOnlyDeserializer(new SimpleStringSchema())
.build();
DataStreamSource<String> stringDataStreamSource = env.fromSource(source, WatermarkStrategy.noWatermarks(), "Kafka Source");
stringDataStreamSource.map(new RichMapFunction<String, String>() {
@Override
public String map(String s) throws Exception {
//读redis
System.out.println("test="+jedis.get("test"));
//写redis
jedis.setex("test",60,s);
return s;
}
@Override
public void close() throws Exception {
super.close();
jedis.close();
}
});
stringDataStreamSource.print();
env.execute("test");
}
}