package api
import java.util.Properties
import org.apache.flink.api.common.functions.FilterFunction
import org.apache.flink.api.common.serialization.SimpleStringSchema
import org.apache.flink.streaming.api.functions.source.SourceFunction
import org.apache.flink.streaming.api.scala._
import org.apache.flink.streaming.connectors.kafka.FlinkKafkaConsumer011
import scala.util.Random
//定义样例类(传感器)
case class SensorReading(id:String,timestamp:Long,temperature:Double)
object Source {
def main(args: Array[String]): Unit = {
//创建执行环境
val env=StreamExecutionEnvironment.getExecutionEnvironment
//1、从集合读取
val dataList=List(
SensorReading("sensor_1", 1547718199, 35.8),
SensorReading("sensor_6", 1547718201, 15.4),
SensorReading("sensor_7", 1547718202, 6.7),
SensorReading("sensor_10", 1547718205, 38.1)
)
val stream1=env.fromCollection(dataList)
//env.fromElements():随便数据类型
stream1.print()
//2、从文件读取
val inputPath="H:\\Scala程序\\Flink\\src\\main\\resources\\source.txt"
val stream2=env.readTextFile(inputPath)
stream2.print()
//3、从kafka读取
val properties=new Properties()
properties.setProperty("bootstrap.servers","hadoop101:9092")
properties.setProperty("group.id","consumer-group")
val stream3=env.addSource(new FlinkKafkaConsumer011[String]("sensor",new SimpleStringSchema(),properties))
stream3.print()
//4、自定义Source
val stream4=env.addSource( new MySensorSource())
stream4.print()
//执行
env.execute("source")
}
}
//自定义MySensor
class MySensorSource() extends SourceFunction[SensorReading]{
//定义一个标志位flag,用来表示数据源是否正常运行发出数据
var running=true
override def run(sourceContext: SourceFunction.SourceContext[SensorReading]): Unit = {
//定义一个随机数发生器
val rand=new Random()
//随机生成一组传感器的初始温度(id,temp)
var curTemp=1.to(10).map(i=>("sensor_"+i,rand.nextDouble()*100))
//定义无限循环,不停产生数据,除非被cancel
while (running){
//在上次温度基础上微调,更新温度
curTemp=curTemp.map(
data=>(data._1,data._2+rand.nextGaussian())
)
//获取当前时间戳,加入到数据中,调用collect发出数据
val curTime=System.currentTimeMillis()
curTemp.foreach(
data=> sourceContext.collect(SensorReading(data._1,curTime,data._2))
)
//间隔100ms
Thread.sleep(100)
}
}
override def cancel(): Unit = running=false
}
//自定義一個函數類
class MyFilter extends FilterFunction[SensorReading] {
override def filter(t: SensorReading): Boolean = {
t.id.startsWith("sensor_1")
}
}
这段代码是一个基于 Apache Flink 的流处理程序,主要功能是从不同数据源(集合、文件、Kafka、自定义 Source)读取传感器数据,并进行简单的处理和输出。以下是代码的总结和原理拓展:
1. 代码结构总结
主要功能模块
- 数据源读取:
- 从集合、文件、Kafka 和自定义 Source 读取数据。
- 数据处理:
- 使用
print()
方法将数据输出到控制台。
- 使用
- 自定义 Source:
- 实现了一个自定义的
SourceFunction
,模拟生成传感器数据。
- 实现了一个自定义的
- 自定义 Filter:
- 实现了一个自定义的
FilterFunction
,用于过滤传感器数据。
- 实现了一个自定义的
核心类与方法
SensorReading
:- 样例类,用于表示传感器的数据(ID、时间戳、温度)。
StreamExecutionEnvironment
:- Flink 流处理程序的执行环境。
SourceFunction
:- 自定义数据源的基类,需要实现
run()
和cancel()
方法。
- 自定义数据源的基类,需要实现
FilterFunction
:- 自定义过滤函数的基类,需要实现
filter()
方法。
- 自定义过滤函数的基类,需要实现
2. 代码原理拓展
Flink 流处理的核心概念
数据源(Source):
- Flink 支持从多种数据源读取数据,如集合、文件、Kafka、Socket 等。
- 自定义数据源需要实现
SourceFunction
接口,并在run()
方法中定义数据生成逻辑。
数据流(DataStream):
- Flink 中的数据流是一个无界的数据集合,数据以事件流的形式被处理。
- 数据流可以通过
map()
、filter()
、keyBy()
等操作进行转换。
执行环境(ExecutionEnvironment):
StreamExecutionEnvironment
是 Flink 流处理程序的入口,用于设置程序的执行参数(如并行度、状态后端等)。
数据输出(Sink):
- Flink 支持将数据输出到多种目标,如控制台、文件、Kafka、数据库等。
- 代码中使用
print()
方法将数据输出到控制台。
自定义 Source 的原理
run()
方法:- 在
run()
方法中定义数据生成逻辑,通过sourceContext.collect()
将数据发送到下游。 - 代码中使用随机数生成器模拟传感器温度的变化。
- 在
cancel()
方法:- 用于停止数据源的运行,通常通过设置标志位(如
running=false
)来实现。
- 用于停止数据源的运行,通常通过设置标志位(如
数据生成逻辑:
- 代码中通过
Thread.sleep(100)
控制数据生成的频率,模拟实时数据流。
- 代码中通过
自定义 Filter 的原理
filter()
方法:- 在
filter()
方法中定义过滤逻辑,返回true
表示保留数据,返回false
表示丢弃数据。 - 代码中过滤出 ID 以
"sensor_1"
开头的传感器数据。
- 在
3. 代码优化与扩展
优化建议
异常处理:
- 在读取文件或 Kafka 数据时,增加异常处理逻辑,避免程序因异常而崩溃。
配置管理:
- 将 Kafka 的配置(如
bootstrap.servers
、group.id
)提取到外部配置文件(如application.conf
),便于维护。
- 将 Kafka 的配置(如
并行度设置:
- 根据数据量和硬件资源,设置合适的并行度,以提高程序性能。
功能扩展
数据写入到外部存储:
- 将处理后的数据写入到数据库(如 MySQL、HBase)或文件系统(如 HDFS)。
复杂事件处理:
- 使用 Flink 的
CEP
(Complex Event Processing)库,实现复杂事件模式匹配。
- 使用 Flink 的
状态管理与窗口计算:
- 使用 Flink 的状态管理和窗口计算功能,实现滑动窗口、会话窗口等复杂计算。
4. 示例代码扩展
以下是一个扩展示例,展示如何将数据写入到文件系统:
scala
import org.apache.flink.streaming.api.scala._
import org.apache.flink.streaming.connectors.kafka.FlinkKafkaConsumer011
import org.apache.flink.streaming.connectors.fs.StringWriter
import org.apache.flink.streaming.connectors.fs.bucketing.BucketingSink
import java.util.Properties
object ExtendedSource {
def main(args: Array[String]): Unit = {
val env = StreamExecutionEnvironment.getExecutionEnvironment
// 从 Kafka 读取数据
val properties = new Properties()
properties.setProperty("bootstrap.servers", "hadoop101:9092")
properties.setProperty("group.id", "consumer-group")
val stream = env.addSource(new FlinkKafkaConsumer011[String]("sensor", new SimpleStringSchema(), properties))
// 将数据写入到文件系统
val sink = new BucketingSink[String]("hdfs://hadoop101:9000/flink/output")
sink.setWriter(new StringWriter())
sink.setBatchSize(1024 * 1024 * 100) // 每 100MB 生成一个文件
stream.addSink(sink)
env.execute("ExtendedSource")
}
}
5. 总结
- 核心功能:
- 从多种数据源读取数据,并进行简单的处理和输出。
- 自定义 Source 和 Filter:
- 通过实现
SourceFunction
和FilterFunction
,可以灵活地生成和过滤数据。
- 通过实现
- 优化与扩展:
- 通过异常处理、配置管理、并行度设置等功能优化程序,通过写入外部存储、复杂事件处理等功能扩展程序。
通过理解 Flink 的核心概念和代码原理,可以更好地开发和优化流处理程序。