基于 Flink 的车辆超速监测与数据存储的小实战

发布于:2024-11-28 ⋅ 阅读:(13) ⋅ 点赞:(0)

基于 Flink 的车辆超速监测与数据存储的小实战

一、实战背景与目标

在智能交通管理领域,实时监控车辆行驶速度并精准识别超速行为对于保障道路交通安全和维护交通秩序具有至关重要的意义。本项目旨在构建一个高效的数据处理系统,能够从 Kafka 的 topic-car 主题中读取卡口车辆数据,依据卡口对应的限速信息,精确判断车辆是否超速(当车速超过卡口限速的 1.2 倍时认定为超速),并将超速车辆的详细信息实时写入 MySQL 的 t_speeding_info 表中,以便后续交通管理部门进行统计分析与违规处理。

二、技术选型与环境搭建

  1. 技术选型
    • Apache Flink:作为核心的分布式流处理框架,具备强大的实时数据处理能力,能够高效地处理来自 Kafka 的海量卡口数据,进行复杂的转换和分析操作。
    • Apache Kafka:作为高性能的分布式消息队列,负责稳定地接收和传输卡口数据,确保数据的可靠性和高可用性,实现数据的解耦与缓冲。
    • MySQL:用于存储监控设备信息以及超速车辆数据,提供持久化的数据存储服务,方便数据的管理与查询。
    • FastJSON:一款快速的 JSON 处理库,用于在 Java 程序中便捷地解析和序列化 JSON 格式的卡口数据。
  2. 环境搭建
    • 首先启动 Hadoop 相关服务:执行 start-dfs.sh 启动分布式文件系统。
    • 启动 ZooKeeper 服务:zk.sh start,为 Kafka 提供分布式协调服务。
    • 启动 Kafka 服务:kf.sh start,确保消息队列正常运行。
    • 启动 Flink 集群:start-cluster.sh,创建 Flink 执行环境。
    • 启动 Flink 的历史服务器:historyserver.sh start,方便查看作业执行历史信息。
    • 创建用于接收卡口数据的 Kafka 主题 topic-carkafka-topics.sh --bootstrap-server bigdata01:9092 --create --partitions 1 --replication-factor 3 --topic topic-car,其中 bigdata01 为 Kafka 服务器地址,--partitions 1 表示主题分区数为 1,--replication-factor 3 表示每个分区的副本数为 3。

三、数据格式与数据库表结构

  1. Kafka 卡口数据格式
    • action_timelong 类型,表示摄像头拍摄时间戳,精确到秒,用于记录车辆通过卡口的具体时间。
    • monitor_idstring 类型,代表卡口号,是识别车辆通过哪个卡口的关键标识。
    • camera_idstring 类型,摄像头编号,由于一个卡口可能配备多个摄像头,此编号用于区分不同摄像头拍摄的数据。
    • carstring 类型,车牌号码,用于唯一标识车辆。
    • speeddouble 类型,车辆通过卡口时的速度,是判断车辆是否超速的核心数据。
    • road_idstring 类型,道路 ID,代表城市中每条道路的唯一编号,有助于关联道路相关信息。
    • area_idstring 类型,区域 ID,标识车辆所在的城市行政区域,方便进行区域交通数据统计分析。
    • 示例数据:{"action_time":1682219447,"monitor_id":"0001","camera_id":"1","car":"豫A12345","speed":34.5,"road_id":"01","area_id":"20"}
  2. MySQL 表结构
    • t_speeding_info 表:
      • idint(11) 类型,自增长主键,用于唯一标识每条超速记录。
      • carvarchar(255) 类型,存储车辆的车牌号码。
      • monitor_idvarchar(255) 类型,记录车辆超速时经过的卡口号。
      • road_idvarchar(255) 类型,对应的道路 ID。
      • real_speeddouble 类型,车辆实际行驶速度。
      • limit_speedint(11) 类型,该卡口的限速值。
      • action_timebigint(20) 类型,车辆超速的时间戳。
    • t_monitor_info 表:
      • monitor_idvarchar(255) 类型,主键,卡口的唯一标识。
      • road_idvarchar(255) 类型,卡口所在道路的 ID。
      • speed_limitint(11) 类型,该卡口的限速信息。
      • area_idvarchar(255) 类型,卡口所在的区域 ID。

四、代码实现详解

以下是使用 Java 结合 Flink 实现的核心代码逻辑:

import com.alibaba.fastjson.JSON;
import org.apache.flink.api.common.RuntimeExecutionMode;
import org.apache.flink.api.common.functions.FilterFunction;
import org.apache.flink.api.common.serialization.SimpleStringSchema;
import org.apache.flink.connector.jdbc.JdbcConnectionOptions;
import org.apache.flink.connector.jdbc.JdbcExecutionOptions;
import org.apache.flink.connector.jdbc.JdbcSink;
import org.apache.flink.connector.jdbc.JdbcStatementBuilder;
import org.apache.flink.streaming.api.datastream.DataStreamSource;
import org.apache.flink.streaming.api.datastream.SingleOutputStreamOperator;
import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
import org.apache.flink.streaming.api.functions.ProcessFunction;
import org.apache.flink.streaming.connectors.kafka.FlinkKafkaConsumer;
import org.apache.flink.util.Collector;
import java.sql.Connection;
import java.sql.DriverManager;
import java.sql.Statement;
import java.util.ArrayList;
import java.util.Objects;
import java.util.Properties;

/**
 * @基本功能: 从 Kafka 读取卡口数据,判断超速并写入 MySQL 超速表
 * @program:FlinkProject
 * @author: BJ
 * @create:2024-11-23 09:09:06
 **/
public class CarSpeedDemo {

    public static void main(String[] args) throws Exception {

        // 1. env - 准备环境
        StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
        env.setRuntimeMode(RuntimeExecutionMode.AUTOMATIC);

        // 2. source - 加载数据
        // 加载 MySQL 中的监控设备信息
        ArrayList<MonitorInfo> monitorInfos = loadMonitorInfoFromMySQL();

        // 加载 Kafka 中的车辆信息数据
        Properties properties = new Properties();
        properties.setProperty("bootstrap.servers", "bigdata01:9092");
        properties.setProperty("group.id", "fD1");
        FlinkKafkaConsumer<String> kafkaSource = new FlinkKafkaConsumer<String>("topic-car", new SimpleStringSchema(), properties);
        DataStreamSource<String> dataStreamSource = env.addSource(kafkaSource);

        // 3. transformation - 数据处理转换
        SingleOutputStreamOperator<CarInfo> filter = dataStreamSource.process(new ProcessFunction<String, CarInfo>() {
            @Override
            public void processElement(String msg, ProcessFunction<String, CarInfo>.Context context, Collector<CarInfo> collector) throws Exception {
                CarInfo carInfo = JSON.parseObject(msg, CarInfo.class);
                System.out.println(carInfo);
                // 根据监控设备信息设置车辆的限速
                for (MonitorInfo monitorInfo : monitorInfos) {
                    if (Objects.equals(monitorInfo.getMonitor_id(), carInfo.getMonitorId())) {
                        carInfo.setLimitSpeed(monitorInfo.getSpeed_limit());
                    }
                }
                collector.collect(carInfo);
                System.out.println(carInfo);
            }
        }).filter(new FilterFunction<CarInfo>() {
            @Override
            public boolean filter(CarInfo carInfo) throws Exception {
                // 判断车辆是否超速(速度超过限速的 1.2 倍)
                return carInfo.getSpeed() > carInfo.getLimitSpeed() * 1.2;
            }
        });
        filter.print();

        // 4. sink - 数据输出
        JdbcConnectionOptions jdbcConnectionOptions = new JdbcConnectionOptions.JdbcConnectionOptionsBuilder()
              .withUrl("jdbc:mysql://localhost:3306/zuoye")
              .withDriverName("com.mysql.jdbc.Driver")
              .withUsername("root")
              .withPassword("123456")
              .build();
        filter.addSink(JdbcSink.sink(
                "insert into t_speeding_info values(null,?,?,?,?,?,?)",
                new JdbcStatementBuilder<CarInfo>() {
                    @Override
                    public void accept(PreparedStatement preparedStatement, CarInfo student) throws SQLException {
                        preparedStatement.setString(1, student.getCar());
                        preparedStatement.setString(2, student.getMonitorId());
                        preparedStatement.setString(3, student.getRoadId());
                        preparedStatement.setDouble(4, student.getSpeed());
                        preparedStatement.setDouble(5, student.getLimitSpeed());
                        preparedStatement.setLong(6, student.getActionTime());
                    }
                }, JdbcExecutionOptions.builder().withBatchSize(1).build(), jdbcConnectionOptions

        ));

        // 5. execute - 执行
        env.execute();
    }

    // 从 MySQL 加载监控设备信息的方法
    private static ArrayList<MonitorInfo> loadMonitorInfoFromMySQL() throws Exception {
        ArrayList<MonitorInfo> monitorInfos = new ArrayList<>();
        Connection connection = DriverManager.getConnection("jdbc:mysql://localhost:3306/zuoye", "root", "123456");
        Statement statement = connection.createStatement();
        ResultSet resultSet = statement.executeQuery("select monitor_id, road_id, speed_limit,area_id FROM t_monitor_info");
        while (resultSet.next()) {
            String monitor_id = resultSet.getString("monitor_id");
            String road_id = resultSet.getString("road_id");
            Integer speed_limit = resultSet.getInt("speed_limit");
            String area_id = resultSet.getString("area_id");
            MonitorInfo monitorInfo = new MonitorInfo(monitor_id, road_id, speed_limit.doubleValue(), area_id);
            System.out.println(monitorInfo);
            monitorInfos.add(monitorInfo);
        }
        statement.close();
        connection.close();
        return monitorInfos;
    }
}

// 车辆信息类
import lombok.AllArgsConstructor;
import lombok.Data;
import lombok.NoArgsConstructor;

import java.io.Serializable;

@Data
@NoArgsConstructor
@AllArgsConstructor
public class CarInfo implements Serializable {
    private long actionTime;
    private String monitorId;
    private String cameraId;
    private String car;
    private double speed;
    private String roadId;
    private String areaId;

    // 用于存储车辆对应的限速信息
    private double limitSpeed;
}

// 监控设备信息类
import lombok.AllArgsConstructor;
import lombok.Data;
import lombok.NoArgsConstructor;

import java.io.Serializable;

@Data
@NoArgsConstructor
@AllArgsConstructor
public class MonitorInfo implements Serializable {
    private String monitor_id;
    private String road_id;
    private Double speed_limit;
    private String area_id;
}
  1. 环境配置与数据加载
    • 首先创建 Flink 的流执行环境 StreamExecutionEnvironment,并设置运行模式为自动模式,让 Flink 根据数据特性自动选择合适的执行模式。
    • 从 MySQL 的 t_monitor_info 表中加载监控设备信息,通过 JDBC 连接数据库,执行查询语句获取监控设备的相关数据,如卡口号、道路 ID、限速信息和区域 ID,将这些信息封装成 MonitorInfo 对象并存储在 ArrayList 中,以便后续在数据处理过程中为车辆信息匹配对应的限速。
    • 配置 Kafka 消费者属性,创建 FlinkKafkaConsumer 连接到 topic-car 主题,获取卡口车辆数据的流 dataStreamSource
  2. 数据处理与转换
    • 使用 process 函数对从 Kafka 读取的车辆信息数据进行处理。在 processElement 方法中,先使用 FastJSON 将 JSON 格式的车辆数据字符串转换为 CarInfo 对象。然后遍历之前加载的监控设备信息列表,根据卡口号匹配车辆对应的监控设备,将限速信息设置到 CarInfo 对象的 limitSpeed 属性中。最后将处理后的 CarInfo 对象收集输出。
    • 通过自定义的 FilterFunction 对车辆信息进行过滤,只保留速度超过限速 1.2 倍的超速车辆信息。
  3. 数据输出
    • 配置 JDBC 连接选项,构建 JdbcSink,将超速车辆信息插入到 MySQL 的 t_speeding_info 表中。在 JdbcStatementBuilder 中指定了插入语句的参数设置,将车辆的相关信息逐个设置到 PreparedStatement 中,准备插入数据库。
  4. 任务执行
    • 最后启动 Flink 任务执行,整个系统开始运行,从数据加载、处理到输出的流程将持续进行,实时监测和处理车辆超速信息。

五、总结与展望

本项目成功地利用 Flink、Kafka 和 MySQL 构建了一个车辆超速监测与数据存储系统,实现了从 Kafka 读取卡口数据、判断车辆超速并将超速信息写入 MySQL 的完整流程。通过实时处理卡口数据,交通管理部门能够及时获取超速车辆信息,有助于加强交通监管力度,提高道路交通安全水平。

然而,在实际应用场景中,还可以对该系统进行进一步的优化与扩展。例如,可以增加数据质量监控模块,确保从 Kafka 读取的数据准确性和完整性;优化 Flink 任务的性能,根据集群资源和数据流量调整并行度等参数;还可以考虑将超速数据进行可视化展示,以便更直观地分析交通超速情况的分布与趋势。未来,随着智能交通技术的不断发展,本系统也可以与其他交通管理系统进行集成,如车辆轨迹分析系统、交通流量预测系统等,为构建更加智能、高效的交通管理平台奠定坚实的基础。

希望通过本文的详细介绍,能够帮助读者深入理解基于 Flink 的实时数据处理在智能交通领域的应用,为相关领域的开发与研究提供有益的参考。