目录
一 Flink核心编程概述
从开发步骤的角度来讲,主要分为四大部分 :
二 Environment环境
没说的,两行代码解决全部
批:
ExecutionEnvironment benv = ExecutionEnvironment.getExecutionEnvironment();
流:
StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
三 Source
(一) 准备
为了更好的方便敲代码,一个导包,一个JavaBean类
<!-- https://mvnrepository.com/artifact/org.projectlombok/lombok -->
<dependency>
<groupId>org.projectlombok</groupId>
<artifactId>lombok</artifactId>
<version>1.18.16</version>
</dependency>
import lombok.AllArgsConstructor;
import lombok.Data;
import lombok.NoArgsConstructor;
/**
* 水位传感器:用于接收水位数据
*
* id:传感器编号
* ts:时间戳
* vc:水位
*/
@Data
@NoArgsConstructor
@AllArgsConstructor
public class WaterSensor {
private String id;
private Long ts;
private Integer vc;
}
(二) 从Java中读取数据
public class SourceJava {
public static void main(String[] args) throws Exception {
//1.获取流的执行环境
StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
env.setParallelism(1);
//2.TODO 从集合中读取数据
List<Integer> list = Arrays.asList(1, 2, 3, 4, 5);
DataStreamSource<Integer> streamSource = env.fromCollection(list);
//从元素中读取数据
DataStreamSource<String> dataStreamSource = env.fromElements("a", "b", "c", "d");
// streamSource.print();
dataStreamSource.print();
env.execute();
}
}
(三) 从文件中读取数据
public class Source_File {
public static void main(String[] args) throws Exception {
//1.获取流的执行环境
StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
env.setParallelism(1);
//2.TODO 从文件读取数据
//.setParallelism(2)-即并行度可不写;"input/niaiwowoaini.txt"是文件的所在
DataStreamSource<String> streamSource = env.readTextFile("input/niaiwowoaini.txt").setParallelism(2);
streamSource.print();
env.execute();
}
}
注意事项
参数可以是文件也可以是目录,可以是相对路径也可以是绝对路径,甚至也可以是hdfs的文件.
相对路径---系统属性:user.dir获取路径 / idea:从project的根目录获取 / standalone模式下是集群节点根目录
从hdfs上读取:使用路径:hdfs://hadoop102:8020/...., 由于Flink没有提供hadoop相关依赖, 需要pom中添加相关依赖:
<dependency>
<groupId>org.apache.hadoop</groupId>
<artifactId>hadoop-client</artifactId>
<version>3.1.3</version>
</dependency>
(四) 从Socket读取数据
DataStreamSource<String> streamSource = env.socketTextStream("hadoop102", 9999);
(五) 从Kafka读取数据
添加相关依赖
<dependency>
<groupId>org.apache.flink</groupId>
<artifactId>flink-connector-kafka_2.12</artifactId>
<version>1.13.0</version>
</dependency>
此处kafka读取数据是有两种写法
第一种(较为喜欢):
import org.apache.flink.api.common.eventtime.WatermarkStrategy;
import org.apache.flink.api.common.serialization.SimpleStringSchema;
import org.apache.flink.connector.kafka.source.KafkaSource;
import org.apache.flink.connector.kafka.source.enumerator.initializer.OffsetsInitializer;
import org.apache.flink.streaming.api.datastream.DataStreamSource;
import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
import org.apache.flink.streaming.connectors.kafka.FlinkKafkaConsumer;
import org.apache.kafka.clients.consumer.ConsumerConfig;
import java.util.Properties;
public class Flink04_Source_Kafka {
public static void main(String[] args) throws Exception {
StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
env.setParallelism(1);
Properties properties = new Properties();
properties.setProperty(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG,"hadoop102:9092");
properties.setProperty(ConsumerConfig.GROUP_ID_CONFIG,"520520");
properties.setProperty(ConsumerConfig.AUTO_OFFSET_RESET_CONFIG,"latest");
DataStreamSource<String> streamSource = env
.addSource(new FlinkKafkaConsumer<String>("sensor", new SimpleStringSchema(), properties))
.setParallelism(3);
streamSource.print();
env.execute();
}
}
第二种:
public class Source_Kafka {
public static void main(String[] args) throws Exception {
StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
env.setParallelism(1);
KafkaSource<String> kafkaSource = KafkaSource.<String>builder()
.setBootstrapServers("hadoop102:9092")
.setGroupId("520")
.setTopics("sensor")
.setStartingOffsets(OffsetsInitializer.latest())
.setValueOnlyDeserializer(new SimpleStringSchema())
.build();
DataStreamSource<String> streamSource = env
.fromSource(kafkaSource, WatermarkStrategy.noWatermarks(), "source-kafka");
streamSource.print();
env.execute();
}
}
(六) 自定义Source
关键点:
1 实现SourceFunction相关接口,如果希望 Source可以指定并行度,那么就 实现 ParallelSourceFunction 这个接口
2 重写相关方法:run(): 主要逻辑 和 cancel(): 停止逻辑
import com.atguigu.bean.WaterSensor;
import org.apache.flink.streaming.api.datastream.DataStreamSource;
import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
import org.apache.flink.streaming.api.functions.source.ParallelSourceFunction;
import org.apache.flink.streaming.api.functions.source.SourceFunction;
import java.util.Random;
public class Source_Custom {
public static void main(String[] args) throws Exception {
StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
env.setParallelism(1);
DataStreamSource<WaterSensor> streamSource = env.addSource(new MySource());
streamSource.print();
env.execute();
}
//public static class MySource implements SourceFunction<WaterSensor>{
public static class MySource implements ParallelSourceFunction<WaterSensor> {
private Random random = new Random();
private Boolean isRunning = true;
@Override
public void run(SourceContext<WaterSensor> ctx) throws Exception {
while (isRunning){
ctx.collect(new WaterSensor("sensor"+random.nextInt(1000),System.currentTimeMillis(),random.nextInt(100)));
}
Thread.sleep(200);
}
@Override
public void cancel() {
isRunning= false;
}
}
}