自定义数据源 [了解]
SourceFunction:非并行数据源(并行度只能=1) --接口
RichSourceFunction:多功能非并行数据源(并行度只能=1) --类
ParallelSourceFunction:并行数据源(并行度能够>=1) --接口
RichParallelSourceFunction:多功能并行数据源(并行度能够>=1) --类 【建议使用的】
Rich 字样代表富有,在编程中,富有代表可以调用的方法很多,功能很全的意思。
通过ParallelSourceFunction创建可并行Source
/**
* 自定义多并行度Source
*/
public class CustomerSourceWithParallelDemo {
public static void main(String[] args) throws Exception {
StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
DataStreamSource<String> mySource = env.addSource(new MySource()).setParallelism(6);
mySource.print();
env.execute();
}
public static class MySource implements ParallelSourceFunction<String> {
@Override
public void run(SourceContext<String> ctx) throws Exception {
ctx.collect(UUID.randomUUID().toString());
/*
如果不设置无限循环可以看出,设置了多少并行度就打印出多少条数据
*/
}
@Override
public void cancel() {}
}
}
如果代码换成ParallelSourceFunction,每次生成12个数据,假如是12核数的话。
总结:Rich富函数总结 ctrl + o
Rich 类型的Source可以比非Rich的多出有:
- open方法,实例化的时候会执行一次,多个并行度会执行多次的哦(因为是多个实例了)
- close方法,销毁实例的时候会执行一次,多个并行度会执行多次的哦
- getRuntimeContext 方法可以获得当前的Runtime对象(底层API)
Rich模板
/**
* 自定义一个RichParallelSourceFunction的实现
*/
public class CustomerRichSourceWithParallelDemo {
public static void main(String[] args) throws Exception {
StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
DataStreamSource<String> mySource = env.addSource(new MySource()).setParallelism(6);
mySource.print();
env.execute();
}
/*
Rich 类型的Source可以比非Rich的多出有:
- open方法,实例化的时候会执行一次,多个并行度会执行多次的哦(因为是多个实例了)
- close方法,销毁实例的时候会执行一次,多个并行度会执行多次的哦
- getRuntime方法可以获得当前的Runtime对象(底层API)
*/
public static class MySource extends RichParallelSourceFunction<String> {
@Override
public void open(Configuration parameters) throws Exception {
super.open(parameters);
System.out.println("open......");
}
@Override
public void close() throws Exception {
super.close();
System.out.println("close......");
}
@Override
public void run(SourceContext<String> ctx) throws Exception {
ctx.collect(UUID.randomUUID().toString());
}
@Override
public void cancel() {}
}
}
Kafka Source [重要] --从kafka中读取数据
https://nightlies.apache.org/flink/flink-docs-release-1.13/zh/docs/connectors/datastream/kafka/
// 添加依赖
<dependency>
<groupId>org.apache.flink</groupId>
<artifactId>flink-connector-kafka_2.11</artifactId>
<version>${flink.version}</version>
</dependency>
创建一个topic1 这个主题:
cd /opt/installs/kafka3/
bin/kafka-topics.sh --bootstrap-server bigdata01:9092 --create --partitions 1 --replication-factor 3 --topic topic1
通过控制台向topic1发送消息:
bin/kafka-console-producer.sh --bootstrap-server bigdata01:9092 --topic topic1
package com.bigdata.day02;
import org.apache.flink.api.common.functions.FilterFunction;
import org.apache.flink.api.common.serialization.SimpleStringSchema;
import org.apache.flink.streaming.api.datastream.DataStreamSource;
import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
import org.apache.flink.streaming.connectors.kafka.FlinkKafkaConsumer;
import java.util.Properties;
public class KafkaSource {
public static void main(String[] args) throws Exception {
StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
Properties properties = new Properties();
properties.setProperty("bootstrap.servers", "bigdata01:9092");
properties.setProperty("group.id", "g1");
FlinkKafkaConsumer<String> kafkaSource = new FlinkKafkaConsumer<String>("topic1",new SimpleStringSchema(),properties);
DataStreamSource<String> dataStreamSource = env.addSource(kafkaSource);
// 以下代码跟flink消费kakfa数据没关系,仅仅是将需求搞的复杂一点而已
// 返回true 的数据就保留下来,返回false 直接丢弃
dataStreamSource.filter(new FilterFunction<String>() {
@Override
public boolean filter(String word) throws Exception {
// 查看单词中是否包含success 字样
return word.contains("success");
}
}).print();
env.execute();
}
}