Flink流处理:多源传感器数据实时处理,基于Scala使用Flink从不同数据源(集合、文件、Kafka、自定义 Source)读取传感器数据

发布于:2025-05-15 ⋅ 阅读:(7) ⋅ 点赞:(0)

package api

import java.util.Properties

import org.apache.flink.api.common.functions.FilterFunction
import org.apache.flink.api.common.serialization.SimpleStringSchema
import org.apache.flink.streaming.api.functions.source.SourceFunction
import org.apache.flink.streaming.api.scala._
import org.apache.flink.streaming.connectors.kafka.FlinkKafkaConsumer011

import scala.util.Random

//定义样例类(传感器)
case class SensorReading(id:String,timestamp:Long,temperature:Double)
object Source {
  def main(args: Array[String]): Unit = {
    //创建执行环境
    val env=StreamExecutionEnvironment.getExecutionEnvironment

    //1、从集合读取
    val dataList=List(
      SensorReading("sensor_1", 1547718199, 35.8),
      SensorReading("sensor_6", 1547718201, 15.4),
      SensorReading("sensor_7", 1547718202, 6.7),
      SensorReading("sensor_10", 1547718205, 38.1)
    )
    val stream1=env.fromCollection(dataList)
    //env.fromElements():随便数据类型
    stream1.print()

    //2、从文件读取
    val inputPath="H:\\Scala程序\\Flink\\src\\main\\resources\\source.txt"
    val stream2=env.readTextFile(inputPath)
    stream2.print()

    //3、从kafka读取
    val properties=new Properties()
    properties.setProperty("bootstrap.servers","hadoop101:9092")
    properties.setProperty("group.id","consumer-group")
    val stream3=env.addSource(new FlinkKafkaConsumer011[String]("sensor",new SimpleStringSchema(),properties))
    stream3.print()

    //4、自定义Source
    val stream4=env.addSource( new MySensorSource())
    stream4.print()
    //执行
    env.execute("source")
  }

}
//自定义MySensor
class MySensorSource() extends SourceFunction[SensorReading]{
  //定义一个标志位flag,用来表示数据源是否正常运行发出数据
  var running=true
  override def run(sourceContext: SourceFunction.SourceContext[SensorReading]): Unit = {
    //定义一个随机数发生器
    val rand=new Random()
    //随机生成一组传感器的初始温度(id,temp)
    var curTemp=1.to(10).map(i=>("sensor_"+i,rand.nextDouble()*100))
    //定义无限循环,不停产生数据,除非被cancel
    while (running){
      //在上次温度基础上微调,更新温度
      curTemp=curTemp.map(
        data=>(data._1,data._2+rand.nextGaussian())
      )
      //获取当前时间戳,加入到数据中,调用collect发出数据
      val curTime=System.currentTimeMillis()
      curTemp.foreach(
        data=> sourceContext.collect(SensorReading(data._1,curTime,data._2))
      )
      //间隔100ms
      Thread.sleep(100)
    }
  }

  override def cancel(): Unit = running=false
}
//自定義一個函數類
class MyFilter extends FilterFunction[SensorReading] {
  override def filter(t: SensorReading): Boolean = {
    t.id.startsWith("sensor_1")
  }
}

这段代码是一个基于 Apache Flink 的流处理程序,主要功能是从不同数据源(集合、文件、Kafka、自定义 Source)读取传感器数据,并进行简单的处理和输出。以下是代码的总结和原理拓展:


1. 代码结构总结

主要功能模块
  1. 数据源读取
    • 从集合、文件、Kafka 和自定义 Source 读取数据。
  2. 数据处理
    • 使用 print() 方法将数据输出到控制台。
  3. 自定义 Source
    • 实现了一个自定义的 SourceFunction,模拟生成传感器数据。
  4. 自定义 Filter
    • 实现了一个自定义的 FilterFunction,用于过滤传感器数据。
核心类与方法
  • SensorReading
    • 样例类,用于表示传感器的数据(ID、时间戳、温度)。
  • StreamExecutionEnvironment
    • Flink 流处理程序的执行环境。
  • SourceFunction
    • 自定义数据源的基类,需要实现 run() 和 cancel() 方法。
  • FilterFunction
    • 自定义过滤函数的基类,需要实现 filter() 方法。

2. 代码原理拓展

Flink 流处理的核心概念
  1. 数据源(Source)

    • Flink 支持从多种数据源读取数据,如集合、文件、Kafka、Socket 等。
    • 自定义数据源需要实现 SourceFunction 接口,并在 run() 方法中定义数据生成逻辑。
  2. 数据流(DataStream)

    • Flink 中的数据流是一个无界的数据集合,数据以事件流的形式被处理。
    • 数据流可以通过 map()filter()keyBy() 等操作进行转换。
  3. 执行环境(ExecutionEnvironment)

    • StreamExecutionEnvironment 是 Flink 流处理程序的入口,用于设置程序的执行参数(如并行度、状态后端等)。
  4. 数据输出(Sink)

    • Flink 支持将数据输出到多种目标,如控制台、文件、Kafka、数据库等。
    • 代码中使用 print() 方法将数据输出到控制台。

自定义 Source 的原理
  1. run() 方法

    • 在 run() 方法中定义数据生成逻辑,通过 sourceContext.collect() 将数据发送到下游。
    • 代码中使用随机数生成器模拟传感器温度的变化。
  2. cancel() 方法

    • 用于停止数据源的运行,通常通过设置标志位(如 running=false)来实现。
  3. 数据生成逻辑

    • 代码中通过 Thread.sleep(100) 控制数据生成的频率,模拟实时数据流。

自定义 Filter 的原理
  1. filter() 方法
    • 在 filter() 方法中定义过滤逻辑,返回 true 表示保留数据,返回 false 表示丢弃数据。
    • 代码中过滤出 ID 以 "sensor_1" 开头的传感器数据。

3. 代码优化与扩展

优化建议
  1. 异常处理

    • 在读取文件或 Kafka 数据时,增加异常处理逻辑,避免程序因异常而崩溃。
  2. 配置管理

    • 将 Kafka 的配置(如 bootstrap.serversgroup.id)提取到外部配置文件(如 application.conf),便于维护。
  3. 并行度设置

    • 根据数据量和硬件资源,设置合适的并行度,以提高程序性能。
功能扩展
  1. 数据写入到外部存储

    • 将处理后的数据写入到数据库(如 MySQL、HBase)或文件系统(如 HDFS)。
  2. 复杂事件处理

    • 使用 Flink 的 CEP(Complex Event Processing)库,实现复杂事件模式匹配。
  3. 状态管理与窗口计算

    • 使用 Flink 的状态管理和窗口计算功能,实现滑动窗口、会话窗口等复杂计算。

4. 示例代码扩展

以下是一个扩展示例,展示如何将数据写入到文件系统:

scala

import org.apache.flink.streaming.api.scala._
import org.apache.flink.streaming.connectors.kafka.FlinkKafkaConsumer011
import org.apache.flink.streaming.connectors.fs.StringWriter
import org.apache.flink.streaming.connectors.fs.bucketing.BucketingSink
import java.util.Properties

object ExtendedSource {
  def main(args: Array[String]): Unit = {
    val env = StreamExecutionEnvironment.getExecutionEnvironment

    // 从 Kafka 读取数据
    val properties = new Properties()
    properties.setProperty("bootstrap.servers", "hadoop101:9092")
    properties.setProperty("group.id", "consumer-group")
    val stream = env.addSource(new FlinkKafkaConsumer011[String]("sensor", new SimpleStringSchema(), properties))

    // 将数据写入到文件系统
    val sink = new BucketingSink[String]("hdfs://hadoop101:9000/flink/output")
    sink.setWriter(new StringWriter())
    sink.setBatchSize(1024 * 1024 * 100) // 每 100MB 生成一个文件
    stream.addSink(sink)

    env.execute("ExtendedSource")
  }
}

5. 总结

  1. 核心功能
    • 从多种数据源读取数据,并进行简单的处理和输出。
  2. 自定义 Source 和 Filter
    • 通过实现 SourceFunction 和 FilterFunction,可以灵活地生成和过滤数据。
  3. 优化与扩展
    • 通过异常处理、配置管理、并行度设置等功能优化程序,通过写入外部存储、复杂事件处理等功能扩展程序。

通过理解 Flink 的核心概念和代码原理,可以更好地开发和优化流处理程序。